diff --git a/presence/conversation.rkt b/presence/conversation.rkt index f8f7f3f..be27059 100644 --- a/presence/conversation.rkt +++ b/presence/conversation.rkt @@ -22,6 +22,8 @@ ;; Presence, Advertisement and Subscription (struct-out topic) + co-role + co-topic ) (struct arrived (who) #:prefab) ;; someone arrived @@ -52,9 +54,9 @@ (struct room (name ch)) (struct room-state (name ch members) #:transparent) -(struct binding (name ;; any - flows-box ;; map from advertised topic to - ;; map from flow to set of (cons advertised-co-topic binding) +(struct route (local-topic remote-topic remote-binding) #:transparent) +(struct binding ([topics #:mutable] ;; set of advertised topics + [flows #:mutable] ;; map from flow to set of route in-ch ;; sync channel out-ch ;; sync channel disconnect-box ;; blocking-box @@ -131,11 +133,7 @@ (sync (retract!-evt name))) (define/public (say-evt who what) - (define cname (upper-case-symbols->canonical who)) - (when (not (set-member? names cname)) - ;; TODO: Overly restrictive. Topics of conversation should be - ;; contained by registered topics, not equal to them. - (error 'say "Attempt to speak on unregistered topic ~v" cname)) + (define cname (upper-case-symbols->canonical (freshen who))) ;; TODO freshening is a bit weird (choice-evt the-disconnected-evt (channel-put-evt out-ch (says cname what)))) @@ -205,8 +203,8 @@ (define ((join-message-handler message) state) (match message [(join-message in-ch out-ch disconnect-box thread exit-status) - (define b (binding (gensym 'binding) - (box (hash)) + (define b (binding (set) + (hash) in-ch out-ch disconnect-box @@ -222,91 +220,97 @@ (and es ;; some threads are not standard-threads (exit-status-exception es))) +(define (co-role r) + (case r + [(publisher) 'subscriber] + [(subscriber) 'publisher] + [else #f])) + +(define (co-topic t) + (topic (co-role (topic-role t)) (topic-pattern t))) + +(define (refine-topic t new-pattern) + (topic (topic-role t) new-pattern)) + +(define (roles-intersect? l r) + (eq? l (co-role r))) + ;; Both left and right must be canonicalized. (define (topic-intersection left right) - (and ;; They are matching roles: - (or (and (eq? (topic-role left) 'publisher) - (eq? (topic-role right) 'subscriber)) - (and (eq? (topic-role left) 'subscriber) - (eq? (topic-role right) 'publisher))) - ;; They unify: + (and (roles-intersect? (topic-role left) (topic-role right)) (mgu-canonical (freshen (topic-pattern left)) (freshen (topic-pattern right))))) ;; Topic must be canonicalized. (define (binding-has-topic? b topic) - (hash-has-key? (unbox (binding-flows-box b)) topic)) - -(define (binding-topics b) - (hash-keys (unbox (binding-flows-box b)))) - -;; Topic must be canonicalized. -(define (binding-flows-for-topic b topic) - (hash-ref (unbox (binding-flows-box b)) topic hash)) - -;; Topic must be canonicalized. -(define (set-binding-flows-for-topic! b topic flows) - (set-box! (binding-flows-box b) - (hash-set (unbox (binding-flows-box b)) topic flows))) + (set-member? (binding-topics b) topic)) ;; Inserts a new flow in the records of b1, and signals b1 if the flow ;; is new to it. ;; Topics and flow must all be canonicalized. (define (insert-flow! b1 topic1 flow topic2 b2) - (set-binding-flows-for-topic! - b1 topic1 - (let ((old-flows (binding-flows-for-topic b1 topic1))) - (when (not (hash-has-key? old-flows flow)) - (enqueue-message! b1 (arrived (topic (topic-role topic2) flow)))) - (hash-update old-flows - flow - (lambda (old-counterparties) - (set-add old-counterparties (cons topic2 b2))) - set)))) + (define old-routes (hash-ref (binding-flows b1) flow set)) + (when (set-empty? old-routes) + (enqueue-message! b1 (arrived (topic (topic-role topic2) flow)))) + (set-binding-flows! b1 (hash-set (binding-flows b1) flow + (set-add old-routes (route topic1 topic2 b2))))) ;; Removes a flow from the records of b1, signalling b1 if the flow ;; ended after the removal. ;; Topic and flow must be canonicalized. (define (remove-flow! why b1 topic1 flow topic2 b2) - (define old-flows (binding-flows-for-topic b1 topic1)) - (define old-counterparties (hash-ref old-flows flow set)) - (define new-counterparties (set-remove old-counterparties (cons topic2 b2))) - (define new-flows (if (set-empty? new-counterparties) - (begin (enqueue-message! b1 (departed (topic (topic-role topic2) flow) why)) - (hash-remove old-flows flow)) - (hash-set old-flows flow new-counterparties))) - (set-binding-flows-for-topic! b1 topic1 new-flows)) + (define old-flows (binding-flows b1)) + (define old-routes (hash-ref old-flows flow set)) + (define new-routes (set-remove old-routes (route topic1 topic2 b2))) + (define new-flows (if (set-empty? new-routes) + (begin (enqueue-message! b1 (departed flow why)) + (hash-remove old-flows flow)) + (hash-set old-flows flow new-routes))) + (set-binding-flows! b1 new-flows)) -(define (depart! b topic why) - (for* ([(flow counterparties) (binding-flows-for-topic b topic)] - [counterparty counterparties]) - (define other-topic (car counterparty)) - (define other-binding (cdr counterparty)) - (remove-flow! why other-binding other-topic flow topic b)) - (set-box! (binding-flows-box b) (hash-remove (unbox (binding-flows-box b)) topic))) +;; Topic must be canonicalized. +(define (arrive! b arriving-topic all-bindings) + (when (not (binding-has-topic? b arriving-topic)) + (set-binding-topics! b (set-add (binding-topics b) arriving-topic)) + (for* ([other-binding all-bindings] + [other-topic (binding-topics other-binding)]) + (let ((flow-pattern (topic-intersection arriving-topic other-topic))) + (when flow-pattern + (define flow (refine-topic arriving-topic flow-pattern)) + (insert-flow! b arriving-topic (co-topic flow) other-topic other-binding) + (insert-flow! other-binding other-topic flow arriving-topic b)))))) + +;; Topic must be canonicalized. +(define (depart! b departing-topic why) + (when (binding-has-topic? b departing-topic) + (define co-departing-topic (co-topic departing-topic)) + (for* ([(co-flow old-routes) (binding-flows b)] + #:when (specialization? co-flow co-departing-topic) + [r old-routes] #:when (equal? (route-local-topic r) departing-topic)) + (match-define (route _ remote-topic remote-binding) r) + (remove-flow! why b departing-topic co-flow remote-topic remote-binding) + (remove-flow! why remote-binding remote-topic (co-topic co-flow) departing-topic b)) + (set-binding-topics! b (set-remove (binding-topics b) departing-topic)))) (define (((handle-binding-message b) message) state) ;;(write `(considering ,message from ,(binding-name b))) (newline) (match message - [(leave-message why) (part state b why)] + [(leave-message why) + (part state b why)] [(arrived this-topic) - (when (not (binding-has-topic? b this-topic)) - (set-binding-flows-for-topic! b this-topic (hash)) - (for* ([other-binding (room-state-members state)] - [other-topic (binding-topics other-binding)]) - (let ((flow (topic-intersection this-topic other-topic))) - (when flow - (insert-flow! b this-topic flow other-topic other-binding) - (insert-flow! other-binding other-topic flow this-topic b))))) + (arrive! b this-topic (room-state-members state)) state] [(departed who why) (depart! b who why) state] - [(says this-topic what) - (for* ([(flow counterparties) (binding-flows-for-topic b this-topic)] - [counterparty counterparties]) - (define other-topic (car counterparty)) - (define other-binding (cdr counterparty)) - (enqueue-message! other-binding (says (topic (topic-role this-topic) flow) what))) + [(says full-topic what) + (define remote-bindings + (for*/set ([(co-flow routes) (binding-flows b)] + #:when (topic-intersection full-topic co-flow) + [r routes]) + (route-remote-binding r))) + (for ([remote-binding remote-bindings]) + (enqueue-message! remote-binding (says full-topic what))) + ;;(write `(delivering ,full-topic ,what ,remote-bindings)) (newline) state] [else (log-warning (format "handle-binding-message: unexpected message ~v" message)) diff --git a/presence/test-conversation.rkt b/presence/test-conversation.rkt index 66b9aba..9051695 100644 --- a/presence/test-conversation.rkt +++ b/presence/test-conversation.rkt @@ -32,6 +32,7 @@ (define name (read-line i)) (if (eof-object? name) (begin (display "OK, bye then!" o) + (newline o) (flush-output o)) (let ((handle (join-room r))) (define talk-topic (topic 'publisher (list name 'Sink 'speech))) @@ -53,7 +54,7 @@ [(says (and specific-topic (topic 'publisher _)) _) (write `(,name acking) o) (newline o) - (send handle say specific-topic (list name 'ack))] + (send handle say (co-topic specific-topic) (list name 'ack))] [_ (void)]) (flush-output o) (loop))) @@ -61,13 +62,19 @@ (lambda (utterance) (when (equal? utterance "error") (error 'interaction "Following orders!")) - (when (not (eof-object? utterance)) - (send handle say talk-topic utterance) - (loop))))))))) + (if (eof-object? utterance) + (begin (display "Closing session." o) + (newline o) + (flush-output o) + (send handle depart)) + (begin (send handle say talk-topic utterance) + (loop)))))))))) (standard-thread (lambda () - (interaction (current-input-port) (current-output-port)))) + (let loop () + (interaction (current-input-port) (current-output-port)) + (loop)))) (define port-number 5001) (display "Listening on port ") diff --git a/presence/test-unify.rkt b/presence/test-unify.rkt index 2dea4f5..7fd5847 100644 --- a/presence/test-unify.rkt +++ b/presence/test-unify.rkt @@ -60,3 +60,12 @@ (check-equal? (unify/vars (x1 'A 'a) (x1 'b 'a)) '((A . b))) (let ((A (variable 'A))) (check-equal? (apply-subst `((,A . b)) (x1 A 'a)) (x1 'b 'a))) + +(check-true (specialization? (upper-case-symbols->canonical '(a A)) + (upper-case-symbols->canonical '(B C)))) +(check-false (specialization? (upper-case-symbols->canonical '(B C)) + (upper-case-symbols->canonical '(a A)))) +(check-true (specialization? (upper-case-symbols->canonical '(a A)) + (upper-case-symbols->canonical '(a A)))) +(check-false (specialization? (upper-case-symbols->canonical '(a A)) + (upper-case-symbols->canonical '(A a)))) diff --git a/presence/unify.rkt b/presence/unify.rkt index 65e58fb..a0e2abc 100644 --- a/presence/unify.rkt +++ b/presence/unify.rkt @@ -16,6 +16,7 @@ mgu-freshen mgu-canonical apply-subst + specialization? upper-case-symbols->variables upper-case-symbols->canonical) @@ -127,7 +128,7 @@ (define env (make-hash)) ;; cheeky use of mutation (let walk ((t t)) (cond - [(upper-case-symbol? t) + [(or (upper-case-symbol? t) (variable? t) (canonical-variable? t)) (cond [(hash-ref env t #f)] [else (define v (canonical-variable (hash-count env))) (hash-set! env t v) v])] [(pair? t) (cons (walk (car t)) (walk (cdr t)))] @@ -205,8 +206,7 @@ (define (canonicalize t) (freshen* t (lambda (var env) (canonical-variable (hash-count env))) - (lambda (var env) - (error 'canonicalize "Canonical variables are forbidden in input to canonicalize")))) + (lambda (var env) (canonical-variable (hash-count env))))) ;; Any Any -> Any ;; If the arguments unify, applies the substitution to one of them, @@ -232,8 +232,23 @@ [(struct? x) (struct-map walk x)] [else x]))) +;; True iff a is a specialization (or instance) of b. +(define (specialization? a b) + (let walk ((a a) (b b)) + (cond + [(or (variable? b) (canonical-variable? b)) #t] + [(or (variable? a) (canonical-variable? a)) #f] + [(and (pair? a) (pair? b)) + (and (walk (car a) (car b)) (walk (cdr a) (cdr b)))] + [(and (vector? a) (vector? b) (= (vector-length a) (vector-length b))) + (for/and ([aa a] [bb b]) (walk aa bb))] + [(and (struct? a) (struct? b)) + (walk (struct->vector a #f) (struct->vector b #f))] + [else (equal? a b)]))) + (require racket/trace) (trace ;;unify/env ;;upper-case-symbols->variables ;;apply-subst + ;;specialization? )