diff --git a/syndicate-examples/tcp-client.rkt b/syndicate-examples/tcp-client.rkt index a715a60..e94c47b 100644 --- a/syndicate-examples/tcp-client.rkt +++ b/syndicate-examples/tcp-client.rkt @@ -22,12 +22,12 @@ (establish-connection ds (TcpRemote host port) #:initial-mode (Mode-lines (LineMode-lf)) - #:on-connected (lambda (source sink) - (at ds - (on (message (RacketEvent (read-line-evt (current-input-port)) $vs)) - (match (car vs) - [(? eof-object?) (stop-current-facet (log-info "EOF on stdin."))] - [line (send-line sink line)])))) + #:on-connect (lambda (source sink) + (at ds + (on (message (RacketEvent (read-line-evt (current-input-port)) $vs)) + (match (car vs) + [(? eof-object?) (stop-current-facet (log-info "EOF on stdin."))] + [line (send-line sink line)])))) #:on-rejected (lambda (message) (stop-current-facet (log-error "~a" message))) #:on-disconnect (lambda () (stop-current-facet (log-info "Disconnected"))) #:on-data (lambda (line _mode) diff --git a/syndicate-examples/tcp-echo-server-explicit.rkt b/syndicate-examples/tcp-echo-server-explicit.rkt deleted file mode 100644 index f0f928c..0000000 --- a/syndicate-examples/tcp-echo-server-explicit.rkt +++ /dev/null @@ -1,26 +0,0 @@ -#lang syndicate -;;; SPDX-License-Identifier: LGPL-3.0-or-later -;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones - -(module+ main - (require racket/cmdline) - (require syndicate/drivers/tcp) - - (define host "0.0.0.0") - (define port 5999) - - (command-line #:once-each - [("--host" "-H") hostname "Set hostname to listen on" - (set! host hostname)] - [("--port" "-p") port-number "Set port number to listen on" - (set! port (string->number port-number))]) - - (standard-actor-system (ds) - (at ds - (assert (StreamListener (TcpLocal host port) - (make-connection-handler - (lambda (source sink) - (handle-connection source sink - #:on-data - (lambda (data mode) - (send-data sink data mode)))))))))) diff --git a/syndicate-examples/tcp-echo-server.rkt b/syndicate-examples/tcp-echo-server.rkt index 9c2a77f..161daa4 100644 --- a/syndicate-examples/tcp-echo-server.rkt +++ b/syndicate-examples/tcp-echo-server.rkt @@ -17,7 +17,7 @@ (standard-actor-system (ds) (at ds - (stop-on (asserted (TcpListenError (TcpLocal host port) $message))) + (stop-on (asserted (StreamListenerError (TcpLocal host port) $message))) (during/spawn (StreamConnection $source $sink (TcpLocal host port)) (handle-connection source sink #:on-data (lambda (data mode) (send-data sink data mode))))))) diff --git a/syndicate-examples/tcp-relay-server.rkt b/syndicate-examples/tcp-relay-server.rkt index e6cae8e..5014e16 100644 --- a/syndicate-examples/tcp-relay-server.rkt +++ b/syndicate-examples/tcp-relay-server.rkt @@ -19,7 +19,7 @@ (standard-actor-system (ds) (at ds - (stop-on (asserted (TcpListenError (TcpLocal host port) $message))) + (stop-on (asserted (StreamListenerError (TcpLocal host port) $message))) (during/spawn (StreamConnection $source $sink (TcpLocal host port)) (handle-connection source sink #:initial-mode (Mode-lines (LineMode-lf)) diff --git a/syndicate/distributed/tcp-server.rkt b/syndicate/distributed/tcp-server.rkt index 696d9b8..eec8b5b 100644 --- a/syndicate/distributed/tcp-server.rkt +++ b/syndicate/distributed/tcp-server.rkt @@ -26,7 +26,7 @@ (define spec (TcpLocal "0.0.0.0" 5999)) (at ds - (stop-on (asserted (TcpListenError spec _))) + (stop-on (asserted (StreamListenerError spec _))) (during/spawn (StreamConnection $source $sink spec) #:name (list 'tcp-server source) (run-relay #:packet-writer (lambda (bs) (send-data sink bs)) diff --git a/syndicate/drivers/stream.rkt b/syndicate/drivers/stream.rkt index 2ba37e6..2a4cf92 100644 --- a/syndicate/drivers/stream.rkt +++ b/syndicate/drivers/stream.rkt @@ -7,7 +7,6 @@ port-source port-sink - make-connection-handler make-source make-sink handle-connection @@ -34,85 +33,25 @@ (require syndicate/service) (require syndicate/pattern) (require syndicate/driver-support) -(require syndicate/drivers/racket-event) (require syndicate/schemas/gen/stream) (define-logger syndicate/drivers/stream) (provide-service [ds] - (at ds - (during/spawn (Observe (:pattern (StreamConnection ,_ ,_ ,$spec-pat)) _) - #:name (list 'stream-listener spec-pat) - (match (pattern->constant spec-pat) - [(? void?) (stop-current-facet)] - [spec (at ds - (during (StreamSpecListenable spec) - (assert - (StreamListener spec - (make-connection-handler - (lambda (source sink) - (assert (StreamConnection source sink spec))))))))])) - - (during/spawn (StreamConnection $app-source $app-sink $spec) - #:name (list 'stream-connection spec) - (at ds - (during (StreamSpecConnectable spec) - (assert (StreamConnect spec - (object #:name 'connection-peer - [(ConnectionHandler-connected sys-source sys-sink) - (at sys-source (assert (Source-sink app-sink))) - (at sys-sink (assert (Sink-source app-source)))] - [(ConnectionHandler-rejected message) - (log-syndicate/drivers/stream-error - "Connection to ~a rejected: ~a" spec message) - (at app-source (assert (StreamError message))) - (at app-sink (assert (StreamError message))) - (stop-current-facet)])))))) - - ;; I translate interest in StreamListener with a particular spec-pattern into a facet - ;; that reacts to interest in StreamSpecListenable with a spec matching the spec-pattern - ;; by asserting StreamSpecListenable with that spec. - (during (Observe (:pattern (StreamListener ,$spec-pat ,_)) _) - (define listenable-asserter - (object [bindings - (define spec - (pattern->constant spec-pat (lambda (_name index) (list-ref bindings index)))) - (assert (StreamSpecListenable spec))])) - (assert - (Observe (:pattern - (Observe (:pattern (StreamSpecListenable ,,(:pattern (DLit ,spec-pat)))) _)) - listenable-asserter))) - - ;; I translate interest in StreamConnect with a particular spec-pattern into a facet that - ;; reacts to interest in StreamSpecConnectable with a spec matching the spec-pattern by - ;; asserting StreamSpecConnectable with that spec. - (during (Observe (:pattern (StreamConnect ,$spec-pat ,_)) _) - (define connectable-asserter - (object [bindings - (define spec - (pattern->constant spec-pat (lambda (_name index) (list-ref bindings index)))) - (assert (StreamSpecConnectable spec))])) - (assert - (Observe (:pattern - (Observe (:pattern (StreamSpecConnectable ,,(:pattern (DLit ,spec-pat)))) _)) - connectable-asserter))))) - -(define (make-connection-handler on-connected #:name [name (gensym 'connection-handler)]) - (object #:name name - [(ConnectionHandler-connected source sink) - (on-connected source sink)] - [(ConnectionHandler-rejected message) - (error 'connection-handler "~a" message)])) + ;; No active components. + ) (define (port-source [port (current-input-port)] + #:initial-sink [initial-sink #f] #:custodian [custodian #f] #:name [name (list 'port-source (object-name port))]) - (define active-sink #f) + (define active-sink initial-sink) (define issue-credit (start-inbound-relay #:custodian custodian (lambda () active-sink) port)) (make-source #:name name + #:initial-sink initial-sink #:on-connect (lambda (new-sink) (set! active-sink new-sink)) #:on-credit issue-credit)) @@ -122,7 +61,7 @@ (define control-ch (make-async-channel)) (linked-thread - #:name (cons 'input-thread (object-name port)) + #:name (list 'input-thread (object-name port)) #:custodian custodian #:peer (object #:name 'inbound-relay-monitor [#:asserted _ @@ -196,12 +135,14 @@ issue-credit) (define (port-sink [port (current-output-port)] + #:initial-source [initial-source #f] #:initial-credit [initial-credit (CreditAmount-unbounded)] #:initial-mode [initial-mode (Mode-bytes)] #:name [name (list 'port-sink (object-name port))]) - (define active-source #f) + (define active-source initial-source) (define relay (outbound-relay port)) (make-sink #:name name + #:initial-source initial-source #:on-connect (lambda (new-source) (set! active-source new-source) @@ -349,7 +290,7 @@ (define (establish-connection ds spec #:name [name (gensym 'establish-connection)] - #:on-connected [on-connected (lambda (source sink) (void))] + #:on-connect [on-connect (lambda (source sink) (void))] #:on-rejected [on-rejected #f] #:on-disconnect [on-disconnect #f] @@ -359,24 +300,39 @@ #:initial-mode [initial-mode (Mode-bytes)] #:on-data on-data #:on-eof [on-eof void]) - (define peer - (object #:name name - [#:asserted (ConnectionHandler-connected source sink) - (handle-connection source sink - #:on-disconnect on-disconnect - #:on-error on-error - #:on-credit on-credit - #:initial-credit initial-credit - #:initial-mode initial-mode - #:on-data on-data - #:on-eof on-eof) - (stop-facet ringing-facet) - (on-connected source sink)] - [#:asserted (ConnectionHandler-rejected message) - (stop-facet ringing-facet) - ((or on-rejected (lambda (_message) (stop-current-facet))) message)])) - (define ringing-facet (react (at ds (assert (StreamConnect spec peer))))) - (void)) + (define connection-state 'pending) + (begin/dataflow (log-info "connection-state ~a" connection-state)) + (define (transition new-state) + (when (not (equal? connection-state new-state)) + (match* (connection-state new-state) + [('pending 'connected) + (when initial-credit (send-credit (peer-source) initial-credit initial-mode)) + (on-connect (peer-source) (peer-sink))] + [(_ 'disconnected) + (on-disconnect)] + [('pending (list 'error m)) + (on-rejected m)] + [(_ (list 'error m)) + (on-error m)]))) + + (define-field peer-source #f) + (define-field peer-sink #f) + (begin/dataflow + (when (and (peer-source) (peer-sink)) + (transition 'connected))) + + (define source (make-source #:name (list 'source name) + #:on-connect peer-sink + #:on-disconnect (lambda () (transition 'disconnected)) + #:on-error (lambda (m) (transition (list 'error m))) + #:on-credit on-credit)) + (define sink (make-sink #:name (list 'sink name) + #:on-connect peer-source + #:on-disconnect (lambda () (transition 'disconnected)) + #:on-error (lambda (m) (transition (list 'error m))) + #:on-data on-data + #:on-eof on-eof)) + (at ds (assert (StreamConnection source sink spec)))) ;;--------------------------------------------------------------------------- diff --git a/syndicate/drivers/tcp.rkt b/syndicate/drivers/tcp.rkt index 3b6f779..9cee497 100644 --- a/syndicate/drivers/tcp.rkt +++ b/syndicate/drivers/tcp.rkt @@ -22,45 +22,71 @@ (provide-service [ds] (with-services [syndicate/drivers/stream] (at ds - (during/spawn (StreamListener (TcpLocal $host $port) $peer) + ;; TODO: this is annoying. We have to pay attention to the *syntactic* structure of the + ;; listener's pattern in order to match all possible variants: + ;; - `variable`, where `variable`'s value matches `(TcpLocal _ _)` + ;; - `(TcpLocal variable1 variable2)` + ;; - `(TcpLocal "hostname" variable)` + ;; - `(TcpLocal variable 1234)` + ;; - `(TcpLocal "hostname" 1234)` + ;; + ;; POSSIBLE SOLUTION: have pattern analysis check to see if there are any binds or + ;; discards within a constructor application; if there are none, it may as well be a + ;; constant literal, so make it one. This is what the earlier Syndicate/js + ;; implementations do (because they don't have a compile-time constructor registry and + ;; have to decide whether to assume a compound or just evaluate some expression), and it + ;; works fine there. + + (during/spawn (Observe (:pattern (StreamConnection ,_ ,_ (TcpLocal ,$host-pat ,$port-pat))) _) + #:match [host (pattern->constant host-pat)] + #:match [port (pattern->constant port-pat)] + #:when (not (or (void? host) (void? port))) #:name (TcpLocal host port) - (run-listener ds peer host port)) + (run-listener ds host port)) - (during/spawn (StreamConnect (TcpRemote $host $port) $peer) + (during/spawn + (Observe (:pattern (StreamConnection ,_ ,_ ,(DLit (TcpLocal $host $port)))) _) + #:name (TcpLocal host port) + (run-listener ds host port)) + + (during/spawn (StreamConnection $source $sink (TcpRemote $host $port)) #:name (TcpRemote host port) - (run-outbound ds peer host port))))) + (run-outbound ds source sink host port))))) -(define (run-listener ds peer host port) - (on-start (log-syndicate/drivers/tcp-info "+listener on ~v ~v" host port)) - (on-stop (log-syndicate/drivers/tcp-info "-listener on ~v ~v" host port)) +(define (run-listener ds host port) + (define spec (TcpLocal host port)) + (on-start (log-syndicate/drivers/tcp-info "+listener on ~v" spec)) + (on-stop (log-syndicate/drivers/tcp-info "-listener on ~v" spec)) (linked-thread #:name (list 'listen-thread host port) (lambda (facet) - (with-connection-error-guard ds peer + (with-connection-error-guard ds (lambda (message) (turn! facet (lambda () - (at ds (assert (TcpListenError (TcpLocal host port) message))) - (at peer (assert (ConnectionHandler-rejected message)))))) + (log-syndicate/drivers/tcp-warning "~a" message) + (at ds (assert (StreamListenerError spec message)))))) (lambda () (define listener (tcp-listen port 512 #t host)) (lambda () + (turn! facet (lambda () + (at ds (assert (StreamListenerReady spec))))) (let loop () (define connection-custodian (make-custodian)) (define-values (i o) (parameterize ((current-custodian connection-custodian)) (tcp-accept listener))) - (turn! facet (lambda () (spawn-connection ds connection-custodian i o peer))) + (turn! facet (lambda () (spawn-connection ds connection-custodian i o spec #f #f))) (loop)))))))) (define (tcp-ends p) (call-with-values (lambda () (tcp-addresses p #t)) (lambda (lh lp rh rp) (list (TcpLocal lh lp) (TcpRemote rh rp))))) -(define (spawn-connection ds custodian i o peer) +(define (spawn-connection ds custodian i o spec peer-source peer-sink) (match-define (and ends (list (and local-end (TcpLocal local-host local-port)) (and remote-end (TcpRemote remote-host remote-port)))) (tcp-ends i)) (define name (format "[~a:~a::~a:~a]" local-host local-port remote-host remote-port)) - (log-syndicate/drivers/tcp-info "Connection ~a established" name) + (log-syndicate/drivers/tcp-info "TCP socket ~a for ~a established" name spec) (spawn #:name name (actor-add-exit-hook! this-actor (lambda () (close-input-port i) @@ -72,28 +98,31 @@ (react (on-stop (facet-count (- (facet-count) 1)) (close-input-port i)) - (set! source (port-source i #:custodian custodian)) + (set! source (port-source i #:initial-sink peer-sink #:custodian custodian)) (at ds (assert (TcpPeerInfo source local-end remote-end)))) (react (on-stop (facet-count (- (facet-count) 1)) (close-output-port o)) - (set! sink (port-sink o)) + (set! sink (port-sink o #:initial-source peer-source)) (at ds (assert (TcpPeerInfo sink local-end remote-end)))) - (at peer - (assert #:when (positive? (facet-count)) - (ConnectionHandler-connected source sink))))) + (when (TcpLocal? spec) + (at ds + (assert #:when (positive? (facet-count)) (StreamConnection source sink spec)))))) -(define (with-connection-error-guard ds peer error-proc proc) +(define (with-connection-error-guard ds error-proc proc) ((with-handlers ([exn:fail:network? (lambda (e) (lambda () (error-proc (exn->string e))))]) (proc)))) -(define (run-outbound ds peer host port) - (with-connection-error-guard ds peer +(define (run-outbound ds source sink host port) + (with-connection-error-guard ds (lambda (message) - (at peer (assert (ConnectionHandler-rejected message)))) + (log-syndicate/drivers/tcp-warning "~a" message) + (at source (assert (StreamError message))) + (at sink (assert (StreamError message)))) (lambda () (define connection-custodian (make-custodian)) (define-values (i o) (parameterize ((current-custodian connection-custodian)) (tcp-connect host port))) - (lambda () (spawn-connection ds connection-custodian i o peer))))) + (lambda () + (spawn-connection ds connection-custodian i o (TcpRemote host port) source sink))))) diff --git a/syndicate/schemas/stream.prs b/syndicate/schemas/stream.prs index a565087..a8c7ecc 100644 --- a/syndicate/schemas/stream.prs +++ b/syndicate/schemas/stream.prs @@ -1,24 +1,14 @@ version 1 . embeddedType EntityRef.Ref . -; Assertions +; Assertion: StreamConnection = . -StreamSpecListenable = . -StreamSpecConnectable = . -; Assertion -StreamListener = . +; Assertions: +StreamListenerReady = . +StreamListenerError = . -; Assertion -StreamConnect = . - -; Assertion -ConnectionHandler = - / @connected - / @rejected -. - -; Assertion +; Assertion: StreamError = . Source = @@ -40,7 +30,9 @@ Sink = / . +; Value: CreditAmount = @count int / @unbounded =unbounded . +; Value: Mode = =bytes / @lines LineMode / / . LineMode = =lf / =crlf . diff --git a/syndicate/schemas/tcp.prs b/syndicate/schemas/tcp.prs index b00df01..7a6d5e1 100644 --- a/syndicate/schemas/tcp.prs +++ b/syndicate/schemas/tcp.prs @@ -5,5 +5,3 @@ TcpRemote = . TcpLocal = . TcpPeerInfo = . - -TcpListenError = . diff --git a/syndicate/syntax-classes.rkt b/syndicate/syntax-classes.rkt index 68f346f..57f59af 100644 --- a/syndicate/syntax-classes.rkt +++ b/syndicate/syntax-classes.rkt @@ -5,7 +5,8 @@ (provide (for-syntax - )) + + )) (require (for-syntax racket/base)) (require (for-syntax syntax/parse)) @@ -25,4 +26,7 @@ (define-splicing-syntax-class (pattern (~seq #:link? L)) - (pattern (~seq) #:attr L #'#f))) + (pattern (~seq) #:attr L #'#f)) + + (define-splicing-syntax-class + (pattern (~seq (~seq #:match [pattern-pieces ...+ discriminant]) ...)))) diff --git a/syndicate/syntax.rkt b/syndicate/syntax.rkt index 9eb6a8f..257ce91 100644 --- a/syndicate/syntax.rkt +++ b/syndicate/syntax.rkt @@ -212,8 +212,10 @@ (define-syntax-rule (on-stop expr ...) (facet-on-stop! this-facet (lambda () expr ...))) -(define-syntax-rule (sync! peer expr ...) - (turn-sync! this-turn peer (lambda (_reply) expr ...))) +(define-syntax (sync! stx) + (syntax-parse stx + [(_ peer expr ...) + (syntax/loc stx (turn-sync! this-turn peer (lambda (_reply) expr ...)))])) (define-for-syntax (with-valid-this-target orig-stx result-stx) ;; Invoke this-target transformer for its side effect: when it's @@ -233,32 +235,44 @@ (define-syntax (spawn stx) (syntax-parse stx - [(_ name: daemon:) + [(_ matches: condition: name: daemon:) (raise-syntax-error #f "Need body in spawn")] - [(_ name: daemon: setup-expr ...) - #'(turn-spawn! #:name name.N - #:daemon? daemon.D - this-turn - (lambda () - (syntax-parameterize ([this-target illegal-use-of-this-target]) - setup-expr ...)))])) + [(_ matches: condition: name: daemon: setup-expr ...) + #'(nested-matches + [[matches.pattern-pieces ... matches.discriminant] ...] + (when condition.E + (turn-spawn! #:name name.N + #:daemon? daemon.D + this-turn + (lambda () + (syntax-parameterize ([this-target illegal-use-of-this-target]) + setup-expr ...)))))])) (define-syntax (spawn/link stx) (syntax-parse stx - [(_ name-stx: daemon: setup-expr ...) - #`(begin - (define name name-stx.N) - (define monitor (ref (entity/stop-on-retract #:name (list name 'monitor-in-parent)))) - (define monitor-handle (turn-assert! this-turn monitor 'alive)) - (turn-spawn! this-turn - #:name name - #:daemon? daemon.D - #:link - (entity/stop-on-retract #:name (list name 'monitor-in-child)) - (lambda () - (syntax-parameterize ([this-target illegal-use-of-this-target]) - setup-expr ...)) - (hasheq monitor-handle #t)))])) + [(_ matches: condition: name-stx: daemon: setup-expr ...) + #`(nested-matches + [[matches.pattern-pieces ... matches.discriminant] ...] + (when condition.E + (define name name-stx.N) + (define monitor (ref (entity/stop-on-retract #:name (list name 'monitor-in-parent)))) + (define monitor-handle (turn-assert! this-turn monitor 'alive)) + (turn-spawn! this-turn + #:name name + #:daemon? daemon.D + #:link + (entity/stop-on-retract #:name (list name 'monitor-in-child)) + (lambda () + (syntax-parameterize ([this-target illegal-use-of-this-target]) + setup-expr ...)) + (hasheq monitor-handle #t))))])) + +(define-syntax nested-matches + (syntax-rules () + [(_ [] body ...) + (begin body ...)] + [(_ [[p ... e] more ...] body ...) + (match e [p ... (nested-matches [more ...] body ...)] [_ (void)])])) (define-syntax-rule (begin/dataflow expr ...) (turn-dataflow! this-turn (lambda () expr ...))) @@ -364,12 +378,12 @@ (define-syntax during/spawn (lambda (stx) (syntax-parse stx - [(_ pat name-stx: daemon: expr ...) + [(_ pat expr ...) (quasisyntax/loc stx (assert (Observe (:pattern pat) (ref (during* (lambda (bindings) (match-define (list #,@(analyse-pattern-bindings #'pat)) bindings) - (spawn/link #:name name-stx.N #:daemon? daemon.D expr ...)))))))]))) + (spawn/link expr ...)))))))]))) (define (during* f #:name [name '?]) (define assertion-map (make-hash))