#lang racket/base (require racket/set) (require racket/match) (require racket/class) (require racket/list) (require "functional-queue.rkt") (require "blocking-box.rkt") (require "standard-thread.rkt") (require "unify.rkt") (provide make-room room? room-name join-room ;; Management and communication (struct-out arrived) (struct-out departed) (struct-out says) ;; Presence, Advertisement and Subscription (struct-out topic) co-role co-topic ) (struct arrived (who) #:prefab) ;; someone arrived (struct departed (who why) #:prefab) ;; someone departed with a reason (struct says (who what) #:prefab) ;; someone said something (struct topic (role pattern) #:prefab) ;; A flow is an intersection between topic patterns: a substitution ;; produced by unify, roughly. Flows are signalled embedded in ;; appropriate topic records, since they're a kind of filtered form of ;; an upstream topic advertisement. ;; The variables in an advertised topic are ;; universally-quantified. When we unify two such topics, we're asking ;; for the fewest extra restrictions on those variables that satisfy ;; *both* topic when seen as predicates. The remaining variables are ;; again to be interpreted as universally quantified. (struct join-message (member-to-room ;; sync channel room-to-member ;; sync channel disconnect-box ;; blocking-box member-thread ;; thread exit-status) ;; exit-status structure from standard-thread.rkt #:prefab) (struct leave-message (reason) #:prefab) (struct room (name ch)) (struct room-state (name ch members) #:transparent) (struct route (local-topic remote-topic remote-binding) #:transparent) (struct binding ([topics #:mutable] ;; set of advertised topics [flows #:mutable] ;; map from flow to set of route in-ch ;; sync channel out-ch ;; sync channel disconnect-box ;; blocking-box queue-box ;; box of functional queue thread ;; thread exit-status) ;; maybe exit-status ;; WARNING: do not make this transparent! The equivalence we ;; want for bindings is eq?, not equal?, because (sadly) ;; equal? descends into boxes! If we make this transparent, ;; then set-add (etc) on anything involving a binding will ;; take a long time, leading to exponential (?) behaviour on ;; arrivals. ) (define (make-room [name (gensym 'room)]) (define ch (make-channel)) (thread (lambda () (room-main (room-state name ch '())))) (room name ch)) (define (join-room room) (make-object membership% (room-ch room))) (define membership% (class* object% () (init room-init) (super-new) (define room room-init) (define names (set)) ;; set of all our advertised topics (define flows (set)) ;; set of all current flows (define in-ch (make-channel)) (define out-ch (make-channel)) (define disconnect-box (make-blocking-box)) (define connected #t) (define reason #f) (define the-disconnected-evt (wrap-evt (blocking-box-evt disconnect-box) (lambda (v) (set! connected #f) (set! reason v) v))) (channel-put room (join-message out-ch in-ch disconnect-box (current-thread) (current-thread-exit-status))) (define/public (connected?) connected) (define/public (disconnect-reason) reason) (define/public (disconnected-evt) the-disconnected-evt) (define/public (assert!-evt name) (define cname (upper-case-symbols->canonical name)) (choice-evt the-disconnected-evt (wrap-evt (channel-put-evt out-ch (arrived cname)) (lambda (v) (set! names (set-add names cname)))))) (define/public (assert! name) (sync (assert!-evt name))) (define/public (retract!-evt name [why #f]) (define cname (upper-case-symbols->canonical name)) (choice-evt the-disconnected-evt (wrap-evt (channel-put-evt out-ch (departed cname why)) (lambda (v) (set! names (set-remove names cname)))))) (define/public (retract! name) (sync (retract!-evt name))) (define/public (say-evt who what) (define cname (upper-case-symbols->canonical (freshen who))) ;; TODO freshening is a bit weird (choice-evt the-disconnected-evt (channel-put-evt out-ch (says cname what)))) (define/public (say who what) (sync (say-evt who what))) (define/public (depart-evt [why #f]) (choice-evt the-disconnected-evt (wrap-evt (channel-put-evt out-ch (leave-message why)) (lambda (v) (set! connected #f) (set! reason why))))) (define/public (depart [why #f]) (sync (depart-evt why))) ;; Maybe<(or arrived departed says)> -> Maybe<(or arrived departed says)> ;; Identity function with side-effects to update our set of flows as required. (define (apply-message v) (match v ((arrived who) (set! flows (set-add flows who))) ((departed who _) (set! flows (set-remove flows who))) (_ (void))) v) (define/public (listen-evt) ;; we wrap this event for two reasons: firstly, because ;; otherwise we leak authority, and secondly, to update our set ;; of flows as arrived/departed events come by. (wrap-evt in-ch apply-message)) (define/public (try-listen) (apply-message (channel-try-get in-ch))) (define/public (listen) (sync (wrap-evt the-disconnected-evt (lambda (reason) (error 'listen "Disconnected with reason ~v while listening" reason))) (listen-evt))))) (define (room-main state) ;;(write `(room-main ,state)) (newline) (define handler (sync (foldl (lambda (b acc) (choice-evt (let ((qb (binding-queue-box b))) (if (queue-empty? (unbox qb)) acc (choice-evt acc (let-values (((first rest) (dequeue (unbox qb)))) (handle-evt (channel-put-evt (binding-out-ch b) first) (lambda (dummy) (lambda (state) (set-box! qb rest) state))))))) (handle-evt (binding-in-ch b) (handle-binding-message b)) (handle-evt (thread-dead-evt (binding-thread b)) (handle-binding-death b)))) (handle-evt (room-state-ch state) join-message-handler) (room-state-members state)))) (room-main (handler state))) (define (((handle-binding-death b) dummy) state) (part state b (binding-death-reason b))) (define ((join-message-handler message) state) (match message [(join-message in-ch out-ch disconnect-box thread exit-status) (define b (binding (set) (hash) in-ch out-ch disconnect-box (box (make-queue)) thread exit-status)) (add-binding state b)] [unexpected (log-warning (format "room-main: unexpected message ~v" unexpected)) state])) (define (binding-death-reason b) (define es (binding-exit-status b)) (and es ;; some threads are not standard-threads (exit-status-exception es))) (define (co-role r) (case r [(publisher) 'subscriber] [(subscriber) 'publisher] [else #f])) (define (co-topic t) (topic (co-role (topic-role t)) (topic-pattern t))) (define (refine-topic t new-pattern) (topic (topic-role t) new-pattern)) (define (roles-intersect? l r) (eq? l (co-role r))) ;; Both left and right must be canonicalized. (define (topic-intersection left right) (and (roles-intersect? (topic-role left) (topic-role right)) (mgu-canonical (freshen (topic-pattern left)) (freshen (topic-pattern right))))) ;; Topic must be canonicalized. (define (binding-has-topic? b topic) (set-member? (binding-topics b) topic)) ;; Inserts a new flow in the records of b1, and signals b1 if the flow ;; is new to it. ;; Topics and flow must all be canonicalized. (define (insert-flow! b1 topic1 flow topic2 b2) (define old-routes (hash-ref (binding-flows b1) flow set)) (when (set-empty? old-routes) (enqueue-message! b1 (arrived (topic (topic-role topic2) flow)))) (set-binding-flows! b1 (hash-set (binding-flows b1) flow (set-add old-routes (route topic1 topic2 b2))))) ;; Removes a flow from the records of b1, signalling b1 if the flow ;; ended after the removal. ;; Topic and flow must be canonicalized. (define (remove-flow! why b1 topic1 flow topic2 b2) (define old-flows (binding-flows b1)) (define old-routes (hash-ref old-flows flow set)) (define new-routes (set-remove old-routes (route topic1 topic2 b2))) (define new-flows (if (set-empty? new-routes) (begin (enqueue-message! b1 (departed flow why)) (hash-remove old-flows flow)) (hash-set old-flows flow new-routes))) (set-binding-flows! b1 new-flows)) ;; Topic must be canonicalized. (define (arrive! b arriving-topic all-bindings) (when (not (binding-has-topic? b arriving-topic)) (set-binding-topics! b (set-add (binding-topics b) arriving-topic)) (for* ([other-binding all-bindings] [other-topic (binding-topics other-binding)]) (let ((flow-pattern (topic-intersection arriving-topic other-topic))) (when flow-pattern (define flow (refine-topic arriving-topic flow-pattern)) (insert-flow! b arriving-topic (co-topic flow) other-topic other-binding) (insert-flow! other-binding other-topic flow arriving-topic b)))))) ;; Topic must be canonicalized. (define (depart! b departing-topic why) (when (binding-has-topic? b departing-topic) (define co-departing-topic (co-topic departing-topic)) (for* ([(co-flow old-routes) (binding-flows b)] #:when (specialization? co-flow co-departing-topic) [r old-routes] #:when (equal? (route-local-topic r) departing-topic)) (match-define (route _ remote-topic remote-binding) r) (remove-flow! why b departing-topic co-flow remote-topic remote-binding) (remove-flow! why remote-binding remote-topic (co-topic co-flow) departing-topic b)) (set-binding-topics! b (set-remove (binding-topics b) departing-topic)))) (define (((handle-binding-message b) message) state) ;;(write `(considering ,message from ,(binding-name b))) (newline) (match message [(leave-message why) (part state b why)] [(arrived this-topic) (arrive! b this-topic (room-state-members state)) state] [(departed who why) (depart! b who why) state] [(says full-topic what) (define remote-bindings (for*/set ([(co-flow routes) (binding-flows b)] #:when (topic-intersection full-topic co-flow) [r routes]) (route-remote-binding r))) (for ([remote-binding remote-bindings]) (enqueue-message! remote-binding (says full-topic what))) ;;(write `(delivering ,full-topic ,what ,remote-bindings)) (newline) state] [else (log-warning (format "handle-binding-message: unexpected message ~v" message)) state])) (define (part state b why) (set-blocking-box! (binding-disconnect-box b) why) (for ([topic (binding-topics b)]) (depart! b topic why)) (remove-binding state b)) (define (add-binding state b) (struct-copy room-state state [members (cons b (room-state-members state))])) (define (remove-binding state b) (struct-copy room-state state [members (remove b (room-state-members state) eq?)])) (define (enqueue-message! b message) (define qb (binding-queue-box b)) ;;(write `(enqueued ,message for ,(binding-name b))) (newline) (set-box! qb (enqueue (unbox qb) message)))