TCP client driver, and provides
This commit is contained in:
parent
336811c51e
commit
45e8c29976
|
@ -2,6 +2,14 @@
|
|||
;;; SPDX-License-Identifier: LGPL-3.0-or-later
|
||||
;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
|
||||
|
||||
(provide (all-from-out syndicate/schemas/gen/tcp)
|
||||
spawn-tcp-driver
|
||||
accept-connection
|
||||
establish-connection
|
||||
send-data
|
||||
|
||||
read-bytes-avail)
|
||||
|
||||
(require racket/tcp)
|
||||
(require syndicate/driver-support)
|
||||
(require syndicate/schemas/gen/tcp)
|
||||
|
@ -19,7 +27,12 @@
|
|||
(during/spawn
|
||||
(Observe (:pattern (Connection ,_ (TcpInbound ,(DLit $host) ,(DLit $port)))) _)
|
||||
#:name (TcpInbound host port)
|
||||
(run-listener this-turn ds host port))))))
|
||||
(run-listener this-turn ds host port))
|
||||
|
||||
(during/spawn
|
||||
(Connection $local-peer (TcpOutbound $host $port))
|
||||
#:name (TcpOutbound host port)
|
||||
(run-outbound this-turn ds local-peer host port))))))
|
||||
|
||||
(define run-listener
|
||||
(action (ds host port)
|
||||
|
@ -35,76 +48,95 @@
|
|||
(define connection-custodian (make-custodian))
|
||||
(define-values (i o) (parameterize ((current-custodian connection-custodian))
|
||||
(tcp-accept listener)))
|
||||
(turn-freshen this-turn (action () (spawn-connection this-turn
|
||||
ds
|
||||
connection-custodian
|
||||
i
|
||||
o
|
||||
(TcpInbound host port))))
|
||||
(turn-freshen this-turn (action () (spawn-inbound this-turn
|
||||
ds
|
||||
connection-custodian
|
||||
i
|
||||
o
|
||||
(TcpInbound host port))))
|
||||
(loop))))))
|
||||
|
||||
(define spawn-connection
|
||||
(define run-outbound
|
||||
(action (ds local-peer host port)
|
||||
(define connection-custodian (make-custodian))
|
||||
((with-handlers ([exn:fail:network?
|
||||
(lambda (e)
|
||||
(lambda () (at local-peer (assert (ActiveSocket-close e)))))])
|
||||
(define-values (i o) (parameterize ((current-custodian connection-custodian))
|
||||
(tcp-connect host port)))
|
||||
(lambda ()
|
||||
(define name (call-with-values (lambda () (tcp-addresses i #t)) list))
|
||||
(on-stop (close-input-port i)
|
||||
(close-output-port o))
|
||||
(start-inbound-relay this-turn connection-custodian name local-peer i)
|
||||
(at local-peer
|
||||
(assert (ActiveSocket-controller
|
||||
(ref (entity #:name (list name 'socket)
|
||||
#:message (outbound-relay name o)))))))))))
|
||||
|
||||
(define spawn-inbound
|
||||
(action (ds custodian i o spec)
|
||||
(define name (call-with-values (lambda () (tcp-addresses i #t)) list))
|
||||
(spawn
|
||||
#:name name
|
||||
|
||||
(on-stop (close-input-port i)
|
||||
(close-output-port o))
|
||||
|
||||
(define active-controller #f)
|
||||
(define active-controller-handle #f)
|
||||
(define flush-pending #f)
|
||||
|
||||
(define start-thread
|
||||
(action ()
|
||||
(linked-thread
|
||||
#:name (list name 'input-thread)
|
||||
#:custodian custodian
|
||||
this-turn
|
||||
(ref (entity #:name (list name 'socket-monitor)
|
||||
#:retract (action (_handle) (stop-current-facet))))
|
||||
(lambda ()
|
||||
(let loop ()
|
||||
(define bs (read-bytes-avail i))
|
||||
(when (bytes? bs)
|
||||
(log-syndicate/drivers/tcp-info "inbound data ~v for ~v" bs this-actor)
|
||||
(turn-freshen this-turn (action () (send! active-controller (Socket bs))))
|
||||
(loop)))))))
|
||||
|
||||
(define active-socket
|
||||
(ref (entity #:name (list name 'active-socket)
|
||||
#:assert
|
||||
(action (m handle)
|
||||
(match (parse-ActiveSocket m)
|
||||
[(ActiveSocket-controller controller)
|
||||
(log-syndicate/drivers/tcp-info "~v controller for ~v" controller this-actor)
|
||||
(when (not active-controller) (start-thread this-turn))
|
||||
(set! active-controller controller)
|
||||
(set! active-controller-handle handle)]
|
||||
[(ActiveSocket-close _)
|
||||
(log-syndicate/drivers/tcp-info "closing ~v" this-actor)
|
||||
(stop-current-facet)]))
|
||||
#:retract
|
||||
(action (handle)
|
||||
(log-syndicate/drivers/tcp-info "peer withdrawn ~v" this-actor)
|
||||
(when (equal? handle active-controller-handle)
|
||||
(stop-current-facet)))
|
||||
#:message
|
||||
(action (m)
|
||||
(match (parse-ActiveSocket m)
|
||||
[(ActiveSocket-Socket (Socket payload))
|
||||
(log-syndicate/drivers/tcp-info "outbound data ~v for ~v" payload this-actor)
|
||||
(write-bytes payload o)
|
||||
(when (not flush-pending)
|
||||
(set! flush-pending #t)
|
||||
(facet-on-end-of-turn! this-facet
|
||||
(action ()
|
||||
(set! flush-pending #f)
|
||||
(flush-output o))))])))))
|
||||
|
||||
(at ds
|
||||
(assert (Connection active-socket spec))))))
|
||||
(assert (Connection
|
||||
(ref (entity #:name (list name 'active-socket)
|
||||
#:assert
|
||||
(action (m handle)
|
||||
(match (parse-ActiveSocket m)
|
||||
[(ActiveSocket-controller controller)
|
||||
(log-syndicate/drivers/tcp-info "~v controller for ~v" controller this-actor)
|
||||
(when (not active-controller)
|
||||
(start-inbound-relay this-turn custodian name controller i))
|
||||
(set! active-controller controller)
|
||||
(set! active-controller-handle handle)]
|
||||
[(ActiveSocket-close reason)
|
||||
(log-syndicate/drivers/tcp-info "closing ~v reason ~v" this-actor reason)
|
||||
(stop-current-facet)]))
|
||||
#:retract
|
||||
(action (handle)
|
||||
(log-syndicate/drivers/tcp-info "peer withdrawn ~v" this-actor)
|
||||
(when (equal? handle active-controller-handle)
|
||||
(stop-current-facet)))
|
||||
#:message
|
||||
(outbound-relay name o)))
|
||||
spec))))))
|
||||
|
||||
(define start-inbound-relay
|
||||
(action (custodian name target i)
|
||||
(linked-thread
|
||||
#:name (list name 'input-thread)
|
||||
#:custodian custodian
|
||||
this-turn
|
||||
(ref (entity #:name (list name 'socket-monitor)
|
||||
#:retract (action (_handle) (stop-current-facet))))
|
||||
(lambda ()
|
||||
(let loop ()
|
||||
(define bs (read-bytes-avail i))
|
||||
(when (bytes? bs)
|
||||
(log-syndicate/drivers/tcp-info "inbound data ~v for ~v" bs name)
|
||||
(turn-freshen this-turn (action () (send! target (Socket bs))))
|
||||
(loop)))))))
|
||||
|
||||
(define (outbound-relay name o)
|
||||
(define flush-pending #f)
|
||||
(action (m)
|
||||
(match (parse-ActiveSocket m)
|
||||
[(ActiveSocket-Socket (Socket payload))
|
||||
(log-syndicate/drivers/tcp-info "outbound data ~v for ~v" payload name)
|
||||
(write-bytes payload o)
|
||||
(when (not flush-pending)
|
||||
(set! flush-pending #t)
|
||||
(facet-on-end-of-turn! this-facet
|
||||
(action ()
|
||||
(set! flush-pending #f)
|
||||
(flush-output o))))])))
|
||||
|
||||
(define (read-bytes-avail input-port #:limit [limit 65536])
|
||||
(define buffer (make-bytes limit))
|
||||
|
@ -112,24 +144,99 @@
|
|||
[(? number? count) (subbytes buffer 0 count)]
|
||||
[other other]))
|
||||
|
||||
(define accept-connection
|
||||
(action (conn #:on-data on-data-action)
|
||||
(at conn
|
||||
(assert (ActiveSocket-controller
|
||||
(ref (entity #:name 'inbound-socket-controller
|
||||
#:message
|
||||
(action (m)
|
||||
(match-define (Socket data) (parse-Socket m))
|
||||
(on-data-action this-turn data)))))))))
|
||||
|
||||
(define establish-connection
|
||||
(action (ds spec
|
||||
#:on-connected on-connected-action
|
||||
#:on-data on-data-action
|
||||
#:on-disconnected [on-disconnected-action (action () (stop-current-facet))]
|
||||
#:on-rejected [on-rejected-action (action () (stop-current-facet))]
|
||||
)
|
||||
(at ds
|
||||
(assert (Connection (ref (entity #:name 'outbound-socket
|
||||
#:assert
|
||||
(action (m _handle)
|
||||
(match (parse-ActiveSocket m)
|
||||
[(ActiveSocket-controller peer)
|
||||
(on-connected-action this-turn peer)]
|
||||
[(ActiveSocket-close e)
|
||||
(on-rejected-action this-turn e)]))
|
||||
#:retract
|
||||
(action (_handle) (on-disconnected-action this-turn))
|
||||
#:message
|
||||
(action (m)
|
||||
(match (parse-ActiveSocket m)
|
||||
[(ActiveSocket-Socket (Socket data))
|
||||
(on-data-action this-turn data)]))))
|
||||
spec)))))
|
||||
|
||||
(define send-data
|
||||
(action (conn data)
|
||||
(send! conn (Socket data))))
|
||||
|
||||
(module+ main
|
||||
(require syndicate/drivers/timer)
|
||||
(require racket/cmdline)
|
||||
|
||||
(define run-echo-server
|
||||
(action (ds host port)
|
||||
(spawn
|
||||
(at ds
|
||||
(during/spawn (Connection $conn (TcpInbound host port))
|
||||
(accept-connection this-turn conn
|
||||
#:on-data (action (data)
|
||||
(match data
|
||||
[#"bye\n" (stop-current-facet)]
|
||||
[_ (send-data this-turn conn data)]))))))))
|
||||
|
||||
(define run-echo-client
|
||||
(action (ds host port)
|
||||
(spawn
|
||||
(define-field sink #f)
|
||||
(define-field counter 0)
|
||||
(begin/dataflow
|
||||
(when (sink)
|
||||
(send! (sink) (Socket (string->bytes/utf-8 (format "~a\n" (counter)))))))
|
||||
(establish-connection
|
||||
this-turn ds (TcpOutbound host port)
|
||||
#:on-connected
|
||||
(action (peer) (sink peer))
|
||||
#:on-rejected
|
||||
(action (reason) (log-error "Connection failed: ~v" reason))
|
||||
#:on-disconnected
|
||||
(action () (log-info "Disconnected"))
|
||||
#:on-data
|
||||
(action (data)
|
||||
(log-info "Got: ~v" data)
|
||||
(counter (+ (counter) 1)))))))
|
||||
|
||||
(define host #f)
|
||||
(define port 5999)
|
||||
(define mode 'server)
|
||||
|
||||
(command-line #:once-each
|
||||
[("--host" "-H") hostname "Set host/interface to connect to/listen on"
|
||||
(set! host hostname)]
|
||||
[("--port" "-p") port-number "Set port number to connect to/listen on"
|
||||
(set! port (string->number port-number))]
|
||||
#:once-any
|
||||
["--server" "Server mode"
|
||||
(set! mode 'server)]
|
||||
["--client" "Client mode"
|
||||
(set! mode 'client)])
|
||||
|
||||
(actor-system/dataspace (ds)
|
||||
(spawn-timer-driver this-turn ds)
|
||||
(spawn-tcp-driver this-turn ds)
|
||||
(spawn
|
||||
(at ds
|
||||
(during/spawn (Connection $conn (TcpInbound "0.0.0.0" 5999))
|
||||
(on-start (log-info "Starting service ~a" this-facet))
|
||||
(on-stop (log-info "Stopping service ~a" this-facet))
|
||||
(at conn
|
||||
(assert (ActiveSocket-controller
|
||||
(ref (entity #:message
|
||||
(action (m)
|
||||
(match (parse-Socket m)
|
||||
[(Socket #"bye\n")
|
||||
(log-info "Bye!")
|
||||
(stop-current-facet)]
|
||||
[(Socket data)
|
||||
(log-info "Echoing ~v" data)
|
||||
(send! conn (Socket data))]))))))))))))
|
||||
(match mode
|
||||
['server (run-echo-server this-turn ds (or host "0.0.0.0") port)]
|
||||
['client (run-echo-client this-turn ds (or host "127.0.0.1") port)])))
|
||||
|
|
Loading…
Reference in New Issue