Virtual topics give a capability similar to that of invisible room

membership in the previous broadcast implementation of
conversation.rkt. They are useful for on-demand resource allocation,
for supervision, and for debugging.
This commit is contained in:
Tony Garnock-Jones 2012-03-11 10:09:23 -04:00
parent b4fe8dfa21
commit 7e0a6228c9
2 changed files with 49 additions and 26 deletions

View File

@ -22,6 +22,8 @@
;; Presence, Advertisement and Subscription ;; Presence, Advertisement and Subscription
(struct-out topic) (struct-out topic)
topic-publisher
topic-subscriber
co-role co-role
co-topic co-topic
) )
@ -30,7 +32,13 @@
(struct departed (who why) #:prefab) ;; someone departed with a reason (struct departed (who why) #:prefab) ;; someone departed with a reason
(struct says (who what) #:prefab) ;; someone said something (struct says (who what) #:prefab) ;; someone said something
(struct topic (role pattern) #:prefab) (struct topic (role pattern virtual?) #:prefab)
(define (topic-publisher pattern #:virtual? [virtual? #f])
(topic 'publisher pattern virtual?))
(define (topic-subscriber pattern #:virtual? [virtual? #f])
(topic 'subscriber pattern virtual?))
;; A flow is an intersection between topic patterns: a substitution ;; A flow is an intersection between topic patterns: a substitution
;; produced by unify, roughly. Flows are signalled embedded in ;; produced by unify, roughly. Flows are signalled embedded in
@ -56,7 +64,8 @@
(struct room-state (name ch members) #:transparent) (struct room-state (name ch members) #:transparent)
(struct route (local-topic remote-topic remote-binding) #:transparent) (struct route (local-topic remote-topic remote-binding) #:transparent)
(struct binding ([topics #:mutable] ;; set of advertised topics (struct binding ([topics #:mutable] ;; set of advertised topics
[flows #:mutable] ;; map from flow to set of route [flows #:mutable] ;; map from signalled (i.e. non-virtual) flow to ref count
[routes #:mutable] ;; map from flow to set of route
in-ch ;; sync channel in-ch ;; sync channel
out-ch ;; sync channel out-ch ;; sync channel
disconnect-box ;; blocking-box disconnect-box ;; blocking-box
@ -207,6 +216,7 @@
(match message (match message
[(join-message in-ch out-ch disconnect-box thread exit-status) [(join-message in-ch out-ch disconnect-box thread exit-status)
(define b (binding (set) (define b (binding (set)
(hash)
(hash) (hash)
in-ch in-ch
out-ch out-ch
@ -230,10 +240,10 @@
[else #f])) [else #f]))
(define (co-topic t) (define (co-topic t)
(topic (co-role (topic-role t)) (topic-pattern t))) (struct-copy topic t [role (co-role (topic-role t))]))
(define (refine-topic t new-pattern) (define (refine-topic t new-pattern)
(topic (topic-role t) new-pattern)) (struct-copy topic t [pattern new-pattern] [virtual? #f]))
(define (roles-intersect? l r) (define (roles-intersect? l r)
(eq? l (co-role r))) (eq? l (co-role r)))
@ -251,24 +261,34 @@
;; is new to it. ;; is new to it.
;; Topics and flow must all be canonicalized. ;; Topics and flow must all be canonicalized.
(define (insert-flow! b1 topic1 flow topic2 b2) (define (insert-flow! b1 topic1 flow topic2 b2)
(define old-routes (hash-ref (binding-flows b1) flow set)) (when (not (topic-virtual? topic2))
(when (set-empty? old-routes) (define old-count (hash-ref (binding-flows b1) flow 0))
(enqueue-message! b1 (arrived flow))) (when (zero? old-count)
(set-binding-flows! b1 (hash-set (binding-flows b1) flow (enqueue-message! b1 (arrived flow)))
(set-add old-routes (route topic1 topic2 b2))))) (set-binding-flows! b1 (hash-set (binding-flows b1) flow (+ old-count 1))))
(let ((new-routes (set-add (hash-ref (binding-routes b1) flow set) (route topic1 topic2 b2))))
(set-binding-routes! b1 (hash-set (binding-routes b1) flow new-routes))))
;; Removes a flow from the records of b1, signalling b1 if the flow ;; Removes a flow from the records of b1, signalling b1 if the flow
;; ended after the removal. ;; ended after the removal.
;; Topic and flow must be canonicalized. ;; Topic and flow must be canonicalized.
(define (remove-flow! why b1 topic1 flow topic2 b2) (define (remove-flow! why b1 topic1 flow topic2 b2)
(define old-flows (binding-flows b1)) (when (not (topic-virtual? topic2))
(define old-routes (hash-ref old-flows flow set)) (define old-flows (binding-flows b1))
(define old-count (hash-ref old-flows flow)) ;; error if absent
(define new-count (- old-count 1))
(define new-flows (if (zero? new-count)
(begin (enqueue-message! b1 (departed flow why))
(hash-remove old-flows flow))
(hash-set old-flows flow new-count)))
(set-binding-flows! b1 new-flows))
(define old-route-map (binding-routes b1))
(define old-routes (hash-ref old-route-map flow set))
(define new-routes (set-remove old-routes (route topic1 topic2 b2))) (define new-routes (set-remove old-routes (route topic1 topic2 b2)))
(define new-flows (if (set-empty? new-routes) (define new-route-map (if (set-empty? new-routes)
(begin (enqueue-message! b1 (departed flow why)) (hash-remove old-route-map flow)
(hash-remove old-flows flow)) (hash-set old-route-map flow new-routes)))
(hash-set old-flows flow new-routes))) (set-binding-routes! b1 new-route-map))
(set-binding-flows! b1 new-flows))
;; Topic must be canonicalized. ;; Topic must be canonicalized.
(define (arrive! b arriving-topic all-bindings) (define (arrive! b arriving-topic all-bindings)
@ -286,7 +306,7 @@
(define (depart! b departing-topic why) (define (depart! b departing-topic why)
(when (binding-has-topic? b departing-topic) (when (binding-has-topic? b departing-topic)
(define co-departing-topic (co-topic departing-topic)) (define co-departing-topic (co-topic departing-topic))
(for* ([(co-flow old-routes) (binding-flows b)] (for* ([(co-flow old-routes) (binding-routes b)]
#:when (specialization? co-flow co-departing-topic) #:when (specialization? co-flow co-departing-topic)
[r old-routes] #:when (equal? (route-local-topic r) departing-topic)) [r old-routes] #:when (equal? (route-local-topic r) departing-topic))
(match-define (route _ remote-topic remote-binding) r) (match-define (route _ remote-topic remote-binding) r)
@ -307,7 +327,7 @@
state] state]
[(says full-topic what) [(says full-topic what)
(define remote-bindings (define remote-bindings
(for*/set ([(co-flow routes) (binding-flows b)] (for*/set ([(co-flow routes) (binding-routes b)]
#:when (topic-intersection full-topic co-flow) #:when (topic-intersection full-topic co-flow)
[r routes]) [r routes])
(route-remote-binding r))) (route-remote-binding r)))

View File

@ -14,16 +14,19 @@
(standard-thread (standard-thread
(lambda () (lambda ()
(define handle (join-room r)) (define handle (join-room r))
(send handle assert! (topic 'subscriber 'Any)) (send handle assert! (topic-subscriber 'Any #:virtual? #t))
(send handle assert! (topic 'publisher 'Any)) (send handle assert! (topic-publisher 'Any #:virtual? #t))
(let loop () (let loop ()
(define m (send handle listen)) (define m (send handle listen))
;;(write `(robot heard ,m)) (newline) ;;(write `(robot heard ,m)) (newline)
(match m (match m
[(arrived who)
(write `(robot hears arrival ,who))
(newline)]
[(says _ "die") [(says _ "die")
(error 'robot "Following orders!")] (error 'robot "Following orders!")]
[(says (topic 'publisher _) _) [(says (topic 'publisher _ _) _)
(send handle say (topic 'subscriber 'Any) `(robot hears ,m))] (send handle say (topic-subscriber 'Any) `(robot hears ,m))]
[else (void)]) [else (void)])
(loop)))) (loop))))
@ -36,8 +39,8 @@
(newline o) (newline o)
(flush-output o)) (flush-output o))
(let ((handle (join-room r))) (let ((handle (join-room r)))
(define talk-topic (topic 'publisher (list name 'Sink 'speech))) (define talk-topic (topic-publisher (list name 'Sink 'speech)))
(define listen-topic (topic 'subscriber (list 'Speaker name 'speech))) (define listen-topic (topic-subscriber (list 'Speaker name 'speech)))
(send handle assert! talk-topic) (send handle assert! talk-topic)
(send handle assert! listen-topic) (send handle assert! listen-topic)
(let loop () (let loop ()
@ -50,10 +53,10 @@
(write `(,name hears ,m) o) (write `(,name hears ,m) o)
(newline o) (newline o)
(match m (match m
[(says (topic 'publisher (list (== name) _ _)) _) [(says (topic 'publisher (list (== name) _ _) _) _)
(write `(,name not acking own utterance) o) (write `(,name not acking own utterance) o)
(newline o)] (newline o)]
[(says (and specific-topic (topic 'publisher _)) _) [(says (and specific-topic (topic 'publisher _ _)) _)
(write `(,name acking) o) (write `(,name acking) o)
(newline o) (newline o)
(send handle say (co-topic specific-topic) (list name 'ack))] (send handle say (co-topic specific-topic) (list name 'ack))]