racket-matrix-2012/presence/conversation.rkt

332 lines
11 KiB
Racket

#lang racket/base
(require racket/set)
(require racket/match)
(require racket/class)
(require racket/list)
(require "functional-queue.rkt")
(require "blocking-box.rkt")
(require "standard-thread.rkt")
(require "unify.rkt")
(provide make-room
room?
room-name
join-room
;; Management and communication
(struct-out arrived)
(struct-out departed)
(struct-out says)
;; Presence, Advertisement and Subscription
(struct-out topic)
)
(struct arrived (who) #:prefab) ;; someone arrived
(struct departed (who why) #:prefab) ;; someone departed with a reason
(struct says (who what) #:prefab) ;; someone said something
(struct topic (role pattern) #:prefab)
;; A flow is an intersection between topic patterns: a substitution
;; produced by unify, roughly. Flows are signalled embedded in
;; appropriate topic records, since they're a kind of filtered form of
;; an upstream topic advertisement.
;; The variables in an advertised topic are
;; universally-quantified. When we unify two such topics, we're asking
;; for the fewest extra restrictions on those variables that satisfy
;; *both* topic when seen as predicates. The remaining variables are
;; again to be interpreted as universally quantified.
(struct join-message (member-to-room ;; sync channel
room-to-member ;; sync channel
disconnect-box ;; blocking-box
member-thread ;; thread
exit-status) ;; exit-status structure from standard-thread.rkt
#:prefab)
(struct leave-message (reason) #:prefab)
(struct room (name ch))
(struct room-state (name ch members) #:transparent)
(struct binding (name ;; any
flows-box ;; map from advertised topic to
;; map from flow to set of (cons advertised-co-topic binding)
in-ch ;; sync channel
out-ch ;; sync channel
disconnect-box ;; blocking-box
queue-box ;; box of functional queue
thread ;; thread
exit-status) ;; maybe exit-status
;; WARNING: do not make this transparent! The equivalence we
;; want for bindings is eq?, not equal?, because (sadly)
;; equal? descends into boxes! If we make this transparent,
;; then set-add (etc) on anything involving a binding will
;; take a long time, leading to exponential (?) behaviour on
;; arrivals.
)
(define (make-room [name (gensym 'room)])
(define ch (make-channel))
(thread (lambda () (room-main (room-state name ch '()))))
(room name ch))
(define (join-room room)
(make-object membership% (room-ch room)))
(define membership%
(class* object% ()
(init room-init)
(super-new)
(define room room-init)
(define names (set)) ;; set of all our advertised topics
(define flows (set)) ;; set of all current flows
(define in-ch (make-channel))
(define out-ch (make-channel))
(define disconnect-box (make-blocking-box))
(define connected #t)
(define reason #f)
(define the-disconnected-evt (wrap-evt (blocking-box-evt disconnect-box)
(lambda (v)
(set! connected #f)
(set! reason v)
v)))
(channel-put room (join-message out-ch in-ch disconnect-box
(current-thread)
(current-thread-exit-status)))
(define/public (connected?)
connected)
(define/public (disconnect-reason)
reason)
(define/public (disconnected-evt)
the-disconnected-evt)
(define/public (assert!-evt name)
(define cname (upper-case-symbols->canonical name))
(choice-evt the-disconnected-evt
(wrap-evt (channel-put-evt out-ch (arrived cname))
(lambda (v)
(set! names (set-add names cname))))))
(define/public (assert! name)
(sync (assert!-evt name)))
(define/public (retract!-evt name [why #f])
(define cname (upper-case-symbols->canonical name))
(choice-evt the-disconnected-evt
(wrap-evt (channel-put-evt out-ch (departed cname why))
(lambda (v)
(set! names (set-remove names cname))))))
(define/public (retract! name)
(sync (retract!-evt name)))
(define/public (say-evt who what)
(define cname (upper-case-symbols->canonical who))
(when (not (set-member? names cname))
;; TODO: Overly restrictive. Topics of conversation should be
;; contained by registered topics, not equal to them.
(error 'say "Attempt to speak on unregistered topic ~v" cname))
(choice-evt the-disconnected-evt
(channel-put-evt out-ch (says cname what))))
(define/public (say who what)
(sync (say-evt who what)))
(define/public (depart-evt [why #f])
(choice-evt the-disconnected-evt
(wrap-evt (channel-put-evt out-ch (leave-message why))
(lambda (v)
(set! connected #f)
(set! reason why)))))
(define/public (depart [why #f])
(sync (depart-evt why)))
;; Maybe<(or arrived departed says)> -> Maybe<(or arrived departed says)>
;; Identity function with side-effects to update our set of flows as required.
(define (apply-message v)
(match v
((arrived who) (set! flows (set-add flows who)))
((departed who _) (set! flows (set-remove flows who)))
(_ (void)))
v)
(define/public (listen-evt)
;; we wrap this event for two reasons: firstly, because
;; otherwise we leak authority, and secondly, to update our set
;; of flows as arrived/departed events come by.
(wrap-evt in-ch apply-message))
(define/public (try-listen)
(apply-message (channel-try-get in-ch)))
(define/public (listen)
(sync (wrap-evt the-disconnected-evt
(lambda (reason)
(error 'listen "Disconnected with reason ~v while listening" reason)))
(listen-evt)))))
(define (room-main state)
;;(write `(room-main ,state)) (newline)
(define handler
(sync (foldl (lambda (b acc)
(choice-evt (let ((qb (binding-queue-box b)))
(if (queue-empty? (unbox qb))
acc
(choice-evt acc
(let-values (((first rest) (dequeue (unbox qb))))
(handle-evt (channel-put-evt (binding-out-ch b)
first)
(lambda (dummy)
(lambda (state)
(set-box! qb rest)
state)))))))
(handle-evt (binding-in-ch b)
(handle-binding-message b))
(handle-evt (thread-dead-evt (binding-thread b))
(handle-binding-death b))))
(handle-evt (room-state-ch state) join-message-handler)
(room-state-members state))))
(room-main (handler state)))
(define (((handle-binding-death b) dummy) state)
(part state b (binding-death-reason b)))
(define ((join-message-handler message) state)
(match message
[(join-message in-ch out-ch disconnect-box thread exit-status)
(define b (binding (gensym 'binding)
(box (hash))
in-ch
out-ch
disconnect-box
(box (make-queue))
thread
exit-status))
(add-binding state b)]
[unexpected (log-warning (format "room-main: unexpected message ~v" unexpected))
state]))
(define (binding-death-reason b)
(define es (binding-exit-status b))
(and es ;; some threads are not standard-threads
(exit-status-exception es)))
;; Both left and right must be canonicalized.
(define (topic-intersection left right)
(and ;; They are matching roles:
(or (and (eq? (topic-role left) 'publisher)
(eq? (topic-role right) 'subscriber))
(and (eq? (topic-role left) 'subscriber)
(eq? (topic-role right) 'publisher)))
;; They unify:
(mgu-canonical (freshen (topic-pattern left)) (freshen (topic-pattern right)))))
;; Topic must be canonicalized.
(define (binding-has-topic? b topic)
(hash-has-key? (unbox (binding-flows-box b)) topic))
(define (binding-topics b)
(hash-keys (unbox (binding-flows-box b))))
;; Topic must be canonicalized.
(define (binding-flows-for-topic b topic)
(hash-ref (unbox (binding-flows-box b)) topic hash))
;; Topic must be canonicalized.
(define (set-binding-flows-for-topic! b topic flows)
(set-box! (binding-flows-box b)
(hash-set (unbox (binding-flows-box b)) topic flows)))
;; Inserts a new flow in the records of b1, and signals b1 if the flow
;; is new to it.
;; Topics and flow must all be canonicalized.
(define (insert-flow! b1 topic1 flow topic2 b2)
(set-binding-flows-for-topic!
b1 topic1
(let ((old-flows (binding-flows-for-topic b1 topic1)))
(when (not (hash-has-key? old-flows flow))
(enqueue-message! b1 (arrived (topic (topic-role topic2) flow))))
(hash-update old-flows
flow
(lambda (old-counterparties)
(set-add old-counterparties (cons topic2 b2)))
set))))
;; Removes a flow from the records of b1, signalling b1 if the flow
;; ended after the removal.
;; Topic and flow must be canonicalized.
(define (remove-flow! why b1 topic1 flow topic2 b2)
(define old-flows (binding-flows-for-topic b1 topic1))
(define old-counterparties (hash-ref old-flows flow set))
(define new-counterparties (set-remove old-counterparties (cons topic2 b2)))
(define new-flows (if (set-empty? new-counterparties)
(begin (enqueue-message! b1 (departed (topic (topic-role topic2) flow) why))
(hash-remove old-flows flow))
(hash-set old-flows flow new-counterparties)))
(set-binding-flows-for-topic! b1 topic1 new-flows))
(define (depart! b topic why)
(for* ([(flow counterparties) (binding-flows-for-topic b topic)]
[counterparty counterparties])
(define other-topic (car counterparty))
(define other-binding (cdr counterparty))
(remove-flow! why other-binding other-topic flow topic b))
(set-box! (binding-flows-box b) (hash-remove (unbox (binding-flows-box b)) topic)))
(define (((handle-binding-message b) message) state)
;;(write `(considering ,message from ,(binding-name b))) (newline)
(match message
[(leave-message why) (part state b why)]
[(arrived this-topic)
(when (not (binding-has-topic? b this-topic))
(set-binding-flows-for-topic! b this-topic (hash))
(for* ([other-binding (room-state-members state)]
[other-topic (binding-topics other-binding)])
(let ((flow (topic-intersection this-topic other-topic)))
(when flow
(insert-flow! b this-topic flow other-topic other-binding)
(insert-flow! other-binding other-topic flow this-topic b)))))
state]
[(departed who why)
(depart! b who why)
state]
[(says this-topic what)
(for* ([(flow counterparties) (binding-flows-for-topic b this-topic)]
[counterparty counterparties])
(define other-topic (car counterparty))
(define other-binding (cdr counterparty))
(enqueue-message! other-binding (says (topic (topic-role this-topic) flow) what)))
state]
[else
(log-warning (format "handle-binding-message: unexpected message ~v" message))
state]))
(define (part state b why)
(set-blocking-box! (binding-disconnect-box b) why)
(for ([topic (binding-topics b)]) (depart! b topic why))
(remove-binding state b))
(define (add-binding state b)
(struct-copy room-state state
[members (cons b (room-state-members state))]))
(define (remove-binding state b)
(struct-copy room-state state
[members (remove b (room-state-members state) eq?)]))
(define (enqueue-message! b message)
(define qb (binding-queue-box b))
;;(write `(enqueued ,message for ,(binding-name b))) (newline)
(set-box! qb (enqueue (unbox qb) message)))