2021-06-10 08:00:43 +00:00
|
|
|
#lang syndicate
|
|
|
|
;;; SPDX-License-Identifier: LGPL-3.0-or-later
|
|
|
|
;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
|
|
|
|
|
2021-06-10 08:42:59 +00:00
|
|
|
(provide (all-from-out syndicate/schemas/gen/tcp)
|
|
|
|
spawn-tcp-driver
|
|
|
|
accept-connection
|
|
|
|
establish-connection
|
2021-06-11 12:18:53 +00:00
|
|
|
send-credit
|
2021-06-11 13:29:12 +00:00
|
|
|
send-lines-credit
|
|
|
|
send-bytes-credit
|
2021-06-11 12:18:53 +00:00
|
|
|
send-line
|
2021-06-10 08:42:59 +00:00
|
|
|
send-data
|
2021-06-11 12:18:53 +00:00
|
|
|
send-eof)
|
2021-06-10 08:42:59 +00:00
|
|
|
|
2021-06-11 12:18:53 +00:00
|
|
|
(require racket/async-channel)
|
2021-06-10 08:00:43 +00:00
|
|
|
(require racket/tcp)
|
2021-06-11 12:18:53 +00:00
|
|
|
(require racket/port)
|
2021-06-10 11:33:16 +00:00
|
|
|
(require (only-in racket/exn exn->string))
|
2021-06-10 08:00:43 +00:00
|
|
|
(require syndicate/driver-support)
|
2021-06-11 12:18:53 +00:00
|
|
|
(require syndicate/functional-queue)
|
2021-06-10 08:00:43 +00:00
|
|
|
(require syndicate/schemas/gen/tcp)
|
|
|
|
(require syndicate/schemas/gen/dataspace-patterns)
|
|
|
|
|
2021-06-10 14:46:20 +00:00
|
|
|
(require (for-syntax racket/base))
|
|
|
|
|
2021-06-10 08:00:43 +00:00
|
|
|
(define-logger syndicate/drivers/tcp)
|
|
|
|
|
2021-06-10 09:42:07 +00:00
|
|
|
(define (spawn-tcp-driver ds)
|
|
|
|
(spawn
|
|
|
|
#:name 'tcp-driver
|
|
|
|
#:daemon? #t
|
2021-06-10 08:00:43 +00:00
|
|
|
|
2021-06-10 09:42:07 +00:00
|
|
|
(at ds
|
|
|
|
(during/spawn
|
|
|
|
(Observe (:pattern (Connection ,_ (TcpInbound ,(DLit $host) ,(DLit $port)))) _)
|
|
|
|
#:name (TcpInbound host port)
|
|
|
|
(run-listener ds host port))
|
|
|
|
|
|
|
|
(during/spawn
|
|
|
|
(Connection $local-peer (TcpOutbound $host $port))
|
|
|
|
#:name (TcpOutbound host port)
|
|
|
|
(run-outbound ds local-peer host port)))))
|
|
|
|
|
|
|
|
(define (run-listener ds host port)
|
|
|
|
(on-start (log-syndicate/drivers/tcp-info "+listener on ~v ~v" host port))
|
|
|
|
(on-stop (log-syndicate/drivers/tcp-info "-listener on ~v ~v" host port))
|
|
|
|
(linked-thread
|
|
|
|
#:name (list (TcpInbound host port) 'thread)
|
|
|
|
(lambda (facet)
|
|
|
|
(define listener (tcp-listen port 512 #t host))
|
|
|
|
(let loop ()
|
|
|
|
(define connection-custodian (make-custodian))
|
2021-06-10 08:42:59 +00:00
|
|
|
(define-values (i o) (parameterize ((current-custodian connection-custodian))
|
2021-06-10 09:42:07 +00:00
|
|
|
(tcp-accept listener)))
|
|
|
|
(turn! facet
|
|
|
|
(lambda () (spawn-inbound ds connection-custodian i o (TcpInbound host port))))
|
|
|
|
(loop)))))
|
|
|
|
|
|
|
|
(define (run-outbound ds local-peer host port)
|
|
|
|
(define connection-custodian (make-custodian))
|
|
|
|
((with-handlers ([exn:fail:network?
|
|
|
|
(lambda (e)
|
2021-06-10 11:33:16 +00:00
|
|
|
(lambda () (at local-peer (assert (ActiveSocket-close (exn->string e))))))])
|
2021-06-10 09:42:07 +00:00
|
|
|
(define-values (i o) (parameterize ((current-custodian connection-custodian))
|
|
|
|
(tcp-connect host port)))
|
2021-06-10 08:42:59 +00:00
|
|
|
(lambda ()
|
2021-06-10 09:42:07 +00:00
|
|
|
(define name (call-with-values (lambda () (tcp-addresses i #t)) list))
|
2021-06-13 05:55:50 +00:00
|
|
|
(actor-add-exit-hook! this-actor (lambda ()
|
|
|
|
(close-input-port i)
|
|
|
|
(close-output-port o)))
|
2021-06-11 12:18:53 +00:00
|
|
|
(define issue-credit (start-inbound-relay connection-custodian name (lambda () local-peer) i))
|
2021-06-10 14:21:30 +00:00
|
|
|
(define relay (outbound-relay name o))
|
2021-06-10 09:42:07 +00:00
|
|
|
(at local-peer
|
|
|
|
(assert (ActiveSocket-controller
|
2021-06-10 14:21:30 +00:00
|
|
|
(object #:name (list name 'socket)
|
2021-06-11 13:29:12 +00:00
|
|
|
[#:asserted (Socket-credit amount mode) (issue-credit amount mode)]
|
|
|
|
[#:asserted (Socket-data data mode) (relay data mode)]
|
2021-06-11 12:18:53 +00:00
|
|
|
[#:asserted (Socket-eof) (close-output-port o)]))))))))
|
2021-06-10 09:42:07 +00:00
|
|
|
|
|
|
|
(define (spawn-inbound ds custodian i o spec)
|
|
|
|
(define name (call-with-values (lambda () (tcp-addresses i #t)) list))
|
|
|
|
(spawn
|
|
|
|
#:name name
|
2021-06-13 05:55:50 +00:00
|
|
|
(actor-add-exit-hook! this-actor (lambda ()
|
|
|
|
(close-input-port i)
|
|
|
|
(close-output-port o)))
|
2021-06-11 12:18:53 +00:00
|
|
|
(define issue-credit #f)
|
2021-06-10 09:42:07 +00:00
|
|
|
(define active-controller #f)
|
2021-06-10 14:21:30 +00:00
|
|
|
(define relay (outbound-relay name o))
|
2021-06-10 09:42:07 +00:00
|
|
|
(at ds
|
|
|
|
(assert (Connection
|
2021-06-10 14:21:30 +00:00
|
|
|
(object
|
|
|
|
#:name (list name 'active-socket)
|
|
|
|
[#:asserted (ActiveSocket-controller controller)
|
|
|
|
(log-syndicate/drivers/tcp-debug "~v controller for ~v" controller this-actor)
|
|
|
|
(when (not active-controller)
|
2021-06-11 12:18:53 +00:00
|
|
|
(set! issue-credit
|
|
|
|
(start-inbound-relay custodian name (lambda () active-controller) i)))
|
2021-06-10 14:21:30 +00:00
|
|
|
(set! active-controller controller)
|
|
|
|
#:retracted
|
|
|
|
(when (eq? controller active-controller)
|
|
|
|
(log-syndicate/drivers/tcp-debug "peer withdrawn ~v" this-actor)
|
|
|
|
(stop-current-facet))]
|
|
|
|
[#:asserted (ActiveSocket-close message)
|
|
|
|
(log-syndicate/drivers/tcp-debug "closing ~v:\n~a" this-actor message)
|
|
|
|
(stop-current-facet)]
|
2021-06-11 13:29:12 +00:00
|
|
|
[#:asserted (ActiveSocket-Socket (Socket-credit amount mode))
|
2021-06-11 12:18:53 +00:00
|
|
|
(if issue-credit
|
2021-06-11 13:29:12 +00:00
|
|
|
(issue-credit amount mode)
|
2021-06-11 12:18:53 +00:00
|
|
|
(log-syndicate/drivers/tcp-warning
|
2021-06-11 13:29:12 +00:00
|
|
|
"Socket-credit ~v/~v ignored because no controller present" amount mode))]
|
|
|
|
[#:asserted (ActiveSocket-Socket (Socket-data data mode))
|
|
|
|
(relay data mode)]
|
2021-06-11 12:18:53 +00:00
|
|
|
[#:asserted (ActiveSocket-Socket (Socket-eof))
|
|
|
|
(close-output-port o)])
|
2021-06-10 09:42:07 +00:00
|
|
|
spec)))))
|
|
|
|
|
2021-06-11 12:18:53 +00:00
|
|
|
(define (start-inbound-relay custodian name target-proc i)
|
|
|
|
(define eof-received? #f)
|
|
|
|
(define control-ch (make-async-channel))
|
2021-06-10 09:42:07 +00:00
|
|
|
(linked-thread
|
|
|
|
#:name (list name 'input-thread)
|
|
|
|
#:custodian custodian
|
2021-06-11 12:18:53 +00:00
|
|
|
#:peer (object #:name 'inbound-relay-monitor
|
|
|
|
[#:asserted _
|
|
|
|
#:retracted
|
|
|
|
(close-input-port i)
|
|
|
|
(when (not eof-received?) (stop-current-facet))])
|
2021-06-10 09:42:07 +00:00
|
|
|
(lambda (facet)
|
2021-06-11 12:18:53 +00:00
|
|
|
(define (update-count remaining-count mode q)
|
|
|
|
(if (zero? remaining-count)
|
|
|
|
q
|
|
|
|
(undequeue (cons remaining-count mode) q)))
|
2021-06-11 13:29:12 +00:00
|
|
|
(define (eof-and-finish)
|
|
|
|
(log-syndicate/drivers/tcp-debug "inbound eof for ~v" name)
|
|
|
|
(turn! facet (lambda () (send-eof (target-proc)))))
|
2021-06-11 12:18:53 +00:00
|
|
|
(let loop ((credits (make-queue)))
|
|
|
|
(sync (handle-evt control-ch
|
|
|
|
(match-lambda
|
2021-06-11 13:29:12 +00:00
|
|
|
[(list 'credit (CreditAmount-count 0) _mode) (loop credits)]
|
|
|
|
[(list 'credit (CreditAmount-count n) mode)
|
2021-06-11 12:18:53 +00:00
|
|
|
(loop (match (unenqueue* credits)
|
|
|
|
[(list q (cons c (== mode))) (enqueue q (cons (+ c n) mode))]
|
|
|
|
[_ (enqueue credits (cons n mode))]))]
|
2021-06-11 13:29:12 +00:00
|
|
|
[(list 'credit (CreditAmount-unbounded) mode)
|
2021-06-11 12:18:53 +00:00
|
|
|
(loop (match (unenqueue* credits)
|
|
|
|
[(list q (cons _ (== mode))) (enqueue q (cons +inf.0 mode))]
|
|
|
|
[_ (enqueue credits (cons +inf.0 mode))]))]))
|
|
|
|
(match (dequeue* credits)
|
|
|
|
[(list)
|
|
|
|
never-evt]
|
2021-06-11 13:29:12 +00:00
|
|
|
[(list (cons count (and mode (Mode-bytes))) q)
|
2021-06-11 12:18:53 +00:00
|
|
|
(define buffer (make-bytes (inexact->exact (min count 131072))))
|
|
|
|
(handle-evt (read-bytes-avail!-evt buffer i)
|
|
|
|
(match-lambda
|
|
|
|
[(? number? read-count)
|
|
|
|
(define bs (subbytes buffer 0 read-count))
|
|
|
|
(log-syndicate/drivers/tcp-debug "inbound data ~v for ~v" bs name)
|
|
|
|
(turn! facet (lambda () (send-data (target-proc) bs)))
|
|
|
|
(loop (update-count (- count read-count) mode q))]
|
2021-06-11 13:29:12 +00:00
|
|
|
[(? eof-object?) (eof-and-finish)]))]
|
|
|
|
[(list (cons count (and mode (Mode-lines line-mode))) q)
|
|
|
|
(handle-evt (read-bytes-line-evt i (match line-mode
|
|
|
|
[(LineMode-lf) 'linefeed]
|
|
|
|
[(LineMode-crlf) 'return-linefeed]))
|
2021-06-11 12:18:53 +00:00
|
|
|
(match-lambda
|
2021-06-11 13:29:12 +00:00
|
|
|
[(? bytes? line)
|
2021-06-11 12:18:53 +00:00
|
|
|
(log-syndicate/drivers/tcp-debug "inbound line ~v for ~v" line name)
|
|
|
|
(turn! facet (lambda () (send-line (target-proc) line line-mode)))
|
|
|
|
(loop (update-count (- count 1) mode q))]
|
2021-06-11 13:29:12 +00:00
|
|
|
[(? eof-object?) (eof-and-finish)]))])))))
|
|
|
|
(define (issue-credit amount mode)
|
|
|
|
(async-channel-put control-ch (list 'credit amount mode)))
|
2021-06-11 12:18:53 +00:00
|
|
|
issue-credit)
|
2021-06-10 08:42:59 +00:00
|
|
|
|
2021-06-10 14:46:20 +00:00
|
|
|
(define-syntax (EPIPE stx)
|
|
|
|
(local-require ffi/unsafe)
|
|
|
|
(define errno-value (cons (lookup-errno 'EPIPE) 'posix))
|
|
|
|
(syntax-case stx ()
|
|
|
|
[(_) #`'#,errno-value]))
|
|
|
|
|
|
|
|
(define (with-stop-current-facet-on-epipe operation thunk)
|
|
|
|
(with-handlers ([(lambda (e)
|
|
|
|
(and (exn:fail:network:errno? e)
|
|
|
|
(equal? (exn:fail:network:errno-errno e) (EPIPE))))
|
|
|
|
(lambda (e)
|
|
|
|
(log-syndicate/drivers/tcp-debug "epipe while ~a" operation)
|
|
|
|
(stop-current-facet))])
|
|
|
|
(thunk)))
|
|
|
|
|
2021-06-10 08:42:59 +00:00
|
|
|
(define (outbound-relay name o)
|
|
|
|
(define flush-pending #f)
|
2021-06-11 13:29:12 +00:00
|
|
|
(lambda (payload mode)
|
2021-06-10 14:21:30 +00:00
|
|
|
(log-syndicate/drivers/tcp-debug "outbound data ~v for ~v" payload name)
|
2021-06-11 13:29:12 +00:00
|
|
|
(with-stop-current-facet-on-epipe 'writing
|
|
|
|
(lambda ()
|
|
|
|
(write-bytes payload o)
|
|
|
|
(match mode
|
|
|
|
[(Mode-bytes) (void)]
|
|
|
|
[(Mode-lines (LineMode-lf)) (write-bytes #"\n" o)]
|
|
|
|
[(Mode-lines (LineMode-crlf)) (write-bytes #"\r\n" o)])))
|
2021-06-10 14:21:30 +00:00
|
|
|
(when (not flush-pending)
|
|
|
|
(set! flush-pending #t)
|
|
|
|
(facet-on-end-of-turn! this-facet
|
|
|
|
(lambda ()
|
|
|
|
(set! flush-pending #f)
|
2021-06-10 14:46:20 +00:00
|
|
|
(with-stop-current-facet-on-epipe 'flushing
|
|
|
|
(lambda () (flush-output o))))))))
|
2021-06-10 08:00:43 +00:00
|
|
|
|
2021-06-11 12:18:53 +00:00
|
|
|
(define (accept-connection conn
|
|
|
|
#:initial-credit [initial-credit (CreditAmount-unbounded)]
|
2021-06-11 13:29:12 +00:00
|
|
|
#:initial-mode [initial-mode (Mode-bytes)]
|
2021-06-11 12:18:53 +00:00
|
|
|
#:on-data on-data
|
|
|
|
#:on-eof [on-eof void]
|
2021-06-11 13:29:12 +00:00
|
|
|
#:on-credit [on-credit void])
|
2021-06-10 09:42:07 +00:00
|
|
|
(at conn
|
|
|
|
(assert (ActiveSocket-controller
|
2021-06-10 14:21:30 +00:00
|
|
|
(object #:name 'inbound-socket-controller
|
2021-06-11 13:29:12 +00:00
|
|
|
[#:asserted (Socket-credit amount mode) (on-credit amount mode)]
|
|
|
|
[#:asserted (Socket-data data mode) (on-data data mode)]
|
2021-06-11 12:18:53 +00:00
|
|
|
[#:asserted (Socket-eof) (on-eof)]))))
|
2021-06-13 05:55:50 +00:00
|
|
|
(when initial-credit (send-credit conn initial-credit initial-mode))
|
|
|
|
(lambda (#:on-data [new-on-data #f]
|
|
|
|
#:on-eof [new-on-eof #f]
|
|
|
|
#:on-credit [new-on-credit #f])
|
|
|
|
(when new-on-data (set! on-data new-on-data))
|
|
|
|
(when new-on-eof (set! on-eof new-on-eof))
|
|
|
|
(when new-on-credit (set! on-credit new-on-credit))))
|
2021-06-10 09:42:07 +00:00
|
|
|
|
|
|
|
(define (establish-connection ds spec
|
2021-06-11 12:18:53 +00:00
|
|
|
#:initial-credit [initial-credit (CreditAmount-unbounded)]
|
2021-06-11 13:29:12 +00:00
|
|
|
#:initial-mode [initial-mode (Mode-bytes)]
|
2021-06-10 09:42:07 +00:00
|
|
|
#:on-connected on-connected
|
|
|
|
#:on-data on-data
|
2021-06-11 12:18:53 +00:00
|
|
|
#:on-eof [on-eof void]
|
2021-06-11 13:29:12 +00:00
|
|
|
#:on-credit [on-credit void]
|
2021-06-10 09:42:07 +00:00
|
|
|
#:on-disconnected [on-disconnected (lambda () (stop-current-facet))]
|
|
|
|
#:on-rejected [on-rejected (lambda () (stop-current-facet))])
|
|
|
|
(define s
|
2021-06-10 14:21:30 +00:00
|
|
|
(object #:name 'outbound-socket
|
|
|
|
[#:asserted (ActiveSocket-controller peer)
|
|
|
|
(on-connected peer)
|
2021-06-11 12:18:53 +00:00
|
|
|
(when initial-credit (send-credit peer initial-credit initial-mode))
|
2021-06-10 14:21:30 +00:00
|
|
|
#:retracted
|
|
|
|
(on-disconnected)]
|
2021-06-11 12:18:53 +00:00
|
|
|
[#:asserted (ActiveSocket-close message) (on-rejected message)]
|
2021-06-11 13:29:12 +00:00
|
|
|
[#:asserted (ActiveSocket-Socket (Socket-credit amount mode)) (on-credit amount mode)]
|
|
|
|
[#:asserted (ActiveSocket-Socket (Socket-data data mode)) (on-data data mode)]
|
2021-06-11 12:18:53 +00:00
|
|
|
[#:asserted (ActiveSocket-Socket (Socket-eof)) (on-eof)]))
|
2021-06-10 14:21:30 +00:00
|
|
|
(at ds (assert (Connection s spec))))
|
2021-06-10 09:42:07 +00:00
|
|
|
|
2021-06-11 13:29:12 +00:00
|
|
|
(define (send-credit conn amount mode)
|
|
|
|
(send! conn (Socket-credit amount mode)))
|
|
|
|
|
|
|
|
(define (send-lines-credit conn amount [mode (LineMode-lf)])
|
|
|
|
(send-credit conn (CreditAmount-count amount) (Mode-lines mode)))
|
|
|
|
|
|
|
|
(define (send-bytes-credit conn amount)
|
|
|
|
(send-credit conn (CreditAmount-count amount) (Mode-bytes)))
|
|
|
|
|
|
|
|
(define (->bytes data)
|
|
|
|
(if (bytes? data)
|
|
|
|
data
|
|
|
|
(string->bytes/utf-8 data)))
|
2021-06-11 12:18:53 +00:00
|
|
|
|
2021-06-11 13:29:12 +00:00
|
|
|
(define (send-line conn line [line-mode (LineMode-lf)])
|
|
|
|
(send! conn (Socket-data (->bytes line) (Mode-lines line-mode))))
|
2021-06-11 12:18:53 +00:00
|
|
|
|
2021-06-11 13:29:12 +00:00
|
|
|
(define (send-data conn data [mode (Mode-bytes)])
|
|
|
|
(send! conn (Socket-data (->bytes data) mode)))
|
2021-06-11 12:18:53 +00:00
|
|
|
|
|
|
|
(define (send-eof conn)
|
|
|
|
(send! conn (Socket-eof)))
|