382 lines
13 KiB
Racket
382 lines
13 KiB
Racket
#lang racket/base
|
|
|
|
(require racket/set)
|
|
(require racket/match)
|
|
(require racket/class)
|
|
(require racket/list)
|
|
|
|
(require "functional-queue.rkt")
|
|
(require "blocking-box.rkt")
|
|
(require "standard-thread.rkt")
|
|
(require "unify.rkt")
|
|
|
|
(provide make-room
|
|
room?
|
|
room-name
|
|
join-room
|
|
|
|
;; Management and communication
|
|
(struct-out arrived)
|
|
(struct-out departed)
|
|
(struct-out says)
|
|
|
|
;; Presence, Advertisement and Subscription
|
|
(struct-out topic)
|
|
topic-publisher
|
|
topic-subscriber
|
|
co-roles
|
|
co-topic
|
|
|
|
;; Reexported from unify.rkt for convenience
|
|
wild
|
|
wild?
|
|
non-wild?
|
|
)
|
|
|
|
(struct arrived (who) #:prefab) ;; someone arrived
|
|
(struct departed (who why) #:prefab) ;; someone departed with a reason
|
|
(struct says (who what) #:prefab) ;; someone said something
|
|
|
|
(struct topic (role pattern virtual?) #:prefab)
|
|
|
|
(define (topic-publisher pattern #:virtual? [virtual? #f])
|
|
(topic 'publisher pattern virtual?))
|
|
|
|
(define (topic-subscriber pattern #:virtual? [virtual? #f])
|
|
(topic 'subscriber pattern virtual?))
|
|
|
|
;; A flow is an intersection between topic patterns: a substitution
|
|
;; produced by unify, roughly. Flows are signalled embedded in
|
|
;; appropriate topic records, since they're a kind of filtered form of
|
|
;; an upstream topic advertisement.
|
|
|
|
;; The variables in an advertised topic are
|
|
;; universally-quantified. When we unify two such topics, we're asking
|
|
;; for the fewest extra restrictions on those variables that satisfy
|
|
;; *both* topic when seen as predicates. The remaining variables are
|
|
;; again to be interpreted as universally quantified.
|
|
|
|
(struct join-message (debug-name ;; any
|
|
member-to-room ;; sync channel
|
|
room-to-member ;; sync channel
|
|
disconnect-box ;; blocking-box
|
|
member-thread ;; thread
|
|
exit-status) ;; exit-status structure from standard-thread.rkt
|
|
#:prefab)
|
|
(struct leave-message (reason) #:prefab)
|
|
|
|
(struct room (name ch))
|
|
|
|
(struct room-state (name ch members) #:transparent)
|
|
(struct route (local-topic remote-topic remote-binding) #:transparent)
|
|
(struct binding (debug-name ;; any
|
|
[topics #:mutable] ;; set of advertised topics
|
|
[flows #:mutable] ;; map from signalled (i.e. non-virtual) flow to ref count
|
|
[routes #:mutable] ;; map from flow to set of route
|
|
in-ch ;; sync channel
|
|
out-ch ;; sync channel
|
|
disconnect-box ;; blocking-box
|
|
queue-box ;; box of functional queue
|
|
thread ;; thread
|
|
exit-status) ;; maybe exit-status
|
|
;; WARNING: do not make this transparent! The equivalence we
|
|
;; want for bindings is eq?, not equal?, because (sadly)
|
|
;; equal? descends into boxes! If we make this transparent,
|
|
;; then set-add (etc) on anything involving a binding will
|
|
;; take a long time, leading to exponential (?) behaviour on
|
|
;; arrivals.
|
|
)
|
|
|
|
(define (make-room [name (gensym 'room)])
|
|
(define ch (make-channel))
|
|
(thread (lambda () (room-main (room-state name ch '()))))
|
|
(room name ch))
|
|
|
|
(define (join-room room [debug-name-base 'handle])
|
|
(make-object membership% (room-ch room) (gensym debug-name-base)))
|
|
|
|
(define membership%
|
|
(class* object% ()
|
|
(init room-init)
|
|
(init debug-name-init)
|
|
|
|
(super-new)
|
|
|
|
(define room room-init)
|
|
(define debug-name debug-name-init)
|
|
(define names (set)) ;; set of all our advertised topics
|
|
(define flows (set)) ;; set of all current flows
|
|
(define in-ch (make-channel))
|
|
(define out-ch (make-channel))
|
|
(define disconnect-box (make-blocking-box))
|
|
(define connected #t)
|
|
(define reason #f)
|
|
|
|
(define the-disconnected-evt (wrap-evt (blocking-box-evt disconnect-box)
|
|
(lambda (v)
|
|
(set! connected #f)
|
|
(set! reason v)
|
|
v)))
|
|
(channel-put room (join-message debug-name out-ch in-ch disconnect-box
|
|
(current-thread)
|
|
(current-thread-exit-status)))
|
|
|
|
(define/public (connected?)
|
|
connected)
|
|
|
|
(define/public (disconnect-reason)
|
|
reason)
|
|
|
|
(define/public (disconnected-evt)
|
|
the-disconnected-evt)
|
|
|
|
(define/public (assert!-evt name)
|
|
(define cname (canonicalize name))
|
|
;;(write `(handle ,debug-name asserts ,cname)) (newline)
|
|
(choice-evt the-disconnected-evt
|
|
(wrap-evt (channel-put-evt out-ch (arrived cname))
|
|
(lambda (v)
|
|
(set! names (set-add names cname))))))
|
|
|
|
(define/public (assert! name)
|
|
(sync (assert!-evt name)))
|
|
|
|
(define/public (retract!-evt name [why #f])
|
|
(define cname (canonicalize name))
|
|
;;(write `(handle ,debug-name retracts ,cname)) (newline)
|
|
(choice-evt the-disconnected-evt
|
|
(wrap-evt (channel-put-evt out-ch (departed cname why))
|
|
(lambda (v)
|
|
(set! names (set-remove names cname))))))
|
|
|
|
(define/public (current-flows)
|
|
flows)
|
|
|
|
(define/public (retract! name)
|
|
(sync (retract!-evt name)))
|
|
|
|
(define/public (say-evt who what)
|
|
(define cname (canonicalize who))
|
|
(choice-evt the-disconnected-evt
|
|
(channel-put-evt out-ch (says cname what))))
|
|
|
|
(define/public (say who what)
|
|
(sync (say-evt who what)))
|
|
|
|
(define/public (depart-evt [why #f])
|
|
(choice-evt the-disconnected-evt
|
|
(wrap-evt (channel-put-evt out-ch (leave-message why))
|
|
(lambda (v)
|
|
(set! connected #f)
|
|
(set! reason why)))))
|
|
|
|
(define/public (depart [why #f])
|
|
(sync (depart-evt why)))
|
|
|
|
;; Maybe<(or arrived departed says)> -> Maybe<(or arrived departed says)>
|
|
;; Identity function with side-effects to update our set of flows as required.
|
|
(define (apply-message v)
|
|
(match v
|
|
((arrived who) (set! flows (set-add flows who)))
|
|
((departed who _) (set! flows (set-remove flows who)))
|
|
(_ (void)))
|
|
v)
|
|
|
|
(define/public (listen-evt)
|
|
;; we wrap this event for two reasons: firstly, because
|
|
;; otherwise we leak authority, and secondly, to update our set
|
|
;; of flows as arrived/departed events come by.
|
|
(wrap-evt in-ch apply-message))
|
|
|
|
(define/public (try-listen)
|
|
(apply-message (channel-try-get in-ch)))
|
|
|
|
(define/public (listen)
|
|
(sync (wrap-evt the-disconnected-evt
|
|
(lambda (reason)
|
|
(error 'listen "Disconnected with reason ~v while listening" reason)))
|
|
(listen-evt)))))
|
|
|
|
(define (room-main state)
|
|
;;(write `(room-main ,state)) (newline)
|
|
(define handler
|
|
(sync (foldl (lambda (b acc)
|
|
(choice-evt (let ((qb (binding-queue-box b)))
|
|
(if (queue-empty? (unbox qb))
|
|
acc
|
|
(choice-evt acc
|
|
(let-values (((first rest) (dequeue (unbox qb))))
|
|
(handle-evt (channel-put-evt (binding-out-ch b)
|
|
first)
|
|
(lambda (dummy)
|
|
(lambda (state)
|
|
(set-box! qb rest)
|
|
state)))))))
|
|
(handle-evt (binding-in-ch b)
|
|
(handle-binding-message b))
|
|
(handle-evt (thread-dead-evt (binding-thread b))
|
|
(handle-binding-death b))))
|
|
(handle-evt (room-state-ch state) join-message-handler)
|
|
(room-state-members state))))
|
|
(room-main (handler state)))
|
|
|
|
(define (((handle-binding-death b) dummy) state)
|
|
(part state b (binding-death-reason b)))
|
|
|
|
(define ((join-message-handler message) state)
|
|
(match message
|
|
[(join-message debug-name in-ch out-ch disconnect-box thread exit-status)
|
|
(define b (binding debug-name
|
|
(set)
|
|
(hash)
|
|
(hash)
|
|
in-ch
|
|
out-ch
|
|
disconnect-box
|
|
(box (make-queue))
|
|
thread
|
|
exit-status))
|
|
(add-binding state b)]
|
|
[unexpected (log-warning (format "room-main: unexpected message ~v" unexpected))
|
|
state]))
|
|
|
|
(define (binding-death-reason b)
|
|
(define es (binding-exit-status b))
|
|
(and es ;; some threads are not standard-threads
|
|
(exit-status-exception es)))
|
|
|
|
(define (co-roles r)
|
|
(case r
|
|
[(publisher) '(subscriber)]
|
|
[(subscriber) '(publisher)]
|
|
[else #f]))
|
|
|
|
(define (co-topic t new-role)
|
|
(struct-copy topic t [role new-role]))
|
|
|
|
(define (refine-topic remote-topic new-pattern)
|
|
(struct-copy topic remote-topic [pattern new-pattern]))
|
|
|
|
(define (roles-intersect? l r)
|
|
(memq l (co-roles r)))
|
|
|
|
;; Both left and right must be canonicalized.
|
|
(define (topic-intersection left right)
|
|
(and (roles-intersect? (topic-role left) (topic-role right))
|
|
(mgu-canonical (freshen (topic-pattern left)) (freshen (topic-pattern right)))))
|
|
|
|
;; Topic must be canonicalized.
|
|
(define (binding-has-topic? b topic)
|
|
(set-member? (binding-topics b) topic))
|
|
|
|
;; True iff the flow between remote-topic and local-topic should be
|
|
;; visible to the local peer. This is the case when either local-topic
|
|
;; is virtual (in which case everything is seen) or otherwise if
|
|
;; remote-topic is also not virtual.
|
|
(define (flow-visible? local-topic remote-topic)
|
|
(or (topic-virtual? local-topic)
|
|
(not (topic-virtual? remote-topic))))
|
|
|
|
;; Inserts a new flow in the records of b1, and signals b1 if the flow
|
|
;; is new to it.
|
|
;; Topics and flow must all be canonicalized.
|
|
(define (insert-flow! b1 topic1 flow topic2 b2)
|
|
(when (flow-visible? topic1 topic2)
|
|
(define old-count (hash-ref (binding-flows b1) flow 0))
|
|
;;(write `(count for flow ,flow at ,(binding-debug-name b1) was ,old-count)) (newline)
|
|
(when (zero? old-count)
|
|
(enqueue-message! b1 (arrived flow)))
|
|
(set-binding-flows! b1 (hash-set (binding-flows b1) flow (+ old-count 1))))
|
|
(let ((new-routes (set-add (hash-ref (binding-routes b1) flow set) (route topic1 topic2 b2))))
|
|
(set-binding-routes! b1 (hash-set (binding-routes b1) flow new-routes))))
|
|
|
|
;; Removes a flow from the records of b1, signalling b1 if the flow
|
|
;; ended after the removal.
|
|
;; Topic and flow must be canonicalized.
|
|
(define (remove-flow! why b1 topic1 flow topic2 b2)
|
|
(when (flow-visible? topic1 topic2)
|
|
(define old-flows (binding-flows b1))
|
|
(define old-count (hash-ref old-flows flow)) ;; error if absent
|
|
(define new-count (- old-count 1))
|
|
;;(write `(count for flow ,flow at ,(binding-debug-name b1) is now ,new-count)) (newline)
|
|
(define new-flows (if (zero? new-count)
|
|
(begin (enqueue-message! b1 (departed flow why))
|
|
(hash-remove old-flows flow))
|
|
(hash-set old-flows flow new-count)))
|
|
(set-binding-flows! b1 new-flows))
|
|
(define old-route-map (binding-routes b1))
|
|
(define old-routes (hash-ref old-route-map flow set))
|
|
(define new-routes (set-remove old-routes (route topic1 topic2 b2)))
|
|
(define new-route-map (if (set-empty? new-routes)
|
|
(hash-remove old-route-map flow)
|
|
(hash-set old-route-map flow new-routes)))
|
|
(set-binding-routes! b1 new-route-map))
|
|
|
|
;; Topic must be canonicalized.
|
|
(define (arrive! b arriving-topic all-bindings)
|
|
(when (not (binding-has-topic? b arriving-topic))
|
|
(set-binding-topics! b (set-add (binding-topics b) arriving-topic))
|
|
(for* ([other-binding all-bindings]
|
|
[other-topic (binding-topics other-binding)])
|
|
(let ((flow-pattern (topic-intersection arriving-topic other-topic)))
|
|
(when flow-pattern
|
|
(define flow1 (refine-topic other-topic flow-pattern))
|
|
(define flow2 (refine-topic arriving-topic flow-pattern))
|
|
(insert-flow! b arriving-topic flow1 other-topic other-binding)
|
|
(insert-flow! other-binding other-topic flow2 arriving-topic b))))))
|
|
|
|
;; Topic must be canonicalized.
|
|
(define (depart! b departing-topic why)
|
|
(when (binding-has-topic? b departing-topic)
|
|
(for* ([(flow1 old-routes) (binding-routes b)]
|
|
#:when (specialization? (topic-pattern flow1) (topic-pattern departing-topic))
|
|
[r old-routes] #:when (equal? (route-local-topic r) departing-topic))
|
|
(match-define (route _ remote-topic remote-binding) r)
|
|
(define flow2 (refine-topic departing-topic (topic-pattern flow1)))
|
|
(remove-flow! why b departing-topic flow1 remote-topic remote-binding)
|
|
(remove-flow! why remote-binding remote-topic flow2 departing-topic b))
|
|
(set-binding-topics! b (set-remove (binding-topics b) departing-topic))))
|
|
|
|
(define (((handle-binding-message b) message) state)
|
|
;;(write `(considering ,message from ,(binding-debug-name b))) (newline)
|
|
(match message
|
|
[(leave-message why)
|
|
(part state b why)]
|
|
[(arrived this-topic)
|
|
(arrive! b this-topic (room-state-members state))
|
|
state]
|
|
[(departed who why)
|
|
(depart! b who why)
|
|
state]
|
|
[(says full-topic what)
|
|
(define remote-bindings
|
|
(for*/set ([(co-flow routes) (binding-routes b)]
|
|
#:when (topic-intersection full-topic co-flow)
|
|
[r routes])
|
|
(route-remote-binding r)))
|
|
(for ([remote-binding remote-bindings])
|
|
(enqueue-message! remote-binding (says full-topic what)))
|
|
;;(write `(delivering ,full-topic ,what ,remote-bindings)) (newline)
|
|
state]
|
|
[else
|
|
(log-warning (format "handle-binding-message: unexpected message ~v" message))
|
|
state]))
|
|
|
|
(define (part state b why)
|
|
(set-blocking-box! (binding-disconnect-box b) why)
|
|
(for ([topic (binding-topics b)]) (depart! b topic why))
|
|
(remove-binding state b))
|
|
|
|
(define (add-binding state b)
|
|
(struct-copy room-state state
|
|
[members (cons b (room-state-members state))]))
|
|
|
|
(define (remove-binding state b)
|
|
(struct-copy room-state state
|
|
[members (remove b (room-state-members state) eq?)]))
|
|
|
|
(define (enqueue-message! b message)
|
|
(define qb (binding-queue-box b))
|
|
;;(write `(enqueued ,message for ,(binding-debug-name b))) (newline)
|
|
(set-box! qb (enqueue (unbox qb) message)))
|