Permit clean shutdown by adjusting virtual-topic presence semantics. A

flow is now visible if either both peer topics are non-virtual or if
the local topic is virtual: this means that virtual topics see other
virtual topics come and go as well as normal topics, but normal topics
only see other normal topics. Adjusting the example program's session
manager to subscribe virtually and then only pay attention to new
normal flows solves the socket-shutdown problem: because all the
wildcard, resource-managing peers are now virtually subscribed and the
resources themselves are normally subscribed, the resources can use
normal presence to manage their own lifecycles.
This commit is contained in:
Tony Garnock-Jones 2012-03-13 05:29:27 -04:00
parent 0820c7e572
commit 3f8b7313db
2 changed files with 31 additions and 18 deletions

View File

@ -254,8 +254,8 @@
(define (co-topic t) (define (co-topic t)
(struct-copy topic t [role (co-role (topic-role t))])) (struct-copy topic t [role (co-role (topic-role t))]))
(define (refine-topic t new-pattern) (define (refine-topic remote-topic new-pattern)
(struct-copy topic t [pattern new-pattern] [virtual? #f])) (struct-copy topic remote-topic [pattern new-pattern]))
(define (roles-intersect? l r) (define (roles-intersect? l r)
(eq? l (co-role r))) (eq? l (co-role r)))
@ -269,11 +269,19 @@
(define (binding-has-topic? b topic) (define (binding-has-topic? b topic)
(set-member? (binding-topics b) topic)) (set-member? (binding-topics b) topic))
;; True iff the flow between remote-topic and local-topic should be
;; visible to the local peer. This is the case when either local-topic
;; is virtual (in which case everything is seen) or otherwise if
;; remote-topic is also not virtual.
(define (flow-visible? local-topic remote-topic)
(or (topic-virtual? local-topic)
(not (topic-virtual? remote-topic))))
;; Inserts a new flow in the records of b1, and signals b1 if the flow ;; Inserts a new flow in the records of b1, and signals b1 if the flow
;; is new to it. ;; is new to it.
;; Topics and flow must all be canonicalized. ;; Topics and flow must all be canonicalized.
(define (insert-flow! b1 topic1 flow topic2 b2) (define (insert-flow! b1 topic1 flow topic2 b2)
(when (not (topic-virtual? topic2)) (when (flow-visible? topic1 topic2)
(define old-count (hash-ref (binding-flows b1) flow 0)) (define old-count (hash-ref (binding-flows b1) flow 0))
;;(write `(count for flow ,flow at ,(binding-debug-name b1) was ,old-count)) (newline) ;;(write `(count for flow ,flow at ,(binding-debug-name b1) was ,old-count)) (newline)
(when (zero? old-count) (when (zero? old-count)
@ -286,7 +294,7 @@
;; ended after the removal. ;; ended after the removal.
;; Topic and flow must be canonicalized. ;; Topic and flow must be canonicalized.
(define (remove-flow! why b1 topic1 flow topic2 b2) (define (remove-flow! why b1 topic1 flow topic2 b2)
(when (not (topic-virtual? topic2)) (when (flow-visible? topic1 topic2)
(define old-flows (binding-flows b1)) (define old-flows (binding-flows b1))
(define old-count (hash-ref old-flows flow)) ;; error if absent (define old-count (hash-ref old-flows flow)) ;; error if absent
(define new-count (- old-count 1)) (define new-count (- old-count 1))
@ -312,20 +320,21 @@
[other-topic (binding-topics other-binding)]) [other-topic (binding-topics other-binding)])
(let ((flow-pattern (topic-intersection arriving-topic other-topic))) (let ((flow-pattern (topic-intersection arriving-topic other-topic)))
(when flow-pattern (when flow-pattern
(define flow (refine-topic arriving-topic flow-pattern)) (define flow1 (refine-topic other-topic flow-pattern))
(insert-flow! b arriving-topic (co-topic flow) other-topic other-binding) (define flow2 (refine-topic arriving-topic flow-pattern))
(insert-flow! other-binding other-topic flow arriving-topic b)))))) (insert-flow! b arriving-topic flow1 other-topic other-binding)
(insert-flow! other-binding other-topic flow2 arriving-topic b))))))
;; Topic must be canonicalized. ;; Topic must be canonicalized.
(define (depart! b departing-topic why) (define (depart! b departing-topic why)
(when (binding-has-topic? b departing-topic) (when (binding-has-topic? b departing-topic)
(define co-departing-topic (co-topic departing-topic)) (for* ([(flow1 old-routes) (binding-routes b)]
(for* ([(co-flow old-routes) (binding-routes b)] #:when (specialization? (topic-pattern flow1) (topic-pattern departing-topic))
#:when (specialization? co-flow co-departing-topic)
[r old-routes] #:when (equal? (route-local-topic r) departing-topic)) [r old-routes] #:when (equal? (route-local-topic r) departing-topic))
(match-define (route _ remote-topic remote-binding) r) (match-define (route _ remote-topic remote-binding) r)
(remove-flow! why b departing-topic co-flow remote-topic remote-binding) (define flow2 (refine-topic departing-topic (topic-pattern flow1)))
(remove-flow! why remote-binding remote-topic (co-topic co-flow) departing-topic b)) (remove-flow! why b departing-topic flow1 remote-topic remote-binding)
(remove-flow! why remote-binding remote-topic flow2 departing-topic b))
(set-binding-topics! b (set-remove (binding-topics b) departing-topic)))) (set-binding-topics! b (set-remove (binding-topics b) departing-topic))))
(define (((handle-binding-message b) message) state) (define (((handle-binding-message b) message) state)

View File

@ -51,7 +51,8 @@
(lambda () (lambda ()
(define h (join-room pool 'LISTEN-THREAD)) (define h (join-room pool 'LISTEN-THREAD))
(define server-address (tcp-address #f port-no)) (define server-address (tcp-address #f port-no))
(send h assert! (topic-subscriber (tcp-stream (wild-address) server-address))) (send h assert! (topic-subscriber (tcp-stream (wild-address) server-address)
#:virtual? #t))
(define (quit-proc) (send h depart)) (define (quit-proc) (send h depart))
(let loop () (let loop ()
(match (send h listen) (match (send h listen)
@ -59,7 +60,7 @@
(topic 'publisher (topic 'publisher
(and inbound-stream (tcp-stream (tcp-address (? non-wild?) (? non-wild?)) (and inbound-stream (tcp-stream (tcp-address (? non-wild?) (? non-wild?))
(== server-address))) (== server-address)))
_)) #f))
(write `(starting session for ,inbound-stream)) (newline) (write `(starting session for ,inbound-stream)) (newline)
(standard-thread (lambda () (session inbound-stream quit-proc))) (standard-thread (lambda () (session inbound-stream quit-proc)))
(loop)] (loop)]
@ -78,18 +79,21 @@
(listen port-number) (listen port-number)
(define (wait-until-pool-empty) (define (wait-until-pool-empty)
(define h (join-room pool 'WAITER)) (define h (join-room pool 'POOL-WAITER))
(send h assert! (topic-publisher (wild) #:virtual? #t)) (send h assert! (topic-publisher (wild) #:virtual? #t))
(send h assert! (topic-subscriber (wild) #:virtual? #t)) (send h assert! (topic-subscriber (wild) #:virtual? #t))
(let loop ((show-count #t) (count 0)) (let loop ((show-count #t) (count 0))
(when show-count (when show-count
(write `(pool has ,count members)) (newline)) (write `(pool has ,count members)) (newline))
(match (send h listen) (match (send h listen)
[(arrived (and x (topic _ _ #t)))
(write `(ignoring arrival of ,x)) (newline)
(loop #f count)]
[(arrived x) [(arrived x)
(write `(,x arrived in pool)) (newline) (write `(,x arrived in pool)) (newline)
(loop #t (+ count 1))] (loop #t (+ count 1))]
[(departed _ _) (if (= count 1) [(departed (topic _ _ #f) _) (if (= count 1)
'done 'done
(loop #t (- count 1)))] (loop #t (- count 1)))]
[_ (loop #f count)]))) [_ (loop #f count)])))
(wait-until-pool-empty) (wait-until-pool-empty)