#lang racket/base (require racket/bool) (require racket/match) (require racket/class) (require "functional-queue.rkt") (require "blocking-box.rkt") (require "standard-thread.rkt") (provide make-room room? room-name join-room wait-for-members spy-on ;; Management and communication (struct-out arrived) (struct-out departed) (struct-out says) ;; Generic utility messages (struct-out credit) (struct-out rpc-request) (struct-out rpc-reply) ;; error is a kind of reply; crashes are detected via disconnection ) (struct arrived (who) #:prefab) ;; someone arrived (struct departed (who why) #:prefab) ;; someone departed with a reason (struct says (who what topic) #:prefab) ;; someone said something with a given topic (struct credit (who amount) #:prefab) ;; give someone an amount of credit (struct rpc-request (reply-to id body) #:prefab) (struct rpc-reply (id body) #:prefab) (struct room (name ch)) (struct room-state (name ch members) #:transparent) (struct binding (name ;; any break-on-departure? ;; boolean invisible? ;; boolean in-ch ;; sync channel out-ch ;; sync channel disconnect-box ;; blocking-box queue-box ;; box of functional queue thread ;; thread exit-status) ;; maybe exit-status #:transparent) (define (make-room [name (gensym 'room)]) (define ch (make-channel)) (thread (lambda () (room-main (room-state name ch '())))) (room name ch)) (define (join-room room [name (gensym 'peer)] #:break-on-departure? [break-on-departure? #f] #:invisible? [invisible? #f]) (make-object membership% (room-ch room) name break-on-departure? invisible?)) (define (wait-for-members room members) (define handle (join-room room (gensym 'wait-for-members) #:invisible? #t)) (let loop ((remaining-members members)) (if (null? remaining-members) #t (match (send handle listen) ((arrived who) (loop (remove who remaining-members))) ((departed who _) (if (member who members) (error 'wait-for-members "Waited-for member exited") (loop remaining-members))) (_ (loop remaining-members))))) (send handle depart)) (define (spy-on room) (thread (lambda () (define spy-name (gensym 'spy)) (define handle (join-room room spy-name #:invisible? #t)) (let loop () (define message (send handle listen)) (log-info (format "~s/~s: ~s" spy-name (room-name room) message)) (loop))))) (define membership% (class* object% () (init room-init) (init name-init) (init break-on-departure?-init) (init invisible?-init) (super-new) (define room room-init) (define name name-init) (define in-ch (make-channel)) (define out-ch (make-channel)) (define disconnect-box (make-blocking-box)) (define connected #t) (define reason #f) (define the-disconnected-evt (wrap-evt (blocking-box-evt disconnect-box) (lambda (v) (set! connected #f) (set! reason v) v))) (channel-put room `(join ,name ,break-on-departure?-init ,invisible?-init ,out-ch ,in-ch ,disconnect-box ,(current-thread) ,(current-thread-exit-status))) (define/public (reply-name) name) (define/public (connected?) connected) (define/public (disconnect-reason) reason) (define/public (disconnected-evt) the-disconnected-evt) (define/public (say-evt what [topic #f]) (choice-evt the-disconnected-evt (channel-put-evt out-ch (says name what topic)))) (define/public (say what [topic #f]) (sync (say-evt what topic))) (define/public (depart-evt [why #f]) (choice-evt the-disconnected-evt (wrap-evt (channel-put-evt out-ch (departed name why)) (lambda (v) (set! connected #f) (set! reason why))))) (define/public (depart [why #f]) (sync (depart-evt why))) (define/public (listen-evt) ;; we wrap this event because otherwise we leak authority (wrap-evt in-ch values)) (define/public (listen) (sync (wrap-evt the-disconnected-evt (lambda (reason) (error 'listen "~v: Disconnected with reason ~v while listening" name reason))) (listen-evt))))) (define (room-main state) ;;(write `(room-main ,state)) (newline) (sync (foldl (lambda (b acc) (choice-evt (let ((qb (binding-queue-box b))) (if (queue-empty? (unbox qb)) acc (choice-evt acc (let-values (((first rest) (dequeue (unbox qb)))) (handle-evt (channel-put-evt (binding-out-ch b) first) (lambda (dummy) (set-box! qb rest) (room-main state))))))) (handle-evt (binding-in-ch b) (lambda (message) (room-main (handle-binding-message state b message)))) (handle-evt (thread-dead-evt (binding-thread b)) (lambda (dummy) (room-main (part state b (binding-death-reason b))))))) (handle-evt (room-state-ch state) (match-lambda (`(join ,name ,break-on-departure? ,invisible? ,in-ch ,out-ch ,disconnect-box ,thread ,exit-status) (room-main (join state name break-on-departure? invisible? in-ch out-ch disconnect-box thread exit-status))) (unexpected (log-warning (format "room-main: unexpected message ~v" unexpected)) (room-main state)))) (room-state-members state)))) (define (binding-death-reason b) (define es (binding-exit-status b)) (and es (exit-status-exception es))) (define (handle-binding-message state b message) (match message ((departed _ why) (part state b why)) ((says _ what topic) (broadcast state b (says (binding-name b) what topic))) (else (log-warning (format "handle-binding-message: unexpected message ~v" message)) state))) (define (join state name break-on-departure? invisible? in-ch out-ch disconnect-box thread exit-status) (define b (binding name break-on-departure? invisible? in-ch out-ch disconnect-box (box (list->queue (membership-summary state))) thread exit-status)) (if invisible? (add-binding state b) (broadcast (add-binding state b) b (arrived name)))) (define (part state b why) (set-blocking-box! (binding-disconnect-box b) why) (if (binding-invisible? b) (remove-binding state b) (broadcast (remove-binding state b) b (departed (binding-name b) why)))) (define (membership-summary state) (map (lambda (member) (arrived (binding-name member))) (room-state-members state))) (define (add-binding state b) (struct-copy room-state state [members (cons b (room-state-members state))])) (define (remove-binding state b) (struct-copy room-state state [members (remove b (room-state-members state) eq?)])) (define (broadcast state b message) (for-each (lambda (member) (when (not (eq? member b)) (enqueue-message! member message))) (room-state-members state)) state) (define (enqueue-message! b message) (define qb (binding-queue-box b)) (if (and (departed? message) (binding-break-on-departure? b)) (break-thread (binding-thread b)) (set-box! qb (enqueue (unbox qb) message))))