Add ConnectionPeer assertions; rename TcpOutbound -> TcpRemote and TcpInbound -> TcpLocal

This commit is contained in:
Tony Garnock-Jones 2021-06-15 12:36:25 +02:00
parent a0a963f1e2
commit 6c9926cb11
6 changed files with 39 additions and 38 deletions

View File

@ -23,7 +23,7 @@
(spawn-tcp-driver ds) (spawn-tcp-driver ds)
(spawn (spawn
(establish-connection (establish-connection
ds (TcpOutbound host port) ds (TcpRemote host port)
#:initial-mode (Mode-lines (LineMode-lf)) #:initial-mode (Mode-lines (LineMode-lf))
#:on-connected (lambda (peer) #:on-connected (lambda (peer)
(at ds (at ds

View File

@ -19,5 +19,5 @@
(spawn-tcp-driver ds) (spawn-tcp-driver ds)
(spawn (spawn
(at ds (at ds
(during/spawn (Connection $conn (TcpInbound host port)) (during/spawn (Connection $conn (TcpLocal host port))
(accept-connection conn #:on-data (lambda (data mode) (send-data conn data mode)))))))) (accept-connection conn #:on-data (lambda (data mode) (send-data conn data mode))))))))

View File

@ -21,7 +21,7 @@
(spawn-tcp-driver ds) (spawn-tcp-driver ds)
(spawn (spawn
(at ds (at ds
(during/spawn (Connection $conn (TcpInbound host port)) (during/spawn (Connection $conn (TcpLocal host port))
(accept-connection conn (accept-connection conn
#:initial-mode (Mode-lines (LineMode-lf)) #:initial-mode (Mode-lines (LineMode-lf))
#:on-data (lambda (data mode) (send! ds (Line data)))) #:on-data (lambda (data mode) (send! ds (Line data))))

View File

@ -27,7 +27,7 @@
(spawn-tcp-driver ds) (spawn-tcp-driver ds)
(spawn #:name 'tcp-server (spawn #:name 'tcp-server
(at ds (at ds
(during/spawn (Connection $conn (TcpInbound "0.0.0.0" 5999)) (during/spawn (Connection $conn (TcpLocal "0.0.0.0" 5999))
(run-relay #:name conn (run-relay #:name conn
#:packet-writer (lambda (bs) (send-data conn bs)) #:packet-writer (lambda (bs) (send-data conn bs))
#:setup-inputs #:setup-inputs

View File

@ -33,20 +33,20 @@
(at ds (at ds
(during/spawn (during/spawn
(Observe (:pattern (Connection ,_ (TcpInbound ,(DLit $host) ,(DLit $port)))) _) (Observe (:pattern (Connection ,_ (TcpLocal ,(DLit $host) ,(DLit $port)))) _)
#:name (TcpInbound host port) #:name (TcpLocal host port)
(run-listener ds host port)) (run-listener ds host port))
(during/spawn (during/spawn
(Connection $local-peer (TcpOutbound $host $port)) (Connection $local-peer (TcpRemote $host $port))
#:name (TcpOutbound host port) #:name (TcpRemote host port)
(run-outbound ds local-peer host port))))) (run-outbound ds local-peer host port)))))
(define (run-listener ds host port) (define (run-listener ds host port)
(on-start (log-syndicate/drivers/tcp-info "+listener on ~v ~v" host port)) (on-start (log-syndicate/drivers/tcp-info "+listener on ~v ~v" host port))
(on-stop (log-syndicate/drivers/tcp-info "-listener on ~v ~v" host port)) (on-stop (log-syndicate/drivers/tcp-info "-listener on ~v ~v" host port))
(linked-thread (linked-thread
#:name (list (TcpInbound host port) 'thread) #:name (list (TcpLocal host port) 'thread)
(lambda (facet) (lambda (facet)
(define listener (tcp-listen port 512 #t host)) (define listener (tcp-listen port 512 #t host))
(let loop () (let loop ()
@ -54,7 +54,7 @@
(define-values (i o) (parameterize ((current-custodian connection-custodian)) (define-values (i o) (parameterize ((current-custodian connection-custodian))
(tcp-accept listener))) (tcp-accept listener)))
(turn! facet (turn! facet
(lambda () (spawn-inbound ds connection-custodian i o (TcpInbound host port)))) (lambda () (spawn-inbound ds connection-custodian i o (TcpLocal host port))))
(loop))))) (loop)))))
(define (run-outbound ds local-peer host port) (define (run-outbound ds local-peer host port)
@ -66,6 +66,7 @@
(tcp-connect host port))) (tcp-connect host port)))
(lambda () (lambda ()
(define name (call-with-values (lambda () (tcp-addresses i #t)) list)) (define name (call-with-values (lambda () (tcp-addresses i #t)) list))
(at ds (assert (ConnectionPeer local-peer (TcpLocal (car name) (cadr name)))))
(actor-add-exit-hook! this-actor (lambda () (actor-add-exit-hook! this-actor (lambda ()
(close-input-port i) (close-input-port i)
(close-output-port o))) (close-output-port o)))
@ -88,33 +89,33 @@
(define issue-credit #f) (define issue-credit #f)
(define active-controller #f) (define active-controller #f)
(define relay (outbound-relay name o)) (define relay (outbound-relay name o))
(define handle
(object
#:name (list name 'active-socket)
[#:asserted (ActiveSocket-controller controller)
(log-syndicate/drivers/tcp-debug "~v controller for ~v" controller this-actor)
(when (not active-controller)
(set! issue-credit (start-inbound-relay custodian name (lambda () active-controller) i)))
(set! active-controller controller)
#:retracted
(when (eq? controller active-controller)
(log-syndicate/drivers/tcp-debug "peer withdrawn ~v" this-actor)
(stop-current-facet))]
[#:asserted (ActiveSocket-close message)
(log-syndicate/drivers/tcp-debug "closing ~v:\n~a" this-actor message)
(stop-current-facet)]
[#:asserted (ActiveSocket-Socket (Socket-credit amount mode))
(if issue-credit
(issue-credit amount mode)
(log-syndicate/drivers/tcp-warning
"Socket-credit ~v/~v ignored because no controller present" amount mode))]
[#:asserted (ActiveSocket-Socket (Socket-data data mode))
(relay data mode)]
[#:asserted (ActiveSocket-Socket (Socket-eof))
(close-output-port o)]))
(at ds (at ds
(assert (Connection (assert (ConnectionPeer handle (TcpRemote (caddr name) (cadddr name))))
(object (assert (Connection handle spec)))))
#:name (list name 'active-socket)
[#:asserted (ActiveSocket-controller controller)
(log-syndicate/drivers/tcp-debug "~v controller for ~v" controller this-actor)
(when (not active-controller)
(set! issue-credit
(start-inbound-relay custodian name (lambda () active-controller) i)))
(set! active-controller controller)
#:retracted
(when (eq? controller active-controller)
(log-syndicate/drivers/tcp-debug "peer withdrawn ~v" this-actor)
(stop-current-facet))]
[#:asserted (ActiveSocket-close message)
(log-syndicate/drivers/tcp-debug "closing ~v:\n~a" this-actor message)
(stop-current-facet)]
[#:asserted (ActiveSocket-Socket (Socket-credit amount mode))
(if issue-credit
(issue-credit amount mode)
(log-syndicate/drivers/tcp-warning
"Socket-credit ~v/~v ignored because no controller present" amount mode))]
[#:asserted (ActiveSocket-Socket (Socket-data data mode))
(relay data mode)]
[#:asserted (ActiveSocket-Socket (Socket-eof))
(close-output-port o)])
spec)))))
(define (start-inbound-relay custodian name target-proc i) (define (start-inbound-relay custodian name target-proc i)
(define eof-received? #f) (define eof-received? #f)

View File

@ -4,8 +4,8 @@ embeddedType EntityRef.Ref .
Connection = <connection @handle #!ActiveSocket @spec any>. Connection = <connection @handle #!ActiveSocket @spec any>.
ConnectionPeer = <connection-peer @handle #!ActiveSocket @spec any>. ConnectionPeer = <connection-peer @handle #!ActiveSocket @spec any>.
TcpOutbound = <outbound @host string @port int>. TcpRemote = <remote @host string @port int>.
TcpInbound = <inbound @host string @port int>. TcpLocal = <local @host string @port int>.
ActiveSocket = ActiveSocket =
/ <controller @controller #!Sink> / <controller @controller #!Sink>