racket-matrix-2012/presence/conversation.rkt

382 lines
13 KiB
Racket

#lang racket/base
(require racket/set)
(require racket/match)
(require racket/class)
(require racket/list)
(require "functional-queue.rkt")
(require "blocking-box.rkt")
(require "standard-thread.rkt")
(require "unify.rkt")
(provide make-room
room?
room-name
join-room
;; Management and communication
(struct-out arrived)
(struct-out departed)
(struct-out says)
;; Presence, Advertisement and Subscription
(struct-out topic)
topic-publisher
topic-subscriber
co-roles
co-topic
;; Reexported from unify.rkt for convenience
wild
wild?
non-wild?
)
(struct arrived (who) #:prefab) ;; someone arrived
(struct departed (who why) #:prefab) ;; someone departed with a reason
(struct says (who what) #:prefab) ;; someone said something
(struct topic (role pattern virtual?) #:prefab)
(define (topic-publisher pattern #:virtual? [virtual? #f])
(topic 'publisher pattern virtual?))
(define (topic-subscriber pattern #:virtual? [virtual? #f])
(topic 'subscriber pattern virtual?))
;; A flow is an intersection between topic patterns: a substitution
;; produced by unify, roughly. Flows are signalled embedded in
;; appropriate topic records, since they're a kind of filtered form of
;; an upstream topic advertisement.
;; The variables in an advertised topic are
;; universally-quantified. When we unify two such topics, we're asking
;; for the fewest extra restrictions on those variables that satisfy
;; *both* topic when seen as predicates. The remaining variables are
;; again to be interpreted as universally quantified.
(struct join-message (debug-name ;; any
member-to-room ;; sync channel
room-to-member ;; sync channel
disconnect-box ;; blocking-box
member-thread ;; thread
exit-status) ;; exit-status structure from standard-thread.rkt
#:prefab)
(struct leave-message (reason) #:prefab)
(struct room (name ch))
(struct room-state (name ch members) #:transparent)
(struct route (local-topic remote-topic remote-binding) #:transparent)
(struct binding (debug-name ;; any
[topics #:mutable] ;; set of advertised topics
[flows #:mutable] ;; map from signalled (i.e. non-virtual) flow to ref count
[routes #:mutable] ;; map from flow to set of route
in-ch ;; sync channel
out-ch ;; sync channel
disconnect-box ;; blocking-box
queue-box ;; box of functional queue
thread ;; thread
exit-status) ;; maybe exit-status
;; WARNING: do not make this transparent! The equivalence we
;; want for bindings is eq?, not equal?, because (sadly)
;; equal? descends into boxes! If we make this transparent,
;; then set-add (etc) on anything involving a binding will
;; take a long time, leading to exponential (?) behaviour on
;; arrivals.
)
(define (make-room [name (gensym 'room)])
(define ch (make-channel))
(thread (lambda () (room-main (room-state name ch '()))))
(room name ch))
(define (join-room room [debug-name-base 'handle])
(make-object membership% (room-ch room) (gensym debug-name-base)))
(define membership%
(class* object% ()
(init room-init)
(init debug-name-init)
(super-new)
(define room room-init)
(define debug-name debug-name-init)
(define names (set)) ;; set of all our advertised topics
(define flows (set)) ;; set of all current flows
(define in-ch (make-channel))
(define out-ch (make-channel))
(define disconnect-box (make-blocking-box))
(define connected #t)
(define reason #f)
(define the-disconnected-evt (wrap-evt (blocking-box-evt disconnect-box)
(lambda (v)
(set! connected #f)
(set! reason v)
v)))
(channel-put room (join-message debug-name out-ch in-ch disconnect-box
(current-thread)
(current-thread-exit-status)))
(define/public (connected?)
connected)
(define/public (disconnect-reason)
reason)
(define/public (disconnected-evt)
the-disconnected-evt)
(define/public (assert!-evt name)
(define cname (canonicalize name))
;;(write `(handle ,debug-name asserts ,cname)) (newline)
(choice-evt the-disconnected-evt
(wrap-evt (channel-put-evt out-ch (arrived cname))
(lambda (v)
(set! names (set-add names cname))))))
(define/public (assert! name)
(sync (assert!-evt name)))
(define/public (retract!-evt name [why #f])
(define cname (canonicalize name))
;;(write `(handle ,debug-name retracts ,cname)) (newline)
(choice-evt the-disconnected-evt
(wrap-evt (channel-put-evt out-ch (departed cname why))
(lambda (v)
(set! names (set-remove names cname))))))
(define/public (current-flows)
flows)
(define/public (retract! name)
(sync (retract!-evt name)))
(define/public (say-evt who what)
(define cname (canonicalize who))
(choice-evt the-disconnected-evt
(channel-put-evt out-ch (says cname what))))
(define/public (say who what)
(sync (say-evt who what)))
(define/public (depart-evt [why #f])
(choice-evt the-disconnected-evt
(wrap-evt (channel-put-evt out-ch (leave-message why))
(lambda (v)
(set! connected #f)
(set! reason why)))))
(define/public (depart [why #f])
(sync (depart-evt why)))
;; Maybe<(or arrived departed says)> -> Maybe<(or arrived departed says)>
;; Identity function with side-effects to update our set of flows as required.
(define (apply-message v)
(match v
((arrived who) (set! flows (set-add flows who)))
((departed who _) (set! flows (set-remove flows who)))
(_ (void)))
v)
(define/public (listen-evt)
;; we wrap this event for two reasons: firstly, because
;; otherwise we leak authority, and secondly, to update our set
;; of flows as arrived/departed events come by.
(wrap-evt in-ch apply-message))
(define/public (try-listen)
(apply-message (channel-try-get in-ch)))
(define/public (listen)
(sync (wrap-evt the-disconnected-evt
(lambda (reason)
(error 'listen "Disconnected with reason ~v while listening" reason)))
(listen-evt)))))
(define (room-main state)
;;(write `(room-main ,state)) (newline)
(define handler
(sync (foldl (lambda (b acc)
(choice-evt (let ((qb (binding-queue-box b)))
(if (queue-empty? (unbox qb))
acc
(choice-evt acc
(let-values (((first rest) (dequeue (unbox qb))))
(handle-evt (channel-put-evt (binding-out-ch b)
first)
(lambda (dummy)
(lambda (state)
(set-box! qb rest)
state)))))))
(handle-evt (binding-in-ch b)
(handle-binding-message b))
(handle-evt (thread-dead-evt (binding-thread b))
(handle-binding-death b))))
(handle-evt (room-state-ch state) join-message-handler)
(room-state-members state))))
(room-main (handler state)))
(define (((handle-binding-death b) dummy) state)
(part state b (binding-death-reason b)))
(define ((join-message-handler message) state)
(match message
[(join-message debug-name in-ch out-ch disconnect-box thread exit-status)
(define b (binding debug-name
(set)
(hash)
(hash)
in-ch
out-ch
disconnect-box
(box (make-queue))
thread
exit-status))
(add-binding state b)]
[unexpected (log-warning (format "room-main: unexpected message ~v" unexpected))
state]))
(define (binding-death-reason b)
(define es (binding-exit-status b))
(and es ;; some threads are not standard-threads
(exit-status-exception es)))
(define (co-roles r)
(case r
[(publisher) '(subscriber)]
[(subscriber) '(publisher)]
[else #f]))
(define (co-topic t new-role)
(struct-copy topic t [role new-role]))
(define (refine-topic remote-topic new-pattern)
(struct-copy topic remote-topic [pattern new-pattern]))
(define (roles-intersect? l r)
(memq l (co-roles r)))
;; Both left and right must be canonicalized.
(define (topic-intersection left right)
(and (roles-intersect? (topic-role left) (topic-role right))
(mgu-canonical (freshen (topic-pattern left)) (freshen (topic-pattern right)))))
;; Topic must be canonicalized.
(define (binding-has-topic? b topic)
(set-member? (binding-topics b) topic))
;; True iff the flow between remote-topic and local-topic should be
;; visible to the local peer. This is the case when either local-topic
;; is virtual (in which case everything is seen) or otherwise if
;; remote-topic is also not virtual.
(define (flow-visible? local-topic remote-topic)
(or (topic-virtual? local-topic)
(not (topic-virtual? remote-topic))))
;; Inserts a new flow in the records of b1, and signals b1 if the flow
;; is new to it.
;; Topics and flow must all be canonicalized.
(define (insert-flow! b1 topic1 flow topic2 b2)
(when (flow-visible? topic1 topic2)
(define old-count (hash-ref (binding-flows b1) flow 0))
;;(write `(count for flow ,flow at ,(binding-debug-name b1) was ,old-count)) (newline)
(when (zero? old-count)
(enqueue-message! b1 (arrived flow)))
(set-binding-flows! b1 (hash-set (binding-flows b1) flow (+ old-count 1))))
(let ((new-routes (set-add (hash-ref (binding-routes b1) flow set) (route topic1 topic2 b2))))
(set-binding-routes! b1 (hash-set (binding-routes b1) flow new-routes))))
;; Removes a flow from the records of b1, signalling b1 if the flow
;; ended after the removal.
;; Topic and flow must be canonicalized.
(define (remove-flow! why b1 topic1 flow topic2 b2)
(when (flow-visible? topic1 topic2)
(define old-flows (binding-flows b1))
(define old-count (hash-ref old-flows flow)) ;; error if absent
(define new-count (- old-count 1))
;;(write `(count for flow ,flow at ,(binding-debug-name b1) is now ,new-count)) (newline)
(define new-flows (if (zero? new-count)
(begin (enqueue-message! b1 (departed flow why))
(hash-remove old-flows flow))
(hash-set old-flows flow new-count)))
(set-binding-flows! b1 new-flows))
(define old-route-map (binding-routes b1))
(define old-routes (hash-ref old-route-map flow set))
(define new-routes (set-remove old-routes (route topic1 topic2 b2)))
(define new-route-map (if (set-empty? new-routes)
(hash-remove old-route-map flow)
(hash-set old-route-map flow new-routes)))
(set-binding-routes! b1 new-route-map))
;; Topic must be canonicalized.
(define (arrive! b arriving-topic all-bindings)
(when (not (binding-has-topic? b arriving-topic))
(set-binding-topics! b (set-add (binding-topics b) arriving-topic))
(for* ([other-binding all-bindings]
[other-topic (binding-topics other-binding)])
(let ((flow-pattern (topic-intersection arriving-topic other-topic)))
(when flow-pattern
(define flow1 (refine-topic other-topic flow-pattern))
(define flow2 (refine-topic arriving-topic flow-pattern))
(insert-flow! b arriving-topic flow1 other-topic other-binding)
(insert-flow! other-binding other-topic flow2 arriving-topic b))))))
;; Topic must be canonicalized.
(define (depart! b departing-topic why)
(when (binding-has-topic? b departing-topic)
(for* ([(flow1 old-routes) (binding-routes b)]
#:when (specialization? (topic-pattern flow1) (topic-pattern departing-topic))
[r old-routes] #:when (equal? (route-local-topic r) departing-topic))
(match-define (route _ remote-topic remote-binding) r)
(define flow2 (refine-topic departing-topic (topic-pattern flow1)))
(remove-flow! why b departing-topic flow1 remote-topic remote-binding)
(remove-flow! why remote-binding remote-topic flow2 departing-topic b))
(set-binding-topics! b (set-remove (binding-topics b) departing-topic))))
(define (((handle-binding-message b) message) state)
;;(write `(considering ,message from ,(binding-debug-name b))) (newline)
(match message
[(leave-message why)
(part state b why)]
[(arrived this-topic)
(arrive! b this-topic (room-state-members state))
state]
[(departed who why)
(depart! b who why)
state]
[(says full-topic what)
(define remote-bindings
(for*/set ([(co-flow routes) (binding-routes b)]
#:when (topic-intersection full-topic co-flow)
[r routes])
(route-remote-binding r)))
(for ([remote-binding remote-bindings])
(enqueue-message! remote-binding (says full-topic what)))
;;(write `(delivering ,full-topic ,what ,remote-bindings)) (newline)
state]
[else
(log-warning (format "handle-binding-message: unexpected message ~v" message))
state]))
(define (part state b why)
(set-blocking-box! (binding-disconnect-box b) why)
(for ([topic (binding-topics b)]) (depart! b topic why))
(remove-binding state b))
(define (add-binding state b)
(struct-copy room-state state
[members (cons b (room-state-members state))]))
(define (remove-binding state b)
(struct-copy room-state state
[members (remove b (room-state-members state) eq?)]))
(define (enqueue-message! b message)
(define qb (binding-queue-box b))
;;(write `(enqueued ,message for ,(binding-debug-name b))) (newline)
(set-box! qb (enqueue (unbox qb) message)))