TCP in a pub/sub style

This commit is contained in:
Tony Garnock-Jones 2012-03-11 13:08:04 -04:00
parent ca2e856660
commit 0820c7e572
3 changed files with 238 additions and 182 deletions

11
presence/NOTES-tcp.txt Normal file
View File

@ -0,0 +1,11 @@
15:01 < tonyg> samth: even tcp has this automatic-resource-allocation thing
that we described yesterday for rudybot-big-bang! to open a
connection, advertise (you your-port target-ip target-port) and
subscribe (target-ip target-port you your-port)
15:01 < tonyg> and to listen, advertise (you your-port * *) and subscribe (* *
you your-port)
15:02 < tonyg> note that where the wildcards are permitted varies between the
languages used for ad patterns and sub patterns!
15:02 < tonyg> (uh, and the driver uses presence/directory to auto-open and
-close the underlying sockets)
15:35 < tonyg> Oh UDP has the same pattern (though less restrictive)

View File

@ -5,139 +5,162 @@
(require racket/match)
(require racket/tcp)
(require "conversation.rkt")
(require "standard-thread.rkt")
(provide (struct-out set-options)
tcp-server-actor
tcp-client-actor)
(provide (struct-out tcp-option)
(struct-out tcp-credit)
(struct-out tcp-data)
(struct-out tcp-eof)
(struct-out tcp-address)
(struct-out tcp-stream)
flip-stream
wild-address
wild-stream
tcp-driver)
(struct set-options (new-values) #:prefab)
(struct tcp-option (name value) #:prefab)
(struct tcp-credit (amount) #:prefab)
(struct tcp-data (inbound? chunk) #:prefab)
(struct tcp-eof () #:prefab)
(define (socket-name role s)
(define-values (local-ip local-port remote-ip remote-port)
(tcp-addresses s #t))
(list role local-ip local-port remote-ip remote-port))
;; The address of a TCP endpoint.
;;
;; The field host may be set to #f, in which case it means "some (or
;; all) local interface(s)". If port is a number, it's treated as a
;; TCP port directly; otherwise, it's treated as a local handle for an
;; automatically-allocated port.
(struct tcp-address (host port) #:prefab)
(define (option-value options key [missing-value #f])
(cond
((assq key options) => cadr)
(else missing-value)))
;; Identifies a (unidirectional) TCP stream. A single TCP connection
;; has two of these, one in each direction.
(struct tcp-stream (source sink) #:prefab)
(define (tcp-server-actor room options . tcp-listener-args)
(define listener (apply tcp-listen tcp-listener-args))
(define name (socket-name 'listener listener))
(thread (lambda ()
(define handle (join-room room name))
(log-info (format "Listening on ~v" name))
(let loop ((owner #f)
(remaining-credit (option-value options 'initial-accept-credit 0)))
(sync (handle-evt (send handle disconnected-evt)
(lambda (reason)
(log-error (format "~v: conversation closed: ~v" name reason))
(tcp-close listener)))
(handle-evt (send handle listen-evt)
(match-lambda
((arrived who)
(log-info (format "~v: New owner: ~v" name who))
(loop who remaining-credit))
((departed who why)
(if (equal? owner who)
(begin (log-info (format "~v: Owner departed, closing" name))
(tcp-close listener))
(loop owner remaining-credit)))
((says _ (credit _ amount) _)
(define new-credit (+ remaining-credit amount))
(log-info (format "~v: Credit now ~v" name new-credit))
(loop owner new-credit))
(unexpected
(log-warning (format "~v: Ignoring message: ~v" name unexpected))
(loop owner remaining-credit))))
(if (positive? remaining-credit)
(handle-evt (tcp-accept-evt listener)
(match-lambda
((list i o)
(send handle say
(tcp-socket-actor 'inbound-connection options i o)
'accepted)
(loop owner (- remaining-credit 1)))))
never-evt)))))
room)
(define (socket-name s)
(call-with-values (lambda () (tcp-addresses s #t)) list))
(define (tcp-client-actor room options . tcp-connect-args)
(define-values (i o) (apply tcp-connect tcp-connect-args))
(tcp-socket-actor 'outbound-connection options i o))
(define (flip-stream s)
(tcp-stream (tcp-stream-sink s) (tcp-stream-source s)))
(define (tcp-socket-actor role options i o [room (make-room)])
(define name (socket-name role i))
(define (close-ports)
(close-input-port i)
(close-output-port o))
(define (compute-terminator options)
;; See read-line-evt and friends.
(option-value options 'line-terminator 'any))
(define (compute-read-evt options)
(define read-mode (option-value options 'read-mode 'bytes))
(case read-mode
((bytes) (values (lambda (credit) (read-bytes-evt credit i))
bytes-length))
((lines) (values (lambda (credit) (read-line-evt i (compute-terminator options)))
(lambda (v) 1)))
((bytes-lines) (values (lambda (credit) (read-bytes-line-evt i (compute-terminator options)))
(lambda (v) 1)))
(else (error 'tcp-socket-actor "Illegal read-evt mode ~v" read-mode))))
(thread (lambda ()
(define handle (join-room room name))
(log-info (format "~v: New connection" name))
(with-handlers
((exn? (lambda (e)
(close-ports)
(raise e))))
(let loop ((options options)
(peer-count 0)
(remaining-credit (option-value options 'initial-read-credit 0)))
;;(write `(connection-loop ,options ,peer-count ,remaining-credit)) (newline)
(sync (handle-evt (send handle disconnected-evt)
(lambda (reason)
(log-error (format "~v: conversation closed: ~v" name reason))))
(handle-evt (send handle listen-evt)
(match-lambda
((arrived _)
(loop options (+ peer-count 1) remaining-credit))
((departed _ _)
(if (= peer-count 1)
(log-info (format "~v: Last peer departed" name))
(loop options (- peer-count 1) remaining-credit)))
((says _ (credit _ amount) _)
(loop options peer-count (+ remaining-credit amount)))
((says _ (? eof-object?) _)
(close-output-port o)
(loop options peer-count remaining-credit))
((says _ (? bytes? bs) _)
;; TODO: credit flow the other way?
(write-bytes bs o)
(flush-output o)
(loop options peer-count remaining-credit))
((says _ (? string? s) _)
;; TODO: credit flow the other way?
(write-string s o)
(flush-output o)
(loop options peer-count remaining-credit))
((says _ (set-options new-values) _)
(loop new-values peer-count remaining-credit))
(unexpected
(log-warning (format "~v: Ignoring message: ~v"
name unexpected))
(loop options peer-count remaining-credit))))
(if (positive? remaining-credit)
(let-values (((e-maker credit-adjuster) (compute-read-evt options)))
(handle-evt (e-maker remaining-credit)
(lambda (v)
(if (eof-object? v)
(begin (send handle say v 'eof)
(loop options peer-count 0))
(begin (send handle say v 'data)
(loop options peer-count
(- remaining-credit
(credit-adjuster v))))))))
never-evt)))
(close-ports))))
room)
(define (wild-address) (tcp-address (wild) (wild)))
(define (wild-stream) (tcp-stream (wild-address) (wild-address)))
(define (tcp-driver room)
(standard-thread
(lambda ()
(define handle (join-room room 'DRIVER))
(send handle assert! (topic-publisher (wild-stream) #:virtual? #t))
(let loop ()
(match (send handle listen)
[(arrived (topic 'subscriber
(tcp-stream (tcp-address (? wild?) (? wild?))
(and local-address (tcp-address #f local-handle)))
_))
;;(write `(listening on ,local-handle)) (newline)
(define listener (tcp-listen (if (number? local-handle) local-handle 0) 4 #t))
(standard-thread (lambda () (tcp-listen-driver room local-address listener)))
(loop)]
[(arrived (topic 'subscriber
(and name (tcp-stream (tcp-address remote-host remote-port)
(tcp-address #f local-handle)))
_))
(when (not (number? local-handle))
;;(write `(connecting to ,remote-host ,remote-port with handle ,local-handle)) (newline)
(define-values (i o)
(tcp-connect remote-host remote-port
#f (if (number? local-handle) local-handle #f)))
(standard-thread (lambda () (tcp-read-driver room name i)))
(standard-thread (lambda () (tcp-write-driver room (flip-stream name) o))))
(loop)]
[_
(loop)])))))
(define (tcp-read-driver room name i)
(define handle (join-room room 'READER))
(send handle assert! (topic-publisher name))
(let loop ((credit 0) (mode 'line))
(sync (wrap-evt (send handle listen-evt)
(lambda (m)
(match m
[(departed (topic 'subscriber (== name) _) _)
;;(write `(closing reader ,name)) (newline)
(tcp-abandon-port i)]
[(says (topic 'subscriber (== name) _) (tcp-option 'mode new-mode))
(loop credit new-mode)]
[(says (topic 'subscriber (== name) _) (tcp-credit amount))
(loop (+ credit amount) mode)]
[_
(loop credit mode)])))
(if (positive? credit)
(match mode
['line (handle-evt (read-line-evt i 'any)
(lambda (v)
(cond
[(eof-object? v)
(send handle say (topic-publisher name) (tcp-eof))
(loop 0 mode)]
[else
;;(write `(relaying line ,v from ,name)) (newline)
(send handle say (topic-publisher name) (tcp-data #t v))
(loop (- credit 1) mode)])))]
['bytes (handle-evt (read-bytes-evt credit i)
(lambda (v)
(cond
[(eof-object? v)
(send handle say (topic-publisher name) (tcp-eof))
(loop 0 mode)]
[else
;;(write `(relaying bytes ,v from ,name)) (newline)
(send handle say (topic-publisher name) (tcp-data #t v))
(loop (- credit (bytes-length v)) mode)])))])
never-evt))))
(define (tcp-write-driver room name o)
(define handle (join-room room 'WRITER))
(send handle assert! (topic-subscriber name))
(let loop ((mode 'string))
(sync (wrap-evt (send handle listen-evt)
(lambda (m)
(match m
[(departed (topic 'publisher (== name) _) _)
;;(write `(closing writer ,name)) (newline)
(tcp-abandon-port o)]
[(says (topic 'publisher (== name) _) (tcp-option 'mode new-mode))
(loop new-mode)]
[(says (topic 'publisher (== name) _) (tcp-data #f v))
;;(write `(writing ,v to ,name)) (newline)
(define credit-amount
(match mode
['string (write-string v o)
1]
['bytes (write-bytes v o)]))
(flush-output o)
(send handle say (topic-subscriber name) (tcp-credit credit-amount))
(loop mode)]
[_
(loop mode)]))))))
(define (tcp-listen-driver room local-address listener)
(define handle (join-room room 'LISTENER))
(define name (tcp-stream (wild-address) local-address))
(send handle assert! (topic-publisher name #:virtual? #t))
(let loop ()
(sync (wrap-evt (send handle listen-evt)
(lambda (m)
(match m
[(departed (topic 'subscriber (== name) _) _)
;;(write `(closing listener ,name)) (newline)
(tcp-close listener)]
[_
;;(write `(listener heard ,m)) (newline)
(loop)])))
(wrap-evt (tcp-accept-evt listener)
(lambda (v)
(match-define (list i o) v)
(match-define (list _ _ remote-host remote-port) (socket-name i))
(define new-stream
(tcp-stream (tcp-address remote-host remote-port) local-address))
(standard-thread
(lambda () (tcp-read-driver room new-stream i)))
(standard-thread
(lambda () (tcp-write-driver room (flip-stream new-stream) o)))
(loop))))))

View File

@ -5,69 +5,91 @@
(require "conversation.rkt")
(require "conversation-socket.rkt")
(require "standard-thread.rkt")
(define pool (make-room 'everybody))
(define (handle-connection sock quit-proc)
(join-room pool)
(define h (join-room sock))
(match (send h listen)
((arrived peer-name)
(let loop ()
(send h say "Ready>> ")
(sync (handle-evt (send h listen-evt)
(match-lambda
((says _ _ 'eof)
(send h say "OK, bye\n"))
((says _ "quit" 'data)
(send h say (credit peer-name 1))
(quit-proc)
(send h say "OK, will quit accepting\n")
(loop))
((says _ what 'data)
(write what)
(newline)
(send h say (credit #f 1))
(send h say "Carry on\n")
(loop))
((departed _ _) (void))
(else (loop))))
(handle-evt (send h disconnected-evt) void))))))
(struct groupchat (utterance) #:prefab)
(define (session inbound-stream quit-proc)
(define h (join-room pool 'SESSION))
(define outbound-stream (flip-stream inbound-stream))
(send h assert! (topic-publisher outbound-stream))
(send h assert! (topic-subscriber inbound-stream))
(send h assert! (topic-publisher groupchat))
(send h assert! (topic-subscriber groupchat))
(define (send-text s) (send h say (topic-publisher outbound-stream) (tcp-data #f s)))
(define (issue-credit) (send h say (topic-subscriber inbound-stream) (tcp-credit 1)))
(issue-credit)
(let loop ((prompt? #t))
(when prompt? (send-text "Ready>> "))
(match (send h listen)
[(says (topic 'publisher (== inbound-stream) _) (tcp-eof))
(send-text "OK, bye\n")]
[(says (topic 'publisher (== inbound-stream) _) (tcp-data #t "quit"))
(issue-credit)
(quit-proc)
(send-text "OK, will quit accepting\n")
(loop #t)]
[(says (topic 'publisher (== inbound-stream) _) (tcp-data #t what))
(write `(someone said ,what))
(newline)
(issue-credit)
(send-text "Carry on\n")
(send h say (topic-publisher groupchat) (groupchat what))
(loop #t)]
[(says (topic 'publisher (== groupchat) _) (groupchat what))
(send-text (string-append "CHAT: " what "\n"))
(loop #t)]
[(departed _ _)
(void)]
[_
(loop #f)])))
(define (listen port-no)
(define r (make-room))
(tcp-server-actor r
`((initial-accept-credit 1)
(read-mode lines)
(initial-read-credit 1))
port-no)
(define h (join-room r 'main))
(match (send h listen)
((arrived listener-name)
(standard-thread
(lambda ()
(define h (join-room pool 'LISTEN-THREAD))
(define server-address (tcp-address #f port-no))
(send h assert! (topic-subscriber (tcp-stream (wild-address) server-address)))
(define (quit-proc) (send h depart))
(let loop ()
(match (send h listen)
((says _ sock 'accepted)
(thread (lambda ()
(handle-connection sock
(lambda ()
(send h depart 'told-to-quit)))))
(send h say (credit listener-name 1)))
(unexpected
(write `(unexpected ,unexpected))
(newline)))
(loop)))))
[(arrived
(topic 'publisher
(and inbound-stream (tcp-stream (tcp-address (? non-wild?) (? non-wild?))
(== server-address)))
_))
(write `(starting session for ,inbound-stream)) (newline)
(standard-thread (lambda () (session inbound-stream quit-proc)))
(loop)]
[_
(loop)])))))
(thread (lambda ()
(join-room pool)
(listen 5001)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(tcp-driver pool)
(define port-number 5001)
(display "Listening on port ")
(display port-number)
(newline)
(listen port-number)
(define (wait-until-pool-empty)
(define h (join-room pool))
(let loop ((count 0))
(define h (join-room pool 'WAITER))
(send h assert! (topic-publisher (wild) #:virtual? #t))
(send h assert! (topic-subscriber (wild) #:virtual? #t))
(let loop ((show-count #t) (count 0))
(when show-count
(write `(pool has ,count members)) (newline))
(match (send h listen)
((arrived _) (loop (+ count 1)))
((departed _ _) (if (= count 1)
[(arrived x)
(write `(,x arrived in pool)) (newline)
(loop #t (+ count 1))]
[(departed _ _) (if (= count 1)
'done
(loop (- count 1))))
(_ (loop count)))))
(loop #t (- count 1)))]
[_ (loop #f count)])))
(wait-until-pool-empty)