From 3f8b7313dbe9acf5f8760902e99eae032cc203fb Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Tue, 13 Mar 2012 05:29:27 -0400 Subject: [PATCH] Permit clean shutdown by adjusting virtual-topic presence semantics. A flow is now visible if either both peer topics are non-virtual or if the local topic is virtual: this means that virtual topics see other virtual topics come and go as well as normal topics, but normal topics only see other normal topics. Adjusting the example program's session manager to subscribe virtually and then only pay attention to new normal flows solves the socket-shutdown problem: because all the wildcard, resource-managing peers are now virtually subscribed and the resources themselves are normally subscribed, the resources can use normal presence to manage their own lifecycles. --- presence/conversation.rkt | 33 +++++++++++++++++---------- presence/test-conversation-socket.rkt | 16 ++++++++----- 2 files changed, 31 insertions(+), 18 deletions(-) diff --git a/presence/conversation.rkt b/presence/conversation.rkt index 9bdea58..dcd0bb2 100644 --- a/presence/conversation.rkt +++ b/presence/conversation.rkt @@ -254,8 +254,8 @@ (define (co-topic t) (struct-copy topic t [role (co-role (topic-role t))])) -(define (refine-topic t new-pattern) - (struct-copy topic t [pattern new-pattern] [virtual? #f])) +(define (refine-topic remote-topic new-pattern) + (struct-copy topic remote-topic [pattern new-pattern])) (define (roles-intersect? l r) (eq? l (co-role r))) @@ -269,11 +269,19 @@ (define (binding-has-topic? b topic) (set-member? (binding-topics b) topic)) +;; True iff the flow between remote-topic and local-topic should be +;; visible to the local peer. This is the case when either local-topic +;; is virtual (in which case everything is seen) or otherwise if +;; remote-topic is also not virtual. +(define (flow-visible? local-topic remote-topic) + (or (topic-virtual? local-topic) + (not (topic-virtual? remote-topic)))) + ;; Inserts a new flow in the records of b1, and signals b1 if the flow ;; is new to it. ;; Topics and flow must all be canonicalized. (define (insert-flow! b1 topic1 flow topic2 b2) - (when (not (topic-virtual? topic2)) + (when (flow-visible? topic1 topic2) (define old-count (hash-ref (binding-flows b1) flow 0)) ;;(write `(count for flow ,flow at ,(binding-debug-name b1) was ,old-count)) (newline) (when (zero? old-count) @@ -286,7 +294,7 @@ ;; ended after the removal. ;; Topic and flow must be canonicalized. (define (remove-flow! why b1 topic1 flow topic2 b2) - (when (not (topic-virtual? topic2)) + (when (flow-visible? topic1 topic2) (define old-flows (binding-flows b1)) (define old-count (hash-ref old-flows flow)) ;; error if absent (define new-count (- old-count 1)) @@ -312,20 +320,21 @@ [other-topic (binding-topics other-binding)]) (let ((flow-pattern (topic-intersection arriving-topic other-topic))) (when flow-pattern - (define flow (refine-topic arriving-topic flow-pattern)) - (insert-flow! b arriving-topic (co-topic flow) other-topic other-binding) - (insert-flow! other-binding other-topic flow arriving-topic b)))))) + (define flow1 (refine-topic other-topic flow-pattern)) + (define flow2 (refine-topic arriving-topic flow-pattern)) + (insert-flow! b arriving-topic flow1 other-topic other-binding) + (insert-flow! other-binding other-topic flow2 arriving-topic b)))))) ;; Topic must be canonicalized. (define (depart! b departing-topic why) (when (binding-has-topic? b departing-topic) - (define co-departing-topic (co-topic departing-topic)) - (for* ([(co-flow old-routes) (binding-routes b)] - #:when (specialization? co-flow co-departing-topic) + (for* ([(flow1 old-routes) (binding-routes b)] + #:when (specialization? (topic-pattern flow1) (topic-pattern departing-topic)) [r old-routes] #:when (equal? (route-local-topic r) departing-topic)) (match-define (route _ remote-topic remote-binding) r) - (remove-flow! why b departing-topic co-flow remote-topic remote-binding) - (remove-flow! why remote-binding remote-topic (co-topic co-flow) departing-topic b)) + (define flow2 (refine-topic departing-topic (topic-pattern flow1))) + (remove-flow! why b departing-topic flow1 remote-topic remote-binding) + (remove-flow! why remote-binding remote-topic flow2 departing-topic b)) (set-binding-topics! b (set-remove (binding-topics b) departing-topic)))) (define (((handle-binding-message b) message) state) diff --git a/presence/test-conversation-socket.rkt b/presence/test-conversation-socket.rkt index 2f9e617..bd34a75 100644 --- a/presence/test-conversation-socket.rkt +++ b/presence/test-conversation-socket.rkt @@ -51,7 +51,8 @@ (lambda () (define h (join-room pool 'LISTEN-THREAD)) (define server-address (tcp-address #f port-no)) - (send h assert! (topic-subscriber (tcp-stream (wild-address) server-address))) + (send h assert! (topic-subscriber (tcp-stream (wild-address) server-address) + #:virtual? #t)) (define (quit-proc) (send h depart)) (let loop () (match (send h listen) @@ -59,7 +60,7 @@ (topic 'publisher (and inbound-stream (tcp-stream (tcp-address (? non-wild?) (? non-wild?)) (== server-address))) - _)) + #f)) (write `(starting session for ,inbound-stream)) (newline) (standard-thread (lambda () (session inbound-stream quit-proc))) (loop)] @@ -78,18 +79,21 @@ (listen port-number) (define (wait-until-pool-empty) - (define h (join-room pool 'WAITER)) + (define h (join-room pool 'POOL-WAITER)) (send h assert! (topic-publisher (wild) #:virtual? #t)) (send h assert! (topic-subscriber (wild) #:virtual? #t)) (let loop ((show-count #t) (count 0)) (when show-count (write `(pool has ,count members)) (newline)) (match (send h listen) + [(arrived (and x (topic _ _ #t))) + (write `(ignoring arrival of ,x)) (newline) + (loop #f count)] [(arrived x) (write `(,x arrived in pool)) (newline) (loop #t (+ count 1))] - [(departed _ _) (if (= count 1) - 'done - (loop #t (- count 1)))] + [(departed (topic _ _ #f) _) (if (= count 1) + 'done + (loop #t (- count 1)))] [_ (loop #f count)]))) (wait-until-pool-empty)