249 lines
7.1 KiB
Racket
249 lines
7.1 KiB
Racket
#lang racket/base
|
|
|
|
(require racket/bool)
|
|
(require racket/match)
|
|
(require racket/class)
|
|
(require racket/list)
|
|
|
|
(require "functional-queue.rkt")
|
|
(require "blocking-box.rkt")
|
|
(require "standard-thread.rkt")
|
|
|
|
(provide make-room
|
|
room?
|
|
room-name
|
|
join-room
|
|
wait-for-members
|
|
spy-on
|
|
|
|
;; Management and communication
|
|
(struct-out arrived)
|
|
(struct-out departed)
|
|
(struct-out says)
|
|
|
|
;; Generic utility messages
|
|
(struct-out credit)
|
|
|
|
(struct-out rpc-request)
|
|
(struct-out rpc-reply) ;; error is a kind of reply; crashes are detected via disconnection
|
|
)
|
|
|
|
(struct arrived (who) #:prefab) ;; someone arrived
|
|
(struct departed (who why) #:prefab) ;; someone departed with a reason
|
|
(struct says (who what topic) #:prefab) ;; someone said something with a given topic
|
|
|
|
(struct credit (who amount) #:prefab) ;; give someone an amount of credit
|
|
|
|
(struct rpc-request (reply-to id body) #:prefab)
|
|
(struct rpc-reply (id body) #:prefab)
|
|
|
|
(struct room (name ch))
|
|
|
|
(struct room-state (name ch members) #:transparent)
|
|
(struct binding (name ;; any
|
|
invisible? ;; boolean
|
|
in-ch ;; sync channel
|
|
out-ch ;; sync channel
|
|
disconnect-box ;; blocking-box
|
|
queue-box ;; box of functional queue
|
|
thread ;; thread
|
|
exit-status) ;; maybe exit-status
|
|
#:transparent)
|
|
|
|
(define (make-room [name (gensym 'room)])
|
|
(define ch (make-channel))
|
|
(thread (lambda () (room-main (room-state name ch '()))))
|
|
(room name ch))
|
|
|
|
(define (join-room room [name (gensym 'peer)]
|
|
#:invisible? [invisible? #f])
|
|
(make-object membership% (room-ch room) name invisible?))
|
|
|
|
(define (wait-for-members room members)
|
|
(define handle (join-room room (gensym 'wait-for-members) #:invisible? #t))
|
|
(let loop ((remaining-members members))
|
|
(if (null? remaining-members)
|
|
#t
|
|
(match (send handle listen)
|
|
((arrived who) (loop (remove who remaining-members)))
|
|
((departed who _) (if (member who members)
|
|
(error 'wait-for-members "Waited-for member exited")
|
|
(loop remaining-members)))
|
|
(_ (loop remaining-members)))))
|
|
(send handle depart))
|
|
|
|
(define (spy-on room)
|
|
(thread (lambda ()
|
|
(define spy-name (gensym 'spy))
|
|
(define handle (join-room room spy-name #:invisible? #t))
|
|
(let loop ()
|
|
(define message (send handle listen))
|
|
(log-info (format "~s/~s: ~s" spy-name (room-name room) message))
|
|
(loop)))))
|
|
|
|
(define membership%
|
|
(class* object% ()
|
|
(init room-init)
|
|
(init name-init)
|
|
(init invisible?-init)
|
|
|
|
(super-new)
|
|
|
|
(define room room-init)
|
|
(define name name-init)
|
|
(define in-ch (make-channel))
|
|
(define out-ch (make-channel))
|
|
(define disconnect-box (make-blocking-box))
|
|
(define connected #t)
|
|
(define reason #f)
|
|
|
|
(define the-disconnected-evt (wrap-evt (blocking-box-evt disconnect-box)
|
|
(lambda (v)
|
|
(set! connected #f)
|
|
(set! reason v)
|
|
v)))
|
|
(channel-put room `(join ,name ,invisible?-init
|
|
,out-ch ,in-ch ,disconnect-box
|
|
,(current-thread) ,(current-thread-exit-status)))
|
|
|
|
(define/public (reply-name)
|
|
name)
|
|
|
|
(define/public (connected?)
|
|
connected)
|
|
|
|
(define/public (disconnect-reason)
|
|
reason)
|
|
|
|
(define/public (disconnected-evt)
|
|
the-disconnected-evt)
|
|
|
|
(define/public (say-evt what [topic #f])
|
|
(choice-evt the-disconnected-evt
|
|
(channel-put-evt out-ch (says name what topic))))
|
|
|
|
(define/public (say what [topic #f])
|
|
(sync (say-evt what topic)))
|
|
|
|
(define/public (depart-evt [why #f])
|
|
(choice-evt the-disconnected-evt
|
|
(wrap-evt (channel-put-evt out-ch (departed name why))
|
|
(lambda (v)
|
|
(set! connected #f)
|
|
(set! reason why)))))
|
|
|
|
(define/public (depart [why #f])
|
|
(sync (depart-evt why)))
|
|
|
|
(define/public (listen-evt)
|
|
;; we wrap this event because otherwise we leak authority
|
|
(wrap-evt in-ch values))
|
|
|
|
(define/public (try-listen)
|
|
(channel-try-get in-ch))
|
|
|
|
(define/public (listen)
|
|
(sync (wrap-evt the-disconnected-evt
|
|
(lambda (reason)
|
|
(error 'listen "~v: Disconnected with reason ~v while listening"
|
|
name reason)))
|
|
(listen-evt)))))
|
|
|
|
(define (room-main state)
|
|
;;(write `(room-main ,state)) (newline)
|
|
(define handler
|
|
(sync (foldl (lambda (b acc)
|
|
(choice-evt (let ((qb (binding-queue-box b)))
|
|
(if (queue-empty? (unbox qb))
|
|
acc
|
|
(choice-evt acc
|
|
(let-values (((first rest) (dequeue (unbox qb))))
|
|
(handle-evt (channel-put-evt (binding-out-ch b)
|
|
first)
|
|
(lambda (dummy)
|
|
(lambda (state)
|
|
(set-box! qb rest)
|
|
state)))))))
|
|
(handle-evt (binding-in-ch b)
|
|
(thread-message-handler b))
|
|
(handle-evt (thread-dead-evt (binding-thread b))
|
|
(thread-death-handler b))))
|
|
(handle-evt (room-state-ch state) join-message-handler)
|
|
(room-state-members state))))
|
|
(room-main (handler state)))
|
|
|
|
(define (thread-message-handler b)
|
|
(lambda (message)
|
|
(lambda (state)
|
|
(handle-binding-message state b message))))
|
|
|
|
(define (thread-death-handler b)
|
|
(lambda (dummy)
|
|
(lambda (state)
|
|
(part state b (binding-death-reason b)))))
|
|
|
|
(define join-message-handler
|
|
(lambda (message)
|
|
(lambda (state)
|
|
(match message
|
|
(`(join ,name ,invisible? ,in-ch ,out-ch ,disconnect-box
|
|
,thread ,exit-status)
|
|
(join state name invisible? in-ch out-ch disconnect-box
|
|
thread exit-status))
|
|
(unexpected (log-warning (format "room-main: unexpected message ~v" unexpected))
|
|
state)))))
|
|
|
|
(define (binding-death-reason b)
|
|
(define es (binding-exit-status b))
|
|
(and es
|
|
(exit-status-exception es)))
|
|
|
|
(define (handle-binding-message state b message)
|
|
(match message
|
|
((departed _ why) (part state b why))
|
|
((says _ what topic) (broadcast state b (says (binding-name b) what topic)))
|
|
(else (log-warning (format "handle-binding-message: unexpected message ~v"
|
|
message))
|
|
state)))
|
|
|
|
(define (join state name invisible?
|
|
in-ch out-ch disconnect-box
|
|
thread exit-status)
|
|
(define b (binding name invisible?
|
|
in-ch out-ch disconnect-box
|
|
(box (list->queue (membership-summary state)))
|
|
thread exit-status))
|
|
(if invisible?
|
|
(add-binding state b)
|
|
(broadcast (add-binding state b) b (arrived name))))
|
|
|
|
(define (part state b why)
|
|
(set-blocking-box! (binding-disconnect-box b) why)
|
|
(if (binding-invisible? b)
|
|
(remove-binding state b)
|
|
(broadcast (remove-binding state b) b (departed (binding-name b) why))))
|
|
|
|
(define (membership-summary state)
|
|
(filter-map (lambda (member) (and (not (binding-invisible? member))
|
|
(arrived (binding-name member))))
|
|
(room-state-members state)))
|
|
|
|
(define (add-binding state b)
|
|
(struct-copy room-state state
|
|
[members (cons b (room-state-members state))]))
|
|
|
|
(define (remove-binding state b)
|
|
(struct-copy room-state state
|
|
[members (remove b (room-state-members state) eq?)]))
|
|
|
|
(define (broadcast state b message)
|
|
(for-each (lambda (member)
|
|
(when (not (eq? member b))
|
|
(enqueue-message! member message)))
|
|
(room-state-members state))
|
|
state)
|
|
|
|
(define (enqueue-message! b message)
|
|
(define qb (binding-queue-box b))
|
|
(set-box! qb (enqueue (unbox qb) message)))
|