2018-04-29 17:43:39 +00:00
|
|
|
#lang imperative-syndicate
|
|
|
|
;; TCP/IP driver interface.
|
|
|
|
;;
|
2018-05-06 10:24:28 +00:00
|
|
|
;; TODO: This protocol is overly simplified.
|
2019-05-12 12:07:38 +00:00
|
|
|
;; a) no facility for separate shutdown of inbound/outbound streams
|
2018-04-29 17:43:39 +00:00
|
|
|
|
|
|
|
(provide (struct-out tcp-connection)
|
2019-01-28 01:14:58 +00:00
|
|
|
(struct-out tcp-connection-peer)
|
2018-04-29 17:43:39 +00:00
|
|
|
(struct-out tcp-accepted)
|
2019-03-18 23:27:59 +00:00
|
|
|
(struct-out tcp-rejected)
|
2018-04-29 17:43:39 +00:00
|
|
|
(struct-out tcp-out)
|
|
|
|
(struct-out tcp-in)
|
|
|
|
(struct-out tcp-in-line)
|
|
|
|
|
|
|
|
(struct-out tcp-address)
|
2019-05-12 12:07:38 +00:00
|
|
|
(struct-out tcp-listener)
|
|
|
|
|
|
|
|
(all-from-out imperative-syndicate/protocol/credit))
|
2018-04-29 17:43:39 +00:00
|
|
|
|
|
|
|
(define-logger syndicate/tcp)
|
|
|
|
|
|
|
|
(require racket/exn)
|
|
|
|
(require (prefix-in tcp: racket/tcp))
|
2019-05-12 12:07:38 +00:00
|
|
|
(require (only-in racket/port read-bytes-avail!-evt read-bytes-line-evt))
|
2018-04-29 17:43:39 +00:00
|
|
|
|
|
|
|
(require racket/unit)
|
|
|
|
(require net/tcp-sig)
|
|
|
|
(require net/tcp-unit)
|
|
|
|
|
|
|
|
(require syndicate/support/bytes)
|
|
|
|
|
2019-05-12 12:07:38 +00:00
|
|
|
(require imperative-syndicate/protocol/credit)
|
|
|
|
|
2018-04-29 17:43:39 +00:00
|
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
|
|
;; Protocol messages
|
|
|
|
|
|
|
|
(assertion-struct tcp-connection (id spec))
|
2019-01-28 01:14:58 +00:00
|
|
|
(assertion-struct tcp-connection-peer (id addr))
|
2018-04-29 17:43:39 +00:00
|
|
|
(assertion-struct tcp-accepted (id))
|
2019-03-18 23:27:59 +00:00
|
|
|
(assertion-struct tcp-rejected (id exn))
|
2018-04-29 17:43:39 +00:00
|
|
|
(message-struct tcp-out (id bytes))
|
|
|
|
(message-struct tcp-in (id bytes))
|
|
|
|
(message-struct tcp-in-line (id bytes))
|
|
|
|
|
|
|
|
(assertion-struct tcp-address (host port))
|
|
|
|
(assertion-struct tcp-listener (port))
|
|
|
|
|
|
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
|
|
;; Ground-level communication messages
|
|
|
|
|
|
|
|
(message-struct raw-tcp-accepted (local-addr remote-addr cin cout))
|
|
|
|
|
|
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
|
|
;; Driver
|
|
|
|
|
|
|
|
(spawn #:name 'drivers/tcp
|
|
|
|
|
|
|
|
(during/spawn (observe (tcp-connection _ (tcp-listener $port)))
|
|
|
|
#:name (list 'drivers/tcp 'listener port)
|
|
|
|
(run-listener port))
|
|
|
|
|
|
|
|
(during/spawn (tcp-connection $id (tcp-address $host $port))
|
|
|
|
#:name (list 'drivers/tcp 'outbound id host port)
|
2019-03-18 23:27:59 +00:00
|
|
|
(match (with-handlers ([exn:fail? (lambda (e) (list e))])
|
|
|
|
(define-values (cin cout) (tcp:tcp-connect host port))
|
|
|
|
(list cin cout))
|
|
|
|
[(list e) (assert (tcp-rejected id e))]
|
|
|
|
[(list cin cout)
|
|
|
|
(assert (tcp-accepted id))
|
2019-05-12 12:07:38 +00:00
|
|
|
(run-connection id cin cout)])))
|
2018-04-29 17:43:39 +00:00
|
|
|
|
|
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
|
|
;; Listener
|
|
|
|
|
|
|
|
(define (run-listener port)
|
|
|
|
(define server-addr (tcp-listener port))
|
|
|
|
(define listener (tcp:tcp-listen port 128 #t))
|
|
|
|
(define control-ch (make-channel))
|
|
|
|
|
|
|
|
(thread (lambda ()
|
2019-05-12 12:07:38 +00:00
|
|
|
(let loop ((credit 1)) ;; NB. not zero initially!
|
2018-04-29 17:43:39 +00:00
|
|
|
(sync (handle-evt control-ch
|
|
|
|
(match-lambda
|
2019-05-12 12:07:38 +00:00
|
|
|
[(list 'credit 'reset) (loop 0)]
|
|
|
|
[(list 'credit (? number? amount)) (loop (+ credit amount))]
|
2018-04-29 17:43:39 +00:00
|
|
|
['quit (void)]))
|
2019-05-12 12:07:38 +00:00
|
|
|
(if (zero? credit)
|
2018-04-29 17:43:39 +00:00
|
|
|
never-evt
|
|
|
|
(handle-evt (tcp:tcp-accept-evt listener)
|
|
|
|
(lambda (cin+cout)
|
|
|
|
(match-define (list cin cout) cin+cout)
|
|
|
|
(define-values
|
|
|
|
(local-hostname local-port remote-hostname remote-port)
|
|
|
|
(tcp:tcp-addresses cin #t))
|
|
|
|
(ground-send!
|
|
|
|
(inbound
|
|
|
|
(raw-tcp-accepted server-addr
|
|
|
|
(tcp-address remote-hostname remote-port)
|
|
|
|
cin
|
|
|
|
cout)))
|
2019-05-12 12:07:38 +00:00
|
|
|
(loop (- credit 1)))))))
|
2018-04-29 17:43:39 +00:00
|
|
|
(tcp:tcp-close listener)
|
|
|
|
(signal-background-activity! -1)))
|
|
|
|
(signal-background-activity! +1)
|
|
|
|
|
|
|
|
(on-stop (channel-put control-ch 'quit))
|
|
|
|
|
2019-05-12 12:07:38 +00:00
|
|
|
(on (message (credit* (list server-addr) $amount))
|
|
|
|
(channel-put control-ch (list 'credit amount)))
|
|
|
|
|
2018-04-29 17:43:39 +00:00
|
|
|
(on (message (inbound (raw-tcp-accepted server-addr $remote-addr $cin $cout)))
|
|
|
|
(define id (seal (list port remote-addr)))
|
|
|
|
(spawn #:name (list 'drivers/tcp 'inbound id)
|
|
|
|
(assert (tcp-connection id server-addr))
|
2019-01-28 01:14:58 +00:00
|
|
|
(assert (tcp-connection-peer id remote-addr))
|
2019-05-12 12:07:38 +00:00
|
|
|
(run-connection id cin cout)
|
2019-03-18 23:27:59 +00:00
|
|
|
(stop-when (asserted (tcp-rejected id _)))
|
2018-04-29 17:43:39 +00:00
|
|
|
(stop-when (retracted (tcp-accepted id))))))
|
|
|
|
|
|
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
|
|
;; Connection
|
|
|
|
|
|
|
|
(define (run-connection id cin cout)
|
|
|
|
(define control-ch (make-channel))
|
|
|
|
(thread (lambda () (connection-thread control-ch id cin)))
|
|
|
|
(signal-background-activity! +1)
|
|
|
|
|
|
|
|
(define (shutdown-connection!)
|
|
|
|
(when control-ch
|
|
|
|
(channel-put control-ch 'quit)
|
|
|
|
(set! control-ch #f))
|
|
|
|
(when cout
|
|
|
|
(close-output-port cout)
|
|
|
|
(set! cout #f)))
|
|
|
|
|
|
|
|
(on-stop (shutdown-connection!))
|
|
|
|
|
2019-05-12 12:07:38 +00:00
|
|
|
(on (asserted (observe (credit* (list tcp-out id) _)))
|
|
|
|
(send! (credit tcp-out id +inf.0)))
|
|
|
|
|
|
|
|
(on (message (credit* (list tcp-in id) $amount))
|
|
|
|
(when control-ch (channel-put control-ch (list 'credit amount))))
|
|
|
|
|
|
|
|
(field [mode 'bytes])
|
|
|
|
(begin/dataflow (when control-ch (channel-put control-ch (mode))))
|
|
|
|
|
2018-04-29 17:43:39 +00:00
|
|
|
(on (message (inbound (tcp-in id $eof-or-bs)))
|
|
|
|
(if (eof-object? eof-or-bs)
|
|
|
|
(stop-current-facet)
|
2019-05-12 12:07:38 +00:00
|
|
|
(send! (match (mode)
|
|
|
|
['bytes (tcp-in id eof-or-bs)]
|
|
|
|
['lines (tcp-in-line id eof-or-bs)]))))
|
|
|
|
|
|
|
|
(during (observe (tcp-in-line id _))
|
|
|
|
(on-start (mode 'lines))
|
|
|
|
(on-stop (mode 'bytes)))
|
2018-04-29 17:43:39 +00:00
|
|
|
|
|
|
|
(define-syntax-rule (trap-exns body ...)
|
|
|
|
(with-handlers ([(lambda (e) (not (exn:break? e)))
|
|
|
|
(lambda (e)
|
|
|
|
(shutdown-connection!)
|
|
|
|
(raise e))])
|
|
|
|
body ...))
|
|
|
|
|
|
|
|
(on (message (tcp-out id $bs))
|
|
|
|
(trap-exns
|
|
|
|
(if (string? bs)
|
|
|
|
(write-string bs cout)
|
|
|
|
(write-bytes bs cout))
|
2019-05-12 12:07:38 +00:00
|
|
|
(flush-output cout))))
|
2018-04-29 17:43:39 +00:00
|
|
|
|
|
|
|
(define (connection-thread control-ch id cin)
|
2019-05-12 12:07:38 +00:00
|
|
|
(let loop ((credit 0) (mode 'bytes))
|
2018-04-29 17:43:39 +00:00
|
|
|
(sync (handle-evt control-ch
|
|
|
|
(match-lambda
|
2019-05-12 12:07:38 +00:00
|
|
|
[(list 'credit 'reset) (loop 0 mode)]
|
|
|
|
[(list 'credit (? number? amount)) (loop (+ credit amount) mode)]
|
|
|
|
['lines (loop credit 'lines)]
|
|
|
|
['bytes (loop credit 'bytes)]
|
2018-04-29 17:43:39 +00:00
|
|
|
['quit (void)]))
|
2019-05-12 12:07:38 +00:00
|
|
|
(if (zero? credit)
|
2018-04-29 17:43:39 +00:00
|
|
|
never-evt
|
2019-05-12 12:07:38 +00:00
|
|
|
(handle-evt (match mode
|
|
|
|
['bytes (read-bytes-avail-evt (inexact->exact (truncate (min credit 32768))) cin)]
|
|
|
|
['lines (read-bytes-line-evt cin 'any)])
|
2018-04-29 17:43:39 +00:00
|
|
|
(lambda (eof-or-bs)
|
|
|
|
(ground-send! (inbound (tcp-in id eof-or-bs)))
|
2019-05-12 12:07:38 +00:00
|
|
|
(loop (if (eof-object? eof-or-bs)
|
|
|
|
0
|
|
|
|
(- credit (match mode
|
|
|
|
['bytes (bytes-length eof-or-bs)]
|
|
|
|
['lines 1])))
|
|
|
|
mode))))))
|
2018-04-29 17:43:39 +00:00
|
|
|
(close-input-port cin)
|
|
|
|
(signal-background-activity! -1))
|
|
|
|
|
|
|
|
(define (read-bytes-avail-evt len input-port)
|
|
|
|
(guard-evt
|
|
|
|
(lambda ()
|
|
|
|
(let ([bstr (make-bytes len)])
|
|
|
|
(handle-evt
|
|
|
|
(read-bytes-avail!-evt bstr input-port)
|
|
|
|
(lambda (v)
|
|
|
|
(if (number? v)
|
|
|
|
(if (= v len) bstr (subbytes bstr 0 v))
|
|
|
|
v)))))))
|