From 96247daae351536b6f7726f95ceabf9cb91c22e1 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Fri, 6 Mar 2015 13:29:59 +0000 Subject: [PATCH] TCP driver; simple TCP example; chat room TCP example. --- prospect/drivers/tcp.rkt | 183 ++++++++++++++++++++++++++++++++ prospect/examples/chat.rkt | 43 ++++++++ prospect/examples/tcp-hello.rkt | 36 +++++++ 3 files changed, 262 insertions(+) create mode 100644 prospect/drivers/tcp.rkt create mode 100644 prospect/examples/chat.rkt create mode 100644 prospect/examples/tcp-hello.rkt diff --git a/prospect/drivers/tcp.rkt b/prospect/drivers/tcp.rkt new file mode 100644 index 0000000..d474a3c --- /dev/null +++ b/prospect/drivers/tcp.rkt @@ -0,0 +1,183 @@ +#lang racket/base + +(require racket/match) +(require (prefix-in tcp: racket/tcp)) +(require (only-in racket/port read-bytes-avail!-evt)) +(require (only-in web-server/private/util exn->string)) +(require "../main.rkt") +(require "../demand-matcher.rkt") + +(require racket/unit) +(require net/tcp-sig) +(require net/tcp-unit) + +(provide (struct-out tcp-address) + (struct-out tcp-handle) + (struct-out tcp-listener) + (struct-out tcp-channel) + spawn-tcp-driver) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Protocol messages + +(struct tcp-address (host port) #:prefab) +(struct tcp-handle (id) #:prefab) +(struct tcp-listener (port) #:prefab) + +(struct tcp-channel (source destination subpacket) #:prefab) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Ground-level communication messages + +(struct tcp-accepted (remote-addr local-addr cin cout) #:prefab) +;; tcp-channel does double-duty as a ground-level message as well + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Driver + +(define (spawn-tcp-driver) + (list (spawn-demand-matcher (advertise (observe (tcp-channel ? (?! (tcp-listener ?)) ?))) + (advertise (advertise (tcp-channel ? (?! (tcp-listener ?)) ?))) + spawn-tcp-listener) + (spawn-demand-matcher (observe (tcp-channel (?! (tcp-handle ?)) (?! (tcp-address ? ?)) ?)) + (advertise (tcp-channel (?! (tcp-handle ?)) (?! (tcp-address ? ?)) ?)) + spawn-tcp-connection))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Listener + +(struct listener-state (control-ch server-addr) #:transparent) + +(define (tcp-listener-thread control-ch listener server-addr) + (let loop ((blocked? #t)) + (sync (handle-evt control-ch + (match-lambda + ['unblock (loop #f)] + ['quit (void)])) + (if blocked? + never-evt + (handle-evt (tcp:tcp-accept-evt listener) + (lambda (cin+cout) + (match-define (list cin cout) cin+cout) + (define-values (local-hostname local-port remote-hostname remote-port) + (tcp:tcp-addresses cin #t)) + (send-ground-message + (tcp-accepted (tcp-address remote-hostname remote-port) + server-addr + cin + cout)) + (loop blocked?)))))) + (tcp:tcp-close listener)) + +(define (tcp-listener-behavior e state) + (match e + [(? patch? p) + (define ch (listener-state-control-ch state)) + (cond [(patch/removed? p) (channel-put ch 'quit) (quit)] + [(patch/added? p) (channel-put ch 'unblock) #f] + [else #f])] + [(message (at-meta (tcp-accepted remote-addr _ cin cout))) + (transition state (spawn-connection (listener-state-server-addr state) + remote-addr + cin + cout))] + [_ #f])) + +(define (spawn-tcp-listener server-addr) + (match-define (tcp-listener port) server-addr) + (define listener (tcp:tcp-listen port 4 #t)) + (define control-ch (make-channel)) + (thread (lambda () (tcp-listener-thread control-ch listener server-addr))) + (spawn tcp-listener-behavior + (listener-state control-ch server-addr) + (sub (advertise (observe (tcp-channel ? server-addr ?)))) ;; monitor peer + (pub (advertise (tcp-channel ? server-addr ?))) ;; declare we might make connections + (sub (tcp-accepted ? server-addr ? ?) #:meta-level 1) ;; events from driver thread + )) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Outbound Connection + +(define (spawn-tcp-connection local-addr remote-addr) + (match-define (tcp-address remote-hostname remote-port) remote-addr) + (define-values (cin cout) + (with-handlers ([exn:fail:network? (lambda (e) + ;; TODO: it'd be nice to + ;; somehow communicate the + ;; actual error to the local + ;; peer. + (log-error "~a" (exn->string e)) + (define o (open-output-string)) + (close-output-port o) + (values (open-input-string "") + o))]) + (tcp:tcp-connect remote-hostname remote-port))) + (spawn-connection local-addr remote-addr cin cout)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Connection + +(struct connection-state (control-ch cout) #:transparent) + +(define (read-bytes-avail-evt len input-port) + (guard-evt + (lambda () + (let ([bstr (make-bytes len)]) + (handle-evt + (read-bytes-avail!-evt bstr input-port) + (lambda (v) + (if (number? v) + (if (= v len) bstr (subbytes bstr 0 v)) + v))))))) + +(define (tcp-connection-thread remote-addr local-addr control-ch cin) + (let loop ((blocked? #t)) + (sync (handle-evt control-ch + (match-lambda + ['unblock (loop #f)] + ['quit (void)])) + (if blocked? + never-evt + (handle-evt (read-bytes-avail-evt 32768 cin) + (lambda (eof-or-bs) + (send-ground-message (tcp-channel remote-addr local-addr eof-or-bs)) + (loop (or blocked? (eof-object? eof-or-bs)))))))) + (close-input-port cin)) + +(define (shutdown-connection! state) + (match-define (connection-state control-ch cout) state) + (when control-ch (channel-put control-ch 'quit)) + (when cout (close-output-port cout))) + +(define (tcp-connection e state) + (with-handlers [((lambda (exn) #t) + (lambda (exn) + (shutdown-connection! state) + (raise exn)))] + (match e + [(message (at-meta (tcp-channel remote-addr local-addr (? eof-object?)))) + (shutdown-connection! state) + (quit)] + [(message (at-meta (tcp-channel remote-addr local-addr (? bytes? bs)))) + (transition state (message (tcp-channel remote-addr local-addr bs)))] + [(message (tcp-channel _ _ bs)) + (write-bytes bs (connection-state-cout state)) + (flush-output (connection-state-cout state)) + #f] + [(? patch? p) + (define ch (connection-state-control-ch state)) + (cond [(patch/removed? p) (shutdown-connection! state) (quit)] + [(patch/added? p) (channel-put ch 'unblock) #f] + [else #f])] + [#f #f]))) + +(define (spawn-connection local-addr remote-addr cin cout) + (define control-ch (make-channel)) + (thread (lambda () (tcp-connection-thread remote-addr local-addr control-ch cin))) + (spawn tcp-connection + (connection-state control-ch cout) + (sub (observe (tcp-channel remote-addr local-addr ?))) ;; monitor peer + (pub (tcp-channel remote-addr local-addr ?)) ;; may send segments to peer + (sub (tcp-channel local-addr remote-addr ?)) ;; want segments from peer + (sub (tcp-channel remote-addr local-addr ?) #:meta-level 1) ;; segments from driver thread + )) diff --git a/prospect/examples/chat.rkt b/prospect/examples/chat.rkt new file mode 100644 index 0000000..9faacab --- /dev/null +++ b/prospect/examples/chat.rkt @@ -0,0 +1,43 @@ +#lang prospect + +(require (only-in racket/string string-trim)) +(require "../drivers/tcp.rkt") +(require "../demand-matcher.rkt") + +(define (spawn-session them us) + (define user (gensym 'user)) + (define remote-detector (compile-projection (at-meta (?!)))) + (define peer-detector (compile-projection (advertise `(,(?!) says ,?)))) + (define (send-to-remote fmt . vs) + (message (at-meta (tcp-channel us them (string->bytes/utf-8 (apply format fmt vs)))))) + (define (say who fmt . vs) + (unless (equal? who user) (send-to-remote "~a ~a\n" who (apply format fmt vs)))) + (list (send-to-remote "Welcome, ~a.\n" user) + (spawn/stateless + (lambda (e) + (match e + [(message (at-meta (tcp-channel _ _ bs))) + (message `(,user says ,(string-trim (bytes->string/utf-8 bs))))] + [(message `(,who says ,what)) + (say who "says: ~a" what)] + [(? patch? p) + (if (patch/removed? (patch-project p remote-detector)) + (quit (send-to-remote "Goodbye!\n")) + (let-values (((arrived departed) (patch-project/set/single p peer-detector))) + (list (for/list [(who arrived)] (say who "arrived.")) + (for/list [(who departed)] (say who "departed.")))))] + [#f #f])) + (sub `(,? says ,?)) ;; read actual chat messages + (sub (advertise `(,? says ,?))) ;; observe peer presence + (pub `(,user says ,?)) ;; advertise our presence + (sub (tcp-channel them us ?) #:meta-level 1) ;; read from remote client + (sub (advertise (tcp-channel them us ?)) #:meta-level 1) ;; monitor remote client + (pub (tcp-channel us them ?) #:meta-level 1) ;; we will write to remote client + ))) + +(spawn-tcp-driver) +(spawn-world + (spawn-demand-matcher (advertise (tcp-channel (?!) (?! (tcp-listener 5999)) ?)) + (observe (tcp-channel (?!) (?! (tcp-listener 5999)) ?)) + #:meta-level 1 + spawn-session)) diff --git a/prospect/examples/tcp-hello.rkt b/prospect/examples/tcp-hello.rkt new file mode 100644 index 0000000..64cb18d --- /dev/null +++ b/prospect/examples/tcp-hello.rkt @@ -0,0 +1,36 @@ +#lang prospect + +(require "../drivers/tcp.rkt") +(require "../demand-matcher.rkt") + +(spawn-tcp-driver) + +(define server-id (tcp-listener 5999)) + +(define (spawn-connection-handler c) + (log-info "spawn-connection-handler ~v" c) + (define (connection-handler e n) + (when e (log-info "connection-handler ~v: ~v /// ~v" c e n)) + (match e + [(? patch/removed?) (quit)] + [(message (tcp-channel src dst #"quit\n")) + (quit (message (tcp-channel dst src #"OK, then.\n")))] + [(message (tcp-channel src dst bs)) + (transition n (message (tcp-channel dst src (string->bytes/utf-8 + (format "You said: ~a" bs)))))] + [_ + (and (< n 5) + (transition (+ n 1) (message (tcp-channel server-id c (string->bytes/utf-8 + (format "msg ~v\n" n))))))])) + (spawn connection-handler + 0 + (sub (advertise (tcp-channel c server-id ?))) + (sub (tcp-channel c server-id ?)) + (pub (tcp-channel server-id c ?)))) + +(spawn-demand-matcher (advertise (tcp-channel (?!) server-id ?)) + (observe (tcp-channel (?!) server-id ?)) + spawn-connection-handler + (lambda (c) + (log-info "Connection handler ~v decided to exit" c) + '()))