Split out experimental "stream" protocols; make tcp.rkt use them; more inertness checks
Also, a few other important changes: - Better printing of entity-ref structs - Inertness checks on assertion retraction (!) and preventer-disarm - Correct selection of active facet during dataflow recomputations - Repair silly omission in turn-assert/dataflow!
This commit is contained in:
parent
ab9168a5ea
commit
d30c6feeee
|
@ -25,14 +25,14 @@
|
|||
(establish-connection
|
||||
ds (TcpRemote host port)
|
||||
#:initial-mode (Mode-lines (LineMode-lf))
|
||||
#:on-connected (lambda (peer)
|
||||
#:on-connected (lambda (source sink)
|
||||
(at ds
|
||||
(when (message (RacketEvent (read-line-evt (current-input-port)) $vs))
|
||||
(match (car vs)
|
||||
[(? eof-object?) (stop-current-facet (log-info "EOF on stdin."))]
|
||||
[line (send-line peer line)]))))
|
||||
[line (send-line sink line)]))))
|
||||
#:on-rejected (lambda (message) (stop-current-facet (log-error "~a" message)))
|
||||
#:on-disconnected (lambda () (stop-current-facet (log-info "Disconnected")))
|
||||
#:on-disconnect (lambda () (stop-current-facet (log-info "Disconnected")))
|
||||
#:on-data (lambda (line _mode)
|
||||
;; \e7 DECSC, save cursor position
|
||||
;; \n\e[A Force a new line if at end of screen, then back up; effect of \r
|
||||
|
|
|
@ -19,5 +19,6 @@
|
|||
(spawn-tcp-driver ds)
|
||||
(spawn
|
||||
(at ds
|
||||
(during/spawn (Connection $conn (TcpLocal host port))
|
||||
(accept-connection conn #:on-data (lambda (data mode) (send-data conn data mode))))))))
|
||||
(during/spawn (StreamConnection $source $sink (TcpLocal host port))
|
||||
(handle-connection source sink
|
||||
#:on-data (lambda (data mode) (send-data sink data mode))))))))
|
||||
|
|
|
@ -21,8 +21,8 @@
|
|||
(spawn-tcp-driver ds)
|
||||
(spawn
|
||||
(at ds
|
||||
(during/spawn (Connection $conn (TcpLocal host port))
|
||||
(accept-connection conn
|
||||
(during/spawn (StreamConnection $source $sink (TcpLocal host port))
|
||||
(handle-connection source sink
|
||||
#:initial-mode (Mode-lines (LineMode-lf))
|
||||
#:on-data (lambda (data mode) (send! ds (Line data))))
|
||||
(at ds (when (message (Line $data)) (send-line conn data))))))))
|
||||
(at ds (when (message (Line $data)) (send-line sink data))))))))
|
||||
|
|
|
@ -2,8 +2,8 @@
|
|||
;;; SPDX-License-Identifier: LGPL-3.0-or-later
|
||||
;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
|
||||
|
||||
(provide (except-out (struct-out entity) entity)
|
||||
(rename-out [make-entity entity])
|
||||
(provide (struct-out <entity>)
|
||||
entity
|
||||
|
||||
current-turn
|
||||
|
||||
|
@ -67,7 +67,6 @@
|
|||
(require (only-in preserves preserve=?))
|
||||
(require racket/match)
|
||||
(require (only-in racket/exn exn->string))
|
||||
(require struct-defaults)
|
||||
|
||||
(require "rewrite.rkt")
|
||||
(require "engine.rkt")
|
||||
|
@ -76,58 +75,8 @@
|
|||
(require "field.rkt")
|
||||
(require "support/counter.rkt")
|
||||
|
||||
(struct entity (id name assert retract message sync data)
|
||||
#:methods gen:custom-write
|
||||
[(define (write-proc e port mode)
|
||||
(fprintf port "#<entity:~a:~a>" (entity-id e) (entity-name e)))])
|
||||
(define-struct-defaults make-entity entity
|
||||
(#:_id [entity-id (generate-entity-id)]
|
||||
#:name [entity-name '?]
|
||||
#:assert [entity-assert #f]
|
||||
#:retract [entity-retract #f]
|
||||
#:message [entity-message #f]
|
||||
#:sync [entity-sync #f]
|
||||
#:data [entity-data (void)]))
|
||||
|
||||
(struct outbound-assertion (handle peer [established? #:mutable]))
|
||||
|
||||
(struct actor (id
|
||||
name
|
||||
engine
|
||||
[daemon? #:mutable]
|
||||
dataflow
|
||||
[root #:mutable]
|
||||
[exit-reason #:mutable] ;; #f -> running, #t -> terminated OK, exn -> error
|
||||
[exit-hooks #:mutable])
|
||||
#:methods gen:custom-write
|
||||
[(define (write-proc a port mode)
|
||||
(fprintf port "#<actor:~a/~a:~a>" (engine-id (actor-engine a)) (actor-id a) (actor-name a)))])
|
||||
|
||||
(struct facet (id
|
||||
actor
|
||||
parent
|
||||
children
|
||||
outbound
|
||||
[end-of-turn-actions #:mutable]
|
||||
[shutdown-actions #:mutable]
|
||||
[live? #:mutable]
|
||||
[inert-check-preventers #:mutable])
|
||||
#:methods gen:custom-write
|
||||
[(define (write-proc f port mode)
|
||||
(local-require (only-in racket/string string-join))
|
||||
(fprintf port "#<facet:~a/~a:~a:~a~a>"
|
||||
(engine-id (actor-engine (facet-actor f)))
|
||||
(actor-id (facet-actor f))
|
||||
(actor-name (facet-actor f))
|
||||
(string-join (reverse (let loop ((f f))
|
||||
(if (facet-parent f)
|
||||
(cons (number->string (facet-id f)) (loop (facet-parent f)))
|
||||
'())))
|
||||
"/")
|
||||
(if (facet-live? f)
|
||||
""
|
||||
":(DEAD)")))])
|
||||
|
||||
(struct turn (id
|
||||
active-facet
|
||||
[queues #:mutable])
|
||||
|
@ -139,7 +88,6 @@
|
|||
|
||||
(define current-turn (make-parameter #f))
|
||||
|
||||
(define generate-entity-id (make-counter))
|
||||
(define generate-actor-id (make-counter))
|
||||
(define generate-turn-id (make-counter))
|
||||
(define generate-handle (make-counter))
|
||||
|
@ -293,7 +241,10 @@
|
|||
(lambda ()
|
||||
(when armed
|
||||
(set! armed #f)
|
||||
(set-facet-inert-check-preventers! f (- (facet-inert-check-preventers f) 1)))))
|
||||
(let ((n (- (facet-inert-check-preventers f) 1)))
|
||||
(set-facet-inert-check-preventers! f n)
|
||||
(when (zero? n)
|
||||
(check-for-inertness (current-turn) f))))))
|
||||
|
||||
(define (facet-terminate! f orderly?)
|
||||
(when (facet-live? f)
|
||||
|
@ -429,7 +380,9 @@
|
|||
(lambda ()
|
||||
(log-syndicate/actor-debug " DELIVER link to ~v assert ~v handle ~v"
|
||||
linked-peer link-message handle)
|
||||
(deliver (entity-assert link-entity) link-message handle)))))))
|
||||
(deliver (entity-assert link-entity) link-message handle))))
|
||||
(when (hash-empty? o)
|
||||
(check-for-inertness (current-turn) f)))))
|
||||
|
||||
(define (turn-stop-actor-system! turn)
|
||||
(define ac (facet-actor (turn-active-facet turn)))
|
||||
|
@ -458,7 +411,7 @@
|
|||
|
||||
(define (turn-dataflow! turn action)
|
||||
(define f (turn-active-facet turn))
|
||||
(define (wrapped) (when (facet-live? f) (action)))
|
||||
(define (wrapped) (when (facet-live? f) (with-active-facet f action)))
|
||||
(parameterize ((current-dataflow-subject-id wrapped)) (wrapped)))
|
||||
|
||||
(define (turn-assert/dataflow! turn peer assertion-action)
|
||||
|
@ -468,6 +421,7 @@
|
|||
(lambda ()
|
||||
(define new-assertion (assertion-action))
|
||||
(when (not (preserve=? assertion new-assertion))
|
||||
(set! assertion new-assertion)
|
||||
(set! handle (turn-replace! (current-turn) peer handle new-assertion))))))
|
||||
|
||||
(define (turn-assert! turn peer assertion)
|
||||
|
@ -502,7 +456,6 @@
|
|||
new-handle)
|
||||
|
||||
(define (turn-retract!* turn a)
|
||||
(hash-remove! (facet-outbound (turn-active-facet turn)) (outbound-assertion-handle a))
|
||||
(log-syndicate/actor-debug " ENQ at ~v retract handle ~v"
|
||||
(outbound-assertion-peer a)
|
||||
(outbound-assertion-handle a))
|
||||
|
@ -518,10 +471,15 @@
|
|||
(when (outbound-assertion-established? a)
|
||||
(set-outbound-assertion-established?! a #f)
|
||||
(deliver (entity-retract (entity-ref-target (outbound-assertion-peer a)))
|
||||
(outbound-assertion-handle a))))))
|
||||
(outbound-assertion-handle a)))))
|
||||
(let* ((f (turn-active-facet turn))
|
||||
(o (facet-outbound f)))
|
||||
(hash-remove! o (outbound-assertion-handle a))
|
||||
(when (hash-empty? o)
|
||||
(check-for-inertness turn f))))
|
||||
|
||||
(define (turn-sync! turn peer k)
|
||||
(turn-sync!* turn peer (turn-ref turn (make-entity #:message k))))
|
||||
(turn-sync!* turn peer (turn-ref turn (entity #:message k))))
|
||||
|
||||
(define (turn-sync!* turn peer-to-sync-with peer-k)
|
||||
(log-syndicate/actor-debug " ENQ sync ~v" peer-to-sync-with)
|
||||
|
@ -556,12 +514,17 @@
|
|||
(lambda ()
|
||||
(define f (turn-active-facet (current-turn)))
|
||||
(action)
|
||||
(turn-enqueue! (current-turn)
|
||||
f
|
||||
(lambda ()
|
||||
(when (or (and (facet-parent f) (not (facet-live? (facet-parent f))))
|
||||
(facet-inert? f))
|
||||
(turn-stop!))))))
|
||||
(check-for-inertness (current-turn) f)))
|
||||
|
||||
(define (check-for-inertness turn f)
|
||||
(log-syndicate/actor-debug " ENQ checking ~a" f)
|
||||
(turn-enqueue! turn
|
||||
f
|
||||
(lambda ()
|
||||
(log-syndicate/actor-debug " DEQ checking ~a" f)
|
||||
(when (or (and (facet-parent f) (not (facet-live? (facet-parent f))))
|
||||
(facet-inert? f))
|
||||
(turn-stop!)))))
|
||||
|
||||
(define (deliver maybe-proc . args)
|
||||
(when maybe-proc
|
||||
|
|
|
@ -27,14 +27,15 @@
|
|||
(spawn-tcp-driver ds)
|
||||
(spawn #:name 'tcp-server
|
||||
(at ds
|
||||
(during/spawn (Connection $conn (TcpLocal "0.0.0.0" 5999))
|
||||
(run-relay #:name conn
|
||||
#:packet-writer (lambda (bs) (send-data conn bs))
|
||||
(during/spawn (StreamConnection $source $sink (TcpLocal "0.0.0.0" 5999))
|
||||
#:name (list 'tcp-server source)
|
||||
(run-relay #:packet-writer (lambda (bs) (send-data sink bs))
|
||||
#:setup-inputs
|
||||
(lambda (tr)
|
||||
(accept-connection conn #:on-data (lambda (d _m) (accept-bytes tr d))))
|
||||
(handle-connection source sink
|
||||
#:on-data (lambda (d _m) (accept-bytes tr d))))
|
||||
#:initial-ref
|
||||
(object #:name (list conn 'gatekeeper)
|
||||
(object #:name 'gatekeeper
|
||||
[(Resolve unvalidated-sturdyref observer)
|
||||
(at ds
|
||||
(during (Bind (SturdyRef-oid unvalidated-sturdyref) $key $target)
|
||||
|
|
|
@ -2,11 +2,16 @@
|
|||
;;; SPDX-License-Identifier: LGPL-3.0-or-later
|
||||
;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
|
||||
|
||||
(provide (all-from-out syndicate/schemas/gen/tcp)
|
||||
(provide (all-from-out syndicate/schemas/gen/stream)
|
||||
(all-from-out syndicate/schemas/gen/tcp)
|
||||
spawn-tcp-driver
|
||||
accept-connection
|
||||
assert-control
|
||||
|
||||
handle-connection
|
||||
make-source
|
||||
make-sink
|
||||
make-connection-handler
|
||||
establish-connection
|
||||
|
||||
send-credit
|
||||
send-lines-credit
|
||||
send-bytes-credit
|
||||
|
@ -21,6 +26,7 @@
|
|||
(require (only-in racket/exn exn->string))
|
||||
(require syndicate/driver-support)
|
||||
(require syndicate/functional-queue)
|
||||
(require syndicate/schemas/gen/stream)
|
||||
(require syndicate/schemas/gen/tcp)
|
||||
(require syndicate/schemas/gen/dataspace-patterns)
|
||||
|
||||
|
@ -35,95 +41,103 @@
|
|||
|
||||
(at ds
|
||||
(during/spawn
|
||||
(Observe (:pattern (Connection ,_ (TcpLocal ,(DLit $host) ,(DLit $port)))) _)
|
||||
(Observe (:pattern (StreamConnection ,_ ,_ (TcpLocal ,(DLit $host) ,(DLit $port)))) _)
|
||||
#:name (list 'simple (TcpLocal host port))
|
||||
(define spec (TcpLocal host port))
|
||||
(at ds
|
||||
(assert (StreamListener spec
|
||||
(make-connection-handler
|
||||
(lambda (source sink)
|
||||
(at ds (assert (StreamConnection source sink spec)))))))))
|
||||
|
||||
(during/spawn (StreamConnection $app-source $app-sink (TcpRemote $host $port))
|
||||
#:name (list 'simple (TcpRemote host port))
|
||||
(define spec (TcpRemote host port))
|
||||
(at ds
|
||||
(assert (StreamConnect spec
|
||||
(object #:name 'connection-peer
|
||||
[(ConnectionHandler-connected tcp-source tcp-sink)
|
||||
(at tcp-source (assert (Source-sink app-sink)))
|
||||
(at tcp-sink (assert (Sink-source app-source)))]
|
||||
[(ConnectionHandler-rejected message)
|
||||
(log-syndicate/drivers/tcp-error
|
||||
"Connection to ~a rejected: ~a" spec message)
|
||||
(stop-current-facet)])))))
|
||||
|
||||
(during/spawn (StreamListener (TcpLocal $host $port) $peer)
|
||||
#:name (TcpLocal host port)
|
||||
(run-listener ds host port))
|
||||
(run-listener ds peer host port))
|
||||
|
||||
(during/spawn
|
||||
(Connection $local-peer (TcpRemote $host $port))
|
||||
(during/spawn (StreamConnect (TcpRemote $host $port) $peer)
|
||||
#:name (TcpRemote host port)
|
||||
(run-outbound ds local-peer host port)))))
|
||||
(run-outbound ds peer host port)))))
|
||||
|
||||
(define (run-listener ds host port)
|
||||
(define (run-listener ds peer host port)
|
||||
(on-start (log-syndicate/drivers/tcp-info "+listener on ~v ~v" host port))
|
||||
(on-stop (log-syndicate/drivers/tcp-info "-listener on ~v ~v" host port))
|
||||
(linked-thread
|
||||
#:name (list (TcpLocal host port) 'thread)
|
||||
#:name (list 'listen-thread host port)
|
||||
(lambda (facet)
|
||||
(define listener (tcp-listen port 512 #t host))
|
||||
(let loop ()
|
||||
(define connection-custodian (make-custodian))
|
||||
(define-values (i o) (parameterize ((current-custodian connection-custodian))
|
||||
(tcp-accept listener)))
|
||||
(turn! facet
|
||||
(lambda () (spawn-inbound ds connection-custodian i o (TcpLocal host port))))
|
||||
(turn! facet (lambda () (spawn-connection ds connection-custodian i o peer)))
|
||||
(loop)))))
|
||||
|
||||
(define (run-outbound ds local-peer host port)
|
||||
(define connection-custodian (make-custodian))
|
||||
(define (tcp-ends p)
|
||||
(call-with-values (lambda () (tcp-addresses p #t))
|
||||
(lambda (lh lp rh rp) (list (TcpLocal lh lp) (TcpRemote rh rp)))))
|
||||
|
||||
(define (spawn-connection ds custodian i o peer)
|
||||
(match-define (and ends (list (and local-end (TcpLocal local-host local-port))
|
||||
(and remote-end (TcpRemote remote-host remote-port))))
|
||||
(tcp-ends i))
|
||||
(define name (format "[~a:~a::~a:~a]" local-host local-port remote-host remote-port))
|
||||
(spawn #:name name
|
||||
(actor-add-exit-hook! this-actor (lambda ()
|
||||
(close-input-port i)
|
||||
(close-output-port o)))
|
||||
|
||||
(define-field facet-count 2)
|
||||
(define source #f)
|
||||
(define sink #f)
|
||||
|
||||
(react (on-stop (facet-count (- (facet-count) 1))
|
||||
(close-input-port i))
|
||||
(define active-sink #f)
|
||||
(define issue-credit (start-inbound-relay custodian (lambda () active-sink) i))
|
||||
(set! source (make-source #:name 'socket-in
|
||||
#:on-connect (lambda (new-sink) (set! active-sink new-sink))
|
||||
#:on-credit issue-credit))
|
||||
(at ds (assert (TcpPeerInfo source local-end remote-end))))
|
||||
|
||||
(react (on-stop (facet-count (- (facet-count) 1))
|
||||
(close-output-port o))
|
||||
(set! sink (make-sink #:name 'socket-out
|
||||
#:on-data (outbound-relay o)
|
||||
#:on-eof (lambda () (stop-current-facet))))
|
||||
(at ds (assert (TcpPeerInfo sink local-end remote-end))))
|
||||
|
||||
(at peer (assert #:when (positive? (facet-count))
|
||||
(ConnectionHandler-connected source sink)))))
|
||||
|
||||
(define (run-outbound ds peer host port)
|
||||
((with-handlers ([exn:fail:network?
|
||||
(lambda (e)
|
||||
(lambda () (at local-peer (assert (ActiveSocket-close (exn->string e))))))])
|
||||
(lambda ()
|
||||
(at peer (assert (ConnectionHandler-rejected (exn->string e))))))])
|
||||
(define connection-custodian (make-custodian))
|
||||
(define-values (i o) (parameterize ((current-custodian connection-custodian))
|
||||
(tcp-connect host port)))
|
||||
(lambda ()
|
||||
(define name (call-with-values (lambda () (tcp-addresses i #t)) list))
|
||||
(at ds (assert (ConnectionPeer local-peer (TcpLocal (car name) (cadr name)))))
|
||||
(actor-add-exit-hook! this-actor (lambda ()
|
||||
(close-input-port i)
|
||||
(close-output-port o)))
|
||||
(define issue-credit (start-inbound-relay connection-custodian name (lambda () local-peer) i))
|
||||
(define relay (outbound-relay name o))
|
||||
(at local-peer
|
||||
(assert (ActiveSocket-controller
|
||||
(object #:name (list name 'socket)
|
||||
[#:message (Socket-credit amount mode) (issue-credit amount mode)]
|
||||
[#:message (Socket-data data mode) (relay data mode)]
|
||||
[#:message (Socket-eof) (close-output-port o)]))))))))
|
||||
(lambda () (spawn-connection ds connection-custodian i o peer)))))
|
||||
|
||||
(define (spawn-inbound ds custodian i o spec)
|
||||
(define name (call-with-values (lambda () (tcp-addresses i #t)) list))
|
||||
(spawn
|
||||
#:name name
|
||||
(actor-add-exit-hook! this-actor (lambda ()
|
||||
(close-input-port i)
|
||||
(close-output-port o)))
|
||||
(define issue-credit #f)
|
||||
(define active-controller #f)
|
||||
(define relay (outbound-relay name o))
|
||||
(define handle
|
||||
(object
|
||||
#:name (list name 'active-socket)
|
||||
[#:asserted (ActiveSocket-controller controller)
|
||||
(log-syndicate/drivers/tcp-debug "~v controller for ~v" controller this-actor)
|
||||
(when (not active-controller)
|
||||
(set! issue-credit (start-inbound-relay custodian name (lambda () active-controller) i)))
|
||||
(set! active-controller controller)
|
||||
#:retracted
|
||||
(when (eq? controller active-controller)
|
||||
(log-syndicate/drivers/tcp-debug "peer withdrawn ~v" this-actor)
|
||||
(stop-current-facet))]
|
||||
[#:asserted (ActiveSocket-close message)
|
||||
(log-syndicate/drivers/tcp-debug "closing ~v:\n~a" this-actor message)
|
||||
(stop-current-facet)]
|
||||
[#:message (ActiveSocket-Socket (Socket-credit amount mode))
|
||||
(if issue-credit
|
||||
(issue-credit amount mode)
|
||||
(log-syndicate/drivers/tcp-warning
|
||||
"Socket-credit ~v/~v ignored because no controller present" amount mode))]
|
||||
[#:message (ActiveSocket-Socket (Socket-data data mode))
|
||||
(relay data mode)]
|
||||
[#:message (ActiveSocket-Socket (Socket-eof))
|
||||
(close-output-port o)]))
|
||||
(at ds
|
||||
(assert (ConnectionPeer handle (TcpRemote (caddr name) (cadddr name))))
|
||||
(assert (Connection handle spec)))))
|
||||
|
||||
(define (start-inbound-relay custodian name target-proc i)
|
||||
(define (start-inbound-relay custodian target-proc i)
|
||||
(define eof-received? #f)
|
||||
(define control-ch (make-async-channel))
|
||||
(linked-thread
|
||||
#:name (list name 'input-thread)
|
||||
#:name (cons 'input-thread (tcp-ends i))
|
||||
#:custodian custodian
|
||||
#:peer (object #:name 'inbound-relay-monitor
|
||||
[#:asserted _
|
||||
|
@ -136,7 +150,7 @@
|
|||
q
|
||||
(undequeue (cons remaining-count mode) q)))
|
||||
(define (eof-and-finish)
|
||||
(log-syndicate/drivers/tcp-debug "inbound eof for ~v" name)
|
||||
(log-syndicate/drivers/tcp-debug "inbound eof for ~a" (tcp-ends i))
|
||||
(turn! facet (lambda () (send-eof (target-proc)))))
|
||||
(let loop ((credits (make-queue)))
|
||||
(sync (handle-evt control-ch
|
||||
|
@ -161,7 +175,7 @@
|
|||
(match-lambda
|
||||
[(? number? read-count)
|
||||
(define bs (subbytes buffer 0 read-count))
|
||||
(log-syndicate/drivers/tcp-debug "inbound data ~v for ~v" bs name)
|
||||
(log-syndicate/drivers/tcp-debug "inbound data ~v for ~a" bs (tcp-ends i))
|
||||
(turn! facet (lambda () (send-data (target-proc) bs)))
|
||||
(loop (update-count (- count read-count) mode q))]
|
||||
[(? eof-object?) (eof-and-finish)]))]
|
||||
|
@ -170,13 +184,13 @@
|
|||
(match-lambda
|
||||
[(? bytes? packet) #:when (< (bytes-length packet) packet-size)
|
||||
(log-syndicate/drivers/tcp-debug
|
||||
"short inbound packet (length ~a; expected ~a bytes) ~v for ~v"
|
||||
(bytes-length packet) packet-size packet name)
|
||||
"short inbound packet (length ~a; expected ~a bytes) ~v for ~a"
|
||||
(bytes-length packet) packet-size packet (tcp-ends i))
|
||||
(eof-and-finish)]
|
||||
[(? bytes? packet)
|
||||
(log-syndicate/drivers/tcp-debug
|
||||
"inbound packet (length ~a) ~v for ~v"
|
||||
(bytes-length packet) packet name)
|
||||
"inbound packet (length ~a) ~v for ~a"
|
||||
(bytes-length packet) packet (tcp-ends i))
|
||||
(turn! facet (lambda () (send-data (target-proc) packet mode)))
|
||||
(loop (update-count (- count 1) mode q))]
|
||||
[(? eof-object?) (eof-and-finish)]))]
|
||||
|
@ -186,7 +200,7 @@
|
|||
[(LineMode-crlf) 'return-linefeed]))
|
||||
(match-lambda
|
||||
[(? bytes? line)
|
||||
(log-syndicate/drivers/tcp-debug "inbound line ~v for ~v" line name)
|
||||
(log-syndicate/drivers/tcp-debug "inbound line ~v for ~a" line (tcp-ends i))
|
||||
(turn! facet (lambda () (send-line (target-proc) line line-mode)))
|
||||
(loop (update-count (- count 1) mode q))]
|
||||
[(? eof-object?) (eof-and-finish)]))])))))
|
||||
|
@ -209,10 +223,10 @@
|
|||
(stop-current-facet))])
|
||||
(thunk)))
|
||||
|
||||
(define (outbound-relay name o)
|
||||
(define (outbound-relay o)
|
||||
(define flush-pending #f)
|
||||
(lambda (payload mode)
|
||||
(log-syndicate/drivers/tcp-debug "outbound data ~v for ~v" payload name)
|
||||
(log-syndicate/drivers/tcp-debug "outbound data ~v on ~a" payload (tcp-ends o))
|
||||
(with-stop-current-facet-on-epipe 'writing
|
||||
(lambda ()
|
||||
(write-bytes payload o)
|
||||
|
@ -228,53 +242,145 @@
|
|||
(with-stop-current-facet-on-epipe 'flushing
|
||||
(lambda () (flush-output o))))))))
|
||||
|
||||
(define (accept-connection conn
|
||||
;;---------------------------------------------------------------------------
|
||||
|
||||
(define (handle-connection source sink
|
||||
#:on-disconnect [on-disconnect #f]
|
||||
#:on-error [on-error #f]
|
||||
#:on-credit [on-credit void]
|
||||
#:initial-credit [initial-credit (CreditAmount-unbounded)]
|
||||
#:initial-mode [initial-mode (Mode-bytes)]
|
||||
#:on-data on-data
|
||||
#:on-eof [on-eof void]
|
||||
#:on-credit [on-credit void])
|
||||
(assert-control conn
|
||||
#:on-data on-data
|
||||
#:on-eof on-eof
|
||||
#:on-credit on-credit)
|
||||
(when initial-credit (send-credit conn initial-credit initial-mode)))
|
||||
#:on-eof [on-eof void])
|
||||
(make-source #:initial-sink sink
|
||||
#:name 'app-out
|
||||
#:on-disconnect on-disconnect #:on-error on-error
|
||||
#:on-credit on-credit)
|
||||
(make-sink #:initial-source source
|
||||
#:name 'app-in
|
||||
#:on-disconnect on-disconnect #:on-error on-error
|
||||
#:on-data on-data #:on-eof on-eof)
|
||||
(when initial-credit (send-credit source initial-credit initial-mode)))
|
||||
|
||||
(define (assert-control conn
|
||||
#:on-data on-data
|
||||
#:on-eof [on-eof void]
|
||||
#:on-credit [on-credit void])
|
||||
(at conn
|
||||
(assert (ActiveSocket-controller
|
||||
(object #:name 'inbound-socket-controller
|
||||
[#:message (Socket-credit amount mode) (on-credit amount mode)]
|
||||
[#:message (Socket-data data mode) (on-data data mode)]
|
||||
[#:message (Socket-eof) (on-eof)])))))
|
||||
(define (make-source
|
||||
#:initial-sink [initial-sink #f]
|
||||
#:name [name (gensym 'source)]
|
||||
#:on-connect [on-connect (lambda (new-sink) (void))]
|
||||
#:on-disconnect [on-disconnect0 #f]
|
||||
#:on-error [on-error0 #f]
|
||||
#:on-credit [on-credit (lambda (amount mode) (void))])
|
||||
(define sink #f)
|
||||
(define handle #f)
|
||||
(define (set-sink! new-sink)
|
||||
(when (not (eq? sink new-sink))
|
||||
(on-connect new-sink)
|
||||
(set! sink new-sink)
|
||||
(set! handle (turn-replace! this-turn sink handle
|
||||
(if sink (->preserve (Sink-source self)) (void))))))
|
||||
|
||||
(define on-disconnect
|
||||
(or on-disconnect0 (lambda ()
|
||||
(log-syndicate/drivers/tcp-debug "~a disconnected" self)
|
||||
(stop-current-facet))))
|
||||
(define on-error
|
||||
(or on-error0 (lambda (message)
|
||||
(log-syndicate/drivers/tcp-debug "~a error: ~v" self message)
|
||||
(stop-current-facet))))
|
||||
|
||||
(define self
|
||||
(object #:name name
|
||||
[#:asserted (Source-sink new-sink) (set-sink! new-sink)
|
||||
#:retracted (when (eq? sink new-sink)
|
||||
(set-sink! #f)
|
||||
(on-disconnect))]
|
||||
[#:asserted (StreamError message) (on-error message)]
|
||||
[#:message (Source-credit amount mode) (on-credit amount mode)]))
|
||||
|
||||
(set-sink! initial-sink)
|
||||
self)
|
||||
|
||||
(define (make-sink
|
||||
#:initial-source [initial-source #f]
|
||||
#:name [name (gensym 'sink)]
|
||||
#:on-connect [on-connect (lambda (new-source) (void))]
|
||||
#:on-disconnect [on-disconnect0 #f]
|
||||
#:on-error [on-error0 #f]
|
||||
#:on-data on-data
|
||||
#:on-eof [on-eof (lambda () (void))])
|
||||
(define source #f)
|
||||
(define handle #f)
|
||||
(define (set-source! new-source)
|
||||
(when (not (eq? new-source source))
|
||||
(on-connect new-source)
|
||||
(set! source new-source)
|
||||
(set! handle (turn-replace! this-turn source handle
|
||||
(if source (->preserve (Source-sink self)) (void))))))
|
||||
|
||||
(define on-disconnect
|
||||
(or on-disconnect0 (lambda ()
|
||||
(log-syndicate/drivers/tcp-debug "~a disconnected" self)
|
||||
(stop-current-facet))))
|
||||
(define on-error
|
||||
(or on-error0 (lambda (message)
|
||||
(log-syndicate/drivers/tcp-debug "~a error: ~v" self message)
|
||||
(stop-current-facet))))
|
||||
|
||||
(define self
|
||||
(object #:name name
|
||||
[#:asserted (Sink-source new-source) (set-source! new-source)
|
||||
#:retracted (when (eq? source new-source)
|
||||
(set-source! #f)
|
||||
(on-disconnect))]
|
||||
[#:asserted (StreamError message) (on-error message)]
|
||||
[#:message (Sink-data payload mode) (on-data payload mode)]
|
||||
[#:message (Sink-eof) (on-eof)]))
|
||||
|
||||
(set-source! initial-source)
|
||||
self)
|
||||
|
||||
(define (make-connection-handler on-connected #:name [name (gensym 'connection-handler)])
|
||||
(object #:name name
|
||||
[(ConnectionHandler-connected source sink)
|
||||
(on-connected source sink)]
|
||||
[(ConnectionHandler-rejected message)
|
||||
(error 'connection-handler "~a" message)]))
|
||||
|
||||
(define (establish-connection ds spec
|
||||
#:name [name (gensym 'establish-connection)]
|
||||
|
||||
#:on-connected [on-connected (lambda (source sink) (void))]
|
||||
#:on-rejected [on-rejected #f]
|
||||
|
||||
#:on-disconnect [on-disconnect #f]
|
||||
#:on-error [on-error #f]
|
||||
#:on-credit [on-credit void]
|
||||
#:initial-credit [initial-credit (CreditAmount-unbounded)]
|
||||
#:initial-mode [initial-mode (Mode-bytes)]
|
||||
#:on-connected on-connected
|
||||
#:on-data on-data
|
||||
#:on-eof [on-eof void]
|
||||
#:on-credit [on-credit void]
|
||||
#:on-disconnected [on-disconnected (lambda () (stop-current-facet))]
|
||||
#:on-rejected [on-rejected (lambda () (stop-current-facet))])
|
||||
(define s
|
||||
(object #:name 'outbound-socket
|
||||
[#:asserted (ActiveSocket-controller peer)
|
||||
(on-connected peer)
|
||||
(when initial-credit (send-credit peer initial-credit initial-mode))
|
||||
#:retracted
|
||||
(on-disconnected)]
|
||||
[#:asserted (ActiveSocket-close message) (on-rejected message)]
|
||||
[#:message (ActiveSocket-Socket (Socket-credit amount mode)) (on-credit amount mode)]
|
||||
[#:message (ActiveSocket-Socket (Socket-data data mode)) (on-data data mode)]
|
||||
[#:message (ActiveSocket-Socket (Socket-eof)) (on-eof)]))
|
||||
(at ds (assert (Connection s spec))))
|
||||
#:on-eof [on-eof void])
|
||||
(define peer
|
||||
(object #:name name
|
||||
[#:asserted (ConnectionHandler-connected source sink)
|
||||
(handle-connection source sink
|
||||
#:on-disconnect on-disconnect
|
||||
#:on-error on-error
|
||||
#:on-credit on-credit
|
||||
#:initial-credit initial-credit
|
||||
#:initial-mode initial-mode
|
||||
#:on-data on-data
|
||||
#:on-eof on-eof)
|
||||
(stop-facet ringing-facet)
|
||||
(on-connected source sink)]
|
||||
[#:asserted (ConnectionHandler-rejected message)
|
||||
(stop-facet ringing-facet)
|
||||
((or on-rejected (lambda (_message) (stop-current-facet))) message)]))
|
||||
(define ringing-facet (react (at ds (assert (StreamConnect spec peer)))))
|
||||
(void))
|
||||
|
||||
;;---------------------------------------------------------------------------
|
||||
|
||||
(define (send-credit conn amount mode)
|
||||
(send! conn (Socket-credit amount mode)))
|
||||
(send! conn (Source-credit amount mode)))
|
||||
|
||||
(define (send-lines-credit conn amount [mode (LineMode-lf)])
|
||||
(send-credit conn (CreditAmount-count amount) (Mode-lines mode)))
|
||||
|
@ -291,10 +397,10 @@
|
|||
(string->bytes/utf-8 data)))
|
||||
|
||||
(define (send-line conn line [line-mode (LineMode-lf)])
|
||||
(send! conn (Socket-data (->bytes line) (Mode-lines line-mode))))
|
||||
(send! conn (Sink-data (->bytes line) (Mode-lines line-mode))))
|
||||
|
||||
(define (send-data conn data [mode (Mode-bytes)])
|
||||
(send! conn (Socket-data (->bytes data) mode)))
|
||||
(send! conn (Sink-data (->bytes data) mode)))
|
||||
|
||||
(define (send-eof conn)
|
||||
(send! conn (Socket-eof)))
|
||||
(send! conn (Sink-eof)))
|
||||
|
|
|
@ -2,10 +2,97 @@
|
|||
;;; SPDX-License-Identifier: LGPL-3.0-or-later
|
||||
;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
|
||||
|
||||
(provide (struct-out entity-ref)
|
||||
(provide (except-out (struct-out entity) entity)
|
||||
(rename-out [make-entity entity] [entity <entity>])
|
||||
|
||||
(struct-out actor)
|
||||
(struct-out facet)
|
||||
(struct-out entity-ref)
|
||||
|
||||
parse-Ref!)
|
||||
|
||||
(struct entity-ref (relay target attenuation) #:transparent)
|
||||
(require racket/match)
|
||||
(require (only-in racket/string string-join))
|
||||
|
||||
(require struct-defaults)
|
||||
(require "support/counter.rkt")
|
||||
(require "engine.rkt")
|
||||
|
||||
(struct entity (id name assert retract message sync data)
|
||||
#:methods gen:custom-write
|
||||
[(define (write-proc e port mode)
|
||||
(fprintf port "#<entity:~a:~a>" (entity-id e) (entity-name e)))])
|
||||
|
||||
(define generate-entity-id (make-counter))
|
||||
|
||||
(define-struct-defaults make-entity entity
|
||||
(#:_id [entity-id (generate-entity-id)]
|
||||
#:name [entity-name '?]
|
||||
#:assert [entity-assert #f]
|
||||
#:retract [entity-retract #f]
|
||||
#:message [entity-message #f]
|
||||
#:sync [entity-sync #f]
|
||||
#:data [entity-data (void)]))
|
||||
|
||||
(struct actor (id
|
||||
name
|
||||
engine
|
||||
[daemon? #:mutable]
|
||||
dataflow
|
||||
[root #:mutable]
|
||||
[exit-reason #:mutable] ;; #f -> running, #t -> terminated OK, exn -> error
|
||||
[exit-hooks #:mutable])
|
||||
#:methods gen:custom-write
|
||||
[(define (write-proc a port mode)
|
||||
(fprintf port "#<actor:~a/~a:~a>" (engine-id (actor-engine a)) (actor-id a) (actor-name a)))])
|
||||
|
||||
(define (facet-path-to-root f)
|
||||
(string-join (reverse (let loop ((f f))
|
||||
(if (facet-parent f)
|
||||
(cons (number->string (facet-id f)) (loop (facet-parent f)))
|
||||
'())))
|
||||
"/"))
|
||||
|
||||
(define (facet-liveness f)
|
||||
(if (facet-live? f)
|
||||
""
|
||||
":(DEAD)"))
|
||||
|
||||
(struct facet (id
|
||||
actor
|
||||
parent
|
||||
children
|
||||
outbound
|
||||
[end-of-turn-actions #:mutable]
|
||||
[shutdown-actions #:mutable]
|
||||
[live? #:mutable]
|
||||
[inert-check-preventers #:mutable])
|
||||
#:methods gen:custom-write
|
||||
[(define (write-proc f port mode)
|
||||
(local-require (only-in racket/string string-join))
|
||||
(fprintf port "#<facet:~a/~a:~a:~a~a>"
|
||||
(engine-id (actor-engine (facet-actor f)))
|
||||
(actor-id (facet-actor f))
|
||||
(actor-name (facet-actor f))
|
||||
(facet-path-to-root f)
|
||||
(facet-liveness f)))])
|
||||
|
||||
(struct entity-ref (relay target attenuation)
|
||||
#:transparent
|
||||
#:methods gen:custom-write
|
||||
[(define (write-proc r port mode)
|
||||
(match-define (entity-ref f e a) r)
|
||||
(fprintf port "#<ref:~a:~a:~a/~a:~a:~a~a~a>"
|
||||
(entity-id e)
|
||||
(entity-name e)
|
||||
(engine-id (actor-engine (facet-actor f)))
|
||||
(actor-id (facet-actor f))
|
||||
(actor-name (facet-actor f))
|
||||
(facet-path-to-root f)
|
||||
(facet-liveness f)
|
||||
(if (null? a)
|
||||
""
|
||||
(format " ~s" a))))])
|
||||
|
||||
(define (parse-Ref! r)
|
||||
(if (entity-ref? r)
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
#lang racket/base
|
||||
|
||||
(provide (except-out (all-from-out "actor.rkt") current-turn)
|
||||
(all-from-out "entity-ref.rkt")
|
||||
(struct-out entity-ref)
|
||||
(all-from-out "syntax.rkt")
|
||||
(all-from-out "event-expander.rkt")
|
||||
(all-from-out preserves)
|
||||
|
|
|
@ -0,0 +1,43 @@
|
|||
version 1 .
|
||||
embeddedType EntityRef.Ref .
|
||||
|
||||
StreamConnection = <stream-connection @source #!Source @sink #!Sink @spec any>.
|
||||
|
||||
; Assertion
|
||||
StreamListener = <stream-listener @spec any @handle #!ConnectionHandler>.
|
||||
|
||||
; Assertion
|
||||
StreamConnect = <stream-connect @spec any @handle #!ConnectionHandler>.
|
||||
|
||||
; Assertion
|
||||
ConnectionHandler =
|
||||
/ @connected <stream-connected @source #!Source @sink #!Sink>
|
||||
/ @rejected <stream-rejected @message string>
|
||||
.
|
||||
|
||||
; Assertion
|
||||
StreamError = <error @message string>.
|
||||
|
||||
Source =
|
||||
; Assertions:
|
||||
/ <sink @controller #!Sink>
|
||||
/ StreamError
|
||||
|
||||
; Messages:
|
||||
/ <credit @amount CreditAmount @mode Mode>
|
||||
.
|
||||
|
||||
Sink =
|
||||
; Assertions:
|
||||
/ <source @controller #!Source>
|
||||
/ StreamError
|
||||
|
||||
; Messages:
|
||||
/ <data @payload any @mode Mode>
|
||||
/ <eof>
|
||||
.
|
||||
|
||||
CreditAmount = @count int / @unbounded =unbounded .
|
||||
|
||||
Mode = =bytes / @lines LineMode / <packet @size int> / <object @description any>.
|
||||
LineMode = =lf / =crlf .
|
|
@ -1,25 +1,7 @@
|
|||
version 1 .
|
||||
embeddedType EntityRef.Ref .
|
||||
|
||||
Connection = <connection @handle #!ActiveSocket @spec any>.
|
||||
ConnectionPeer = <connection-peer @handle #!ActiveSocket @spec any>.
|
||||
TcpRemote = <tcp-remote @host string @port int>.
|
||||
TcpLocal = <tcp-local @host string @port int>.
|
||||
|
||||
TcpRemote = <remote @host string @port int>.
|
||||
TcpLocal = <local @host string @port int>.
|
||||
|
||||
ActiveSocket =
|
||||
/ <controller @controller #!Sink>
|
||||
/ <close @message string>
|
||||
/ Socket
|
||||
.
|
||||
|
||||
Socket =
|
||||
/ <credit @amount CreditAmount @mode Mode>
|
||||
/ <data @payload bytes @mode Mode>
|
||||
/ <eof>
|
||||
.
|
||||
|
||||
CreditAmount = @count int / @unbounded =unbounded .
|
||||
|
||||
Mode = =bytes / @lines LineMode / <packet @size int>.
|
||||
LineMode = =lf / =crlf .
|
||||
TcpPeerInfo = <tcp-peer @handle #!any @local TcpLocal @remote TcpRemote>.
|
||||
|
|
Loading…
Reference in New Issue