diff --git a/presence/conversation.rkt b/presence/conversation.rkt index 9bdea58..dcd0bb2 100644 --- a/presence/conversation.rkt +++ b/presence/conversation.rkt @@ -254,8 +254,8 @@ (define (co-topic t) (struct-copy topic t [role (co-role (topic-role t))])) -(define (refine-topic t new-pattern) - (struct-copy topic t [pattern new-pattern] [virtual? #f])) +(define (refine-topic remote-topic new-pattern) + (struct-copy topic remote-topic [pattern new-pattern])) (define (roles-intersect? l r) (eq? l (co-role r))) @@ -269,11 +269,19 @@ (define (binding-has-topic? b topic) (set-member? (binding-topics b) topic)) +;; True iff the flow between remote-topic and local-topic should be +;; visible to the local peer. This is the case when either local-topic +;; is virtual (in which case everything is seen) or otherwise if +;; remote-topic is also not virtual. +(define (flow-visible? local-topic remote-topic) + (or (topic-virtual? local-topic) + (not (topic-virtual? remote-topic)))) + ;; Inserts a new flow in the records of b1, and signals b1 if the flow ;; is new to it. ;; Topics and flow must all be canonicalized. (define (insert-flow! b1 topic1 flow topic2 b2) - (when (not (topic-virtual? topic2)) + (when (flow-visible? topic1 topic2) (define old-count (hash-ref (binding-flows b1) flow 0)) ;;(write `(count for flow ,flow at ,(binding-debug-name b1) was ,old-count)) (newline) (when (zero? old-count) @@ -286,7 +294,7 @@ ;; ended after the removal. ;; Topic and flow must be canonicalized. (define (remove-flow! why b1 topic1 flow topic2 b2) - (when (not (topic-virtual? topic2)) + (when (flow-visible? topic1 topic2) (define old-flows (binding-flows b1)) (define old-count (hash-ref old-flows flow)) ;; error if absent (define new-count (- old-count 1)) @@ -312,20 +320,21 @@ [other-topic (binding-topics other-binding)]) (let ((flow-pattern (topic-intersection arriving-topic other-topic))) (when flow-pattern - (define flow (refine-topic arriving-topic flow-pattern)) - (insert-flow! b arriving-topic (co-topic flow) other-topic other-binding) - (insert-flow! other-binding other-topic flow arriving-topic b)))))) + (define flow1 (refine-topic other-topic flow-pattern)) + (define flow2 (refine-topic arriving-topic flow-pattern)) + (insert-flow! b arriving-topic flow1 other-topic other-binding) + (insert-flow! other-binding other-topic flow2 arriving-topic b)))))) ;; Topic must be canonicalized. (define (depart! b departing-topic why) (when (binding-has-topic? b departing-topic) - (define co-departing-topic (co-topic departing-topic)) - (for* ([(co-flow old-routes) (binding-routes b)] - #:when (specialization? co-flow co-departing-topic) + (for* ([(flow1 old-routes) (binding-routes b)] + #:when (specialization? (topic-pattern flow1) (topic-pattern departing-topic)) [r old-routes] #:when (equal? (route-local-topic r) departing-topic)) (match-define (route _ remote-topic remote-binding) r) - (remove-flow! why b departing-topic co-flow remote-topic remote-binding) - (remove-flow! why remote-binding remote-topic (co-topic co-flow) departing-topic b)) + (define flow2 (refine-topic departing-topic (topic-pattern flow1))) + (remove-flow! why b departing-topic flow1 remote-topic remote-binding) + (remove-flow! why remote-binding remote-topic flow2 departing-topic b)) (set-binding-topics! b (set-remove (binding-topics b) departing-topic)))) (define (((handle-binding-message b) message) state) diff --git a/presence/test-conversation-socket.rkt b/presence/test-conversation-socket.rkt index 2f9e617..bd34a75 100644 --- a/presence/test-conversation-socket.rkt +++ b/presence/test-conversation-socket.rkt @@ -51,7 +51,8 @@ (lambda () (define h (join-room pool 'LISTEN-THREAD)) (define server-address (tcp-address #f port-no)) - (send h assert! (topic-subscriber (tcp-stream (wild-address) server-address))) + (send h assert! (topic-subscriber (tcp-stream (wild-address) server-address) + #:virtual? #t)) (define (quit-proc) (send h depart)) (let loop () (match (send h listen) @@ -59,7 +60,7 @@ (topic 'publisher (and inbound-stream (tcp-stream (tcp-address (? non-wild?) (? non-wild?)) (== server-address))) - _)) + #f)) (write `(starting session for ,inbound-stream)) (newline) (standard-thread (lambda () (session inbound-stream quit-proc))) (loop)] @@ -78,18 +79,21 @@ (listen port-number) (define (wait-until-pool-empty) - (define h (join-room pool 'WAITER)) + (define h (join-room pool 'POOL-WAITER)) (send h assert! (topic-publisher (wild) #:virtual? #t)) (send h assert! (topic-subscriber (wild) #:virtual? #t)) (let loop ((show-count #t) (count 0)) (when show-count (write `(pool has ,count members)) (newline)) (match (send h listen) + [(arrived (and x (topic _ _ #t))) + (write `(ignoring arrival of ,x)) (newline) + (loop #f count)] [(arrived x) (write `(,x arrived in pool)) (newline) (loop #t (+ count 1))] - [(departed _ _) (if (= count 1) - 'done - (loop #t (- count 1)))] + [(departed (topic _ _ #f) _) (if (= count 1) + 'done + (loop #t (- count 1)))] [_ (loop #f count)]))) (wait-until-pool-empty)