Remove to-be-redundant libraries from this branch
This commit is contained in:
parent
bdc4cff0f4
commit
2afedd91fc
|
@ -1,143 +0,0 @@
|
|||
#lang racket/base
|
||||
|
||||
(require racket/port)
|
||||
(require racket/class)
|
||||
(require racket/match)
|
||||
(require racket/tcp)
|
||||
(require "conversation.rkt")
|
||||
|
||||
(provide (struct-out set-options)
|
||||
tcp-server-actor
|
||||
tcp-client-actor)
|
||||
|
||||
(struct set-options (new-values) #:prefab)
|
||||
|
||||
(define (socket-name role s)
|
||||
(define-values (local-ip local-port remote-ip remote-port)
|
||||
(tcp-addresses s #t))
|
||||
(list role local-ip local-port remote-ip remote-port))
|
||||
|
||||
(define (option-value options key [missing-value #f])
|
||||
(cond
|
||||
((assq key options) => cadr)
|
||||
(else missing-value)))
|
||||
|
||||
(define (tcp-server-actor room options . tcp-listener-args)
|
||||
(define listener (apply tcp-listen tcp-listener-args))
|
||||
(define name (socket-name 'listener listener))
|
||||
(thread (lambda ()
|
||||
(define handle (join-room room name))
|
||||
(log-info (format "Listening on ~v" name))
|
||||
(let loop ((owner #f)
|
||||
(remaining-credit (option-value options 'initial-accept-credit 0)))
|
||||
(sync (handle-evt (send handle disconnected-evt)
|
||||
(lambda (reason)
|
||||
(log-error (format "~v: conversation closed: ~v" name reason))
|
||||
(tcp-close listener)))
|
||||
(handle-evt (send handle listen-evt)
|
||||
(match-lambda
|
||||
((arrived who)
|
||||
(log-info (format "~v: New owner: ~v" name who))
|
||||
(loop who remaining-credit))
|
||||
((departed who why)
|
||||
(if (equal? owner who)
|
||||
(begin (log-info (format "~v: Owner departed, closing" name))
|
||||
(tcp-close listener))
|
||||
(loop owner remaining-credit)))
|
||||
((says _ (credit _ amount) _)
|
||||
(define new-credit (+ remaining-credit amount))
|
||||
(log-info (format "~v: Credit now ~v" name new-credit))
|
||||
(loop owner new-credit))
|
||||
(unexpected
|
||||
(log-warning (format "~v: Ignoring message: ~v" name unexpected))
|
||||
(loop owner remaining-credit))))
|
||||
(if (positive? remaining-credit)
|
||||
(handle-evt (tcp-accept-evt listener)
|
||||
(match-lambda
|
||||
((list i o)
|
||||
(send handle say
|
||||
(tcp-socket-actor 'inbound-connection options i o)
|
||||
'accepted)
|
||||
(loop owner (- remaining-credit 1)))))
|
||||
never-evt)))))
|
||||
room)
|
||||
|
||||
(define (tcp-client-actor room options . tcp-connect-args)
|
||||
(define-values (i o) (apply tcp-connect tcp-connect-args))
|
||||
(tcp-socket-actor 'outbound-connection options i o))
|
||||
|
||||
(define (tcp-socket-actor role options i o [room (make-room)])
|
||||
(define name (socket-name role i))
|
||||
(define (close-ports)
|
||||
(close-input-port i)
|
||||
(close-output-port o))
|
||||
(define (compute-terminator options)
|
||||
;; See read-line-evt and friends.
|
||||
(option-value options 'line-terminator 'any))
|
||||
(define (compute-read-evt options)
|
||||
(define read-mode (option-value options 'read-mode 'bytes))
|
||||
(case read-mode
|
||||
((bytes) (values (lambda (credit) (read-bytes-evt credit i))
|
||||
bytes-length))
|
||||
((lines) (values (lambda (credit) (read-line-evt i (compute-terminator options)))
|
||||
(lambda (v) 1)))
|
||||
((bytes-lines) (values (lambda (credit) (read-bytes-line-evt i (compute-terminator options)))
|
||||
(lambda (v) 1)))
|
||||
(else (error 'tcp-socket-actor "Illegal read-evt mode ~v" read-mode))))
|
||||
(thread (lambda ()
|
||||
(define handle (join-room room name))
|
||||
(log-info (format "~v: New connection" name))
|
||||
(with-handlers
|
||||
((exn? (lambda (e)
|
||||
(close-ports)
|
||||
(raise e))))
|
||||
(let loop ((options options)
|
||||
(peer-count 0)
|
||||
(remaining-credit (option-value options 'initial-read-credit 0)))
|
||||
;;(write `(connection-loop ,options ,peer-count ,remaining-credit)) (newline)
|
||||
(sync (handle-evt (send handle disconnected-evt)
|
||||
(lambda (reason)
|
||||
(log-error (format "~v: conversation closed: ~v" name reason))))
|
||||
(handle-evt (send handle listen-evt)
|
||||
(match-lambda
|
||||
((arrived _)
|
||||
(loop options (+ peer-count 1) remaining-credit))
|
||||
((departed _ _)
|
||||
(if (= peer-count 1)
|
||||
(log-info (format "~v: Last peer departed" name))
|
||||
(loop options (- peer-count 1) remaining-credit)))
|
||||
((says _ (credit _ amount) _)
|
||||
(loop options peer-count (+ remaining-credit amount)))
|
||||
((says _ (? eof-object?) _)
|
||||
(close-output-port o)
|
||||
(loop options peer-count remaining-credit))
|
||||
((says _ (? bytes? bs) _)
|
||||
;; TODO: credit flow the other way?
|
||||
(write-bytes bs o)
|
||||
(flush-output o)
|
||||
(loop options peer-count remaining-credit))
|
||||
((says _ (? string? s) _)
|
||||
;; TODO: credit flow the other way?
|
||||
(write-string s o)
|
||||
(flush-output o)
|
||||
(loop options peer-count remaining-credit))
|
||||
((says _ (set-options new-values) _)
|
||||
(loop new-values peer-count remaining-credit))
|
||||
(unexpected
|
||||
(log-warning (format "~v: Ignoring message: ~v"
|
||||
name unexpected))
|
||||
(loop options peer-count remaining-credit))))
|
||||
(if (positive? remaining-credit)
|
||||
(let-values (((e-maker credit-adjuster) (compute-read-evt options)))
|
||||
(handle-evt (e-maker remaining-credit)
|
||||
(lambda (v)
|
||||
(if (eof-object? v)
|
||||
(begin (send handle say v 'eof)
|
||||
(loop options peer-count 0))
|
||||
(begin (send handle say v 'data)
|
||||
(loop options peer-count
|
||||
(- remaining-credit
|
||||
(credit-adjuster v))))))))
|
||||
never-evt)))
|
||||
(close-ports))))
|
||||
room)
|
248
conversation.rkt
248
conversation.rkt
|
@ -1,248 +0,0 @@
|
|||
#lang racket/base
|
||||
|
||||
(require racket/bool)
|
||||
(require racket/match)
|
||||
(require racket/class)
|
||||
(require racket/list)
|
||||
|
||||
(require "functional-queue.rkt")
|
||||
(require "blocking-box.rkt")
|
||||
(require "standard-thread.rkt")
|
||||
|
||||
(provide make-room
|
||||
room?
|
||||
room-name
|
||||
join-room
|
||||
wait-for-members
|
||||
spy-on
|
||||
|
||||
;; Management and communication
|
||||
(struct-out arrived)
|
||||
(struct-out departed)
|
||||
(struct-out says)
|
||||
|
||||
;; Generic utility messages
|
||||
(struct-out credit)
|
||||
|
||||
(struct-out rpc-request)
|
||||
(struct-out rpc-reply) ;; error is a kind of reply; crashes are detected via disconnection
|
||||
)
|
||||
|
||||
(struct arrived (who) #:prefab) ;; someone arrived
|
||||
(struct departed (who why) #:prefab) ;; someone departed with a reason
|
||||
(struct says (who what topic) #:prefab) ;; someone said something with a given topic
|
||||
|
||||
(struct credit (who amount) #:prefab) ;; give someone an amount of credit
|
||||
|
||||
(struct rpc-request (reply-to id body) #:prefab)
|
||||
(struct rpc-reply (id body) #:prefab)
|
||||
|
||||
(struct room (name ch))
|
||||
|
||||
(struct room-state (name ch members) #:transparent)
|
||||
(struct binding (name ;; any
|
||||
invisible? ;; boolean
|
||||
in-ch ;; sync channel
|
||||
out-ch ;; sync channel
|
||||
disconnect-box ;; blocking-box
|
||||
queue-box ;; box of functional queue
|
||||
thread ;; thread
|
||||
exit-status) ;; maybe exit-status
|
||||
#:transparent)
|
||||
|
||||
(define (make-room [name (gensym 'room)])
|
||||
(define ch (make-channel))
|
||||
(thread (lambda () (room-main (room-state name ch '()))))
|
||||
(room name ch))
|
||||
|
||||
(define (join-room room [name (gensym 'peer)]
|
||||
#:invisible? [invisible? #f])
|
||||
(make-object membership% (room-ch room) name invisible?))
|
||||
|
||||
(define (wait-for-members room members)
|
||||
(define handle (join-room room (gensym 'wait-for-members) #:invisible? #t))
|
||||
(let loop ((remaining-members members))
|
||||
(if (null? remaining-members)
|
||||
#t
|
||||
(match (send handle listen)
|
||||
((arrived who) (loop (remove who remaining-members)))
|
||||
((departed who _) (if (member who members)
|
||||
(error 'wait-for-members "Waited-for member exited")
|
||||
(loop remaining-members)))
|
||||
(_ (loop remaining-members)))))
|
||||
(send handle depart))
|
||||
|
||||
(define (spy-on room)
|
||||
(thread (lambda ()
|
||||
(define spy-name (gensym 'spy))
|
||||
(define handle (join-room room spy-name #:invisible? #t))
|
||||
(let loop ()
|
||||
(define message (send handle listen))
|
||||
(log-info (format "~s/~s: ~s" spy-name (room-name room) message))
|
||||
(loop)))))
|
||||
|
||||
(define membership%
|
||||
(class* object% ()
|
||||
(init room-init)
|
||||
(init name-init)
|
||||
(init invisible?-init)
|
||||
|
||||
(super-new)
|
||||
|
||||
(define room room-init)
|
||||
(define name name-init)
|
||||
(define in-ch (make-channel))
|
||||
(define out-ch (make-channel))
|
||||
(define disconnect-box (make-blocking-box))
|
||||
(define connected #t)
|
||||
(define reason #f)
|
||||
|
||||
(define the-disconnected-evt (wrap-evt (blocking-box-evt disconnect-box)
|
||||
(lambda (v)
|
||||
(set! connected #f)
|
||||
(set! reason v)
|
||||
v)))
|
||||
(channel-put room `(join ,name ,invisible?-init
|
||||
,out-ch ,in-ch ,disconnect-box
|
||||
,(current-thread) ,(current-thread-exit-status)))
|
||||
|
||||
(define/public (reply-name)
|
||||
name)
|
||||
|
||||
(define/public (connected?)
|
||||
connected)
|
||||
|
||||
(define/public (disconnect-reason)
|
||||
reason)
|
||||
|
||||
(define/public (disconnected-evt)
|
||||
the-disconnected-evt)
|
||||
|
||||
(define/public (say-evt what [topic #f])
|
||||
(choice-evt the-disconnected-evt
|
||||
(channel-put-evt out-ch (says name what topic))))
|
||||
|
||||
(define/public (say what [topic #f])
|
||||
(sync (say-evt what topic)))
|
||||
|
||||
(define/public (depart-evt [why #f])
|
||||
(choice-evt the-disconnected-evt
|
||||
(wrap-evt (channel-put-evt out-ch (departed name why))
|
||||
(lambda (v)
|
||||
(set! connected #f)
|
||||
(set! reason why)))))
|
||||
|
||||
(define/public (depart [why #f])
|
||||
(sync (depart-evt why)))
|
||||
|
||||
(define/public (listen-evt)
|
||||
;; we wrap this event because otherwise we leak authority
|
||||
(wrap-evt in-ch values))
|
||||
|
||||
(define/public (try-listen)
|
||||
(channel-try-get in-ch))
|
||||
|
||||
(define/public (listen)
|
||||
(sync (wrap-evt the-disconnected-evt
|
||||
(lambda (reason)
|
||||
(error 'listen "~v: Disconnected with reason ~v while listening"
|
||||
name reason)))
|
||||
(listen-evt)))))
|
||||
|
||||
(define (room-main state)
|
||||
;;(write `(room-main ,state)) (newline)
|
||||
(define handler
|
||||
(sync (foldl (lambda (b acc)
|
||||
(choice-evt (let ((qb (binding-queue-box b)))
|
||||
(if (queue-empty? (unbox qb))
|
||||
acc
|
||||
(choice-evt acc
|
||||
(let-values (((first rest) (dequeue (unbox qb))))
|
||||
(handle-evt (channel-put-evt (binding-out-ch b)
|
||||
first)
|
||||
(lambda (dummy)
|
||||
(lambda (state)
|
||||
(set-box! qb rest)
|
||||
state)))))))
|
||||
(handle-evt (binding-in-ch b)
|
||||
(thread-message-handler b))
|
||||
(handle-evt (thread-dead-evt (binding-thread b))
|
||||
(thread-death-handler b))))
|
||||
(handle-evt (room-state-ch state) join-message-handler)
|
||||
(room-state-members state))))
|
||||
(room-main (handler state)))
|
||||
|
||||
(define (thread-message-handler b)
|
||||
(lambda (message)
|
||||
(lambda (state)
|
||||
(handle-binding-message state b message))))
|
||||
|
||||
(define (thread-death-handler b)
|
||||
(lambda (dummy)
|
||||
(lambda (state)
|
||||
(part state b (binding-death-reason b)))))
|
||||
|
||||
(define join-message-handler
|
||||
(lambda (message)
|
||||
(lambda (state)
|
||||
(match message
|
||||
(`(join ,name ,invisible? ,in-ch ,out-ch ,disconnect-box
|
||||
,thread ,exit-status)
|
||||
(join state name invisible? in-ch out-ch disconnect-box
|
||||
thread exit-status))
|
||||
(unexpected (log-warning (format "room-main: unexpected message ~v" unexpected))
|
||||
state)))))
|
||||
|
||||
(define (binding-death-reason b)
|
||||
(define es (binding-exit-status b))
|
||||
(and es
|
||||
(exit-status-exception es)))
|
||||
|
||||
(define (handle-binding-message state b message)
|
||||
(match message
|
||||
((departed _ why) (part state b why))
|
||||
((says _ what topic) (broadcast state b (says (binding-name b) what topic)))
|
||||
(else (log-warning (format "handle-binding-message: unexpected message ~v"
|
||||
message))
|
||||
state)))
|
||||
|
||||
(define (join state name invisible?
|
||||
in-ch out-ch disconnect-box
|
||||
thread exit-status)
|
||||
(define b (binding name invisible?
|
||||
in-ch out-ch disconnect-box
|
||||
(box (list->queue (membership-summary state)))
|
||||
thread exit-status))
|
||||
(if invisible?
|
||||
(add-binding state b)
|
||||
(broadcast (add-binding state b) b (arrived name))))
|
||||
|
||||
(define (part state b why)
|
||||
(set-blocking-box! (binding-disconnect-box b) why)
|
||||
(if (binding-invisible? b)
|
||||
(remove-binding state b)
|
||||
(broadcast (remove-binding state b) b (departed (binding-name b) why))))
|
||||
|
||||
(define (membership-summary state)
|
||||
(filter-map (lambda (member) (and (not (binding-invisible? member))
|
||||
(arrived (binding-name member))))
|
||||
(room-state-members state)))
|
||||
|
||||
(define (add-binding state b)
|
||||
(struct-copy room-state state
|
||||
[members (cons b (room-state-members state))]))
|
||||
|
||||
(define (remove-binding state b)
|
||||
(struct-copy room-state state
|
||||
[members (remove b (room-state-members state) eq?)]))
|
||||
|
||||
(define (broadcast state b message)
|
||||
(for-each (lambda (member)
|
||||
(when (not (eq? member b))
|
||||
(enqueue-message! member message)))
|
||||
(room-state-members state))
|
||||
state)
|
||||
|
||||
(define (enqueue-message! b message)
|
||||
(define qb (binding-queue-box b))
|
||||
(set-box! qb (enqueue (unbox qb) message)))
|
|
@ -1,73 +0,0 @@
|
|||
#lang racket/base
|
||||
|
||||
(require racket/class)
|
||||
(require racket/match)
|
||||
|
||||
(require "conversation.rkt")
|
||||
(require "conversation-socket.rkt")
|
||||
|
||||
(define pool (make-room 'everybody))
|
||||
|
||||
(define (handle-connection sock quit-proc)
|
||||
(join-room pool)
|
||||
(define h (join-room sock))
|
||||
(match (send h listen)
|
||||
((arrived peer-name)
|
||||
(let loop ()
|
||||
(send h say "Ready>> ")
|
||||
(sync (handle-evt (send h listen-evt)
|
||||
(match-lambda
|
||||
((says _ _ 'eof)
|
||||
(send h say "OK, bye\n"))
|
||||
((says _ "quit" 'data)
|
||||
(send h say (credit peer-name 1))
|
||||
(quit-proc)
|
||||
(send h say "OK, will quit accepting\n")
|
||||
(loop))
|
||||
((says _ what 'data)
|
||||
(write what)
|
||||
(newline)
|
||||
(send h say (credit #f 1))
|
||||
(send h say "Carry on\n")
|
||||
(loop))
|
||||
((departed _ _) (void))
|
||||
(else (loop))))
|
||||
(handle-evt (send h disconnected-evt) void))))))
|
||||
|
||||
(define (listen port-no)
|
||||
(define r (make-room))
|
||||
(tcp-server-actor r
|
||||
`((initial-accept-credit 1)
|
||||
(read-mode lines)
|
||||
(initial-read-credit 1))
|
||||
port-no)
|
||||
(define h (join-room r 'main))
|
||||
(match (send h listen)
|
||||
((arrived listener-name)
|
||||
(let loop ()
|
||||
(match (send h listen)
|
||||
((says _ sock 'accepted)
|
||||
(thread (lambda ()
|
||||
(handle-connection sock
|
||||
(lambda ()
|
||||
(send h depart 'told-to-quit)))))
|
||||
(send h say (credit listener-name 1)))
|
||||
(unexpected
|
||||
(write `(unexpected ,unexpected))
|
||||
(newline)))
|
||||
(loop)))))
|
||||
|
||||
(thread (lambda ()
|
||||
(join-room pool)
|
||||
(listen 5001)))
|
||||
|
||||
(define (wait-until-pool-empty)
|
||||
(define h (join-room pool))
|
||||
(let loop ((count 0))
|
||||
(match (send h listen)
|
||||
((arrived _) (loop (+ count 1)))
|
||||
((departed _ _) (if (= count 1)
|
||||
'done
|
||||
(loop (- count 1))))
|
||||
(_ (loop count)))))
|
||||
(wait-until-pool-empty)
|
|
@ -1,61 +0,0 @@
|
|||
#lang racket/base
|
||||
|
||||
(require racket/tcp)
|
||||
(require racket/port)
|
||||
(require racket/class)
|
||||
(require racket/match)
|
||||
|
||||
(require "conversation.rkt")
|
||||
(require "standard-thread.rkt")
|
||||
|
||||
(define r (make-room))
|
||||
|
||||
(thread (lambda ()
|
||||
(define handle (join-room r 'robot))
|
||||
(let loop ()
|
||||
(match (send handle listen)
|
||||
((says _ "die" _)
|
||||
(error 'robot "Following orders!"))
|
||||
((and m (says _ _ _))
|
||||
(send handle say `(robot hears ,m) 'echo))
|
||||
(else (void)))
|
||||
(loop))))
|
||||
|
||||
(define (interaction i o)
|
||||
(display "What is your name? > " o)
|
||||
(flush-output o)
|
||||
(define name (read-line i))
|
||||
(if (eof-object? name)
|
||||
(begin (display "OK, bye then!" o)
|
||||
(flush-output o))
|
||||
(let ((handle (join-room r name)))
|
||||
(let loop ()
|
||||
(display name o)
|
||||
(display "@ROOM>> " o)
|
||||
(flush-output o)
|
||||
(sync (handle-evt (send handle listen-evt)
|
||||
(lambda (m)
|
||||
(write `(,name hears ,m) o)
|
||||
(newline o)
|
||||
(flush-output o)
|
||||
(loop)))
|
||||
(handle-evt (read-line-evt i 'any)
|
||||
(lambda (utterance)
|
||||
(when (equal? utterance "error")
|
||||
(error 'interaction "Following orders!"))
|
||||
(when (not (eof-object? utterance))
|
||||
(send handle say utterance 'speech)
|
||||
(loop)))))))))
|
||||
|
||||
(thread (lambda ()
|
||||
(interaction (current-input-port) (current-output-port))))
|
||||
|
||||
(let ((s (tcp-listen 5001 4 #t)))
|
||||
(let accept-loop ()
|
||||
(define-values (i o) (tcp-accept s))
|
||||
(thread (lambda ()
|
||||
(interaction i o)
|
||||
(close-input-port i)
|
||||
(close-output-port o)))
|
||||
(accept-loop)))
|
||||
|
Loading…
Reference in New Issue