racket-ssh-2012/conversation.rkt

194 lines
5.5 KiB
Racket

#lang racket/base
(require racket/bool)
(require racket/match)
(require racket/class)
(require "functional-queue.rkt")
(provide make-room
join-room
make-blocking-box
blocking-box-evt
blocking-box-value
set-blocking-box!
;; Management and communication
(struct-out arrived)
(struct-out departed)
(struct-out says)
;; Generic utility messages
(struct-out credit)
)
(struct arrived (who) #:prefab) ;; someone arrived
(struct departed (who why) #:prefab) ;; someone departed with a reason
(struct says (who what topic) #:prefab) ;; someone said something with a given topic
(struct credit (who amount) #:prefab) ;; give someone an amount of credit
(struct room (ch))
(struct blocking-box (ch))
(struct room-state (name ch members) #:transparent)
(struct binding (name in-ch out-ch disconnect-box queue-box thread) #:transparent)
(define (make-room [name (gensym 'room)])
(define ch (make-channel))
(thread (lambda () (room-main (room-state name ch '()))))
(room ch))
(define (join-room room [name (gensym 'peer)])
(make-object membership% (room-ch room) name))
(define (make-blocking-box)
(blocking-box (make-channel)))
(define (blocking-box-evt b)
;; wrap the event to avoid exposing too much authority
(wrap-evt (blocking-box-ch b) values))
(define (blocking-box-value b)
(sync (blocking-box-evt b)))
(define (set-blocking-box! b v)
(define c (blocking-box-ch b))
(thread (lambda ()
(let loop ()
;; Either we get to successfully write, or someone's racing with us
(sync (handle-evt (channel-put-evt c v)
(lambda (dummy)
(loop)))
c))))
(void))
(define membership%
(class* object% ()
(init room-init)
(init name-init)
(super-new)
(define room room-init)
(define name name-init)
(define in-ch (make-channel))
(define out-ch (make-channel))
(define disconnect-box (make-blocking-box))
(define connected #t)
(define reason #f)
(define the-disconnected-evt (wrap-evt (blocking-box-evt disconnect-box)
(lambda (v)
(set! connected #f)
(set! reason v)
v)))
(channel-put room `(join ,name ,out-ch ,in-ch ,disconnect-box ,(current-thread)))
(define/public (connected?)
connected)
(define/public (disconnect-reason)
reason)
(define/public (disconnected-evt)
the-disconnected-evt)
(define/public (say-evt what [topic #f])
(choice-evt the-disconnected-evt
(channel-put-evt out-ch (says name what topic))))
(define/public (say what [topic #f])
(sync (say-evt what topic)))
(define/public (depart-evt [why #f])
(choice-evt the-disconnected-evt
(wrap-evt (channel-put-evt out-ch (departed name why))
(lambda (v)
(set! connected #f)
(set! reason why)))))
(define/public (depart [why #f])
(sync (depart-evt why)))
(define/public (listen-evt)
;; we wrap this event because otherwise we leak authority
(wrap-evt in-ch values))
(define/public (listen)
(sync (wrap-evt the-disconnected-evt
(lambda (reason)
(error 'listen "~v: Disconnected with reason ~v while listening"
name reason)))
(listen-evt)))))
(define (room-main state)
;;(write `(room-main ,state)) (newline)
(sync (foldl (lambda (b acc)
(choice-evt (let ((qb (binding-queue-box b)))
(if (queue-empty? (unbox qb))
acc
(choice-evt acc
(let-values (((first rest) (dequeue (unbox qb))))
(handle-evt (channel-put-evt (binding-out-ch b)
first)
(lambda (dummy)
(set-box! qb rest)
(room-main state)))))))
(handle-evt (binding-in-ch b)
(lambda (message)
(room-main (handle-binding-message state b message))))
(handle-evt (thread-dead-evt (binding-thread b))
(lambda (dummy)
(room-main (part state b 'EXIT))))))
(handle-evt (room-state-ch state)
(match-lambda
(`(join ,name ,in-ch ,out-ch ,disconnect-box ,thread)
(room-main (join state name in-ch out-ch disconnect-box thread)))
(unexpected (log-warning (format "room-main: unexpected message ~v"
unexpected))
(room-main state))))
(room-state-members state))))
(define (handle-binding-message state b message)
(match message
((departed _ why) (part state b why))
((says _ what topic) (broadcast state b (says (binding-name b) what topic)))
(else (log-warning (format "handle-binding-message: unexpected message ~v"
message))
state)))
(define (join state name in-ch out-ch disconnect-box thread)
(define b (binding name in-ch out-ch disconnect-box
(box (list->queue (membership-summary state)))
thread))
(broadcast (add-binding state b) b (arrived name)))
(define (part state b why)
(set-blocking-box! (binding-disconnect-box b) why)
(broadcast (remove-binding state b) b (departed (binding-name b) why)))
(define (membership-summary state)
(map (lambda (member) (arrived (binding-name member)))
(room-state-members state)))
(define (add-binding state b)
(struct-copy room-state state
[members (cons b (room-state-members state))]))
(define (remove-binding state b)
(struct-copy room-state state
[members (remove b (room-state-members state) eq?)]))
(define (broadcast state b message)
(for-each (lambda (member)
(when (not (eq? member b))
(enqueue-message! member message)))
(room-state-members state))
state)
(define (enqueue-message! b message)
(define qb (binding-queue-box b))
(set-box! qb (enqueue (unbox qb) message)))