diff --git a/syndicate-examples/box-and-client.rkt b/syndicate-examples/box-and-client.rkt index dc7f2eb..1564742 100644 --- a/syndicate-examples/box-and-client.rkt +++ b/syndicate-examples/box-and-client.rkt @@ -13,18 +13,18 @@ (define-field current-value 0) (at ds (assert (box-state (current-value))) - (when (message (set-box $new-value)) + (on (message (set-box $new-value)) (log-info "box: taking on new-value ~v" new-value) (current-value new-value))) - (stop-when-true (= (current-value) 10) + (stop-on-true (= (current-value) 10) (log-info "box: terminating"))) (spawn #:name 'client (at ds - (stop-when (retracted (Observe (:pattern (set-box ,_)) _)) + (stop-on (retracted (Observe (:pattern (set-box ,_)) _)) (log-info "client: box has gone")) - (when (asserted (box-state $v)) + (on (asserted (box-state $v)) (log-info "client: learned that box's value is now ~v" v) - (send! ds (set-box (+ v 1)))) - (when (retracted (box-state _)) + (send! (set-box (+ v 1)))) + (on (retracted (box-state _)) (log-info "client: box state disappeared")))))) diff --git a/syndicate-examples/speed-tests/box-and-client/with-dataspace.rkt b/syndicate-examples/speed-tests/box-and-client/with-dataspace.rkt index 50ff3cc..b2b3457 100644 --- a/syndicate-examples/speed-tests/box-and-client/with-dataspace.rkt +++ b/syndicate-examples/speed-tests/box-and-client/with-dataspace.rkt @@ -11,7 +11,7 @@ (define reporter (report-stats REPORT_EVERY)) (at ds (assert (BoxState (value))) - (when (message (SetBox $new-value)) + (on (message (SetBox $new-value)) (reporter new-value) (when (= new-value LIMIT) (stop-current-facet)) (value new-value))))) @@ -20,7 +20,7 @@ (spawn #:name 'client (define root-facet this-facet) (at ds - (when (asserted (BoxState $value)) (send! ds (SetBox (+ value 1)))) + (on (asserted (BoxState $value)) (send! (SetBox (+ value 1)))) (during (BoxState _) (on-stop (log-info "Client detected box termination") (stop-facet root-facet)))))) diff --git a/syndicate-examples/tcp-client-naive.rkt b/syndicate-examples/tcp-client-naive.rkt new file mode 100644 index 0000000..5129d1c --- /dev/null +++ b/syndicate-examples/tcp-client-naive.rkt @@ -0,0 +1,27 @@ +#lang syndicate +;;; SPDX-License-Identifier: LGPL-3.0-or-later +;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones + +(module+ main + (require racket/cmdline) + (require (only-in racket/port read-line-evt)) + (require (only-in racket/string string-trim)) + (require syndicate/drivers/tcp) + (require syndicate/drivers/stream) + (require syndicate/drivers/racket-event) + + (define host "127.0.0.1") + (define port 5999) + + (command-line #:once-each + [("--host" "-H") hostname "Set hostname to connect to" + (set! host hostname)] + [("--port" "-p") port-number "Set port number to connect to" + (set! port (string->number port-number))]) + + (actor-system/dataspace (ds) + (spawn-racket-event-driver ds) + (spawn-tcp-driver ds) + (spawn + (at ds + (assert (StreamConnection (port-lines-source ds) (port-sink) (TcpRemote host port))))))) diff --git a/syndicate-examples/tcp-client.rkt b/syndicate-examples/tcp-client.rkt index 06c5377..a5f5c6a 100644 --- a/syndicate-examples/tcp-client.rkt +++ b/syndicate-examples/tcp-client.rkt @@ -5,7 +5,6 @@ (module+ main (require racket/cmdline) (require (only-in racket/port read-line-evt)) - (require (only-in racket/string string-trim)) (require syndicate/drivers/tcp) (require syndicate/drivers/racket-event) @@ -27,7 +26,7 @@ #:initial-mode (Mode-lines (LineMode-lf)) #:on-connected (lambda (source sink) (at ds - (when (message (RacketEvent (read-line-evt (current-input-port)) $vs)) + (on (message (RacketEvent (read-line-evt (current-input-port)) $vs)) (match (car vs) [(? eof-object?) (stop-current-facet (log-info "EOF on stdin."))] [line (send-line sink line)])))) diff --git a/syndicate-examples/tcp-echo-server-explicit.rkt b/syndicate-examples/tcp-echo-server-explicit.rkt new file mode 100644 index 0000000..15c1581 --- /dev/null +++ b/syndicate-examples/tcp-echo-server-explicit.rkt @@ -0,0 +1,28 @@ +#lang syndicate +;;; SPDX-License-Identifier: LGPL-3.0-or-later +;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones + +(module+ main + (require racket/cmdline) + (require syndicate/drivers/tcp) + + (define host "0.0.0.0") + (define port 5999) + + (command-line #:once-each + [("--host" "-H") hostname "Set hostname to listen on" + (set! host hostname)] + [("--port" "-p") port-number "Set port number to listen on" + (set! port (string->number port-number))]) + + (actor-system/dataspace (ds) + (spawn-tcp-driver ds) + (spawn + (at ds + (assert (StreamListener (TcpLocal host port) + (make-connection-handler + (lambda (source sink) + (handle-connection source sink + #:on-data + (lambda (data mode) + (send-data sink data mode))))))))))) diff --git a/syndicate-examples/tcp-echo-server.rkt b/syndicate-examples/tcp-echo-server.rkt index f09b91e..7173fdb 100644 --- a/syndicate-examples/tcp-echo-server.rkt +++ b/syndicate-examples/tcp-echo-server.rkt @@ -19,6 +19,7 @@ (spawn-tcp-driver ds) (spawn (at ds + (stop-on (asserted (TcpListenError (TcpLocal host port) $message))) (during/spawn (StreamConnection $source $sink (TcpLocal host port)) (handle-connection source sink #:on-data (lambda (data mode) (send-data sink data mode)))))))) diff --git a/syndicate-examples/tcp-relay-server.rkt b/syndicate-examples/tcp-relay-server.rkt index a7169af..fec555d 100644 --- a/syndicate-examples/tcp-relay-server.rkt +++ b/syndicate-examples/tcp-relay-server.rkt @@ -21,8 +21,9 @@ (spawn-tcp-driver ds) (spawn (at ds + (stop-on (asserted (TcpListenError (TcpLocal host port) $message))) (during/spawn (StreamConnection $source $sink (TcpLocal host port)) (handle-connection source sink #:initial-mode (Mode-lines (LineMode-lf)) #:on-data (lambda (data mode) (send! ds (Line data)))) - (at ds (when (message (Line $data)) (send-line sink data)))))))) + (at ds (on (message (Line $data)) (send-line sink data)))))))) diff --git a/syndicate-examples/timer-demo.rkt b/syndicate-examples/timer-demo.rkt index 95a3386..85b92a8 100644 --- a/syndicate-examples/timer-demo.rkt +++ b/syndicate-examples/timer-demo.rkt @@ -9,5 +9,7 @@ (spawn-timer-driver ds) (spawn (at ds (log-info "waiting...") - (stop-when-timeout 2000 + (on (timeout 1000) + (log-info "still waiting...")) + (stop-on (timeout 2000) (log-info "done!")))))) diff --git a/syndicate/dataspace.rkt b/syndicate/dataspace.rkt index d1db868..8c5e02a 100644 --- a/syndicate/dataspace.rkt +++ b/syndicate/dataspace.rkt @@ -9,6 +9,7 @@ dataspace actor-system/dataspace) +(require racket/pretty) (require racket/match) (require preserves) @@ -23,6 +24,18 @@ (define-logger syndicate/dataspace) +(match (getenv "SYNDICATE_COLUMNS") + [#f (void)] + [n (pretty-print-columns (string->number n))]) + +(define (pretty-assertion indent value) + (define gap (make-string indent #\space)) + (parameterize ((pretty-print-print-line + (lambda (line-number port line-length columns) + (fprintf port "\n~a" gap) + indent))) + (pretty-format value #:mode 'print))) + (define (dataspace #:name [name (gensym 'dataspace)]) (define handles (make-hash)) (define assertions (make-bag)) @@ -30,7 +43,7 @@ (define ds-e (entity #:name name #:assert (lambda (value handle) - (log-syndicate/dataspace-debug "~v + ~v ~v" ds-e handle value) + (log-syndicate/dataspace-debug "~v + ~v~a" ds-e handle (pretty-assertion 4 value)) (define maybe-observe (parse-Observe value)) (hash-set! handles handle (cons value maybe-observe)) (when (eq? (bag-change! assertions value +1) 'absent->present) @@ -42,7 +55,7 @@ (match (hash-ref handles upstream-handle #f) [#f (error 'dataspace "Peer retracted unknown handle ~v" upstream-handle)] [(cons value maybe-observe) - (log-syndicate/dataspace-debug "~v - ~v ~v" ds-e upstream-handle value) + (log-syndicate/dataspace-debug "~v - ~v~a" ds-e upstream-handle (pretty-assertion 4 value)) (hash-remove! handles upstream-handle) (when (eq? (bag-change! assertions value -1) 'present->absent) (remove-assertion! this-turn skeleton value) @@ -50,7 +63,7 @@ [(? eof-object?) (void)] [(Observe pat ref) (remove-interest! this-turn skeleton pat ref)]))])) #:message (lambda (message) - (log-syndicate/dataspace-debug "~v ! ~v" ds-e message) + (log-syndicate/dataspace-debug "~v !~a" ds-e (pretty-assertion 4 message)) (send-assertion! this-turn skeleton message)))) (ref ds-e)) diff --git a/syndicate/driver-support.rkt b/syndicate/driver-support.rkt index 800a598..9f365e1 100644 --- a/syndicate/driver-support.rkt +++ b/syndicate/driver-support.rkt @@ -13,24 +13,26 @@ #:name [name (gensym 'linked-thread)] #:peer [peer (ref (entity/stop-on-retract #:name (list name 'monitor)))] #:custodian [c (make-custodian)]) + (define facet this-facet) + (define actor this-actor) + (define handle #f) (define armed? #t) (define (!) (when armed? - (set! armed? #f) (log-syndicate/driver-support-info "~a shutdown" name) - (turn-retract! this-turn handle) - (queue-task! (actor-engine this-actor) (lambda () (custodian-shutdown-all c))) - (actor-remove-exit-hook! this-actor !))) + (set! armed? #f) + (turn! facet (lambda () (turn-retract! this-turn handle))) + (queue-task! (actor-engine actor) (lambda () (custodian-shutdown-all c))) + (actor-remove-exit-hook! actor !))) (on-stop (!)) - (actor-add-exit-hook! this-actor !) + (actor-add-exit-hook! actor !) (log-syndicate/driver-support-info "~a startup" name) (set! handle (parameterize ((current-custodian c)) - (define facet this-facet) (turn-assert! this-turn peer (embedded @@ -43,4 +45,4 @@ (exn->string e)) (void))]) (thread-proc facet)) - (turn! facet !)))))))) + (!)))))))) diff --git a/syndicate/drivers/racket-event.rkt b/syndicate/drivers/racket-event.rkt index 4be61ef..fc52c64 100644 --- a/syndicate/drivers/racket-event.rkt +++ b/syndicate/drivers/racket-event.rkt @@ -14,17 +14,19 @@ (define-logger syndicate/drivers/racket-event) (define (spawn-racket-event-driver ds) - (at ds - (during/spawn (Observe (:pattern (RacketEvent ,(DLit $embedded-event) ,_)) _) - #:name (embedded-value embedded-event) - (define event (embedded-value embedded-event)) - (on-start (log-syndicate/drivers/racket-event-debug "started listening: ~v" event)) - (on-stop (log-syndicate/drivers/racket-event-debug "stopped listening: ~v" event)) - (linked-thread - #:name (list event 'thread) - (lambda (facet) - (let loop () - (sync (handle-evt event - (lambda args - (turn! facet (lambda () (send! ds (RacketEvent event args)))) - (loop)))))))))) + (spawn #:name 'racket-event-driver + #:daemon? #t + (at ds + (during/spawn (Observe (:pattern (RacketEvent ,(DLit $embedded-event) ,_)) _) + #:name (embedded-value embedded-event) + (define event (embedded-value embedded-event)) + (on-start (log-syndicate/drivers/racket-event-debug "started listening: ~v" event)) + (on-stop (log-syndicate/drivers/racket-event-debug "stopped listening: ~v" event)) + (linked-thread + #:name (list event 'thread) + (lambda (facet) + (let loop () + (sync (handle-evt event + (lambda args + (turn! facet (lambda () (send! ds (RacketEvent event args)))) + (loop))))))))))) diff --git a/syndicate/drivers/stream.rkt b/syndicate/drivers/stream.rkt new file mode 100644 index 0000000..764b610 --- /dev/null +++ b/syndicate/drivers/stream.rkt @@ -0,0 +1,47 @@ +#lang syndicate +;;; SPDX-License-Identifier: LGPL-3.0-or-later +;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones + +(provide port-lines-source + port-sink) + +(require (only-in racket/port read-line-evt)) + +(require "tcp.rkt") ;; ugh, lots of tcp.rkt actually belongs in this file +(require "racket-event.rkt") + +(define (port-lines-source ds [port (current-input-port)] + #:initial-credit [initial-credit 0] + #:name [name (list 'port-lines-source (object-name port))] + #:line-mode [line-mode (LineMode-lf)]) + (define-field credit initial-credit) + (define-field sink #f) + (at ds + (on (message #:when (and (sink) (positive? (credit))) (RacketEvent (read-line-evt port) $vs)) + (credit (- (credit) 1)) + (match (car vs) + [(? eof-object?) (send-eof (sink))] + [line (send-line (sink) line line-mode)]))) + (make-source #:name name + #:on-connect sink + #:on-credit (lambda (amount mode) + (if (equal? amount (CreditAmount-unbounded)) + (credit +inf.0) + (match mode + [(Mode-lines _) + (credit (+ (credit) (CreditAmount-count-value amount)))] + [_ (void)]))))) + +(define (port-sink [port (current-output-port)] + #:name [name (list 'port-sink (object-name port))]) + (make-sink #:name name + #:on-connect (lambda (source) (send-credit source (CreditAmount-unbounded) (Mode-bytes))) + #:on-eof (lambda () (close-output-port port)) + #:on-data (lambda (data mode) + (when (bytes? data) + (write-bytes data port) + (match mode + [(Mode-bytes) (void)] + [(Mode-lines (LineMode-lf)) (write-bytes #"\n" port)] + [(Mode-lines (LineMode-crlf)) (write-bytes #"\r\n" port)]) + (flush-output port))))) diff --git a/syndicate/drivers/tcp.rkt b/syndicate/drivers/tcp.rkt index e1d72dc..462b274 100644 --- a/syndicate/drivers/tcp.rkt +++ b/syndicate/drivers/tcp.rkt @@ -26,6 +26,7 @@ (require (only-in racket/exn exn->string)) (require syndicate/driver-support) (require syndicate/functional-queue) +(require syndicate/pattern) (require syndicate/schemas/gen/stream) (require syndicate/schemas/gen/tcp) (require syndicate/schemas/gen/dataspace-patterns) @@ -40,30 +41,61 @@ #:daemon? #t (at ds - (during/spawn - (Observe (:pattern (StreamConnection ,_ ,_ (TcpLocal ,(DLit $host) ,(DLit $port)))) _) - #:name (list 'simple (TcpLocal host port)) - (define spec (TcpLocal host port)) - (at ds - (assert (StreamListener spec - (make-connection-handler - (lambda (source sink) - (at ds (assert (StreamConnection source sink spec))))))))) + (during/spawn (Observe (:pattern (StreamConnection ,_ ,_ ,$spec-pat)) _) + #:name (list 'simple-listener spec-pat) + (match (pattern->constant spec-pat) + [(? void?) (stop-current-facet)] + [spec (at ds + (during (StreamSpecListenable spec) + (assert + (StreamListener spec + (make-connection-handler + (lambda (source sink) + (assert (StreamConnection source sink spec))))))))])) - (during/spawn (StreamConnection $app-source $app-sink (TcpRemote $host $port)) - #:name (list 'simple (TcpRemote host port)) - (define spec (TcpRemote host port)) + (during/spawn (StreamConnection $app-source $app-sink $spec) + #:name (list 'simple-connection spec) (at ds (assert (StreamConnect spec (object #:name 'connection-peer - [(ConnectionHandler-connected tcp-source tcp-sink) - (at tcp-source (assert (Source-sink app-sink))) - (at tcp-sink (assert (Sink-source app-source)))] + [(ConnectionHandler-connected sys-source sys-sink) + (at sys-source (assert (Source-sink app-sink))) + (at sys-sink (assert (Sink-source app-source)))] [(ConnectionHandler-rejected message) (log-syndicate/drivers/tcp-error "Connection to ~a rejected: ~a" spec message) + (at app-source (assert (StreamError message))) + (at app-sink (assert (StreamError message))) (stop-current-facet)]))))) + ;; I translate interest in StreamListener with a particular spec-pattern into a facet + ;; that reacts to interest in StreamSpecListenable with a spec matching the spec-pattern + ;; by asserting StreamSpecListenable with that spec. + (during (Observe (:pattern (StreamListener ,$spec-pat ,_)) _) + (define listenable-asserter + (object [bindings + (define spec + (pattern->constant spec-pat (lambda (_name index) (list-ref bindings index)))) + (assert (StreamSpecListenable spec))])) + (assert + (Observe (:pattern + (Observe (:pattern (StreamSpecListenable ,,(:pattern (DLit ,spec-pat)))) _)) + listenable-asserter))) + + ;; I translate interest in StreamConnect with a particular spec-pattern into a facet that + ;; reacts to interest in StreamSpecConnectable with a spec matching the spec-pattern by + ;; asserting StreamSpecConnectable with that spec. + (during (Observe (:pattern (StreamConnect ,$spec-pat ,_)) _) + (define connectable-asserter + (object [bindings + (define spec + (pattern->constant spec-pat (lambda (_name index) (list-ref bindings index)))) + (assert (StreamSpecConnectable spec))])) + (assert + (Observe (:pattern + (Observe (:pattern (StreamSpecConnectable ,,(:pattern (DLit ,spec-pat)))) _)) + connectable-asserter))) + (during/spawn (StreamListener (TcpLocal $host $port) $peer) #:name (TcpLocal host port) (run-listener ds peer host port)) @@ -78,13 +110,20 @@ (linked-thread #:name (list 'listen-thread host port) (lambda (facet) - (define listener (tcp-listen port 512 #t host)) - (let loop () - (define connection-custodian (make-custodian)) - (define-values (i o) (parameterize ((current-custodian connection-custodian)) - (tcp-accept listener))) - (turn! facet (lambda () (spawn-connection ds connection-custodian i o peer))) - (loop))))) + (with-connection-error-guard ds peer + (lambda (message) + (turn! facet (lambda () + (at ds (assert (TcpListenError (TcpLocal host port) message))) + (at peer (assert (ConnectionHandler-rejected message)))))) + (lambda () + (define listener (tcp-listen port 512 #t host)) + (lambda () + (let loop () + (define connection-custodian (make-custodian)) + (define-values (i o) (parameterize ((current-custodian connection-custodian)) + (tcp-accept listener))) + (turn! facet (lambda () (spawn-connection ds connection-custodian i o peer))) + (loop)))))))) (define (tcp-ends p) (call-with-values (lambda () (tcp-addresses p #t)) @@ -115,23 +154,40 @@ (react (on-stop (facet-count (- (facet-count) 1)) (close-output-port o)) + (define active-source #f) + (define relay (outbound-relay o)) (set! sink (make-sink #:name 'socket-out - #:on-data (outbound-relay o) + #:on-connect + (lambda (new-source) + (set! active-source new-source) + (send-credit active-source (CreditAmount-unbounded) (Mode-bytes))) + #:on-data + (lambda (data mode) + (relay data mode) + (match mode + [(Mode-bytes) (send-bytes-credit active-source (bytes-length data))] + [(Mode-lines lm) (send-lines-credit active-source 1 lm)] + [(Mode-packet n) (send-packet-credit active-source n)] + [(Mode-object _) (void)])) #:on-eof (lambda () (stop-current-facet)))) (at ds (assert (TcpPeerInfo sink local-end remote-end)))) (at peer (assert #:when (positive? (facet-count)) (ConnectionHandler-connected source sink))))) +(define (with-connection-error-guard ds peer error-proc proc) + ((with-handlers ([exn:fail:network? (lambda (e) (lambda () (error-proc (exn->string e))))]) + (proc)))) + (define (run-outbound ds peer host port) - ((with-handlers ([exn:fail:network? - (lambda (e) - (lambda () - (at peer (assert (ConnectionHandler-rejected (exn->string e))))))]) - (define connection-custodian (make-custodian)) - (define-values (i o) (parameterize ((current-custodian connection-custodian)) - (tcp-connect host port))) - (lambda () (spawn-connection ds connection-custodian i o peer))))) + (with-connection-error-guard ds peer + (lambda (message) + (at peer (assert (ConnectionHandler-rejected message)))) + (lambda () + (define connection-custodian (make-custodian)) + (define-values (i o) (parameterize ((current-custodian connection-custodian)) + (tcp-connect host port))) + (lambda () (spawn-connection ds connection-custodian i o peer))))) (define (start-inbound-relay custodian target-proc i) (define eof-received? #f) @@ -273,8 +329,8 @@ (define handle #f) (define (set-sink! new-sink) (when (not (eq? sink new-sink)) - (on-connect new-sink) (set! sink new-sink) + (when sink (on-connect sink)) (set! handle (turn-replace! this-turn sink handle (if sink (->preserve (Sink-source self)) (void)))))) @@ -311,8 +367,8 @@ (define handle #f) (define (set-source! new-source) (when (not (eq? new-source source)) - (on-connect new-source) (set! source new-source) + (when source (on-connect source)) (set! handle (turn-replace! this-turn source handle (if source (->preserve (Source-sink self)) (void)))))) @@ -379,28 +435,28 @@ ;;--------------------------------------------------------------------------- -(define (send-credit conn amount mode) - (send! conn (Source-credit amount mode))) +(define (send-credit source amount mode) + (send! source (Source-credit amount mode))) -(define (send-lines-credit conn amount [mode (LineMode-lf)]) - (send-credit conn (CreditAmount-count amount) (Mode-lines mode))) +(define (send-lines-credit source amount [mode (LineMode-lf)]) + (send-credit source (CreditAmount-count amount) (Mode-lines mode))) -(define (send-bytes-credit conn amount) - (send-credit conn (CreditAmount-count amount) (Mode-bytes))) +(define (send-bytes-credit source amount) + (send-credit source (CreditAmount-count amount) (Mode-bytes))) -(define (send-packet-credit conn packet-size #:count [count 1]) - (send-credit conn (CreditAmount-count count) (Mode-packet packet-size))) +(define (send-packet-credit source packet-size #:count [count 1]) + (send-credit source (CreditAmount-count count) (Mode-packet packet-size))) (define (->bytes data) (if (bytes? data) data (string->bytes/utf-8 data))) -(define (send-line conn line [line-mode (LineMode-lf)]) - (send! conn (Sink-data (->bytes line) (Mode-lines line-mode)))) +(define (send-line sink line [line-mode (LineMode-lf)]) + (send! sink (Sink-data (->bytes line) (Mode-lines line-mode)))) -(define (send-data conn data [mode (Mode-bytes)]) - (send! conn (Sink-data (->bytes data) mode))) +(define (send-data sink data [mode (Mode-bytes)]) + (send! sink (Sink-data (->bytes data) mode))) -(define (send-eof conn) - (send! conn (Sink-eof))) +(define (send-eof sink) + (send! sink (Sink-eof))) diff --git a/syndicate/drivers/timer.rkt b/syndicate/drivers/timer.rkt index 87fc3b4..a2a7f5d 100644 --- a/syndicate/drivers/timer.rkt +++ b/syndicate/drivers/timer.rkt @@ -4,8 +4,7 @@ (provide (all-from-out syndicate/schemas/gen/timer) spawn-timer-driver - on-timeout - stop-when-timeout) + timeout) (require syndicate/driver-support) (require syndicate/engine) @@ -90,7 +89,7 @@ (loop)]))))))) (at ds - (when (message ($ instruction (SetTimer _ _ _))) + (on (message ($ instruction (SetTimer _ _ _))) (log-syndicate/drivers/timer-debug "received instruction ~a" instruction) (channel-put control-ch instruction)) @@ -99,18 +98,14 @@ msecs (current-inexact-milliseconds)) (define timer-id (gensym 'timestate)) - (on-start (send! ds (SetTimer timer-id msecs (TimerKind-absolute)))) - (on-stop (send! ds (SetTimer timer-id msecs (TimerKind-clear)))) - (at ds - (when (message (TimerExpired timer-id _)) - (react (at ds (assert (LaterThan msecs)))))))))) + (on-start (send! (SetTimer timer-id msecs (TimerKind-absolute)))) + (on-stop (send! (SetTimer timer-id msecs (TimerKind-clear)))) + (on (message (TimerExpired timer-id _)) + (react (assert (LaterThan msecs)))))))) -(define-syntax-rule (on-timeout relative-msecs body ...) - (let ((timer-id (gensym 'timeout))) - (on-start (send! this-target (SetTimer timer-id relative-msecs (TimerKind-relative)))) - (at this-target - (when (message (TimerExpired timer-id _)) - body ...)))) - -(define-syntax-rule (stop-when-timeout relative-msecs body ...) - (on-timeout relative-msecs (stop-current-facet body ...))) +(define-event-expander timeout + (syntax-rules () + [(_ [relative-msecs] body ...) + (let ((timer-id (gensym 'timeout))) + (on-start (send! (SetTimer timer-id relative-msecs (TimerKind-relative)))) + (on (message (TimerExpired timer-id _)) body ...))])) diff --git a/syndicate/entity-ref.rkt b/syndicate/entity-ref.rkt index 6ebeb20..2dd1a83 100644 --- a/syndicate/entity-ref.rkt +++ b/syndicate/entity-ref.rkt @@ -19,6 +19,7 @@ (require "engine.rkt") (struct entity (id name assert retract message sync data) + #:transparent #:methods gen:custom-write [(define (write-proc e port mode) (fprintf port "#" (entity-id e) (entity-name e)))]) diff --git a/syndicate/pattern.rkt b/syndicate/pattern.rkt index 058eada..b14500e 100644 --- a/syndicate/pattern.rkt +++ b/syndicate/pattern.rkt @@ -17,6 +17,9 @@ pattern->capture-paths pattern->capture-names + pattern->constant + ;; quote-pattern + !dump-registered-preserves-patterns! (all-from-out "schemas/gen/dataspace-patterns.rkt")) @@ -29,6 +32,7 @@ (require (for-syntax syntax/id-table)) (require (for-syntax syntax/stx)) +(require preserves) (require preserves-schema) (require racket/match) (require racket/list) @@ -347,6 +351,81 @@ (lambda (key-rev name-stx) (list name-stx)) (lambda (key-rev value) (list)))) +(define (pattern->constant desc [env (lambda (name index) (void))]) + (define next-binding-index 0) + (define (walk p k) + (match p + [(Pattern-DDiscard (DDiscard)) (void)] + [(Pattern-DBind (DBind name pat)) + (let ((v (env name next-binding-index))) + (set! next-binding-index (+ next-binding-index 1)) + (let ((inner (walk pat values))) + (k (if (void? v) inner v))))] + [(Pattern-DLit (DLit value)) (k value)] + [(Pattern-DCompound (DCompound-rec (CRec label arity) members)) + (let loop ((fields-rev '()) (i 0)) + (if (= i arity) + (k (record label (reverse fields-rev))) + (let ((vpat (hash-ref members i #f))) + (if vpat + (walk vpat (lambda (v) (loop (cons v fields-rev) (+ i 1)))) + (void)))))] + [(Pattern-DCompound (DCompound-arr (CArr arity) members)) + (let loop ((items-rev '()) (i 0)) + (if (= i arity) + (k (reverse items-rev)) + (let ((vpat (hash-ref members i #f))) + (if vpat + (walk vpat (lambda (v) (loop (cons v items-rev) (+ i 1)))) + (void)))))] + [(Pattern-DCompound (DCompound-dict (CDict) members)) + (let loop ((items (hash)) (entries (hash->list members))) + (match entries + ['() (k items)] + [(cons (cons key vpat) more) + (walk vpat (lambda (v) (loop (hash-set items key v) more)))]))])) + (walk (parse-Pattern desc) values)) + +;; (define (quote-pattern p) +;; (match p +;; [(Pattern-DDiscard (DDiscard)) +;; (Pattern-DCompound (DCompound-rec (CRec '_ 0) (hash)))] +;; [(Pattern-DBind (DBind name pat)) +;; (Pattern-DCompound (DCompound-rec (CRec 'bind 2) +;; (hash 0 (Pattern-DLit (DLit name)) 1 (quote-pattern pat))))] +;; [(Pattern-DLit value) +;; (Pattern-DCompound (DCompound-rec (CRec 'lit 1) (hash 0 (Pattern-DLit (DLit value)))))] +;; [(Pattern-DCompound (DCompound-rec (CRec label arity) members)) +;; (Pattern-DCompound +;; (DCompound-rec (CRec 'compound 2) +;; (hash 0 (Pattern-DCompound +;; (DCompound-rec (CRec 'rec 2) +;; (hash 0 (Pattern-DLit (DLit label)) +;; 1 (Pattern-DLit (DLit arity))))) +;; 1 (Pattern-DCompound +;; (DCompound-dict (CDict) +;; (for/hash ([(kv vp) (in-hash members)]) +;; (values kv (quote-pattern vp))))))))] +;; [(Pattern-DCompound (DCompound-arr (CArr arity) members)) +;; (Pattern-DCompound +;; (DCompound-rec (CRec 'compound 2) +;; (hash 0 (Pattern-DCompound +;; (DCompound-rec (CRec 'arr 1) +;; (hash 0 (Pattern-DLit (DLit arity))))) +;; 1 (Pattern-DCompound +;; (DCompound-dict (CDict) +;; (for/hash ([(kv vp) (in-hash members)]) +;; (values kv (quote-pattern vp))))))))] +;; [(Pattern-DCompound (DCompound-dict (CDict) members)) +;; (Pattern-DCompound +;; (DCompound-rec (CRec 'compound 2) +;; (hash 0 (Pattern-DCompound +;; (DCompound-rec (CRec 'dict 0) (hash))) +;; 1 (Pattern-DCompound +;; (DCompound-dict (CDict) +;; (for/hash ([(kv vp) (in-hash members)]) +;; (values kv (quote-pattern vp))))))))])) + (define-syntax (!dump-registered-preserves-patterns! stx) (syntax-case stx () [(_) diff --git a/syndicate/schemas/stream.prs b/syndicate/schemas/stream.prs index 918c8f9..a565087 100644 --- a/syndicate/schemas/stream.prs +++ b/syndicate/schemas/stream.prs @@ -1,7 +1,10 @@ version 1 . embeddedType EntityRef.Ref . +; Assertions StreamConnection = . +StreamSpecListenable = . +StreamSpecConnectable = . ; Assertion StreamListener = . diff --git a/syndicate/schemas/tcp.prs b/syndicate/schemas/tcp.prs index 7a6d5e1..b00df01 100644 --- a/syndicate/schemas/tcp.prs +++ b/syndicate/schemas/tcp.prs @@ -5,3 +5,5 @@ TcpRemote = . TcpLocal = . TcpPeerInfo = . + +TcpListenError = . diff --git a/syndicate/syntax.rkt b/syndicate/syntax.rkt index 34bdf57..9eb6a8f 100644 --- a/syndicate/syntax.rkt +++ b/syndicate/syntax.rkt @@ -28,14 +28,14 @@ begin/dataflow define/dataflow - stop-when-true + stop-on-true entity/stop-on-retract this-target at assert - stop-when - (rename-out [event:when when]) + stop-on + on during during/spawn during*) @@ -62,7 +62,10 @@ (syntax-case stx () [id (identifier? #'id) - #'(or (current-turn) (error 'this-turn "Illegal use outside an Actor turn"))])))) + #'(this-turn*)])))) + +(define (this-turn*) + (or (current-turn) (error 'this-turn "Illegal use outside an Actor turn"))) (define-syntax-rule (with-this-turn turn-expr expr ...) (parameterize ([current-turn turn-expr]) @@ -187,7 +190,7 @@ [(_ [#:do expr e ...] body ...) #'(begin expr (let-event [e ...] body ...))] [(_ [e0 e ...] body ...) - #'(react (stop-when e0 (let-event [e ...] body ...)))])) + #'(react (stop-on e0 (let-event [e ...] body ...)))])) (define-syntax-rule (define-field id initial-value) (define id (turn-field! this-turn 'id initial-value))) @@ -212,8 +215,21 @@ (define-syntax-rule (sync! peer expr ...) (turn-sync! this-turn peer (lambda (_reply) expr ...))) -(define-syntax-rule (send! peer assertion) - (turn-message! this-turn peer (->preserve assertion))) +(define-for-syntax (with-valid-this-target orig-stx result-stx) + ;; Invoke this-target transformer for its side effect: when it's + ;; illegal to use it, it will signal an error. + (let ((v (syntax-parameter-value #'this-target))) + (when (procedure? v) + (v orig-stx))) + result-stx) + +(define-syntax (send! stx) + (syntax-parse stx + [(_ peer assertion) + (syntax/loc stx (turn-message! this-turn peer (->preserve assertion)))] + [(_ assertion) + (with-valid-this-target stx + (syntax/loc stx (send! this-target assertion)))])) (define-syntax (spawn stx) (syntax-parse stx @@ -223,7 +239,9 @@ #'(turn-spawn! #:name name.N #:daemon? daemon.D this-turn - (lambda () setup-expr ...))])) + (lambda () + (syntax-parameterize ([this-target illegal-use-of-this-target]) + setup-expr ...)))])) (define-syntax (spawn/link stx) (syntax-parse stx @@ -237,7 +255,9 @@ #:daemon? daemon.D #:link (entity/stop-on-retract #:name (list name 'monitor-in-child)) - (lambda () setup-expr ...) + (lambda () + (syntax-parameterize ([this-target illegal-use-of-this-target]) + setup-expr ...)) (hasheq monitor-handle #t)))])) (define-syntax-rule (begin/dataflow expr ...) @@ -247,7 +267,7 @@ (begin (define-field id #f) (begin/dataflow (id expr)))) -(define-syntax-rule (stop-when-true test expr ...) +(define-syntax-rule (stop-on-true test expr ...) (begin/dataflow (when test (stop-current-facet expr ...)))) @@ -260,61 +280,62 @@ (define-for-syntax orig-insp (variable-reference->module-declaration-inspector (#%variable-reference))) -(define-syntax-parameter this-target +(define-for-syntax illegal-use-of-this-target (lambda (stx) - (raise-syntax-error #f "Illegal use outside an `at` expression" stx))) + (raise-syntax-error 'this-target "Illegal use outside an `at` expression" stx))) + +(define-syntax-parameter this-target illegal-use-of-this-target) (define-syntax (at stx) (syntax-case stx () [(_ target-expr items ...) #`(let ((target target-expr)) (syntax-parameterize ([this-target (make-rename-transformer #'target)]) - #,@(for/list [(item-stx (in-list (syntax->list #'(items ...))))] - (let loop ((item-stx item-stx)) - (define disarmed-item-stx (syntax-disarm item-stx orig-insp)) - (syntax-case disarmed-item-stx () - [(expander args ...) - (event-expander-id? #'expander) - (event-expander-transform disarmed-item-stx - (lambda (r) (loop (syntax-rearm r item-stx))))] - [_ - item-stx])))))])) + items ...))])) -(define-event-expander assert +(define-syntax assert (lambda (stx) (syntax-parse stx [(_ condition: expr) - #`(turn-assert/dataflow! this-turn - this-target - (lambda () - (if condition.E - (->preserve expr) - (void))))]))) + (with-valid-this-target stx + (quasisyntax/loc stx + (turn-assert/dataflow! this-turn + this-target + #,(quasisyntax/loc #'expr + (lambda () (if condition.E + (->preserve expr) + (void)))))))]))) -(define-event-expander stop-when - (syntax-rules () - [(_ event expr ...) - (event:when event (stop-current-facet expr ...))])) +(define-syntax-rule (stop-on event expr ...) + (on event (stop-current-facet expr ...))) (require "schemas/gen/dataspace.rkt") -(define-event-expander event:when +(define-syntax on (lambda (stx) - (syntax-parse stx - [(_ ((~datum message) pat) expr ...) - #`(assert (Observe (:pattern pat) - (ref (entity #:message - (lambda (bindings) - (match-define (list #,@(analyse-pattern-bindings #'pat)) bindings) - expr ...)))))] - [(_ ((~datum asserted) pat) expr ...) - #`(assert (Observe (:pattern pat) + (define disarmed-stx (syntax-disarm stx orig-insp)) + (syntax-parse disarmed-stx + [(_ ((~datum message) condition: pat) expr ...) + (quasisyntax/loc stx + (assert #:when condition.E + #,(quasisyntax/loc #'pat + (Observe (:pattern pat) + (ref (entity #:message + (lambda (bindings) + (match-define (list #,@(analyse-pattern-bindings #'pat)) bindings) + expr ...)))))))] + [(_ ((~datum asserted) condition: pat) expr ...) + (quasisyntax/loc stx + (assert #:when condition.E + (Observe (:pattern pat) (ref (entity #:assert (lambda (bindings _handle) (match-define (list #,@(analyse-pattern-bindings #'pat)) bindings) - expr ...)))))] - [(_ ((~datum retracted) pat) expr ...) - #`(assert (Observe (:pattern pat) + expr ...))))))] + [(_ ((~datum retracted) condition: pat) expr ...) + (quasisyntax/loc stx + (assert #:when condition.E + (Observe (:pattern pat) (let ((assertion-map (make-hash))) (ref (entity #:assert (lambda (bindings handle) @@ -324,28 +345,31 @@ (match-define (list #,@(analyse-pattern-bindings #'pat)) (hash-ref assertion-map handle)) (hash-remove! assertion-map handle) - expr ...))))))])) - (syntax-rules () - [(_ test expr ...) - (when test expr ...)])) + expr ...)))))))] + [(_ (expander args ...) body ...) #:when (event-expander-id? #'expander) + (event-expander-transform #'(expander [args ...] body ...) (lambda (r) (syntax-rearm r stx)))] + [_ + (raise-syntax-error #f "Invalid event pattern")]))) -(define-event-expander during +(define-syntax during (lambda (stx) (syntax-case stx () [(_ pat expr ...) - #`(assert (Observe (:pattern pat) + (quasisyntax/loc stx + (assert (Observe (:pattern pat) (ref (during* (lambda (bindings) (match-define (list #,@(analyse-pattern-bindings #'pat)) bindings) - expr ...)))))]))) + expr ...))))))]))) -(define-event-expander during/spawn +(define-syntax during/spawn (lambda (stx) (syntax-parse stx [(_ pat name-stx: daemon: expr ...) - #`(assert (Observe (:pattern pat) + (quasisyntax/loc stx + (assert (Observe (:pattern pat) (ref (during* (lambda (bindings) (match-define (list #,@(analyse-pattern-bindings #'pat)) bindings) - (spawn/link #:name name-stx.N #:daemon? daemon.D expr ...))))))]))) + (spawn/link #:name name-stx.N #:daemon? daemon.D expr ...)))))))]))) (define (during* f #:name [name '?]) (define assertion-map (make-hash)) @@ -370,10 +394,11 @@ ;;; eval: (put 'actor-system/dataspace 'racket-indent-function 1) ;;; eval: (put 'at 'racket-indent-function 1) ;;; eval: (put 'object 'racket-indent-function 0) +;;; eval: (put 'on 'racket-indent-function 1) ;;; eval: (put 'react 'racket-indent-function 0) ;;; eval: (put 'send! 'racket-indent-function 1) -;;; eval: (put 'sync! 'racket-indent-function 1) ;;; eval: (put 'spawn 'racket-indent-function 0) -;;; eval: (put 'stop-when 'racket-indent-function 1) -;;; eval: (put 'stop-when-true 'racket-indent-function 1) +;;; eval: (put 'stop-on 'racket-indent-function 1) +;;; eval: (put 'stop-on-true 'racket-indent-function 1) +;;; eval: (put 'sync! 'racket-indent-function 1) ;;; End: diff --git a/syndicate/test/core/during-with-spawn.rkt b/syndicate/test/core/during-with-spawn.rkt new file mode 100644 index 0000000..8396897 --- /dev/null +++ b/syndicate/test/core/during-with-spawn.rkt @@ -0,0 +1,18 @@ +#lang syndicate +;;; SPDX-License-Identifier: LGPL-3.0-or-later +;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones + +(require syndicate/drivers/timer) + +(module+ test + (actor-system/dataspace (ds) + (spawn-timer-driver ds) + (spawn (at ds + (assert 'item) + (on (timeout 50) (stop-current-facet)))) + (spawn #:daemon? #t + (at ds + (during/spawn 'item + #:name 'item-handler + (on-start (log-info "up")) + (on-stop (log-info "down"))))))) diff --git a/syndicate/test/core/self-loop.rkt b/syndicate/test/core/self-loop.rkt new file mode 100644 index 0000000..3a16977 --- /dev/null +++ b/syndicate/test/core/self-loop.rkt @@ -0,0 +1,13 @@ +#lang syndicate +;;; SPDX-License-Identifier: LGPL-3.0-or-later +;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones + +(module+ test + (actor-system/dataspace (ds) + (spawn + (define (loop n) + (log-info "loop ~v" n) + (when (positive? n) + (react (on-start (send! ds (- n 1))) + (at ds (stop-on (message $n) (loop n)))))) + (loop 5))))