From d30c6feeeec85d3bb947ad367b1682d1d968ce7e Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Wed, 16 Jun 2021 21:41:53 +0200 Subject: [PATCH] Split out experimental "stream" protocols; make tcp.rkt use them; more inertness checks Also, a few other important changes: - Better printing of entity-ref structs - Inertness checks on assertion retraction (!) and preventer-disarm - Correct selection of active facet during dataflow recomputations - Repair silly omission in turn-assert/dataflow! --- syndicate-examples/tcp-client.rkt | 6 +- syndicate-examples/tcp-echo-server.rkt | 5 +- syndicate-examples/tcp-relay-server.rkt | 6 +- syndicate/actor.rkt | 95 ++----- syndicate/distributed/tcp-server.rkt | 11 +- syndicate/drivers/tcp.rkt | 340 ++++++++++++++++-------- syndicate/entity-ref.rkt | 91 ++++++- syndicate/main.rkt | 2 +- syndicate/schemas/stream.prs | 43 +++ syndicate/schemas/tcp.prs | 24 +- 10 files changed, 403 insertions(+), 220 deletions(-) create mode 100644 syndicate/schemas/stream.prs diff --git a/syndicate-examples/tcp-client.rkt b/syndicate-examples/tcp-client.rkt index 6c63b4e..06c5377 100644 --- a/syndicate-examples/tcp-client.rkt +++ b/syndicate-examples/tcp-client.rkt @@ -25,14 +25,14 @@ (establish-connection ds (TcpRemote host port) #:initial-mode (Mode-lines (LineMode-lf)) - #:on-connected (lambda (peer) + #:on-connected (lambda (source sink) (at ds (when (message (RacketEvent (read-line-evt (current-input-port)) $vs)) (match (car vs) [(? eof-object?) (stop-current-facet (log-info "EOF on stdin."))] - [line (send-line peer line)])))) + [line (send-line sink line)])))) #:on-rejected (lambda (message) (stop-current-facet (log-error "~a" message))) - #:on-disconnected (lambda () (stop-current-facet (log-info "Disconnected"))) + #:on-disconnect (lambda () (stop-current-facet (log-info "Disconnected"))) #:on-data (lambda (line _mode) ;; \e7 DECSC, save cursor position ;; \n\e[A Force a new line if at end of screen, then back up; effect of \r diff --git a/syndicate-examples/tcp-echo-server.rkt b/syndicate-examples/tcp-echo-server.rkt index 3fb8db9..f09b91e 100644 --- a/syndicate-examples/tcp-echo-server.rkt +++ b/syndicate-examples/tcp-echo-server.rkt @@ -19,5 +19,6 @@ (spawn-tcp-driver ds) (spawn (at ds - (during/spawn (Connection $conn (TcpLocal host port)) - (accept-connection conn #:on-data (lambda (data mode) (send-data conn data mode)))))))) + (during/spawn (StreamConnection $source $sink (TcpLocal host port)) + (handle-connection source sink + #:on-data (lambda (data mode) (send-data sink data mode)))))))) diff --git a/syndicate-examples/tcp-relay-server.rkt b/syndicate-examples/tcp-relay-server.rkt index effefae..a7169af 100644 --- a/syndicate-examples/tcp-relay-server.rkt +++ b/syndicate-examples/tcp-relay-server.rkt @@ -21,8 +21,8 @@ (spawn-tcp-driver ds) (spawn (at ds - (during/spawn (Connection $conn (TcpLocal host port)) - (accept-connection conn + (during/spawn (StreamConnection $source $sink (TcpLocal host port)) + (handle-connection source sink #:initial-mode (Mode-lines (LineMode-lf)) #:on-data (lambda (data mode) (send! ds (Line data)))) - (at ds (when (message (Line $data)) (send-line conn data)))))))) + (at ds (when (message (Line $data)) (send-line sink data)))))))) diff --git a/syndicate/actor.rkt b/syndicate/actor.rkt index a0a07d2..b67c8ee 100644 --- a/syndicate/actor.rkt +++ b/syndicate/actor.rkt @@ -2,8 +2,8 @@ ;;; SPDX-License-Identifier: LGPL-3.0-or-later ;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones -(provide (except-out (struct-out entity) entity) - (rename-out [make-entity entity]) +(provide (struct-out ) + entity current-turn @@ -67,7 +67,6 @@ (require (only-in preserves preserve=?)) (require racket/match) (require (only-in racket/exn exn->string)) -(require struct-defaults) (require "rewrite.rkt") (require "engine.rkt") @@ -76,58 +75,8 @@ (require "field.rkt") (require "support/counter.rkt") -(struct entity (id name assert retract message sync data) - #:methods gen:custom-write - [(define (write-proc e port mode) - (fprintf port "#" (entity-id e) (entity-name e)))]) -(define-struct-defaults make-entity entity - (#:_id [entity-id (generate-entity-id)] - #:name [entity-name '?] - #:assert [entity-assert #f] - #:retract [entity-retract #f] - #:message [entity-message #f] - #:sync [entity-sync #f] - #:data [entity-data (void)])) - (struct outbound-assertion (handle peer [established? #:mutable])) -(struct actor (id - name - engine - [daemon? #:mutable] - dataflow - [root #:mutable] - [exit-reason #:mutable] ;; #f -> running, #t -> terminated OK, exn -> error - [exit-hooks #:mutable]) - #:methods gen:custom-write - [(define (write-proc a port mode) - (fprintf port "#" (engine-id (actor-engine a)) (actor-id a) (actor-name a)))]) - -(struct facet (id - actor - parent - children - outbound - [end-of-turn-actions #:mutable] - [shutdown-actions #:mutable] - [live? #:mutable] - [inert-check-preventers #:mutable]) - #:methods gen:custom-write - [(define (write-proc f port mode) - (local-require (only-in racket/string string-join)) - (fprintf port "#" - (engine-id (actor-engine (facet-actor f))) - (actor-id (facet-actor f)) - (actor-name (facet-actor f)) - (string-join (reverse (let loop ((f f)) - (if (facet-parent f) - (cons (number->string (facet-id f)) (loop (facet-parent f))) - '()))) - "/") - (if (facet-live? f) - "" - ":(DEAD)")))]) - (struct turn (id active-facet [queues #:mutable]) @@ -139,7 +88,6 @@ (define current-turn (make-parameter #f)) -(define generate-entity-id (make-counter)) (define generate-actor-id (make-counter)) (define generate-turn-id (make-counter)) (define generate-handle (make-counter)) @@ -293,7 +241,10 @@ (lambda () (when armed (set! armed #f) - (set-facet-inert-check-preventers! f (- (facet-inert-check-preventers f) 1))))) + (let ((n (- (facet-inert-check-preventers f) 1))) + (set-facet-inert-check-preventers! f n) + (when (zero? n) + (check-for-inertness (current-turn) f)))))) (define (facet-terminate! f orderly?) (when (facet-live? f) @@ -429,7 +380,9 @@ (lambda () (log-syndicate/actor-debug " DELIVER link to ~v assert ~v handle ~v" linked-peer link-message handle) - (deliver (entity-assert link-entity) link-message handle))))))) + (deliver (entity-assert link-entity) link-message handle)))) + (when (hash-empty? o) + (check-for-inertness (current-turn) f))))) (define (turn-stop-actor-system! turn) (define ac (facet-actor (turn-active-facet turn))) @@ -458,7 +411,7 @@ (define (turn-dataflow! turn action) (define f (turn-active-facet turn)) - (define (wrapped) (when (facet-live? f) (action))) + (define (wrapped) (when (facet-live? f) (with-active-facet f action))) (parameterize ((current-dataflow-subject-id wrapped)) (wrapped))) (define (turn-assert/dataflow! turn peer assertion-action) @@ -468,6 +421,7 @@ (lambda () (define new-assertion (assertion-action)) (when (not (preserve=? assertion new-assertion)) + (set! assertion new-assertion) (set! handle (turn-replace! (current-turn) peer handle new-assertion)))))) (define (turn-assert! turn peer assertion) @@ -502,7 +456,6 @@ new-handle) (define (turn-retract!* turn a) - (hash-remove! (facet-outbound (turn-active-facet turn)) (outbound-assertion-handle a)) (log-syndicate/actor-debug " ENQ at ~v retract handle ~v" (outbound-assertion-peer a) (outbound-assertion-handle a)) @@ -518,10 +471,15 @@ (when (outbound-assertion-established? a) (set-outbound-assertion-established?! a #f) (deliver (entity-retract (entity-ref-target (outbound-assertion-peer a))) - (outbound-assertion-handle a)))))) + (outbound-assertion-handle a))))) + (let* ((f (turn-active-facet turn)) + (o (facet-outbound f))) + (hash-remove! o (outbound-assertion-handle a)) + (when (hash-empty? o) + (check-for-inertness turn f)))) (define (turn-sync! turn peer k) - (turn-sync!* turn peer (turn-ref turn (make-entity #:message k)))) + (turn-sync!* turn peer (turn-ref turn (entity #:message k)))) (define (turn-sync!* turn peer-to-sync-with peer-k) (log-syndicate/actor-debug " ENQ sync ~v" peer-to-sync-with) @@ -556,12 +514,17 @@ (lambda () (define f (turn-active-facet (current-turn))) (action) - (turn-enqueue! (current-turn) - f - (lambda () - (when (or (and (facet-parent f) (not (facet-live? (facet-parent f)))) - (facet-inert? f)) - (turn-stop!)))))) + (check-for-inertness (current-turn) f))) + +(define (check-for-inertness turn f) + (log-syndicate/actor-debug " ENQ checking ~a" f) + (turn-enqueue! turn + f + (lambda () + (log-syndicate/actor-debug " DEQ checking ~a" f) + (when (or (and (facet-parent f) (not (facet-live? (facet-parent f)))) + (facet-inert? f)) + (turn-stop!))))) (define (deliver maybe-proc . args) (when maybe-proc diff --git a/syndicate/distributed/tcp-server.rkt b/syndicate/distributed/tcp-server.rkt index 72d7d11..41af355 100644 --- a/syndicate/distributed/tcp-server.rkt +++ b/syndicate/distributed/tcp-server.rkt @@ -27,14 +27,15 @@ (spawn-tcp-driver ds) (spawn #:name 'tcp-server (at ds - (during/spawn (Connection $conn (TcpLocal "0.0.0.0" 5999)) - (run-relay #:name conn - #:packet-writer (lambda (bs) (send-data conn bs)) + (during/spawn (StreamConnection $source $sink (TcpLocal "0.0.0.0" 5999)) + #:name (list 'tcp-server source) + (run-relay #:packet-writer (lambda (bs) (send-data sink bs)) #:setup-inputs (lambda (tr) - (accept-connection conn #:on-data (lambda (d _m) (accept-bytes tr d)))) + (handle-connection source sink + #:on-data (lambda (d _m) (accept-bytes tr d)))) #:initial-ref - (object #:name (list conn 'gatekeeper) + (object #:name 'gatekeeper [(Resolve unvalidated-sturdyref observer) (at ds (during (Bind (SturdyRef-oid unvalidated-sturdyref) $key $target) diff --git a/syndicate/drivers/tcp.rkt b/syndicate/drivers/tcp.rkt index 553194a..e1d72dc 100644 --- a/syndicate/drivers/tcp.rkt +++ b/syndicate/drivers/tcp.rkt @@ -2,11 +2,16 @@ ;;; SPDX-License-Identifier: LGPL-3.0-or-later ;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones -(provide (all-from-out syndicate/schemas/gen/tcp) +(provide (all-from-out syndicate/schemas/gen/stream) + (all-from-out syndicate/schemas/gen/tcp) spawn-tcp-driver - accept-connection - assert-control + + handle-connection + make-source + make-sink + make-connection-handler establish-connection + send-credit send-lines-credit send-bytes-credit @@ -21,6 +26,7 @@ (require (only-in racket/exn exn->string)) (require syndicate/driver-support) (require syndicate/functional-queue) +(require syndicate/schemas/gen/stream) (require syndicate/schemas/gen/tcp) (require syndicate/schemas/gen/dataspace-patterns) @@ -35,95 +41,103 @@ (at ds (during/spawn - (Observe (:pattern (Connection ,_ (TcpLocal ,(DLit $host) ,(DLit $port)))) _) + (Observe (:pattern (StreamConnection ,_ ,_ (TcpLocal ,(DLit $host) ,(DLit $port)))) _) + #:name (list 'simple (TcpLocal host port)) + (define spec (TcpLocal host port)) + (at ds + (assert (StreamListener spec + (make-connection-handler + (lambda (source sink) + (at ds (assert (StreamConnection source sink spec))))))))) + + (during/spawn (StreamConnection $app-source $app-sink (TcpRemote $host $port)) + #:name (list 'simple (TcpRemote host port)) + (define spec (TcpRemote host port)) + (at ds + (assert (StreamConnect spec + (object #:name 'connection-peer + [(ConnectionHandler-connected tcp-source tcp-sink) + (at tcp-source (assert (Source-sink app-sink))) + (at tcp-sink (assert (Sink-source app-source)))] + [(ConnectionHandler-rejected message) + (log-syndicate/drivers/tcp-error + "Connection to ~a rejected: ~a" spec message) + (stop-current-facet)]))))) + + (during/spawn (StreamListener (TcpLocal $host $port) $peer) #:name (TcpLocal host port) - (run-listener ds host port)) + (run-listener ds peer host port)) - (during/spawn - (Connection $local-peer (TcpRemote $host $port)) + (during/spawn (StreamConnect (TcpRemote $host $port) $peer) #:name (TcpRemote host port) - (run-outbound ds local-peer host port))))) + (run-outbound ds peer host port))))) -(define (run-listener ds host port) +(define (run-listener ds peer host port) (on-start (log-syndicate/drivers/tcp-info "+listener on ~v ~v" host port)) (on-stop (log-syndicate/drivers/tcp-info "-listener on ~v ~v" host port)) (linked-thread - #:name (list (TcpLocal host port) 'thread) + #:name (list 'listen-thread host port) (lambda (facet) (define listener (tcp-listen port 512 #t host)) (let loop () (define connection-custodian (make-custodian)) (define-values (i o) (parameterize ((current-custodian connection-custodian)) (tcp-accept listener))) - (turn! facet - (lambda () (spawn-inbound ds connection-custodian i o (TcpLocal host port)))) + (turn! facet (lambda () (spawn-connection ds connection-custodian i o peer))) (loop))))) -(define (run-outbound ds local-peer host port) - (define connection-custodian (make-custodian)) +(define (tcp-ends p) + (call-with-values (lambda () (tcp-addresses p #t)) + (lambda (lh lp rh rp) (list (TcpLocal lh lp) (TcpRemote rh rp))))) + +(define (spawn-connection ds custodian i o peer) + (match-define (and ends (list (and local-end (TcpLocal local-host local-port)) + (and remote-end (TcpRemote remote-host remote-port)))) + (tcp-ends i)) + (define name (format "[~a:~a::~a:~a]" local-host local-port remote-host remote-port)) + (spawn #:name name + (actor-add-exit-hook! this-actor (lambda () + (close-input-port i) + (close-output-port o))) + + (define-field facet-count 2) + (define source #f) + (define sink #f) + + (react (on-stop (facet-count (- (facet-count) 1)) + (close-input-port i)) + (define active-sink #f) + (define issue-credit (start-inbound-relay custodian (lambda () active-sink) i)) + (set! source (make-source #:name 'socket-in + #:on-connect (lambda (new-sink) (set! active-sink new-sink)) + #:on-credit issue-credit)) + (at ds (assert (TcpPeerInfo source local-end remote-end)))) + + (react (on-stop (facet-count (- (facet-count) 1)) + (close-output-port o)) + (set! sink (make-sink #:name 'socket-out + #:on-data (outbound-relay o) + #:on-eof (lambda () (stop-current-facet)))) + (at ds (assert (TcpPeerInfo sink local-end remote-end)))) + + (at peer (assert #:when (positive? (facet-count)) + (ConnectionHandler-connected source sink))))) + +(define (run-outbound ds peer host port) ((with-handlers ([exn:fail:network? (lambda (e) - (lambda () (at local-peer (assert (ActiveSocket-close (exn->string e))))))]) + (lambda () + (at peer (assert (ConnectionHandler-rejected (exn->string e))))))]) + (define connection-custodian (make-custodian)) (define-values (i o) (parameterize ((current-custodian connection-custodian)) (tcp-connect host port))) - (lambda () - (define name (call-with-values (lambda () (tcp-addresses i #t)) list)) - (at ds (assert (ConnectionPeer local-peer (TcpLocal (car name) (cadr name))))) - (actor-add-exit-hook! this-actor (lambda () - (close-input-port i) - (close-output-port o))) - (define issue-credit (start-inbound-relay connection-custodian name (lambda () local-peer) i)) - (define relay (outbound-relay name o)) - (at local-peer - (assert (ActiveSocket-controller - (object #:name (list name 'socket) - [#:message (Socket-credit amount mode) (issue-credit amount mode)] - [#:message (Socket-data data mode) (relay data mode)] - [#:message (Socket-eof) (close-output-port o)])))))))) + (lambda () (spawn-connection ds connection-custodian i o peer))))) -(define (spawn-inbound ds custodian i o spec) - (define name (call-with-values (lambda () (tcp-addresses i #t)) list)) - (spawn - #:name name - (actor-add-exit-hook! this-actor (lambda () - (close-input-port i) - (close-output-port o))) - (define issue-credit #f) - (define active-controller #f) - (define relay (outbound-relay name o)) - (define handle - (object - #:name (list name 'active-socket) - [#:asserted (ActiveSocket-controller controller) - (log-syndicate/drivers/tcp-debug "~v controller for ~v" controller this-actor) - (when (not active-controller) - (set! issue-credit (start-inbound-relay custodian name (lambda () active-controller) i))) - (set! active-controller controller) - #:retracted - (when (eq? controller active-controller) - (log-syndicate/drivers/tcp-debug "peer withdrawn ~v" this-actor) - (stop-current-facet))] - [#:asserted (ActiveSocket-close message) - (log-syndicate/drivers/tcp-debug "closing ~v:\n~a" this-actor message) - (stop-current-facet)] - [#:message (ActiveSocket-Socket (Socket-credit amount mode)) - (if issue-credit - (issue-credit amount mode) - (log-syndicate/drivers/tcp-warning - "Socket-credit ~v/~v ignored because no controller present" amount mode))] - [#:message (ActiveSocket-Socket (Socket-data data mode)) - (relay data mode)] - [#:message (ActiveSocket-Socket (Socket-eof)) - (close-output-port o)])) - (at ds - (assert (ConnectionPeer handle (TcpRemote (caddr name) (cadddr name)))) - (assert (Connection handle spec))))) - -(define (start-inbound-relay custodian name target-proc i) +(define (start-inbound-relay custodian target-proc i) (define eof-received? #f) (define control-ch (make-async-channel)) (linked-thread - #:name (list name 'input-thread) + #:name (cons 'input-thread (tcp-ends i)) #:custodian custodian #:peer (object #:name 'inbound-relay-monitor [#:asserted _ @@ -136,7 +150,7 @@ q (undequeue (cons remaining-count mode) q))) (define (eof-and-finish) - (log-syndicate/drivers/tcp-debug "inbound eof for ~v" name) + (log-syndicate/drivers/tcp-debug "inbound eof for ~a" (tcp-ends i)) (turn! facet (lambda () (send-eof (target-proc))))) (let loop ((credits (make-queue))) (sync (handle-evt control-ch @@ -161,7 +175,7 @@ (match-lambda [(? number? read-count) (define bs (subbytes buffer 0 read-count)) - (log-syndicate/drivers/tcp-debug "inbound data ~v for ~v" bs name) + (log-syndicate/drivers/tcp-debug "inbound data ~v for ~a" bs (tcp-ends i)) (turn! facet (lambda () (send-data (target-proc) bs))) (loop (update-count (- count read-count) mode q))] [(? eof-object?) (eof-and-finish)]))] @@ -170,13 +184,13 @@ (match-lambda [(? bytes? packet) #:when (< (bytes-length packet) packet-size) (log-syndicate/drivers/tcp-debug - "short inbound packet (length ~a; expected ~a bytes) ~v for ~v" - (bytes-length packet) packet-size packet name) + "short inbound packet (length ~a; expected ~a bytes) ~v for ~a" + (bytes-length packet) packet-size packet (tcp-ends i)) (eof-and-finish)] [(? bytes? packet) (log-syndicate/drivers/tcp-debug - "inbound packet (length ~a) ~v for ~v" - (bytes-length packet) packet name) + "inbound packet (length ~a) ~v for ~a" + (bytes-length packet) packet (tcp-ends i)) (turn! facet (lambda () (send-data (target-proc) packet mode))) (loop (update-count (- count 1) mode q))] [(? eof-object?) (eof-and-finish)]))] @@ -186,7 +200,7 @@ [(LineMode-crlf) 'return-linefeed])) (match-lambda [(? bytes? line) - (log-syndicate/drivers/tcp-debug "inbound line ~v for ~v" line name) + (log-syndicate/drivers/tcp-debug "inbound line ~v for ~a" line (tcp-ends i)) (turn! facet (lambda () (send-line (target-proc) line line-mode))) (loop (update-count (- count 1) mode q))] [(? eof-object?) (eof-and-finish)]))]))))) @@ -209,10 +223,10 @@ (stop-current-facet))]) (thunk))) -(define (outbound-relay name o) +(define (outbound-relay o) (define flush-pending #f) (lambda (payload mode) - (log-syndicate/drivers/tcp-debug "outbound data ~v for ~v" payload name) + (log-syndicate/drivers/tcp-debug "outbound data ~v on ~a" payload (tcp-ends o)) (with-stop-current-facet-on-epipe 'writing (lambda () (write-bytes payload o) @@ -228,53 +242,145 @@ (with-stop-current-facet-on-epipe 'flushing (lambda () (flush-output o)))))))) -(define (accept-connection conn +;;--------------------------------------------------------------------------- + +(define (handle-connection source sink + #:on-disconnect [on-disconnect #f] + #:on-error [on-error #f] + #:on-credit [on-credit void] #:initial-credit [initial-credit (CreditAmount-unbounded)] #:initial-mode [initial-mode (Mode-bytes)] #:on-data on-data - #:on-eof [on-eof void] - #:on-credit [on-credit void]) - (assert-control conn - #:on-data on-data - #:on-eof on-eof - #:on-credit on-credit) - (when initial-credit (send-credit conn initial-credit initial-mode))) + #:on-eof [on-eof void]) + (make-source #:initial-sink sink + #:name 'app-out + #:on-disconnect on-disconnect #:on-error on-error + #:on-credit on-credit) + (make-sink #:initial-source source + #:name 'app-in + #:on-disconnect on-disconnect #:on-error on-error + #:on-data on-data #:on-eof on-eof) + (when initial-credit (send-credit source initial-credit initial-mode))) -(define (assert-control conn - #:on-data on-data - #:on-eof [on-eof void] - #:on-credit [on-credit void]) - (at conn - (assert (ActiveSocket-controller - (object #:name 'inbound-socket-controller - [#:message (Socket-credit amount mode) (on-credit amount mode)] - [#:message (Socket-data data mode) (on-data data mode)] - [#:message (Socket-eof) (on-eof)]))))) +(define (make-source + #:initial-sink [initial-sink #f] + #:name [name (gensym 'source)] + #:on-connect [on-connect (lambda (new-sink) (void))] + #:on-disconnect [on-disconnect0 #f] + #:on-error [on-error0 #f] + #:on-credit [on-credit (lambda (amount mode) (void))]) + (define sink #f) + (define handle #f) + (define (set-sink! new-sink) + (when (not (eq? sink new-sink)) + (on-connect new-sink) + (set! sink new-sink) + (set! handle (turn-replace! this-turn sink handle + (if sink (->preserve (Sink-source self)) (void)))))) + + (define on-disconnect + (or on-disconnect0 (lambda () + (log-syndicate/drivers/tcp-debug "~a disconnected" self) + (stop-current-facet)))) + (define on-error + (or on-error0 (lambda (message) + (log-syndicate/drivers/tcp-debug "~a error: ~v" self message) + (stop-current-facet)))) + + (define self + (object #:name name + [#:asserted (Source-sink new-sink) (set-sink! new-sink) + #:retracted (when (eq? sink new-sink) + (set-sink! #f) + (on-disconnect))] + [#:asserted (StreamError message) (on-error message)] + [#:message (Source-credit amount mode) (on-credit amount mode)])) + + (set-sink! initial-sink) + self) + +(define (make-sink + #:initial-source [initial-source #f] + #:name [name (gensym 'sink)] + #:on-connect [on-connect (lambda (new-source) (void))] + #:on-disconnect [on-disconnect0 #f] + #:on-error [on-error0 #f] + #:on-data on-data + #:on-eof [on-eof (lambda () (void))]) + (define source #f) + (define handle #f) + (define (set-source! new-source) + (when (not (eq? new-source source)) + (on-connect new-source) + (set! source new-source) + (set! handle (turn-replace! this-turn source handle + (if source (->preserve (Source-sink self)) (void)))))) + + (define on-disconnect + (or on-disconnect0 (lambda () + (log-syndicate/drivers/tcp-debug "~a disconnected" self) + (stop-current-facet)))) + (define on-error + (or on-error0 (lambda (message) + (log-syndicate/drivers/tcp-debug "~a error: ~v" self message) + (stop-current-facet)))) + + (define self + (object #:name name + [#:asserted (Sink-source new-source) (set-source! new-source) + #:retracted (when (eq? source new-source) + (set-source! #f) + (on-disconnect))] + [#:asserted (StreamError message) (on-error message)] + [#:message (Sink-data payload mode) (on-data payload mode)] + [#:message (Sink-eof) (on-eof)])) + + (set-source! initial-source) + self) + +(define (make-connection-handler on-connected #:name [name (gensym 'connection-handler)]) + (object #:name name + [(ConnectionHandler-connected source sink) + (on-connected source sink)] + [(ConnectionHandler-rejected message) + (error 'connection-handler "~a" message)])) (define (establish-connection ds spec + #:name [name (gensym 'establish-connection)] + + #:on-connected [on-connected (lambda (source sink) (void))] + #:on-rejected [on-rejected #f] + + #:on-disconnect [on-disconnect #f] + #:on-error [on-error #f] + #:on-credit [on-credit void] #:initial-credit [initial-credit (CreditAmount-unbounded)] #:initial-mode [initial-mode (Mode-bytes)] - #:on-connected on-connected #:on-data on-data - #:on-eof [on-eof void] - #:on-credit [on-credit void] - #:on-disconnected [on-disconnected (lambda () (stop-current-facet))] - #:on-rejected [on-rejected (lambda () (stop-current-facet))]) - (define s - (object #:name 'outbound-socket - [#:asserted (ActiveSocket-controller peer) - (on-connected peer) - (when initial-credit (send-credit peer initial-credit initial-mode)) - #:retracted - (on-disconnected)] - [#:asserted (ActiveSocket-close message) (on-rejected message)] - [#:message (ActiveSocket-Socket (Socket-credit amount mode)) (on-credit amount mode)] - [#:message (ActiveSocket-Socket (Socket-data data mode)) (on-data data mode)] - [#:message (ActiveSocket-Socket (Socket-eof)) (on-eof)])) - (at ds (assert (Connection s spec)))) + #:on-eof [on-eof void]) + (define peer + (object #:name name + [#:asserted (ConnectionHandler-connected source sink) + (handle-connection source sink + #:on-disconnect on-disconnect + #:on-error on-error + #:on-credit on-credit + #:initial-credit initial-credit + #:initial-mode initial-mode + #:on-data on-data + #:on-eof on-eof) + (stop-facet ringing-facet) + (on-connected source sink)] + [#:asserted (ConnectionHandler-rejected message) + (stop-facet ringing-facet) + ((or on-rejected (lambda (_message) (stop-current-facet))) message)])) + (define ringing-facet (react (at ds (assert (StreamConnect spec peer))))) + (void)) + +;;--------------------------------------------------------------------------- (define (send-credit conn amount mode) - (send! conn (Socket-credit amount mode))) + (send! conn (Source-credit amount mode))) (define (send-lines-credit conn amount [mode (LineMode-lf)]) (send-credit conn (CreditAmount-count amount) (Mode-lines mode))) @@ -291,10 +397,10 @@ (string->bytes/utf-8 data))) (define (send-line conn line [line-mode (LineMode-lf)]) - (send! conn (Socket-data (->bytes line) (Mode-lines line-mode)))) + (send! conn (Sink-data (->bytes line) (Mode-lines line-mode)))) (define (send-data conn data [mode (Mode-bytes)]) - (send! conn (Socket-data (->bytes data) mode))) + (send! conn (Sink-data (->bytes data) mode))) (define (send-eof conn) - (send! conn (Socket-eof))) + (send! conn (Sink-eof))) diff --git a/syndicate/entity-ref.rkt b/syndicate/entity-ref.rkt index 6769d81..6ebeb20 100644 --- a/syndicate/entity-ref.rkt +++ b/syndicate/entity-ref.rkt @@ -2,10 +2,97 @@ ;;; SPDX-License-Identifier: LGPL-3.0-or-later ;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones -(provide (struct-out entity-ref) +(provide (except-out (struct-out entity) entity) + (rename-out [make-entity entity] [entity ]) + + (struct-out actor) + (struct-out facet) + (struct-out entity-ref) + parse-Ref!) -(struct entity-ref (relay target attenuation) #:transparent) +(require racket/match) +(require (only-in racket/string string-join)) + +(require struct-defaults) +(require "support/counter.rkt") +(require "engine.rkt") + +(struct entity (id name assert retract message sync data) + #:methods gen:custom-write + [(define (write-proc e port mode) + (fprintf port "#" (entity-id e) (entity-name e)))]) + +(define generate-entity-id (make-counter)) + +(define-struct-defaults make-entity entity + (#:_id [entity-id (generate-entity-id)] + #:name [entity-name '?] + #:assert [entity-assert #f] + #:retract [entity-retract #f] + #:message [entity-message #f] + #:sync [entity-sync #f] + #:data [entity-data (void)])) + +(struct actor (id + name + engine + [daemon? #:mutable] + dataflow + [root #:mutable] + [exit-reason #:mutable] ;; #f -> running, #t -> terminated OK, exn -> error + [exit-hooks #:mutable]) + #:methods gen:custom-write + [(define (write-proc a port mode) + (fprintf port "#" (engine-id (actor-engine a)) (actor-id a) (actor-name a)))]) + +(define (facet-path-to-root f) + (string-join (reverse (let loop ((f f)) + (if (facet-parent f) + (cons (number->string (facet-id f)) (loop (facet-parent f))) + '()))) + "/")) + +(define (facet-liveness f) + (if (facet-live? f) + "" + ":(DEAD)")) + +(struct facet (id + actor + parent + children + outbound + [end-of-turn-actions #:mutable] + [shutdown-actions #:mutable] + [live? #:mutable] + [inert-check-preventers #:mutable]) + #:methods gen:custom-write + [(define (write-proc f port mode) + (local-require (only-in racket/string string-join)) + (fprintf port "#" + (engine-id (actor-engine (facet-actor f))) + (actor-id (facet-actor f)) + (actor-name (facet-actor f)) + (facet-path-to-root f) + (facet-liveness f)))]) + +(struct entity-ref (relay target attenuation) + #:transparent + #:methods gen:custom-write + [(define (write-proc r port mode) + (match-define (entity-ref f e a) r) + (fprintf port "#" + (entity-id e) + (entity-name e) + (engine-id (actor-engine (facet-actor f))) + (actor-id (facet-actor f)) + (actor-name (facet-actor f)) + (facet-path-to-root f) + (facet-liveness f) + (if (null? a) + "" + (format " ~s" a))))]) (define (parse-Ref! r) (if (entity-ref? r) diff --git a/syndicate/main.rkt b/syndicate/main.rkt index 4311fd1..4a8722f 100644 --- a/syndicate/main.rkt +++ b/syndicate/main.rkt @@ -4,7 +4,7 @@ #lang racket/base (provide (except-out (all-from-out "actor.rkt") current-turn) - (all-from-out "entity-ref.rkt") + (struct-out entity-ref) (all-from-out "syntax.rkt") (all-from-out "event-expander.rkt") (all-from-out preserves) diff --git a/syndicate/schemas/stream.prs b/syndicate/schemas/stream.prs new file mode 100644 index 0000000..918c8f9 --- /dev/null +++ b/syndicate/schemas/stream.prs @@ -0,0 +1,43 @@ +version 1 . +embeddedType EntityRef.Ref . + +StreamConnection = . + +; Assertion +StreamListener = . + +; Assertion +StreamConnect = . + +; Assertion +ConnectionHandler = + / @connected + / @rejected +. + +; Assertion +StreamError = . + +Source = + ; Assertions: + / + / StreamError + + ; Messages: + / +. + +Sink = + ; Assertions: + / + / StreamError + + ; Messages: + / + / +. + +CreditAmount = @count int / @unbounded =unbounded . + +Mode = =bytes / @lines LineMode / / . +LineMode = =lf / =crlf . diff --git a/syndicate/schemas/tcp.prs b/syndicate/schemas/tcp.prs index 9239133..7a6d5e1 100644 --- a/syndicate/schemas/tcp.prs +++ b/syndicate/schemas/tcp.prs @@ -1,25 +1,7 @@ version 1 . embeddedType EntityRef.Ref . -Connection = . -ConnectionPeer = . +TcpRemote = . +TcpLocal = . -TcpRemote = . -TcpLocal = . - -ActiveSocket = - / - / - / Socket -. - -Socket = - / - / - / -. - -CreditAmount = @count int / @unbounded =unbounded . - -Mode = =bytes / @lines LineMode / . -LineMode = =lf / =crlf . +TcpPeerInfo = .