From 6c9926cb11da768a1c6c6c26e651ae4244740db2 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Tue, 15 Jun 2021 12:36:25 +0200 Subject: [PATCH] Add ConnectionPeer assertions; rename TcpOutbound -> TcpRemote and TcpInbound -> TcpLocal --- syndicate-examples/tcp-client.rkt | 2 +- syndicate-examples/tcp-echo-server.rkt | 2 +- syndicate-examples/tcp-relay-server.rkt | 2 +- syndicate/distributed/tcp-server.rkt | 2 +- syndicate/drivers/tcp.rkt | 65 +++++++++++++------------ syndicate/schemas/tcp.prs | 4 +- 6 files changed, 39 insertions(+), 38 deletions(-) diff --git a/syndicate-examples/tcp-client.rkt b/syndicate-examples/tcp-client.rkt index 7178422..6c63b4e 100644 --- a/syndicate-examples/tcp-client.rkt +++ b/syndicate-examples/tcp-client.rkt @@ -23,7 +23,7 @@ (spawn-tcp-driver ds) (spawn (establish-connection - ds (TcpOutbound host port) + ds (TcpRemote host port) #:initial-mode (Mode-lines (LineMode-lf)) #:on-connected (lambda (peer) (at ds diff --git a/syndicate-examples/tcp-echo-server.rkt b/syndicate-examples/tcp-echo-server.rkt index ed14293..3fb8db9 100644 --- a/syndicate-examples/tcp-echo-server.rkt +++ b/syndicate-examples/tcp-echo-server.rkt @@ -19,5 +19,5 @@ (spawn-tcp-driver ds) (spawn (at ds - (during/spawn (Connection $conn (TcpInbound host port)) + (during/spawn (Connection $conn (TcpLocal host port)) (accept-connection conn #:on-data (lambda (data mode) (send-data conn data mode)))))))) diff --git a/syndicate-examples/tcp-relay-server.rkt b/syndicate-examples/tcp-relay-server.rkt index 151e573..effefae 100644 --- a/syndicate-examples/tcp-relay-server.rkt +++ b/syndicate-examples/tcp-relay-server.rkt @@ -21,7 +21,7 @@ (spawn-tcp-driver ds) (spawn (at ds - (during/spawn (Connection $conn (TcpInbound host port)) + (during/spawn (Connection $conn (TcpLocal host port)) (accept-connection conn #:initial-mode (Mode-lines (LineMode-lf)) #:on-data (lambda (data mode) (send! ds (Line data)))) diff --git a/syndicate/distributed/tcp-server.rkt b/syndicate/distributed/tcp-server.rkt index fa0592f..72d7d11 100644 --- a/syndicate/distributed/tcp-server.rkt +++ b/syndicate/distributed/tcp-server.rkt @@ -27,7 +27,7 @@ (spawn-tcp-driver ds) (spawn #:name 'tcp-server (at ds - (during/spawn (Connection $conn (TcpInbound "0.0.0.0" 5999)) + (during/spawn (Connection $conn (TcpLocal "0.0.0.0" 5999)) (run-relay #:name conn #:packet-writer (lambda (bs) (send-data conn bs)) #:setup-inputs diff --git a/syndicate/drivers/tcp.rkt b/syndicate/drivers/tcp.rkt index 57cd962..40f9c55 100644 --- a/syndicate/drivers/tcp.rkt +++ b/syndicate/drivers/tcp.rkt @@ -33,20 +33,20 @@ (at ds (during/spawn - (Observe (:pattern (Connection ,_ (TcpInbound ,(DLit $host) ,(DLit $port)))) _) - #:name (TcpInbound host port) + (Observe (:pattern (Connection ,_ (TcpLocal ,(DLit $host) ,(DLit $port)))) _) + #:name (TcpLocal host port) (run-listener ds host port)) (during/spawn - (Connection $local-peer (TcpOutbound $host $port)) - #:name (TcpOutbound host port) + (Connection $local-peer (TcpRemote $host $port)) + #:name (TcpRemote host port) (run-outbound ds local-peer host port))))) (define (run-listener ds host port) (on-start (log-syndicate/drivers/tcp-info "+listener on ~v ~v" host port)) (on-stop (log-syndicate/drivers/tcp-info "-listener on ~v ~v" host port)) (linked-thread - #:name (list (TcpInbound host port) 'thread) + #:name (list (TcpLocal host port) 'thread) (lambda (facet) (define listener (tcp-listen port 512 #t host)) (let loop () @@ -54,7 +54,7 @@ (define-values (i o) (parameterize ((current-custodian connection-custodian)) (tcp-accept listener))) (turn! facet - (lambda () (spawn-inbound ds connection-custodian i o (TcpInbound host port)))) + (lambda () (spawn-inbound ds connection-custodian i o (TcpLocal host port)))) (loop))))) (define (run-outbound ds local-peer host port) @@ -66,6 +66,7 @@ (tcp-connect host port))) (lambda () (define name (call-with-values (lambda () (tcp-addresses i #t)) list)) + (at ds (assert (ConnectionPeer local-peer (TcpLocal (car name) (cadr name))))) (actor-add-exit-hook! this-actor (lambda () (close-input-port i) (close-output-port o))) @@ -88,33 +89,33 @@ (define issue-credit #f) (define active-controller #f) (define relay (outbound-relay name o)) + (define handle + (object + #:name (list name 'active-socket) + [#:asserted (ActiveSocket-controller controller) + (log-syndicate/drivers/tcp-debug "~v controller for ~v" controller this-actor) + (when (not active-controller) + (set! issue-credit (start-inbound-relay custodian name (lambda () active-controller) i))) + (set! active-controller controller) + #:retracted + (when (eq? controller active-controller) + (log-syndicate/drivers/tcp-debug "peer withdrawn ~v" this-actor) + (stop-current-facet))] + [#:asserted (ActiveSocket-close message) + (log-syndicate/drivers/tcp-debug "closing ~v:\n~a" this-actor message) + (stop-current-facet)] + [#:asserted (ActiveSocket-Socket (Socket-credit amount mode)) + (if issue-credit + (issue-credit amount mode) + (log-syndicate/drivers/tcp-warning + "Socket-credit ~v/~v ignored because no controller present" amount mode))] + [#:asserted (ActiveSocket-Socket (Socket-data data mode)) + (relay data mode)] + [#:asserted (ActiveSocket-Socket (Socket-eof)) + (close-output-port o)])) (at ds - (assert (Connection - (object - #:name (list name 'active-socket) - [#:asserted (ActiveSocket-controller controller) - (log-syndicate/drivers/tcp-debug "~v controller for ~v" controller this-actor) - (when (not active-controller) - (set! issue-credit - (start-inbound-relay custodian name (lambda () active-controller) i))) - (set! active-controller controller) - #:retracted - (when (eq? controller active-controller) - (log-syndicate/drivers/tcp-debug "peer withdrawn ~v" this-actor) - (stop-current-facet))] - [#:asserted (ActiveSocket-close message) - (log-syndicate/drivers/tcp-debug "closing ~v:\n~a" this-actor message) - (stop-current-facet)] - [#:asserted (ActiveSocket-Socket (Socket-credit amount mode)) - (if issue-credit - (issue-credit amount mode) - (log-syndicate/drivers/tcp-warning - "Socket-credit ~v/~v ignored because no controller present" amount mode))] - [#:asserted (ActiveSocket-Socket (Socket-data data mode)) - (relay data mode)] - [#:asserted (ActiveSocket-Socket (Socket-eof)) - (close-output-port o)]) - spec))))) + (assert (ConnectionPeer handle (TcpRemote (caddr name) (cadddr name)))) + (assert (Connection handle spec))))) (define (start-inbound-relay custodian name target-proc i) (define eof-received? #f) diff --git a/syndicate/schemas/tcp.prs b/syndicate/schemas/tcp.prs index ad854f8..02714a0 100644 --- a/syndicate/schemas/tcp.prs +++ b/syndicate/schemas/tcp.prs @@ -4,8 +4,8 @@ embeddedType EntityRef.Ref . Connection = . ConnectionPeer = . -TcpOutbound = . -TcpInbound = . +TcpRemote = . +TcpLocal = . ActiveSocket = /