From f6edad972fc2d62174350e469f8a8872ea4d632a Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Wed, 17 Jul 2013 12:27:03 -0400 Subject: [PATCH] Simplify TCP driver state management by exploiting match-interest-type. --- drivers/tcp-bare-with-ground.rkt | 178 +++++++++++++++++++++++++++++++ drivers/tcp-bare.rkt | 145 +++++++++++-------------- 2 files changed, 240 insertions(+), 83 deletions(-) create mode 100644 drivers/tcp-bare-with-ground.rkt diff --git a/drivers/tcp-bare-with-ground.rkt b/drivers/tcp-bare-with-ground.rkt new file mode 100644 index 0000000..1813246 --- /dev/null +++ b/drivers/tcp-bare-with-ground.rkt @@ -0,0 +1,178 @@ +#lang racket/base +;; TCP drivers, ported from os2.rkt directly, with flow-control and line discipline removed + +(require racket/set) +(require racket/match) +(require (prefix-in tcp: racket/tcp)) +(require racket/port) +(require "../sugar-untyped.rkt") +(require "../support/dump-bytes.rkt") + +(provide (struct-out tcp-address) + (struct-out tcp-handle) + (struct-out tcp-listener) + (struct-out tcp-channel) + tcp + tcp-driver) + +(struct tcp-address (host port) #:prefab) +(struct tcp-handle (id) #:prefab) +(struct tcp-listener (port) #:prefab) + +(struct tcp-channel (source destination subpacket) #:prefab) + +(define any-remote (tcp-address (wild) (wild))) +(define any-handle (tcp-handle (wild))) +(define any-listener (tcp-listener (wild))) + +(define (tcp-driver) + (name-process 'tcp-driver + (spawn + (transition (set) + (observe-publishers/everything (tcp-channel any-listener any-remote (wild)) + (match-state active-handles + (match-conversation c + (on-presence (maybe-spawn-socket 'publisher c active-handles #f tcp-listener-manager)) + (on-absence (maybe-forget-socket 'publisher c active-handles))))) + (observe-subscribers/everything (tcp-channel any-remote any-listener (wild)) + (match-state active-handles + (match-conversation c + (on-presence (maybe-spawn-socket 'subscriber c active-handles #f tcp-listener-manager)) + (on-absence (maybe-forget-socket 'subscriber c active-handles))))) + (observe-publishers (tcp-channel any-handle any-remote (wild)) + (match-state active-handles + (match-conversation c + (on-presence + (maybe-spawn-socket 'publisher c active-handles #t tcp-connection-manager)) + (on-absence (maybe-forget-socket 'publisher c active-handles))))) + (observe-subscribers (tcp-channel any-handle any-remote (wild)) + (match-state active-handles + (match-conversation c + (on-presence + (maybe-spawn-socket 'subscriber c active-handles #t tcp-connection-manager)) + (on-absence (maybe-forget-socket 'subscriber c active-handles))))))))) + +(define tcp (tcp-driver)) ;; pre-instantiated! + +(define (maybe-spawn-socket orientation c active-handles remote-should-be-ground driver-fun) + (match (list orientation c) + [(or (list 'publisher (tcp-channel local-addr remote-addr _)) + (list 'subscriber (tcp-channel remote-addr local-addr _))) + (cond + [(not (eqv? remote-should-be-ground (ground? remote-addr))) (transition active-handles)] + [(not (ground? local-addr)) (transition active-handles)] + [(set-member? active-handles (cons local-addr remote-addr)) (transition active-handles)] + [else + (transition (set-add active-handles (cons local-addr remote-addr)) + (name-process (cons local-addr remote-addr) + (spawn (driver-fun local-addr remote-addr))))])])) + +;; Orientation Topic Set -> Transition +(define (maybe-forget-socket orientation c active-handles) + (match (list orientation c) + [(or (list 'publisher (tcp-channel local-addr remote-addr _)) + (list 'subscriber (tcp-channel remote-addr local-addr _))) + (cond + [(ground? remote-addr) (transition active-handles)] + [(not (ground? local-addr)) (transition active-handles)] + [else (transition (set-remove active-handles local-addr))])])) + +;; TcpAddress TcpAddress -> Transition +(define (tcp-listener-manager local-addr dummy-remote-addr) + (match-define (tcp-listener port) local-addr) + (define listener (tcp:tcp-listen port 4 #t)) + + (define (handle-absence orientation c state) + ;; Hey, what if the presence we need went away between our manager + ;; spawning us, and us getting to this point? Presence being + ;; "edge-" rather than "level-triggered" means we'll hang around + ;; sadly forever, accepting connections to nowhere. TODO + (match (list orientation c) + [(or (list 'publisher (tcp-channel (== local-addr) remote-addr _)) + (list 'subscriber (tcp-channel remote-addr (== local-addr) _))) + (if (ground? remote-addr) + (transition state) + (transition 'listener-is-closed + (quit) + (when (eq? state 'listener-is-running) + (name-process (list 'tcp-listener-closer local-addr) + (spawn (begin (tcp:tcp-close listener) + (transition 'dummy (quit))))))))])) + + (transition 'listener-is-running + (observe-publishers/everything (tcp-channel local-addr any-remote (wild)) + (match-state state + (match-conversation c + (on-absence (handle-absence 'publisher c state))))) + (observe-subscribers/everything (tcp-channel any-remote local-addr (wild)) + (match-state state + (match-conversation c + (on-absence (handle-absence 'subscriber c state))))) + (subscriber (cons (tcp:tcp-accept-evt listener) (wild)) + (on-message + [(cons _ (list cin cout)) + (let-values (((local-hostname local-port remote-hostname remote-port) + (tcp:tcp-addresses cin #t))) + (define remote-addr (tcp-address remote-hostname remote-port)) + (name-process (cons local-addr remote-addr) + (spawn (tcp-connection-manager* local-addr remote-addr cin cout))))])))) + +;; TcpAddress TcpAddress -> Transition +(define (tcp-connection-manager local-addr remote-addr) + (match-define (tcp-address remote-hostname remote-port) remote-addr) + (define-values (cin cout) (tcp:tcp-connect remote-hostname remote-port)) + (tcp-connection-manager* local-addr remote-addr cin cout)) + +(define (read-bytes-avail-evt len input-port) + (guard-evt + (lambda () + (let ([bstr (make-bytes len)]) + (wrap-evt + (read-bytes-avail!-evt bstr input-port) + (lambda (v) + (if (number? v) + (if (= v len) bstr (subbytes bstr 0 v)) + v))))))) + +;; TcpAddress TcpAddress InputPort OutputPort -> Transition +;; +;; Our process state here is a Maybe, representing +;; a shutting-down state if #f. +(define (tcp-connection-manager* local-addr remote-addr cin cout) + (define (close-transition is-open send-eof?) + (transition #f + (when is-open + (list (when send-eof? + (send-message (tcp-channel remote-addr local-addr eof))) + (name-process (list 'tcp-connection-closer local-addr remote-addr) + (spawn (begin (tcp:tcp-abandon-port cin) + (tcp:tcp-abandon-port cout) + (transition/no-state (quit))))))) + (quit))) + + (transition #t ;; open + (subscriber (cons (read-bytes-avail-evt 4096 cin) (wild)) + (match-state is-open + (on-message + [(cons _ (? eof-object?)) (close-transition is-open #t)] + [(cons _ (? bytes? bs)) (transition is-open + (send-message (tcp-channel remote-addr local-addr bs)))]))) + (subscriber (cons (eof-evt cin) (wild)) + (match-state is-open + (on-message [(cons (? evt?) _) (close-transition is-open #t)]))) + (subscriber (tcp-channel local-addr remote-addr (wild)) + (match-state is-open + (on-absence (close-transition is-open #f)) + (on-message + [(tcp-channel (== local-addr) (== remote-addr) subpacket) + (match subpacket + [(? eof-object?) (close-transition is-open #f)] + [(? string? s) (begin (write-string s cout) + (flush-output cout) + (transition is-open))] + [(? bytes? bs) (begin (write-bytes bs cout) + (flush-output cout) + (transition is-open))])]))) + (publisher (tcp-channel remote-addr local-addr (wild)) + (match-state is-open + (on-absence (close-transition is-open #f)))))) diff --git a/drivers/tcp-bare.rkt b/drivers/tcp-bare.rkt index 1813246..656ba27 100644 --- a/drivers/tcp-bare.rkt +++ b/drivers/tcp-bare.rkt @@ -1,5 +1,5 @@ #lang racket/base -;; TCP drivers, ported from os2.rkt directly, with flow-control and line discipline removed +;; TCP driver, with flow-control and line discipline removed, sans reliance on (ground?) (require racket/set) (require racket/match) @@ -30,84 +30,66 @@ (spawn (transition (set) (observe-publishers/everything (tcp-channel any-listener any-remote (wild)) - (match-state active-handles - (match-conversation c - (on-presence (maybe-spawn-socket 'publisher c active-handles #f tcp-listener-manager)) - (on-absence (maybe-forget-socket 'publisher c active-handles))))) + (match-interest-type 'observer + (match-state active-handles + (match-conversation (tcp-channel L _ _) + (on-presence (maybe-spawn-socket 'incoming L active-handles tcp-listener-manager)) + (on-absence (maybe-forget-socket 'incoming L active-handles)))))) (observe-subscribers/everything (tcp-channel any-remote any-listener (wild)) - (match-state active-handles - (match-conversation c - (on-presence (maybe-spawn-socket 'subscriber c active-handles #f tcp-listener-manager)) - (on-absence (maybe-forget-socket 'subscriber c active-handles))))) + (match-interest-type 'observer + (match-state active-handles + (match-conversation (tcp-channel _ L _) + (on-presence (maybe-spawn-socket 'incoming L active-handles tcp-listener-manager)) + (on-absence (maybe-forget-socket 'incoming L active-handles)))))) (observe-publishers (tcp-channel any-handle any-remote (wild)) (match-state active-handles - (match-conversation c - (on-presence - (maybe-spawn-socket 'publisher c active-handles #t tcp-connection-manager)) - (on-absence (maybe-forget-socket 'publisher c active-handles))))) - (observe-subscribers (tcp-channel any-handle any-remote (wild)) + (match-conversation (tcp-channel L R _) + (on-presence (maybe-spawn-socket R L active-handles tcp-connection-manager)) + (on-absence (maybe-forget-socket R L active-handles))))) + (observe-subscribers (tcp-channel any-remote any-handle (wild)) (match-state active-handles - (match-conversation c - (on-presence - (maybe-spawn-socket 'subscriber c active-handles #t tcp-connection-manager)) - (on-absence (maybe-forget-socket 'subscriber c active-handles))))))))) + (match-conversation (tcp-channel R L _) + (on-presence (maybe-spawn-socket R L active-handles tcp-connection-manager)) + (on-absence (maybe-forget-socket R L active-handles))))))))) (define tcp (tcp-driver)) ;; pre-instantiated! -(define (maybe-spawn-socket orientation c active-handles remote-should-be-ground driver-fun) - (match (list orientation c) - [(or (list 'publisher (tcp-channel local-addr remote-addr _)) - (list 'subscriber (tcp-channel remote-addr local-addr _))) - (cond - [(not (eqv? remote-should-be-ground (ground? remote-addr))) (transition active-handles)] - [(not (ground? local-addr)) (transition active-handles)] - [(set-member? active-handles (cons local-addr remote-addr)) (transition active-handles)] - [else - (transition (set-add active-handles (cons local-addr remote-addr)) - (name-process (cons local-addr remote-addr) - (spawn (driver-fun local-addr remote-addr))))])])) +(define (maybe-spawn-socket R L active-handles driver-fun) + (define name (cons L R)) + (if (set-member? active-handles name) + (transition active-handles) + (transition (set-add active-handles name) + (name-process name (spawn (driver-fun L R)))))) -;; Orientation Topic Set -> Transition -(define (maybe-forget-socket orientation c active-handles) - (match (list orientation c) - [(or (list 'publisher (tcp-channel local-addr remote-addr _)) - (list 'subscriber (tcp-channel remote-addr local-addr _))) - (cond - [(ground? remote-addr) (transition active-handles)] - [(not (ground? local-addr)) (transition active-handles)] - [else (transition (set-remove active-handles local-addr))])])) +(define (maybe-forget-socket R L active-handles) + (define name (cons L R)) + (transition (set-remove active-handles name))) -;; TcpAddress TcpAddress -> Transition -(define (tcp-listener-manager local-addr dummy-remote-addr) +;; TcpAddress 'incoming -> Transition +(define (tcp-listener-manager local-addr dummy-incoming-marker) (match-define (tcp-listener port) local-addr) (define listener (tcp:tcp-listen port 4 #t)) - (define (handle-absence orientation c state) + (define (handle-absence) ;; Hey, what if the presence we need went away between our manager ;; spawning us, and us getting to this point? Presence being ;; "edge-" rather than "level-triggered" means we'll hang around ;; sadly forever, accepting connections to nowhere. TODO - (match (list orientation c) - [(or (list 'publisher (tcp-channel (== local-addr) remote-addr _)) - (list 'subscriber (tcp-channel remote-addr (== local-addr) _))) - (if (ground? remote-addr) - (transition state) - (transition 'listener-is-closed - (quit) - (when (eq? state 'listener-is-running) - (name-process (list 'tcp-listener-closer local-addr) - (spawn (begin (tcp:tcp-close listener) - (transition 'dummy (quit))))))))])) + (transition 'listener-is-closed + (name-process (list 'tcp-listener-closer local-addr) + (spawn (begin (tcp:tcp-close listener) + (transition 'dummy (quit))))) + (quit))) (transition 'listener-is-running (observe-publishers/everything (tcp-channel local-addr any-remote (wild)) - (match-state state - (match-conversation c - (on-absence (handle-absence 'publisher c state))))) + (match-interest-type 'observer + (match-state 'listener-is-running + (on-absence (handle-absence))))) (observe-subscribers/everything (tcp-channel any-remote local-addr (wild)) - (match-state state - (match-conversation c - (on-absence (handle-absence 'subscriber c state))))) + (match-interest-type 'observer + (match-state 'listener-is-running + (on-absence (handle-absence))))) (subscriber (cons (tcp:tcp-accept-evt listener) (wild)) (on-message [(cons _ (list cin cout)) @@ -136,43 +118,40 @@ ;; TcpAddress TcpAddress InputPort OutputPort -> Transition ;; -;; Our process state here is a Maybe, representing -;; a shutting-down state if #f. +;; Our process state here is either 'open or 'closing. (define (tcp-connection-manager* local-addr remote-addr cin cout) - (define (close-transition is-open send-eof?) - (transition #f - (when is-open - (list (when send-eof? - (send-message (tcp-channel remote-addr local-addr eof))) - (name-process (list 'tcp-connection-closer local-addr remote-addr) - (spawn (begin (tcp:tcp-abandon-port cin) - (tcp:tcp-abandon-port cout) - (transition/no-state (quit))))))) + (define (close-transition send-eof?) + (transition 'closing + (when send-eof? (send-message (tcp-channel remote-addr local-addr eof))) + (name-process (list 'tcp-connection-closer local-addr remote-addr) + (spawn (begin (tcp:tcp-abandon-port cin) + (tcp:tcp-abandon-port cout) + (transition/no-state (quit))))) (quit))) - (transition #t ;; open + (transition 'open (subscriber (cons (read-bytes-avail-evt 4096 cin) (wild)) - (match-state is-open + (match-state 'open (on-message - [(cons _ (? eof-object?)) (close-transition is-open #t)] - [(cons _ (? bytes? bs)) (transition is-open + [(cons _ (? eof-object?)) (close-transition #t)] + [(cons _ (? bytes? bs)) (transition 'open (send-message (tcp-channel remote-addr local-addr bs)))]))) (subscriber (cons (eof-evt cin) (wild)) - (match-state is-open - (on-message [(cons (? evt?) _) (close-transition is-open #t)]))) + (match-state 'open + (on-message [(cons (? evt?) _) (close-transition #t)]))) (subscriber (tcp-channel local-addr remote-addr (wild)) - (match-state is-open - (on-absence (close-transition is-open #f)) + (match-state 'open + (on-absence (close-transition #f)) (on-message [(tcp-channel (== local-addr) (== remote-addr) subpacket) (match subpacket - [(? eof-object?) (close-transition is-open #f)] + [(? eof-object?) (close-transition #f)] [(? string? s) (begin (write-string s cout) (flush-output cout) - (transition is-open))] + (transition 'open))] [(? bytes? bs) (begin (write-bytes bs cout) (flush-output cout) - (transition is-open))])]))) + (transition 'open))])]))) (publisher (tcp-channel remote-addr local-addr (wild)) - (match-state is-open - (on-absence (close-transition is-open #f)))))) + (match-state 'open + (on-absence (close-transition #f))))))