diff --git a/minimart/drivers/tcp.rkt b/minimart/drivers/tcp.rkt new file mode 100644 index 0000000..2fb42e6 --- /dev/null +++ b/minimart/drivers/tcp.rkt @@ -0,0 +1,139 @@ +#lang racket/base + +(require racket/match) +(require (prefix-in tcp: racket/tcp)) +(require (only-in racket/port read-bytes-avail!-evt)) +(require "../main.rkt") +(require "../demand-matcher.rkt") + +(require racket/unit) +(require net/tcp-sig) +(require net/tcp-unit) + +(provide (struct-out tcp-address) + (struct-out tcp-handle) + (struct-out tcp-listener) + (struct-out tcp-channel) + spawn-tcp-driver) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Protocol messages + +(struct tcp-address (host port) #:prefab) +(struct tcp-handle (id) #:prefab) +(struct tcp-listener (port) #:prefab) + +(struct tcp-channel (source destination subpacket) #:prefab) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Driver + +(define (spawn-tcp-driver) + (list (spawn-demand-matcher (tcp-channel ? (?! (tcp-listener ?)) ?) + #:demand-level 1 + #:supply-level 2 + spawn-tcp-listener) + (spawn-demand-matcher (tcp-channel (?! (tcp-handle ?)) (?! (tcp-address ? ?)) ?) + #:demand-is-subscription? #f + spawn-tcp-connection))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Listener + +(struct listener-state (listener server-addr) #:transparent) + +(define (tcp-listener-behavior e state) + (match e + [(routing-update g) + (match-define (listener-state listener server-addr) state) + (if (gestalt-empty? (gestalt-filter g (pub (tcp-channel ? server-addr ?) #:level 2))) + (begin (when listener (tcp:tcp-close listener)) + (transition (struct-copy listener-state state [listener #f]) (quit))) + #f)] + [(message (event _ (list (list cin cout))) 1 #f) + (define-values (local-hostname local-port remote-hostname remote-port) (tcp:tcp-addresses cin #t)) + (transition state + (spawn-connection (listener-state-server-addr state) + (tcp-address remote-hostname remote-port) + cin + cout))] + [_ #f])) + +(define (spawn-tcp-listener server-addr) + (match-define (tcp-listener port) server-addr) + (define listener (tcp:tcp-listen port 4 #t)) + (spawn tcp-listener-behavior + (listener-state listener server-addr) + (gestalt-union (pub (tcp-channel ? server-addr ?) #:level 2) + (sub (event (tcp:tcp-accept-evt listener) ?) #:meta-level 1)))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Outbound Connection + +(define (spawn-tcp-connection local-addr remote-addr) + (match-define (tcp-address remote-hostname remote-port) remote-addr) + (define-values (cin cout) (tcp:tcp-connect remote-hostname remote-port)) + (spawn-connection local-addr remote-addr cin cout)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Connection + +(struct connection-state (seen-peer? local-addr remote-addr cin cout) #:transparent) + +(define (shutdown-connection state) + (define cin (connection-state-cin state)) + (define cout (connection-state-cout state)) + (when cin (close-input-port cin)) + (when cout (close-output-port cout)) + (transition (struct-copy connection-state state [cin #f] [cout #f]) (quit))) + +(define (read-bytes-avail-evt len input-port) + (guard-evt + (lambda () + (let ([bstr (make-bytes len)]) + (handle-evt + (read-bytes-avail!-evt bstr input-port) + (lambda (v) + (if (number? v) + (if (= v len) bstr (subbytes bstr 0 v)) + v))))))) + +(define (tcp-connection e state) + (with-handlers [((lambda (exn) #t) + (lambda (exn) + (shutdown-connection state) + (raise exn)))] + (match e + [(message (event _ (list (? eof-object?))) 1 #f) + (shutdown-connection state)] + [(message (event _ (list (? bytes? bs))) 1 #f) + (transition state (send (tcp-channel (connection-state-remote-addr state) + (connection-state-local-addr state) + bs)))] + [(message (tcp-channel _ _ bs) 0 #f) + (write-bytes bs (connection-state-cout state)) + (flush-output (connection-state-cout state)) + #f] + [(routing-update g) + (cond + [(and (connection-state-seen-peer? state) (gestalt-empty? g)) + (shutdown-connection state)] + [(and (not (connection-state-seen-peer? state)) (not (gestalt-empty? g))) + (define new-state (struct-copy connection-state state [seen-peer? #t])) + (transition new-state (routing-update (connection-gestalt new-state)))] + [else + #f])] + [#f #f]))) + +(define (connection-gestalt state) + (match-define (connection-state seen-peer? local-addr remote-addr cin _) state) + (gestalt-union (pub (tcp-channel remote-addr local-addr ?)) + (sub (tcp-channel local-addr remote-addr ?)) + (pub (tcp-channel remote-addr local-addr ?) #:level 1) + (if seen-peer? + (sub (event (read-bytes-avail-evt 32768 cin) ?) #:meta-level 1) + (gestalt-empty)))) + +(define (spawn-connection local-addr remote-addr cin cout) + (define state (connection-state #f local-addr remote-addr cin cout)) + (spawn tcp-connection state (connection-gestalt state))) diff --git a/minimart/examples/chat-client.rkt b/minimart/examples/chat-client.rkt new file mode 100644 index 0000000..a44fe3b --- /dev/null +++ b/minimart/examples/chat-client.rkt @@ -0,0 +1,29 @@ +#lang minimart + +(require (only-in racket/port read-bytes-line-evt)) +(require "../drivers/tcp.rkt") + +(define local-handle (tcp-handle 'chat)) +(define remote-handle (tcp-address "localhost" 5999)) + +(spawn-tcp-driver) + +(spawn (lambda (e seen-remote?) + (match e + [(message (event _ (list (? eof-object?))) 1 #f) + (transition seen-remote? (quit))] + [(message (event _ (list line)) 1 #f) + (transition seen-remote? (send (tcp-channel local-handle remote-handle line)))] + [(message (tcp-channel _ _ bs) 0 #f) + (write-bytes bs) + #f] + [(routing-update g) + (define remote-present? (not (gestalt-empty? g))) + (transition (or seen-remote? remote-present?) + (when (and (not remote-present?) seen-remote?) (quit)))] + [#f #f])) + #f + (gestalt-union (sub (event (read-bytes-line-evt (current-input-port) 'any) ?) #:meta-level 1) + (sub (tcp-channel remote-handle local-handle ?)) + (pub (tcp-channel local-handle remote-handle ?)) + (pub (tcp-channel local-handle remote-handle ?) #:level 1))) diff --git a/minimart/examples/chat.rkt b/minimart/examples/chat.rkt new file mode 100644 index 0000000..4bd08cf --- /dev/null +++ b/minimart/examples/chat.rkt @@ -0,0 +1,51 @@ +#lang minimart + +(require racket/set) +(require (only-in racket/string string-trim)) +(require "../drivers/tcp.rkt") +(require "../demand-matcher.rkt") + +(struct session (seen-remote? peers) #:transparent) + +(define (spawn-session them us) + (define user (gensym 'user)) + (define remote-detector (compile-gestalt-projection (?!))) + (define peer-detector (compile-gestalt-projection `(,(?!) says ,?))) + (define (send-to-remote fmt . vs) + (send #:meta-level 1 (tcp-channel us them (string->bytes/utf-8 (apply format fmt vs))))) + (define (say who fmt . vs) + (unless (equal? who user) (send-to-remote "~a ~a\n" who (apply format fmt vs)))) + (list (send-to-remote "Welcome, ~a.\n" user) + (spawn (lambda (e state) + (match e + [(message (tcp-channel _ _ bs) 1 #f) + (transition state (send `(,user says ,(string-trim (bytes->string/utf-8 bs)))))] + [(message `(,who says ,what) 0 #f) + (transition state (say who "says: ~a" what))] + [(routing-update g) + (match-define (session seen-remote? old-peers) state) + (define remote-present? + (not (matcher-empty? (gestalt-project g 1 0 #f remote-detector)))) + (define new-peers (matcher-key-set/single (gestalt-project g 0 0 #t peer-detector))) + (transition + (struct-copy session state + [seen-remote? (or remote-present? seen-remote?)] + [peers new-peers]) + (list (when (and seen-remote? (not remote-present?)) (quit)) + (for/list [(who (set-subtract new-peers old-peers))] (say who "arrived.")) + (for/list [(who (set-subtract old-peers new-peers))] (say who "departed."))))] + [#f #f])) + (session #f (set)) + (gestalt-union (sub `(,? says ,?)) + (sub `(,? says ,?) #:level 1) + (pub `(,user says ,?)) + (sub (tcp-channel them us ?) #:meta-level 1) + (pub (tcp-channel us them ?) #:meta-level 1) + (pub (tcp-channel us them ?) #:meta-level 1 #:level 1))))) + +(spawn-tcp-driver) +(spawn-world + (spawn-demand-matcher (tcp-channel (?! (tcp-address ? ?)) (?! (tcp-listener 5999)) ?) + #:meta-level 1 + #:demand-is-subscription? #f + spawn-session)) diff --git a/minimart/examples/tcp-hello.rkt b/minimart/examples/tcp-hello.rkt new file mode 100644 index 0000000..51e6468 --- /dev/null +++ b/minimart/examples/tcp-hello.rkt @@ -0,0 +1,31 @@ +#lang minimart + +(require "../drivers/tcp.rkt") +(require "../demand-matcher.rkt") + +(spawn-tcp-driver) + +(define server-id (tcp-listener 5999)) + +(define (spawn-connection-handler c) + (log-info "spawn-connection-handler ~v" c) + (define (connection-handler e n) + (when e (log-info "connection-handler ~v: ~v /// ~v" c e n)) + (match e + [(routing-update (? gestalt-empty?)) (transition n (quit))] + [_ + (if (< n 20) + (transition (+ n 1) (send (tcp-channel server-id c (string->bytes/utf-8 (format "msg ~v\n" n))))) + #f)])) + (spawn connection-handler + 0 + (gestalt-union (sub (tcp-channel c server-id ?)) + (pub (tcp-channel server-id c ?)) + (pub (tcp-channel server-id c ?) #:level 1)))) + +(spawn-demand-matcher (tcp-channel (?! (tcp-address ? ?)) server-id ?) + #:demand-is-subscription? #f + spawn-connection-handler + (lambda (c) + (log-info "Connection handler ~v decided to exit" c) + '()))