; SPDX-License-Identifier: LGPL-3.0-or-later ; Copyright (C) 2010-2021 Tony Garnock-Jones #lang syndicate ;; TCP/IP driver interface. ;; ;; TODO: This protocol is overly simplified. ;; a) no facility for separate shutdown of inbound/outbound streams (provide (struct-out tcp-connection) (struct-out tcp-connection-peer) (struct-out tcp-accepted) (struct-out tcp-rejected) (struct-out tcp-out) (struct-out tcp-in) (struct-out tcp-in-line) (struct-out tcp-address) (struct-out tcp-listener) (all-from-out "../protocol/credit.rkt")) (define-logger syndicate/tcp) (require racket/exn) (require (prefix-in tcp: racket/tcp)) (require (only-in racket/port read-bytes-avail!-evt read-bytes-line-evt)) (require racket/unit) (require net/tcp-sig) (require net/tcp-unit) (require "../support/bytes.rkt") (require "../protocol/credit.rkt") ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Protocol messages (assertion-struct tcp-connection (id spec)) (assertion-struct tcp-connection-peer (id addr)) (assertion-struct tcp-accepted (id)) (assertion-struct tcp-rejected (id exn)) (message-struct tcp-out (id bytes)) (message-struct tcp-in (id bytes)) (message-struct tcp-in-line (id bytes)) (assertion-struct tcp-address (host port)) (assertion-struct tcp-listener (port)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Ground-level communication messages (message-struct raw-tcp-accepted (local-addr remote-addr cin cout)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Driver (spawn #:name 'drivers/tcp (during/spawn (observe (tcp-connection _ (tcp-listener $port))) #:name (list 'drivers/tcp 'listener port) (run-listener port)) (during/spawn (tcp-connection $id (tcp-address $host $port)) #:name (list 'drivers/tcp 'outbound id host port) (match (with-handlers ([exn:fail? (lambda (e) (list e))]) (define-values (cin cout) (tcp:tcp-connect host port)) (list cin cout)) [(list e) (assert (tcp-rejected id e))] [(list cin cout) (assert (tcp-accepted id)) (run-connection id cin cout)]))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Listener (define (run-listener port) (define server-addr (tcp-listener port)) (define listener (tcp:tcp-listen port 128 #t)) (define control-ch (make-channel)) (thread (lambda () (let loop ((credit 1)) ;; NB. not zero initially! (sync (handle-evt control-ch (match-lambda [(list 'credit 'reset) (loop 0)] [(list 'credit (? number? amount)) (loop (+ credit amount))] ['quit (void)])) (if (zero? credit) never-evt (handle-evt (tcp:tcp-accept-evt listener) (lambda (cin+cout) (match-define (list cin cout) cin+cout) (define-values (local-hostname local-port remote-hostname remote-port) (tcp:tcp-addresses cin #t)) (ground-send! (inbound (raw-tcp-accepted server-addr (tcp-address remote-hostname remote-port) cin cout))) (loop (- credit 1))))))) (tcp:tcp-close listener) (signal-background-activity! -1))) (signal-background-activity! +1) (on-stop (channel-put control-ch 'quit)) (on (message (credit* (list server-addr) $amount)) (channel-put control-ch (list 'credit amount))) (on (message (inbound (raw-tcp-accepted server-addr $remote-addr $cin $cout))) (define id (seal (list port remote-addr))) (spawn #:name (list 'drivers/tcp 'inbound id) (assert (tcp-connection id server-addr)) (assert (tcp-connection-peer id remote-addr)) (run-connection id cin cout) (stop-when (asserted (tcp-rejected id _))) (stop-when (retracted (tcp-accepted id)))))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Connection (define (run-connection id cin cout) (define control-ch (make-channel)) (thread (lambda () (connection-thread control-ch id cin))) (signal-background-activity! +1) (define (shutdown-connection!) (when control-ch (channel-put control-ch 'quit) (set! control-ch #f)) (when cout (close-output-port cout) (set! cout #f))) (on-stop (shutdown-connection!)) (on (asserted (observe (credit* (list tcp-out id) _))) (send! (credit tcp-out id +inf.0))) (on (message (credit* (list tcp-in id) $amount)) (when control-ch (channel-put control-ch (list 'credit amount)))) (field [mode 'bytes]) (begin/dataflow (when control-ch (channel-put control-ch (mode)))) (on (message (inbound (tcp-in id $eof-or-bs))) (if (eof-object? eof-or-bs) (stop-current-facet) (send! (match (mode) ['bytes (tcp-in id eof-or-bs)] ['lines (tcp-in-line id eof-or-bs)])))) (during (observe (tcp-in-line id _)) (on-start (mode 'lines)) (on-stop (mode 'bytes))) (define-syntax-rule (trap-exns body ...) (with-handlers ([(lambda (e) (not (exn:break? e))) (lambda (e) (shutdown-connection!) (raise e))]) body ...)) (on (message (tcp-out id $bs)) (trap-exns (if (string? bs) (write-string bs cout) (write-bytes bs cout)) (flush-output cout)))) (define (connection-thread control-ch id cin) (let loop ((credit 0) (mode 'bytes)) (sync (handle-evt control-ch (match-lambda [(list 'credit 'reset) (loop 0 mode)] [(list 'credit (? number? amount)) (loop (+ credit amount) mode)] ['lines (loop credit 'lines)] ['bytes (loop credit 'bytes)] ['quit (void)])) (if (zero? credit) never-evt (handle-evt (match mode ['bytes (read-bytes-avail-evt (inexact->exact (truncate (min credit 32768))) cin)] ['lines (read-bytes-line-evt cin 'any)]) (lambda (eof-or-bs) (ground-send! (inbound (tcp-in id eof-or-bs))) (loop (if (eof-object? eof-or-bs) 0 (- credit (match mode ['bytes (bytes-length eof-or-bs)] ['lines 1]))) mode)))))) (close-input-port cin) (signal-background-activity! -1)) (define (read-bytes-avail-evt len input-port) (guard-evt (lambda () (let ([bstr (make-bytes len)]) (handle-evt (read-bytes-avail!-evt bstr input-port) (lambda (v) (if (number? v) (if (= v len) bstr (subbytes bstr 0 v)) v)))))))