From 7e0a6228c9defe21eeb5c45d0f06c2d2a0fad411 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sun, 11 Mar 2012 10:09:23 -0400 Subject: [PATCH] Virtual topics give a capability similar to that of invisible room membership in the previous broadcast implementation of conversation.rkt. They are useful for on-demand resource allocation, for supervision, and for debugging. --- presence/conversation.rkt | 56 +++++++++++++++++++++++----------- presence/test-conversation.rkt | 19 +++++++----- 2 files changed, 49 insertions(+), 26 deletions(-) diff --git a/presence/conversation.rkt b/presence/conversation.rkt index 568afd1..21d5403 100644 --- a/presence/conversation.rkt +++ b/presence/conversation.rkt @@ -22,6 +22,8 @@ ;; Presence, Advertisement and Subscription (struct-out topic) + topic-publisher + topic-subscriber co-role co-topic ) @@ -30,7 +32,13 @@ (struct departed (who why) #:prefab) ;; someone departed with a reason (struct says (who what) #:prefab) ;; someone said something -(struct topic (role pattern) #:prefab) +(struct topic (role pattern virtual?) #:prefab) + +(define (topic-publisher pattern #:virtual? [virtual? #f]) + (topic 'publisher pattern virtual?)) + +(define (topic-subscriber pattern #:virtual? [virtual? #f]) + (topic 'subscriber pattern virtual?)) ;; A flow is an intersection between topic patterns: a substitution ;; produced by unify, roughly. Flows are signalled embedded in @@ -56,7 +64,8 @@ (struct room-state (name ch members) #:transparent) (struct route (local-topic remote-topic remote-binding) #:transparent) (struct binding ([topics #:mutable] ;; set of advertised topics - [flows #:mutable] ;; map from flow to set of route + [flows #:mutable] ;; map from signalled (i.e. non-virtual) flow to ref count + [routes #:mutable] ;; map from flow to set of route in-ch ;; sync channel out-ch ;; sync channel disconnect-box ;; blocking-box @@ -207,6 +216,7 @@ (match message [(join-message in-ch out-ch disconnect-box thread exit-status) (define b (binding (set) + (hash) (hash) in-ch out-ch @@ -230,10 +240,10 @@ [else #f])) (define (co-topic t) - (topic (co-role (topic-role t)) (topic-pattern t))) + (struct-copy topic t [role (co-role (topic-role t))])) (define (refine-topic t new-pattern) - (topic (topic-role t) new-pattern)) + (struct-copy topic t [pattern new-pattern] [virtual? #f])) (define (roles-intersect? l r) (eq? l (co-role r))) @@ -251,24 +261,34 @@ ;; is new to it. ;; Topics and flow must all be canonicalized. (define (insert-flow! b1 topic1 flow topic2 b2) - (define old-routes (hash-ref (binding-flows b1) flow set)) - (when (set-empty? old-routes) - (enqueue-message! b1 (arrived flow))) - (set-binding-flows! b1 (hash-set (binding-flows b1) flow - (set-add old-routes (route topic1 topic2 b2))))) + (when (not (topic-virtual? topic2)) + (define old-count (hash-ref (binding-flows b1) flow 0)) + (when (zero? old-count) + (enqueue-message! b1 (arrived flow))) + (set-binding-flows! b1 (hash-set (binding-flows b1) flow (+ old-count 1)))) + (let ((new-routes (set-add (hash-ref (binding-routes b1) flow set) (route topic1 topic2 b2)))) + (set-binding-routes! b1 (hash-set (binding-routes b1) flow new-routes)))) ;; Removes a flow from the records of b1, signalling b1 if the flow ;; ended after the removal. ;; Topic and flow must be canonicalized. (define (remove-flow! why b1 topic1 flow topic2 b2) - (define old-flows (binding-flows b1)) - (define old-routes (hash-ref old-flows flow set)) + (when (not (topic-virtual? topic2)) + (define old-flows (binding-flows b1)) + (define old-count (hash-ref old-flows flow)) ;; error if absent + (define new-count (- old-count 1)) + (define new-flows (if (zero? new-count) + (begin (enqueue-message! b1 (departed flow why)) + (hash-remove old-flows flow)) + (hash-set old-flows flow new-count))) + (set-binding-flows! b1 new-flows)) + (define old-route-map (binding-routes b1)) + (define old-routes (hash-ref old-route-map flow set)) (define new-routes (set-remove old-routes (route topic1 topic2 b2))) - (define new-flows (if (set-empty? new-routes) - (begin (enqueue-message! b1 (departed flow why)) - (hash-remove old-flows flow)) - (hash-set old-flows flow new-routes))) - (set-binding-flows! b1 new-flows)) + (define new-route-map (if (set-empty? new-routes) + (hash-remove old-route-map flow) + (hash-set old-route-map flow new-routes))) + (set-binding-routes! b1 new-route-map)) ;; Topic must be canonicalized. (define (arrive! b arriving-topic all-bindings) @@ -286,7 +306,7 @@ (define (depart! b departing-topic why) (when (binding-has-topic? b departing-topic) (define co-departing-topic (co-topic departing-topic)) - (for* ([(co-flow old-routes) (binding-flows b)] + (for* ([(co-flow old-routes) (binding-routes b)] #:when (specialization? co-flow co-departing-topic) [r old-routes] #:when (equal? (route-local-topic r) departing-topic)) (match-define (route _ remote-topic remote-binding) r) @@ -307,7 +327,7 @@ state] [(says full-topic what) (define remote-bindings - (for*/set ([(co-flow routes) (binding-flows b)] + (for*/set ([(co-flow routes) (binding-routes b)] #:when (topic-intersection full-topic co-flow) [r routes]) (route-remote-binding r))) diff --git a/presence/test-conversation.rkt b/presence/test-conversation.rkt index 7c4abb5..fe6d396 100644 --- a/presence/test-conversation.rkt +++ b/presence/test-conversation.rkt @@ -14,16 +14,19 @@ (standard-thread (lambda () (define handle (join-room r)) - (send handle assert! (topic 'subscriber 'Any)) - (send handle assert! (topic 'publisher 'Any)) + (send handle assert! (topic-subscriber 'Any #:virtual? #t)) + (send handle assert! (topic-publisher 'Any #:virtual? #t)) (let loop () (define m (send handle listen)) ;;(write `(robot heard ,m)) (newline) (match m + [(arrived who) + (write `(robot hears arrival ,who)) + (newline)] [(says _ "die") (error 'robot "Following orders!")] - [(says (topic 'publisher _) _) - (send handle say (topic 'subscriber 'Any) `(robot hears ,m))] + [(says (topic 'publisher _ _) _) + (send handle say (topic-subscriber 'Any) `(robot hears ,m))] [else (void)]) (loop)))) @@ -36,8 +39,8 @@ (newline o) (flush-output o)) (let ((handle (join-room r))) - (define talk-topic (topic 'publisher (list name 'Sink 'speech))) - (define listen-topic (topic 'subscriber (list 'Speaker name 'speech))) + (define talk-topic (topic-publisher (list name 'Sink 'speech))) + (define listen-topic (topic-subscriber (list 'Speaker name 'speech))) (send handle assert! talk-topic) (send handle assert! listen-topic) (let loop () @@ -50,10 +53,10 @@ (write `(,name hears ,m) o) (newline o) (match m - [(says (topic 'publisher (list (== name) _ _)) _) + [(says (topic 'publisher (list (== name) _ _) _) _) (write `(,name not acking own utterance) o) (newline o)] - [(says (and specific-topic (topic 'publisher _)) _) + [(says (and specific-topic (topic 'publisher _ _)) _) (write `(,name acking) o) (newline o) (send handle say (co-topic specific-topic) (list name 'ack))]