From 0820c7e57288ddb705f84a02f5c53bda9ecd4be8 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sun, 11 Mar 2012 13:08:04 -0400 Subject: [PATCH] TCP in a pub/sub style --- presence/NOTES-tcp.txt | 11 + presence/conversation-socket.rkt | 281 ++++++++++++++------------ presence/test-conversation-socket.rkt | 128 +++++++----- 3 files changed, 238 insertions(+), 182 deletions(-) create mode 100644 presence/NOTES-tcp.txt diff --git a/presence/NOTES-tcp.txt b/presence/NOTES-tcp.txt new file mode 100644 index 0000000..e8fa72d --- /dev/null +++ b/presence/NOTES-tcp.txt @@ -0,0 +1,11 @@ +15:01 < tonyg> samth: even tcp has this automatic-resource-allocation thing + that we described yesterday for rudybot-big-bang! to open a + connection, advertise (you your-port target-ip target-port) and + subscribe (target-ip target-port you your-port) +15:01 < tonyg> and to listen, advertise (you your-port * *) and subscribe (* * + you your-port) +15:02 < tonyg> note that where the wildcards are permitted varies between the + languages used for ad patterns and sub patterns! +15:02 < tonyg> (uh, and the driver uses presence/directory to auto-open and + -close the underlying sockets) +15:35 < tonyg> Oh UDP has the same pattern (though less restrictive) diff --git a/presence/conversation-socket.rkt b/presence/conversation-socket.rkt index 49a0197..11342db 100644 --- a/presence/conversation-socket.rkt +++ b/presence/conversation-socket.rkt @@ -5,139 +5,162 @@ (require racket/match) (require racket/tcp) (require "conversation.rkt") +(require "standard-thread.rkt") -(provide (struct-out set-options) - tcp-server-actor - tcp-client-actor) +(provide (struct-out tcp-option) + (struct-out tcp-credit) + (struct-out tcp-data) + (struct-out tcp-eof) + (struct-out tcp-address) + (struct-out tcp-stream) + flip-stream + wild-address + wild-stream + tcp-driver) -(struct set-options (new-values) #:prefab) +(struct tcp-option (name value) #:prefab) +(struct tcp-credit (amount) #:prefab) +(struct tcp-data (inbound? chunk) #:prefab) +(struct tcp-eof () #:prefab) -(define (socket-name role s) - (define-values (local-ip local-port remote-ip remote-port) - (tcp-addresses s #t)) - (list role local-ip local-port remote-ip remote-port)) +;; The address of a TCP endpoint. +;; +;; The field host may be set to #f, in which case it means "some (or +;; all) local interface(s)". If port is a number, it's treated as a +;; TCP port directly; otherwise, it's treated as a local handle for an +;; automatically-allocated port. +(struct tcp-address (host port) #:prefab) -(define (option-value options key [missing-value #f]) - (cond - ((assq key options) => cadr) - (else missing-value))) +;; Identifies a (unidirectional) TCP stream. A single TCP connection +;; has two of these, one in each direction. +(struct tcp-stream (source sink) #:prefab) -(define (tcp-server-actor room options . tcp-listener-args) - (define listener (apply tcp-listen tcp-listener-args)) - (define name (socket-name 'listener listener)) - (thread (lambda () - (define handle (join-room room name)) - (log-info (format "Listening on ~v" name)) - (let loop ((owner #f) - (remaining-credit (option-value options 'initial-accept-credit 0))) - (sync (handle-evt (send handle disconnected-evt) - (lambda (reason) - (log-error (format "~v: conversation closed: ~v" name reason)) - (tcp-close listener))) - (handle-evt (send handle listen-evt) - (match-lambda - ((arrived who) - (log-info (format "~v: New owner: ~v" name who)) - (loop who remaining-credit)) - ((departed who why) - (if (equal? owner who) - (begin (log-info (format "~v: Owner departed, closing" name)) - (tcp-close listener)) - (loop owner remaining-credit))) - ((says _ (credit _ amount) _) - (define new-credit (+ remaining-credit amount)) - (log-info (format "~v: Credit now ~v" name new-credit)) - (loop owner new-credit)) - (unexpected - (log-warning (format "~v: Ignoring message: ~v" name unexpected)) - (loop owner remaining-credit)))) - (if (positive? remaining-credit) - (handle-evt (tcp-accept-evt listener) - (match-lambda - ((list i o) - (send handle say - (tcp-socket-actor 'inbound-connection options i o) - 'accepted) - (loop owner (- remaining-credit 1))))) - never-evt))))) - room) +(define (socket-name s) + (call-with-values (lambda () (tcp-addresses s #t)) list)) -(define (tcp-client-actor room options . tcp-connect-args) - (define-values (i o) (apply tcp-connect tcp-connect-args)) - (tcp-socket-actor 'outbound-connection options i o)) +(define (flip-stream s) + (tcp-stream (tcp-stream-sink s) (tcp-stream-source s))) -(define (tcp-socket-actor role options i o [room (make-room)]) - (define name (socket-name role i)) - (define (close-ports) - (close-input-port i) - (close-output-port o)) - (define (compute-terminator options) - ;; See read-line-evt and friends. - (option-value options 'line-terminator 'any)) - (define (compute-read-evt options) - (define read-mode (option-value options 'read-mode 'bytes)) - (case read-mode - ((bytes) (values (lambda (credit) (read-bytes-evt credit i)) - bytes-length)) - ((lines) (values (lambda (credit) (read-line-evt i (compute-terminator options))) - (lambda (v) 1))) - ((bytes-lines) (values (lambda (credit) (read-bytes-line-evt i (compute-terminator options))) - (lambda (v) 1))) - (else (error 'tcp-socket-actor "Illegal read-evt mode ~v" read-mode)))) - (thread (lambda () - (define handle (join-room room name)) - (log-info (format "~v: New connection" name)) - (with-handlers - ((exn? (lambda (e) - (close-ports) - (raise e)))) - (let loop ((options options) - (peer-count 0) - (remaining-credit (option-value options 'initial-read-credit 0))) - ;;(write `(connection-loop ,options ,peer-count ,remaining-credit)) (newline) - (sync (handle-evt (send handle disconnected-evt) - (lambda (reason) - (log-error (format "~v: conversation closed: ~v" name reason)))) - (handle-evt (send handle listen-evt) - (match-lambda - ((arrived _) - (loop options (+ peer-count 1) remaining-credit)) - ((departed _ _) - (if (= peer-count 1) - (log-info (format "~v: Last peer departed" name)) - (loop options (- peer-count 1) remaining-credit))) - ((says _ (credit _ amount) _) - (loop options peer-count (+ remaining-credit amount))) - ((says _ (? eof-object?) _) - (close-output-port o) - (loop options peer-count remaining-credit)) - ((says _ (? bytes? bs) _) - ;; TODO: credit flow the other way? - (write-bytes bs o) - (flush-output o) - (loop options peer-count remaining-credit)) - ((says _ (? string? s) _) - ;; TODO: credit flow the other way? - (write-string s o) - (flush-output o) - (loop options peer-count remaining-credit)) - ((says _ (set-options new-values) _) - (loop new-values peer-count remaining-credit)) - (unexpected - (log-warning (format "~v: Ignoring message: ~v" - name unexpected)) - (loop options peer-count remaining-credit)))) - (if (positive? remaining-credit) - (let-values (((e-maker credit-adjuster) (compute-read-evt options))) - (handle-evt (e-maker remaining-credit) - (lambda (v) - (if (eof-object? v) - (begin (send handle say v 'eof) - (loop options peer-count 0)) - (begin (send handle say v 'data) - (loop options peer-count - (- remaining-credit - (credit-adjuster v)))))))) - never-evt))) - (close-ports)))) - room) +(define (wild-address) (tcp-address (wild) (wild))) +(define (wild-stream) (tcp-stream (wild-address) (wild-address))) + +(define (tcp-driver room) + (standard-thread + (lambda () + (define handle (join-room room 'DRIVER)) + (send handle assert! (topic-publisher (wild-stream) #:virtual? #t)) + (let loop () + (match (send handle listen) + [(arrived (topic 'subscriber + (tcp-stream (tcp-address (? wild?) (? wild?)) + (and local-address (tcp-address #f local-handle))) + _)) + ;;(write `(listening on ,local-handle)) (newline) + (define listener (tcp-listen (if (number? local-handle) local-handle 0) 4 #t)) + (standard-thread (lambda () (tcp-listen-driver room local-address listener))) + (loop)] + [(arrived (topic 'subscriber + (and name (tcp-stream (tcp-address remote-host remote-port) + (tcp-address #f local-handle))) + _)) + (when (not (number? local-handle)) + ;;(write `(connecting to ,remote-host ,remote-port with handle ,local-handle)) (newline) + (define-values (i o) + (tcp-connect remote-host remote-port + #f (if (number? local-handle) local-handle #f))) + (standard-thread (lambda () (tcp-read-driver room name i))) + (standard-thread (lambda () (tcp-write-driver room (flip-stream name) o)))) + (loop)] + [_ + (loop)]))))) + +(define (tcp-read-driver room name i) + (define handle (join-room room 'READER)) + (send handle assert! (topic-publisher name)) + (let loop ((credit 0) (mode 'line)) + (sync (wrap-evt (send handle listen-evt) + (lambda (m) + (match m + [(departed (topic 'subscriber (== name) _) _) + ;;(write `(closing reader ,name)) (newline) + (tcp-abandon-port i)] + [(says (topic 'subscriber (== name) _) (tcp-option 'mode new-mode)) + (loop credit new-mode)] + [(says (topic 'subscriber (== name) _) (tcp-credit amount)) + (loop (+ credit amount) mode)] + [_ + (loop credit mode)]))) + (if (positive? credit) + (match mode + ['line (handle-evt (read-line-evt i 'any) + (lambda (v) + (cond + [(eof-object? v) + (send handle say (topic-publisher name) (tcp-eof)) + (loop 0 mode)] + [else + ;;(write `(relaying line ,v from ,name)) (newline) + (send handle say (topic-publisher name) (tcp-data #t v)) + (loop (- credit 1) mode)])))] + ['bytes (handle-evt (read-bytes-evt credit i) + (lambda (v) + (cond + [(eof-object? v) + (send handle say (topic-publisher name) (tcp-eof)) + (loop 0 mode)] + [else + ;;(write `(relaying bytes ,v from ,name)) (newline) + (send handle say (topic-publisher name) (tcp-data #t v)) + (loop (- credit (bytes-length v)) mode)])))]) + never-evt)))) + +(define (tcp-write-driver room name o) + (define handle (join-room room 'WRITER)) + (send handle assert! (topic-subscriber name)) + (let loop ((mode 'string)) + (sync (wrap-evt (send handle listen-evt) + (lambda (m) + (match m + [(departed (topic 'publisher (== name) _) _) + ;;(write `(closing writer ,name)) (newline) + (tcp-abandon-port o)] + [(says (topic 'publisher (== name) _) (tcp-option 'mode new-mode)) + (loop new-mode)] + [(says (topic 'publisher (== name) _) (tcp-data #f v)) + ;;(write `(writing ,v to ,name)) (newline) + (define credit-amount + (match mode + ['string (write-string v o) + 1] + ['bytes (write-bytes v o)])) + (flush-output o) + (send handle say (topic-subscriber name) (tcp-credit credit-amount)) + (loop mode)] + [_ + (loop mode)])))))) + +(define (tcp-listen-driver room local-address listener) + (define handle (join-room room 'LISTENER)) + (define name (tcp-stream (wild-address) local-address)) + (send handle assert! (topic-publisher name #:virtual? #t)) + (let loop () + (sync (wrap-evt (send handle listen-evt) + (lambda (m) + (match m + [(departed (topic 'subscriber (== name) _) _) + ;;(write `(closing listener ,name)) (newline) + (tcp-close listener)] + [_ + ;;(write `(listener heard ,m)) (newline) + (loop)]))) + (wrap-evt (tcp-accept-evt listener) + (lambda (v) + (match-define (list i o) v) + (match-define (list _ _ remote-host remote-port) (socket-name i)) + (define new-stream + (tcp-stream (tcp-address remote-host remote-port) local-address)) + (standard-thread + (lambda () (tcp-read-driver room new-stream i))) + (standard-thread + (lambda () (tcp-write-driver room (flip-stream new-stream) o))) + (loop)))))) diff --git a/presence/test-conversation-socket.rkt b/presence/test-conversation-socket.rkt index b267d2e..2f9e617 100644 --- a/presence/test-conversation-socket.rkt +++ b/presence/test-conversation-socket.rkt @@ -5,69 +5,91 @@ (require "conversation.rkt") (require "conversation-socket.rkt") +(require "standard-thread.rkt") (define pool (make-room 'everybody)) -(define (handle-connection sock quit-proc) - (join-room pool) - (define h (join-room sock)) - (match (send h listen) - ((arrived peer-name) - (let loop () - (send h say "Ready>> ") - (sync (handle-evt (send h listen-evt) - (match-lambda - ((says _ _ 'eof) - (send h say "OK, bye\n")) - ((says _ "quit" 'data) - (send h say (credit peer-name 1)) - (quit-proc) - (send h say "OK, will quit accepting\n") - (loop)) - ((says _ what 'data) - (write what) - (newline) - (send h say (credit #f 1)) - (send h say "Carry on\n") - (loop)) - ((departed _ _) (void)) - (else (loop)))) - (handle-evt (send h disconnected-evt) void)))))) +(struct groupchat (utterance) #:prefab) + +(define (session inbound-stream quit-proc) + (define h (join-room pool 'SESSION)) + (define outbound-stream (flip-stream inbound-stream)) + (send h assert! (topic-publisher outbound-stream)) + (send h assert! (topic-subscriber inbound-stream)) + (send h assert! (topic-publisher groupchat)) + (send h assert! (topic-subscriber groupchat)) + (define (send-text s) (send h say (topic-publisher outbound-stream) (tcp-data #f s))) + (define (issue-credit) (send h say (topic-subscriber inbound-stream) (tcp-credit 1))) + (issue-credit) + (let loop ((prompt? #t)) + (when prompt? (send-text "Ready>> ")) + (match (send h listen) + [(says (topic 'publisher (== inbound-stream) _) (tcp-eof)) + (send-text "OK, bye\n")] + [(says (topic 'publisher (== inbound-stream) _) (tcp-data #t "quit")) + (issue-credit) + (quit-proc) + (send-text "OK, will quit accepting\n") + (loop #t)] + [(says (topic 'publisher (== inbound-stream) _) (tcp-data #t what)) + (write `(someone said ,what)) + (newline) + (issue-credit) + (send-text "Carry on\n") + (send h say (topic-publisher groupchat) (groupchat what)) + (loop #t)] + [(says (topic 'publisher (== groupchat) _) (groupchat what)) + (send-text (string-append "CHAT: " what "\n")) + (loop #t)] + [(departed _ _) + (void)] + [_ + (loop #f)]))) (define (listen port-no) - (define r (make-room)) - (tcp-server-actor r - `((initial-accept-credit 1) - (read-mode lines) - (initial-read-credit 1)) - port-no) - (define h (join-room r 'main)) - (match (send h listen) - ((arrived listener-name) + (standard-thread + (lambda () + (define h (join-room pool 'LISTEN-THREAD)) + (define server-address (tcp-address #f port-no)) + (send h assert! (topic-subscriber (tcp-stream (wild-address) server-address))) + (define (quit-proc) (send h depart)) (let loop () (match (send h listen) - ((says _ sock 'accepted) - (thread (lambda () - (handle-connection sock - (lambda () - (send h depart 'told-to-quit))))) - (send h say (credit listener-name 1))) - (unexpected - (write `(unexpected ,unexpected)) - (newline))) - (loop))))) + [(arrived + (topic 'publisher + (and inbound-stream (tcp-stream (tcp-address (? non-wild?) (? non-wild?)) + (== server-address))) + _)) + (write `(starting session for ,inbound-stream)) (newline) + (standard-thread (lambda () (session inbound-stream quit-proc))) + (loop)] + [_ + (loop)]))))) -(thread (lambda () - (join-room pool) - (listen 5001))) +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(tcp-driver pool) + +(define port-number 5001) +(display "Listening on port ") +(display port-number) +(newline) + +(listen port-number) (define (wait-until-pool-empty) - (define h (join-room pool)) - (let loop ((count 0)) + (define h (join-room pool 'WAITER)) + (send h assert! (topic-publisher (wild) #:virtual? #t)) + (send h assert! (topic-subscriber (wild) #:virtual? #t)) + (let loop ((show-count #t) (count 0)) + (when show-count + (write `(pool has ,count members)) (newline)) (match (send h listen) - ((arrived _) (loop (+ count 1))) - ((departed _ _) (if (= count 1) + [(arrived x) + (write `(,x arrived in pool)) (newline) + (loop #t (+ count 1))] + [(departed _ _) (if (= count 1) 'done - (loop (- count 1)))) - (_ (loop count))))) + (loop #t (- count 1)))] + [_ (loop #f count)]))) (wait-until-pool-empty)