Simplify TCP driver state management by exploiting match-interest-type.
This commit is contained in:
parent
82aaa12c4e
commit
f6edad972f
|
@ -0,0 +1,178 @@
|
|||
#lang racket/base
|
||||
;; TCP drivers, ported from os2.rkt directly, with flow-control and line discipline removed
|
||||
|
||||
(require racket/set)
|
||||
(require racket/match)
|
||||
(require (prefix-in tcp: racket/tcp))
|
||||
(require racket/port)
|
||||
(require "../sugar-untyped.rkt")
|
||||
(require "../support/dump-bytes.rkt")
|
||||
|
||||
(provide (struct-out tcp-address)
|
||||
(struct-out tcp-handle)
|
||||
(struct-out tcp-listener)
|
||||
(struct-out tcp-channel)
|
||||
tcp
|
||||
tcp-driver)
|
||||
|
||||
(struct tcp-address (host port) #:prefab)
|
||||
(struct tcp-handle (id) #:prefab)
|
||||
(struct tcp-listener (port) #:prefab)
|
||||
|
||||
(struct tcp-channel (source destination subpacket) #:prefab)
|
||||
|
||||
(define any-remote (tcp-address (wild) (wild)))
|
||||
(define any-handle (tcp-handle (wild)))
|
||||
(define any-listener (tcp-listener (wild)))
|
||||
|
||||
(define (tcp-driver)
|
||||
(name-process 'tcp-driver
|
||||
(spawn
|
||||
(transition (set)
|
||||
(observe-publishers/everything (tcp-channel any-listener any-remote (wild))
|
||||
(match-state active-handles
|
||||
(match-conversation c
|
||||
(on-presence (maybe-spawn-socket 'publisher c active-handles #f tcp-listener-manager))
|
||||
(on-absence (maybe-forget-socket 'publisher c active-handles)))))
|
||||
(observe-subscribers/everything (tcp-channel any-remote any-listener (wild))
|
||||
(match-state active-handles
|
||||
(match-conversation c
|
||||
(on-presence (maybe-spawn-socket 'subscriber c active-handles #f tcp-listener-manager))
|
||||
(on-absence (maybe-forget-socket 'subscriber c active-handles)))))
|
||||
(observe-publishers (tcp-channel any-handle any-remote (wild))
|
||||
(match-state active-handles
|
||||
(match-conversation c
|
||||
(on-presence
|
||||
(maybe-spawn-socket 'publisher c active-handles #t tcp-connection-manager))
|
||||
(on-absence (maybe-forget-socket 'publisher c active-handles)))))
|
||||
(observe-subscribers (tcp-channel any-handle any-remote (wild))
|
||||
(match-state active-handles
|
||||
(match-conversation c
|
||||
(on-presence
|
||||
(maybe-spawn-socket 'subscriber c active-handles #t tcp-connection-manager))
|
||||
(on-absence (maybe-forget-socket 'subscriber c active-handles)))))))))
|
||||
|
||||
(define tcp (tcp-driver)) ;; pre-instantiated!
|
||||
|
||||
(define (maybe-spawn-socket orientation c active-handles remote-should-be-ground driver-fun)
|
||||
(match (list orientation c)
|
||||
[(or (list 'publisher (tcp-channel local-addr remote-addr _))
|
||||
(list 'subscriber (tcp-channel remote-addr local-addr _)))
|
||||
(cond
|
||||
[(not (eqv? remote-should-be-ground (ground? remote-addr))) (transition active-handles)]
|
||||
[(not (ground? local-addr)) (transition active-handles)]
|
||||
[(set-member? active-handles (cons local-addr remote-addr)) (transition active-handles)]
|
||||
[else
|
||||
(transition (set-add active-handles (cons local-addr remote-addr))
|
||||
(name-process (cons local-addr remote-addr)
|
||||
(spawn (driver-fun local-addr remote-addr))))])]))
|
||||
|
||||
;; Orientation Topic Set<HandleMapping> -> Transition
|
||||
(define (maybe-forget-socket orientation c active-handles)
|
||||
(match (list orientation c)
|
||||
[(or (list 'publisher (tcp-channel local-addr remote-addr _))
|
||||
(list 'subscriber (tcp-channel remote-addr local-addr _)))
|
||||
(cond
|
||||
[(ground? remote-addr) (transition active-handles)]
|
||||
[(not (ground? local-addr)) (transition active-handles)]
|
||||
[else (transition (set-remove active-handles local-addr))])]))
|
||||
|
||||
;; TcpAddress TcpAddress -> Transition
|
||||
(define (tcp-listener-manager local-addr dummy-remote-addr)
|
||||
(match-define (tcp-listener port) local-addr)
|
||||
(define listener (tcp:tcp-listen port 4 #t))
|
||||
|
||||
(define (handle-absence orientation c state)
|
||||
;; Hey, what if the presence we need went away between our manager
|
||||
;; spawning us, and us getting to this point? Presence being
|
||||
;; "edge-" rather than "level-triggered" means we'll hang around
|
||||
;; sadly forever, accepting connections to nowhere. TODO
|
||||
(match (list orientation c)
|
||||
[(or (list 'publisher (tcp-channel (== local-addr) remote-addr _))
|
||||
(list 'subscriber (tcp-channel remote-addr (== local-addr) _)))
|
||||
(if (ground? remote-addr)
|
||||
(transition state)
|
||||
(transition 'listener-is-closed
|
||||
(quit)
|
||||
(when (eq? state 'listener-is-running)
|
||||
(name-process (list 'tcp-listener-closer local-addr)
|
||||
(spawn (begin (tcp:tcp-close listener)
|
||||
(transition 'dummy (quit))))))))]))
|
||||
|
||||
(transition 'listener-is-running
|
||||
(observe-publishers/everything (tcp-channel local-addr any-remote (wild))
|
||||
(match-state state
|
||||
(match-conversation c
|
||||
(on-absence (handle-absence 'publisher c state)))))
|
||||
(observe-subscribers/everything (tcp-channel any-remote local-addr (wild))
|
||||
(match-state state
|
||||
(match-conversation c
|
||||
(on-absence (handle-absence 'subscriber c state)))))
|
||||
(subscriber (cons (tcp:tcp-accept-evt listener) (wild))
|
||||
(on-message
|
||||
[(cons _ (list cin cout))
|
||||
(let-values (((local-hostname local-port remote-hostname remote-port)
|
||||
(tcp:tcp-addresses cin #t)))
|
||||
(define remote-addr (tcp-address remote-hostname remote-port))
|
||||
(name-process (cons local-addr remote-addr)
|
||||
(spawn (tcp-connection-manager* local-addr remote-addr cin cout))))]))))
|
||||
|
||||
;; TcpAddress TcpAddress -> Transition
|
||||
(define (tcp-connection-manager local-addr remote-addr)
|
||||
(match-define (tcp-address remote-hostname remote-port) remote-addr)
|
||||
(define-values (cin cout) (tcp:tcp-connect remote-hostname remote-port))
|
||||
(tcp-connection-manager* local-addr remote-addr cin cout))
|
||||
|
||||
(define (read-bytes-avail-evt len input-port)
|
||||
(guard-evt
|
||||
(lambda ()
|
||||
(let ([bstr (make-bytes len)])
|
||||
(wrap-evt
|
||||
(read-bytes-avail!-evt bstr input-port)
|
||||
(lambda (v)
|
||||
(if (number? v)
|
||||
(if (= v len) bstr (subbytes bstr 0 v))
|
||||
v)))))))
|
||||
|
||||
;; TcpAddress TcpAddress InputPort OutputPort -> Transition
|
||||
;;
|
||||
;; Our process state here is a Maybe<TcpConnectionState>, representing
|
||||
;; a shutting-down state if #f.
|
||||
(define (tcp-connection-manager* local-addr remote-addr cin cout)
|
||||
(define (close-transition is-open send-eof?)
|
||||
(transition #f
|
||||
(when is-open
|
||||
(list (when send-eof?
|
||||
(send-message (tcp-channel remote-addr local-addr eof)))
|
||||
(name-process (list 'tcp-connection-closer local-addr remote-addr)
|
||||
(spawn (begin (tcp:tcp-abandon-port cin)
|
||||
(tcp:tcp-abandon-port cout)
|
||||
(transition/no-state (quit)))))))
|
||||
(quit)))
|
||||
|
||||
(transition #t ;; open
|
||||
(subscriber (cons (read-bytes-avail-evt 4096 cin) (wild))
|
||||
(match-state is-open
|
||||
(on-message
|
||||
[(cons _ (? eof-object?)) (close-transition is-open #t)]
|
||||
[(cons _ (? bytes? bs)) (transition is-open
|
||||
(send-message (tcp-channel remote-addr local-addr bs)))])))
|
||||
(subscriber (cons (eof-evt cin) (wild))
|
||||
(match-state is-open
|
||||
(on-message [(cons (? evt?) _) (close-transition is-open #t)])))
|
||||
(subscriber (tcp-channel local-addr remote-addr (wild))
|
||||
(match-state is-open
|
||||
(on-absence (close-transition is-open #f))
|
||||
(on-message
|
||||
[(tcp-channel (== local-addr) (== remote-addr) subpacket)
|
||||
(match subpacket
|
||||
[(? eof-object?) (close-transition is-open #f)]
|
||||
[(? string? s) (begin (write-string s cout)
|
||||
(flush-output cout)
|
||||
(transition is-open))]
|
||||
[(? bytes? bs) (begin (write-bytes bs cout)
|
||||
(flush-output cout)
|
||||
(transition is-open))])])))
|
||||
(publisher (tcp-channel remote-addr local-addr (wild))
|
||||
(match-state is-open
|
||||
(on-absence (close-transition is-open #f))))))
|
|
@ -1,5 +1,5 @@
|
|||
#lang racket/base
|
||||
;; TCP drivers, ported from os2.rkt directly, with flow-control and line discipline removed
|
||||
;; TCP driver, with flow-control and line discipline removed, sans reliance on (ground?)
|
||||
|
||||
(require racket/set)
|
||||
(require racket/match)
|
||||
|
@ -30,84 +30,66 @@
|
|||
(spawn
|
||||
(transition (set)
|
||||
(observe-publishers/everything (tcp-channel any-listener any-remote (wild))
|
||||
(match-state active-handles
|
||||
(match-conversation c
|
||||
(on-presence (maybe-spawn-socket 'publisher c active-handles #f tcp-listener-manager))
|
||||
(on-absence (maybe-forget-socket 'publisher c active-handles)))))
|
||||
(match-interest-type 'observer
|
||||
(match-state active-handles
|
||||
(match-conversation (tcp-channel L _ _)
|
||||
(on-presence (maybe-spawn-socket 'incoming L active-handles tcp-listener-manager))
|
||||
(on-absence (maybe-forget-socket 'incoming L active-handles))))))
|
||||
(observe-subscribers/everything (tcp-channel any-remote any-listener (wild))
|
||||
(match-state active-handles
|
||||
(match-conversation c
|
||||
(on-presence (maybe-spawn-socket 'subscriber c active-handles #f tcp-listener-manager))
|
||||
(on-absence (maybe-forget-socket 'subscriber c active-handles)))))
|
||||
(match-interest-type 'observer
|
||||
(match-state active-handles
|
||||
(match-conversation (tcp-channel _ L _)
|
||||
(on-presence (maybe-spawn-socket 'incoming L active-handles tcp-listener-manager))
|
||||
(on-absence (maybe-forget-socket 'incoming L active-handles))))))
|
||||
(observe-publishers (tcp-channel any-handle any-remote (wild))
|
||||
(match-state active-handles
|
||||
(match-conversation c
|
||||
(on-presence
|
||||
(maybe-spawn-socket 'publisher c active-handles #t tcp-connection-manager))
|
||||
(on-absence (maybe-forget-socket 'publisher c active-handles)))))
|
||||
(observe-subscribers (tcp-channel any-handle any-remote (wild))
|
||||
(match-conversation (tcp-channel L R _)
|
||||
(on-presence (maybe-spawn-socket R L active-handles tcp-connection-manager))
|
||||
(on-absence (maybe-forget-socket R L active-handles)))))
|
||||
(observe-subscribers (tcp-channel any-remote any-handle (wild))
|
||||
(match-state active-handles
|
||||
(match-conversation c
|
||||
(on-presence
|
||||
(maybe-spawn-socket 'subscriber c active-handles #t tcp-connection-manager))
|
||||
(on-absence (maybe-forget-socket 'subscriber c active-handles)))))))))
|
||||
(match-conversation (tcp-channel R L _)
|
||||
(on-presence (maybe-spawn-socket R L active-handles tcp-connection-manager))
|
||||
(on-absence (maybe-forget-socket R L active-handles)))))))))
|
||||
|
||||
(define tcp (tcp-driver)) ;; pre-instantiated!
|
||||
|
||||
(define (maybe-spawn-socket orientation c active-handles remote-should-be-ground driver-fun)
|
||||
(match (list orientation c)
|
||||
[(or (list 'publisher (tcp-channel local-addr remote-addr _))
|
||||
(list 'subscriber (tcp-channel remote-addr local-addr _)))
|
||||
(cond
|
||||
[(not (eqv? remote-should-be-ground (ground? remote-addr))) (transition active-handles)]
|
||||
[(not (ground? local-addr)) (transition active-handles)]
|
||||
[(set-member? active-handles (cons local-addr remote-addr)) (transition active-handles)]
|
||||
[else
|
||||
(transition (set-add active-handles (cons local-addr remote-addr))
|
||||
(name-process (cons local-addr remote-addr)
|
||||
(spawn (driver-fun local-addr remote-addr))))])]))
|
||||
(define (maybe-spawn-socket R L active-handles driver-fun)
|
||||
(define name (cons L R))
|
||||
(if (set-member? active-handles name)
|
||||
(transition active-handles)
|
||||
(transition (set-add active-handles name)
|
||||
(name-process name (spawn (driver-fun L R))))))
|
||||
|
||||
;; Orientation Topic Set<HandleMapping> -> Transition
|
||||
(define (maybe-forget-socket orientation c active-handles)
|
||||
(match (list orientation c)
|
||||
[(or (list 'publisher (tcp-channel local-addr remote-addr _))
|
||||
(list 'subscriber (tcp-channel remote-addr local-addr _)))
|
||||
(cond
|
||||
[(ground? remote-addr) (transition active-handles)]
|
||||
[(not (ground? local-addr)) (transition active-handles)]
|
||||
[else (transition (set-remove active-handles local-addr))])]))
|
||||
(define (maybe-forget-socket R L active-handles)
|
||||
(define name (cons L R))
|
||||
(transition (set-remove active-handles name)))
|
||||
|
||||
;; TcpAddress TcpAddress -> Transition
|
||||
(define (tcp-listener-manager local-addr dummy-remote-addr)
|
||||
;; TcpAddress 'incoming -> Transition
|
||||
(define (tcp-listener-manager local-addr dummy-incoming-marker)
|
||||
(match-define (tcp-listener port) local-addr)
|
||||
(define listener (tcp:tcp-listen port 4 #t))
|
||||
|
||||
(define (handle-absence orientation c state)
|
||||
(define (handle-absence)
|
||||
;; Hey, what if the presence we need went away between our manager
|
||||
;; spawning us, and us getting to this point? Presence being
|
||||
;; "edge-" rather than "level-triggered" means we'll hang around
|
||||
;; sadly forever, accepting connections to nowhere. TODO
|
||||
(match (list orientation c)
|
||||
[(or (list 'publisher (tcp-channel (== local-addr) remote-addr _))
|
||||
(list 'subscriber (tcp-channel remote-addr (== local-addr) _)))
|
||||
(if (ground? remote-addr)
|
||||
(transition state)
|
||||
(transition 'listener-is-closed
|
||||
(quit)
|
||||
(when (eq? state 'listener-is-running)
|
||||
(name-process (list 'tcp-listener-closer local-addr)
|
||||
(spawn (begin (tcp:tcp-close listener)
|
||||
(transition 'dummy (quit))))))))]))
|
||||
(transition 'listener-is-closed
|
||||
(name-process (list 'tcp-listener-closer local-addr)
|
||||
(spawn (begin (tcp:tcp-close listener)
|
||||
(transition 'dummy (quit)))))
|
||||
(quit)))
|
||||
|
||||
(transition 'listener-is-running
|
||||
(observe-publishers/everything (tcp-channel local-addr any-remote (wild))
|
||||
(match-state state
|
||||
(match-conversation c
|
||||
(on-absence (handle-absence 'publisher c state)))))
|
||||
(match-interest-type 'observer
|
||||
(match-state 'listener-is-running
|
||||
(on-absence (handle-absence)))))
|
||||
(observe-subscribers/everything (tcp-channel any-remote local-addr (wild))
|
||||
(match-state state
|
||||
(match-conversation c
|
||||
(on-absence (handle-absence 'subscriber c state)))))
|
||||
(match-interest-type 'observer
|
||||
(match-state 'listener-is-running
|
||||
(on-absence (handle-absence)))))
|
||||
(subscriber (cons (tcp:tcp-accept-evt listener) (wild))
|
||||
(on-message
|
||||
[(cons _ (list cin cout))
|
||||
|
@ -136,43 +118,40 @@
|
|||
|
||||
;; TcpAddress TcpAddress InputPort OutputPort -> Transition
|
||||
;;
|
||||
;; Our process state here is a Maybe<TcpConnectionState>, representing
|
||||
;; a shutting-down state if #f.
|
||||
;; Our process state here is either 'open or 'closing.
|
||||
(define (tcp-connection-manager* local-addr remote-addr cin cout)
|
||||
(define (close-transition is-open send-eof?)
|
||||
(transition #f
|
||||
(when is-open
|
||||
(list (when send-eof?
|
||||
(send-message (tcp-channel remote-addr local-addr eof)))
|
||||
(name-process (list 'tcp-connection-closer local-addr remote-addr)
|
||||
(spawn (begin (tcp:tcp-abandon-port cin)
|
||||
(tcp:tcp-abandon-port cout)
|
||||
(transition/no-state (quit)))))))
|
||||
(define (close-transition send-eof?)
|
||||
(transition 'closing
|
||||
(when send-eof? (send-message (tcp-channel remote-addr local-addr eof)))
|
||||
(name-process (list 'tcp-connection-closer local-addr remote-addr)
|
||||
(spawn (begin (tcp:tcp-abandon-port cin)
|
||||
(tcp:tcp-abandon-port cout)
|
||||
(transition/no-state (quit)))))
|
||||
(quit)))
|
||||
|
||||
(transition #t ;; open
|
||||
(transition 'open
|
||||
(subscriber (cons (read-bytes-avail-evt 4096 cin) (wild))
|
||||
(match-state is-open
|
||||
(match-state 'open
|
||||
(on-message
|
||||
[(cons _ (? eof-object?)) (close-transition is-open #t)]
|
||||
[(cons _ (? bytes? bs)) (transition is-open
|
||||
[(cons _ (? eof-object?)) (close-transition #t)]
|
||||
[(cons _ (? bytes? bs)) (transition 'open
|
||||
(send-message (tcp-channel remote-addr local-addr bs)))])))
|
||||
(subscriber (cons (eof-evt cin) (wild))
|
||||
(match-state is-open
|
||||
(on-message [(cons (? evt?) _) (close-transition is-open #t)])))
|
||||
(match-state 'open
|
||||
(on-message [(cons (? evt?) _) (close-transition #t)])))
|
||||
(subscriber (tcp-channel local-addr remote-addr (wild))
|
||||
(match-state is-open
|
||||
(on-absence (close-transition is-open #f))
|
||||
(match-state 'open
|
||||
(on-absence (close-transition #f))
|
||||
(on-message
|
||||
[(tcp-channel (== local-addr) (== remote-addr) subpacket)
|
||||
(match subpacket
|
||||
[(? eof-object?) (close-transition is-open #f)]
|
||||
[(? eof-object?) (close-transition #f)]
|
||||
[(? string? s) (begin (write-string s cout)
|
||||
(flush-output cout)
|
||||
(transition is-open))]
|
||||
(transition 'open))]
|
||||
[(? bytes? bs) (begin (write-bytes bs cout)
|
||||
(flush-output cout)
|
||||
(transition is-open))])])))
|
||||
(transition 'open))])])))
|
||||
(publisher (tcp-channel remote-addr local-addr (wild))
|
||||
(match-state is-open
|
||||
(on-absence (close-transition is-open #f))))))
|
||||
(match-state 'open
|
||||
(on-absence (close-transition #f))))))
|
||||
|
|
Loading…
Reference in New Issue