`when` -> `on`; StreamConnection API; better `this-target`; tcp-listen errors

- spec-generic StreamConnection translators, for simple TCP API
 - `when` -> `on`, better use for event-expanders
 - Removal of special processing of `at`, making `this-target` properly lexically scopeable
 - TcpListenError and handling of tcp-listen errors
 - SYNDICATE_COLUMNS for pretty-printing of dataspace traces
 - Repair driver-support.rkt thread shutdown turn-taking
 - Refinements to stream protocols and implementation
 - Improvements to syntax location preservation in syntax.rkt
This commit is contained in:
Tony Garnock-Jones 2021-06-17 13:38:30 +02:00
parent a43b6d39aa
commit ac8ea67ab1
21 changed files with 473 additions and 159 deletions

View File

@ -13,18 +13,18 @@
(define-field current-value 0)
(at ds
(assert (box-state (current-value)))
(when (message (set-box $new-value))
(on (message (set-box $new-value))
(log-info "box: taking on new-value ~v" new-value)
(current-value new-value)))
(stop-when-true (= (current-value) 10)
(stop-on-true (= (current-value) 10)
(log-info "box: terminating")))
(spawn #:name 'client
(at ds
(stop-when (retracted (Observe (:pattern (set-box ,_)) _))
(stop-on (retracted (Observe (:pattern (set-box ,_)) _))
(log-info "client: box has gone"))
(when (asserted (box-state $v))
(on (asserted (box-state $v))
(log-info "client: learned that box's value is now ~v" v)
(send! ds (set-box (+ v 1))))
(when (retracted (box-state _))
(send! (set-box (+ v 1))))
(on (retracted (box-state _))
(log-info "client: box state disappeared"))))))

View File

@ -11,7 +11,7 @@
(define reporter (report-stats REPORT_EVERY))
(at ds
(assert (BoxState (value)))
(when (message (SetBox $new-value))
(on (message (SetBox $new-value))
(reporter new-value)
(when (= new-value LIMIT) (stop-current-facet))
(value new-value)))))
@ -20,7 +20,7 @@
(spawn #:name 'client
(define root-facet this-facet)
(at ds
(when (asserted (BoxState $value)) (send! ds (SetBox (+ value 1))))
(on (asserted (BoxState $value)) (send! (SetBox (+ value 1))))
(during (BoxState _)
(on-stop (log-info "Client detected box termination")
(stop-facet root-facet))))))

View File

@ -0,0 +1,27 @@
#lang syndicate
;;; SPDX-License-Identifier: LGPL-3.0-or-later
;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
(module+ main
(require racket/cmdline)
(require (only-in racket/port read-line-evt))
(require (only-in racket/string string-trim))
(require syndicate/drivers/tcp)
(require syndicate/drivers/stream)
(require syndicate/drivers/racket-event)
(define host "127.0.0.1")
(define port 5999)
(command-line #:once-each
[("--host" "-H") hostname "Set hostname to connect to"
(set! host hostname)]
[("--port" "-p") port-number "Set port number to connect to"
(set! port (string->number port-number))])
(actor-system/dataspace (ds)
(spawn-racket-event-driver ds)
(spawn-tcp-driver ds)
(spawn
(at ds
(assert (StreamConnection (port-lines-source ds) (port-sink) (TcpRemote host port)))))))

View File

@ -5,7 +5,6 @@
(module+ main
(require racket/cmdline)
(require (only-in racket/port read-line-evt))
(require (only-in racket/string string-trim))
(require syndicate/drivers/tcp)
(require syndicate/drivers/racket-event)
@ -27,7 +26,7 @@
#:initial-mode (Mode-lines (LineMode-lf))
#:on-connected (lambda (source sink)
(at ds
(when (message (RacketEvent (read-line-evt (current-input-port)) $vs))
(on (message (RacketEvent (read-line-evt (current-input-port)) $vs))
(match (car vs)
[(? eof-object?) (stop-current-facet (log-info "EOF on stdin."))]
[line (send-line sink line)]))))

View File

@ -0,0 +1,28 @@
#lang syndicate
;;; SPDX-License-Identifier: LGPL-3.0-or-later
;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
(module+ main
(require racket/cmdline)
(require syndicate/drivers/tcp)
(define host "0.0.0.0")
(define port 5999)
(command-line #:once-each
[("--host" "-H") hostname "Set hostname to listen on"
(set! host hostname)]
[("--port" "-p") port-number "Set port number to listen on"
(set! port (string->number port-number))])
(actor-system/dataspace (ds)
(spawn-tcp-driver ds)
(spawn
(at ds
(assert (StreamListener (TcpLocal host port)
(make-connection-handler
(lambda (source sink)
(handle-connection source sink
#:on-data
(lambda (data mode)
(send-data sink data mode)))))))))))

View File

@ -19,6 +19,7 @@
(spawn-tcp-driver ds)
(spawn
(at ds
(stop-on (asserted (TcpListenError (TcpLocal host port) $message)))
(during/spawn (StreamConnection $source $sink (TcpLocal host port))
(handle-connection source sink
#:on-data (lambda (data mode) (send-data sink data mode))))))))

View File

@ -21,8 +21,9 @@
(spawn-tcp-driver ds)
(spawn
(at ds
(stop-on (asserted (TcpListenError (TcpLocal host port) $message)))
(during/spawn (StreamConnection $source $sink (TcpLocal host port))
(handle-connection source sink
#:initial-mode (Mode-lines (LineMode-lf))
#:on-data (lambda (data mode) (send! ds (Line data))))
(at ds (when (message (Line $data)) (send-line sink data))))))))
(at ds (on (message (Line $data)) (send-line sink data))))))))

View File

@ -9,5 +9,7 @@
(spawn-timer-driver ds)
(spawn (at ds
(log-info "waiting...")
(stop-when-timeout 2000
(on (timeout 1000)
(log-info "still waiting..."))
(stop-on (timeout 2000)
(log-info "done!"))))))

View File

@ -9,6 +9,7 @@
dataspace
actor-system/dataspace)
(require racket/pretty)
(require racket/match)
(require preserves)
@ -23,6 +24,18 @@
(define-logger syndicate/dataspace)
(match (getenv "SYNDICATE_COLUMNS")
[#f (void)]
[n (pretty-print-columns (string->number n))])
(define (pretty-assertion indent value)
(define gap (make-string indent #\space))
(parameterize ((pretty-print-print-line
(lambda (line-number port line-length columns)
(fprintf port "\n~a" gap)
indent)))
(pretty-format value #:mode 'print)))
(define (dataspace #:name [name (gensym 'dataspace)])
(define handles (make-hash))
(define assertions (make-bag))
@ -30,7 +43,7 @@
(define ds-e
(entity #:name name
#:assert (lambda (value handle)
(log-syndicate/dataspace-debug "~v + ~v ~v" ds-e handle value)
(log-syndicate/dataspace-debug "~v + ~v~a" ds-e handle (pretty-assertion 4 value))
(define maybe-observe (parse-Observe value))
(hash-set! handles handle (cons value maybe-observe))
(when (eq? (bag-change! assertions value +1) 'absent->present)
@ -42,7 +55,7 @@
(match (hash-ref handles upstream-handle #f)
[#f (error 'dataspace "Peer retracted unknown handle ~v" upstream-handle)]
[(cons value maybe-observe)
(log-syndicate/dataspace-debug "~v - ~v ~v" ds-e upstream-handle value)
(log-syndicate/dataspace-debug "~v - ~v~a" ds-e upstream-handle (pretty-assertion 4 value))
(hash-remove! handles upstream-handle)
(when (eq? (bag-change! assertions value -1) 'present->absent)
(remove-assertion! this-turn skeleton value)
@ -50,7 +63,7 @@
[(? eof-object?) (void)]
[(Observe pat ref) (remove-interest! this-turn skeleton pat ref)]))]))
#:message (lambda (message)
(log-syndicate/dataspace-debug "~v ! ~v" ds-e message)
(log-syndicate/dataspace-debug "~v !~a" ds-e (pretty-assertion 4 message))
(send-assertion! this-turn skeleton message))))
(ref ds-e))

View File

@ -13,24 +13,26 @@
#:name [name (gensym 'linked-thread)]
#:peer [peer (ref (entity/stop-on-retract #:name (list name 'monitor)))]
#:custodian [c (make-custodian)])
(define facet this-facet)
(define actor this-actor)
(define handle #f)
(define armed? #t)
(define (!)
(when armed?
(set! armed? #f)
(log-syndicate/driver-support-info "~a shutdown" name)
(turn-retract! this-turn handle)
(queue-task! (actor-engine this-actor) (lambda () (custodian-shutdown-all c)))
(actor-remove-exit-hook! this-actor !)))
(set! armed? #f)
(turn! facet (lambda () (turn-retract! this-turn handle)))
(queue-task! (actor-engine actor) (lambda () (custodian-shutdown-all c)))
(actor-remove-exit-hook! actor !)))
(on-stop (!))
(actor-add-exit-hook! this-actor !)
(actor-add-exit-hook! actor !)
(log-syndicate/driver-support-info "~a startup" name)
(set! handle
(parameterize ((current-custodian c))
(define facet this-facet)
(turn-assert! this-turn
peer
(embedded
@ -43,4 +45,4 @@
(exn->string e))
(void))])
(thread-proc facet))
(turn! facet !))))))))
(!))))))))

View File

@ -14,17 +14,19 @@
(define-logger syndicate/drivers/racket-event)
(define (spawn-racket-event-driver ds)
(at ds
(during/spawn (Observe (:pattern (RacketEvent ,(DLit $embedded-event) ,_)) _)
#:name (embedded-value embedded-event)
(define event (embedded-value embedded-event))
(on-start (log-syndicate/drivers/racket-event-debug "started listening: ~v" event))
(on-stop (log-syndicate/drivers/racket-event-debug "stopped listening: ~v" event))
(linked-thread
#:name (list event 'thread)
(lambda (facet)
(let loop ()
(sync (handle-evt event
(lambda args
(turn! facet (lambda () (send! ds (RacketEvent event args))))
(loop))))))))))
(spawn #:name 'racket-event-driver
#:daemon? #t
(at ds
(during/spawn (Observe (:pattern (RacketEvent ,(DLit $embedded-event) ,_)) _)
#:name (embedded-value embedded-event)
(define event (embedded-value embedded-event))
(on-start (log-syndicate/drivers/racket-event-debug "started listening: ~v" event))
(on-stop (log-syndicate/drivers/racket-event-debug "stopped listening: ~v" event))
(linked-thread
#:name (list event 'thread)
(lambda (facet)
(let loop ()
(sync (handle-evt event
(lambda args
(turn! facet (lambda () (send! ds (RacketEvent event args))))
(loop)))))))))))

View File

@ -0,0 +1,47 @@
#lang syndicate
;;; SPDX-License-Identifier: LGPL-3.0-or-later
;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
(provide port-lines-source
port-sink)
(require (only-in racket/port read-line-evt))
(require "tcp.rkt") ;; ugh, lots of tcp.rkt actually belongs in this file
(require "racket-event.rkt")
(define (port-lines-source ds [port (current-input-port)]
#:initial-credit [initial-credit 0]
#:name [name (list 'port-lines-source (object-name port))]
#:line-mode [line-mode (LineMode-lf)])
(define-field credit initial-credit)
(define-field sink #f)
(at ds
(on (message #:when (and (sink) (positive? (credit))) (RacketEvent (read-line-evt port) $vs))
(credit (- (credit) 1))
(match (car vs)
[(? eof-object?) (send-eof (sink))]
[line (send-line (sink) line line-mode)])))
(make-source #:name name
#:on-connect sink
#:on-credit (lambda (amount mode)
(if (equal? amount (CreditAmount-unbounded))
(credit +inf.0)
(match mode
[(Mode-lines _)
(credit (+ (credit) (CreditAmount-count-value amount)))]
[_ (void)])))))
(define (port-sink [port (current-output-port)]
#:name [name (list 'port-sink (object-name port))])
(make-sink #:name name
#:on-connect (lambda (source) (send-credit source (CreditAmount-unbounded) (Mode-bytes)))
#:on-eof (lambda () (close-output-port port))
#:on-data (lambda (data mode)
(when (bytes? data)
(write-bytes data port)
(match mode
[(Mode-bytes) (void)]
[(Mode-lines (LineMode-lf)) (write-bytes #"\n" port)]
[(Mode-lines (LineMode-crlf)) (write-bytes #"\r\n" port)])
(flush-output port)))))

View File

@ -26,6 +26,7 @@
(require (only-in racket/exn exn->string))
(require syndicate/driver-support)
(require syndicate/functional-queue)
(require syndicate/pattern)
(require syndicate/schemas/gen/stream)
(require syndicate/schemas/gen/tcp)
(require syndicate/schemas/gen/dataspace-patterns)
@ -40,30 +41,61 @@
#:daemon? #t
(at ds
(during/spawn
(Observe (:pattern (StreamConnection ,_ ,_ (TcpLocal ,(DLit $host) ,(DLit $port)))) _)
#:name (list 'simple (TcpLocal host port))
(define spec (TcpLocal host port))
(at ds
(assert (StreamListener spec
(make-connection-handler
(lambda (source sink)
(at ds (assert (StreamConnection source sink spec)))))))))
(during/spawn (Observe (:pattern (StreamConnection ,_ ,_ ,$spec-pat)) _)
#:name (list 'simple-listener spec-pat)
(match (pattern->constant spec-pat)
[(? void?) (stop-current-facet)]
[spec (at ds
(during (StreamSpecListenable spec)
(assert
(StreamListener spec
(make-connection-handler
(lambda (source sink)
(assert (StreamConnection source sink spec))))))))]))
(during/spawn (StreamConnection $app-source $app-sink (TcpRemote $host $port))
#:name (list 'simple (TcpRemote host port))
(define spec (TcpRemote host port))
(during/spawn (StreamConnection $app-source $app-sink $spec)
#:name (list 'simple-connection spec)
(at ds
(assert (StreamConnect spec
(object #:name 'connection-peer
[(ConnectionHandler-connected tcp-source tcp-sink)
(at tcp-source (assert (Source-sink app-sink)))
(at tcp-sink (assert (Sink-source app-source)))]
[(ConnectionHandler-connected sys-source sys-sink)
(at sys-source (assert (Source-sink app-sink)))
(at sys-sink (assert (Sink-source app-source)))]
[(ConnectionHandler-rejected message)
(log-syndicate/drivers/tcp-error
"Connection to ~a rejected: ~a" spec message)
(at app-source (assert (StreamError message)))
(at app-sink (assert (StreamError message)))
(stop-current-facet)])))))
;; I translate interest in StreamListener with a particular spec-pattern into a facet
;; that reacts to interest in StreamSpecListenable with a spec matching the spec-pattern
;; by asserting StreamSpecListenable with that spec.
(during (Observe (:pattern (StreamListener ,$spec-pat ,_)) _)
(define listenable-asserter
(object [bindings
(define spec
(pattern->constant spec-pat (lambda (_name index) (list-ref bindings index))))
(assert (StreamSpecListenable spec))]))
(assert
(Observe (:pattern
(Observe (:pattern (StreamSpecListenable ,,(:pattern (DLit ,spec-pat)))) _))
listenable-asserter)))
;; I translate interest in StreamConnect with a particular spec-pattern into a facet that
;; reacts to interest in StreamSpecConnectable with a spec matching the spec-pattern by
;; asserting StreamSpecConnectable with that spec.
(during (Observe (:pattern (StreamConnect ,$spec-pat ,_)) _)
(define connectable-asserter
(object [bindings
(define spec
(pattern->constant spec-pat (lambda (_name index) (list-ref bindings index))))
(assert (StreamSpecConnectable spec))]))
(assert
(Observe (:pattern
(Observe (:pattern (StreamSpecConnectable ,,(:pattern (DLit ,spec-pat)))) _))
connectable-asserter)))
(during/spawn (StreamListener (TcpLocal $host $port) $peer)
#:name (TcpLocal host port)
(run-listener ds peer host port))
@ -78,13 +110,20 @@
(linked-thread
#:name (list 'listen-thread host port)
(lambda (facet)
(define listener (tcp-listen port 512 #t host))
(let loop ()
(define connection-custodian (make-custodian))
(define-values (i o) (parameterize ((current-custodian connection-custodian))
(tcp-accept listener)))
(turn! facet (lambda () (spawn-connection ds connection-custodian i o peer)))
(loop)))))
(with-connection-error-guard ds peer
(lambda (message)
(turn! facet (lambda ()
(at ds (assert (TcpListenError (TcpLocal host port) message)))
(at peer (assert (ConnectionHandler-rejected message))))))
(lambda ()
(define listener (tcp-listen port 512 #t host))
(lambda ()
(let loop ()
(define connection-custodian (make-custodian))
(define-values (i o) (parameterize ((current-custodian connection-custodian))
(tcp-accept listener)))
(turn! facet (lambda () (spawn-connection ds connection-custodian i o peer)))
(loop))))))))
(define (tcp-ends p)
(call-with-values (lambda () (tcp-addresses p #t))
@ -115,23 +154,40 @@
(react (on-stop (facet-count (- (facet-count) 1))
(close-output-port o))
(define active-source #f)
(define relay (outbound-relay o))
(set! sink (make-sink #:name 'socket-out
#:on-data (outbound-relay o)
#:on-connect
(lambda (new-source)
(set! active-source new-source)
(send-credit active-source (CreditAmount-unbounded) (Mode-bytes)))
#:on-data
(lambda (data mode)
(relay data mode)
(match mode
[(Mode-bytes) (send-bytes-credit active-source (bytes-length data))]
[(Mode-lines lm) (send-lines-credit active-source 1 lm)]
[(Mode-packet n) (send-packet-credit active-source n)]
[(Mode-object _) (void)]))
#:on-eof (lambda () (stop-current-facet))))
(at ds (assert (TcpPeerInfo sink local-end remote-end))))
(at peer (assert #:when (positive? (facet-count))
(ConnectionHandler-connected source sink)))))
(define (with-connection-error-guard ds peer error-proc proc)
((with-handlers ([exn:fail:network? (lambda (e) (lambda () (error-proc (exn->string e))))])
(proc))))
(define (run-outbound ds peer host port)
((with-handlers ([exn:fail:network?
(lambda (e)
(lambda ()
(at peer (assert (ConnectionHandler-rejected (exn->string e))))))])
(define connection-custodian (make-custodian))
(define-values (i o) (parameterize ((current-custodian connection-custodian))
(tcp-connect host port)))
(lambda () (spawn-connection ds connection-custodian i o peer)))))
(with-connection-error-guard ds peer
(lambda (message)
(at peer (assert (ConnectionHandler-rejected message))))
(lambda ()
(define connection-custodian (make-custodian))
(define-values (i o) (parameterize ((current-custodian connection-custodian))
(tcp-connect host port)))
(lambda () (spawn-connection ds connection-custodian i o peer)))))
(define (start-inbound-relay custodian target-proc i)
(define eof-received? #f)
@ -273,8 +329,8 @@
(define handle #f)
(define (set-sink! new-sink)
(when (not (eq? sink new-sink))
(on-connect new-sink)
(set! sink new-sink)
(when sink (on-connect sink))
(set! handle (turn-replace! this-turn sink handle
(if sink (->preserve (Sink-source self)) (void))))))
@ -311,8 +367,8 @@
(define handle #f)
(define (set-source! new-source)
(when (not (eq? new-source source))
(on-connect new-source)
(set! source new-source)
(when source (on-connect source))
(set! handle (turn-replace! this-turn source handle
(if source (->preserve (Source-sink self)) (void))))))
@ -379,28 +435,28 @@
;;---------------------------------------------------------------------------
(define (send-credit conn amount mode)
(send! conn (Source-credit amount mode)))
(define (send-credit source amount mode)
(send! source (Source-credit amount mode)))
(define (send-lines-credit conn amount [mode (LineMode-lf)])
(send-credit conn (CreditAmount-count amount) (Mode-lines mode)))
(define (send-lines-credit source amount [mode (LineMode-lf)])
(send-credit source (CreditAmount-count amount) (Mode-lines mode)))
(define (send-bytes-credit conn amount)
(send-credit conn (CreditAmount-count amount) (Mode-bytes)))
(define (send-bytes-credit source amount)
(send-credit source (CreditAmount-count amount) (Mode-bytes)))
(define (send-packet-credit conn packet-size #:count [count 1])
(send-credit conn (CreditAmount-count count) (Mode-packet packet-size)))
(define (send-packet-credit source packet-size #:count [count 1])
(send-credit source (CreditAmount-count count) (Mode-packet packet-size)))
(define (->bytes data)
(if (bytes? data)
data
(string->bytes/utf-8 data)))
(define (send-line conn line [line-mode (LineMode-lf)])
(send! conn (Sink-data (->bytes line) (Mode-lines line-mode))))
(define (send-line sink line [line-mode (LineMode-lf)])
(send! sink (Sink-data (->bytes line) (Mode-lines line-mode))))
(define (send-data conn data [mode (Mode-bytes)])
(send! conn (Sink-data (->bytes data) mode)))
(define (send-data sink data [mode (Mode-bytes)])
(send! sink (Sink-data (->bytes data) mode)))
(define (send-eof conn)
(send! conn (Sink-eof)))
(define (send-eof sink)
(send! sink (Sink-eof)))

View File

@ -4,8 +4,7 @@
(provide (all-from-out syndicate/schemas/gen/timer)
spawn-timer-driver
on-timeout
stop-when-timeout)
timeout)
(require syndicate/driver-support)
(require syndicate/engine)
@ -90,7 +89,7 @@
(loop)])))))))
(at ds
(when (message ($ instruction (SetTimer _ _ _)))
(on (message ($ instruction (SetTimer _ _ _)))
(log-syndicate/drivers/timer-debug "received instruction ~a" instruction)
(channel-put control-ch instruction))
@ -99,18 +98,14 @@
msecs
(current-inexact-milliseconds))
(define timer-id (gensym 'timestate))
(on-start (send! ds (SetTimer timer-id msecs (TimerKind-absolute))))
(on-stop (send! ds (SetTimer timer-id msecs (TimerKind-clear))))
(at ds
(when (message (TimerExpired timer-id _))
(react (at ds (assert (LaterThan msecs))))))))))
(on-start (send! (SetTimer timer-id msecs (TimerKind-absolute))))
(on-stop (send! (SetTimer timer-id msecs (TimerKind-clear))))
(on (message (TimerExpired timer-id _))
(react (assert (LaterThan msecs))))))))
(define-syntax-rule (on-timeout relative-msecs body ...)
(let ((timer-id (gensym 'timeout)))
(on-start (send! this-target (SetTimer timer-id relative-msecs (TimerKind-relative))))
(at this-target
(when (message (TimerExpired timer-id _))
body ...))))
(define-syntax-rule (stop-when-timeout relative-msecs body ...)
(on-timeout relative-msecs (stop-current-facet body ...)))
(define-event-expander timeout
(syntax-rules ()
[(_ [relative-msecs] body ...)
(let ((timer-id (gensym 'timeout)))
(on-start (send! (SetTimer timer-id relative-msecs (TimerKind-relative))))
(on (message (TimerExpired timer-id _)) body ...))]))

View File

@ -19,6 +19,7 @@
(require "engine.rkt")
(struct entity (id name assert retract message sync data)
#:transparent
#:methods gen:custom-write
[(define (write-proc e port mode)
(fprintf port "#<entity:~a:~a>" (entity-id e) (entity-name e)))])

View File

@ -17,6 +17,9 @@
pattern->capture-paths
pattern->capture-names
pattern->constant
;; quote-pattern
!dump-registered-preserves-patterns!
(all-from-out "schemas/gen/dataspace-patterns.rkt"))
@ -29,6 +32,7 @@
(require (for-syntax syntax/id-table))
(require (for-syntax syntax/stx))
(require preserves)
(require preserves-schema)
(require racket/match)
(require racket/list)
@ -347,6 +351,81 @@
(lambda (key-rev name-stx) (list name-stx))
(lambda (key-rev value) (list))))
(define (pattern->constant desc [env (lambda (name index) (void))])
(define next-binding-index 0)
(define (walk p k)
(match p
[(Pattern-DDiscard (DDiscard)) (void)]
[(Pattern-DBind (DBind name pat))
(let ((v (env name next-binding-index)))
(set! next-binding-index (+ next-binding-index 1))
(let ((inner (walk pat values)))
(k (if (void? v) inner v))))]
[(Pattern-DLit (DLit value)) (k value)]
[(Pattern-DCompound (DCompound-rec (CRec label arity) members))
(let loop ((fields-rev '()) (i 0))
(if (= i arity)
(k (record label (reverse fields-rev)))
(let ((vpat (hash-ref members i #f)))
(if vpat
(walk vpat (lambda (v) (loop (cons v fields-rev) (+ i 1))))
(void)))))]
[(Pattern-DCompound (DCompound-arr (CArr arity) members))
(let loop ((items-rev '()) (i 0))
(if (= i arity)
(k (reverse items-rev))
(let ((vpat (hash-ref members i #f)))
(if vpat
(walk vpat (lambda (v) (loop (cons v items-rev) (+ i 1))))
(void)))))]
[(Pattern-DCompound (DCompound-dict (CDict) members))
(let loop ((items (hash)) (entries (hash->list members)))
(match entries
['() (k items)]
[(cons (cons key vpat) more)
(walk vpat (lambda (v) (loop (hash-set items key v) more)))]))]))
(walk (parse-Pattern desc) values))
;; (define (quote-pattern p)
;; (match p
;; [(Pattern-DDiscard (DDiscard))
;; (Pattern-DCompound (DCompound-rec (CRec '_ 0) (hash)))]
;; [(Pattern-DBind (DBind name pat))
;; (Pattern-DCompound (DCompound-rec (CRec 'bind 2)
;; (hash 0 (Pattern-DLit (DLit name)) 1 (quote-pattern pat))))]
;; [(Pattern-DLit value)
;; (Pattern-DCompound (DCompound-rec (CRec 'lit 1) (hash 0 (Pattern-DLit (DLit value)))))]
;; [(Pattern-DCompound (DCompound-rec (CRec label arity) members))
;; (Pattern-DCompound
;; (DCompound-rec (CRec 'compound 2)
;; (hash 0 (Pattern-DCompound
;; (DCompound-rec (CRec 'rec 2)
;; (hash 0 (Pattern-DLit (DLit label))
;; 1 (Pattern-DLit (DLit arity)))))
;; 1 (Pattern-DCompound
;; (DCompound-dict (CDict)
;; (for/hash ([(kv vp) (in-hash members)])
;; (values kv (quote-pattern vp))))))))]
;; [(Pattern-DCompound (DCompound-arr (CArr arity) members))
;; (Pattern-DCompound
;; (DCompound-rec (CRec 'compound 2)
;; (hash 0 (Pattern-DCompound
;; (DCompound-rec (CRec 'arr 1)
;; (hash 0 (Pattern-DLit (DLit arity)))))
;; 1 (Pattern-DCompound
;; (DCompound-dict (CDict)
;; (for/hash ([(kv vp) (in-hash members)])
;; (values kv (quote-pattern vp))))))))]
;; [(Pattern-DCompound (DCompound-dict (CDict) members))
;; (Pattern-DCompound
;; (DCompound-rec (CRec 'compound 2)
;; (hash 0 (Pattern-DCompound
;; (DCompound-rec (CRec 'dict 0) (hash)))
;; 1 (Pattern-DCompound
;; (DCompound-dict (CDict)
;; (for/hash ([(kv vp) (in-hash members)])
;; (values kv (quote-pattern vp))))))))]))
(define-syntax (!dump-registered-preserves-patterns! stx)
(syntax-case stx ()
[(_)

View File

@ -1,7 +1,10 @@
version 1 .
embeddedType EntityRef.Ref .
; Assertions
StreamConnection = <stream-connection @source #!Source @sink #!Sink @spec any>.
StreamSpecListenable = <stream-spec-listenable @spec any>.
StreamSpecConnectable = <stream-spec-connectable @spec any>.
; Assertion
StreamListener = <stream-listener @spec any @handle #!ConnectionHandler>.

View File

@ -5,3 +5,5 @@ TcpRemote = <tcp-remote @host string @port int>.
TcpLocal = <tcp-local @host string @port int>.
TcpPeerInfo = <tcp-peer @handle #!any @local TcpLocal @remote TcpRemote>.
TcpListenError = <tcp-listen-error @spec TcpLocal @message string>.

View File

@ -28,14 +28,14 @@
begin/dataflow
define/dataflow
stop-when-true
stop-on-true
entity/stop-on-retract
this-target
at
assert
stop-when
(rename-out [event:when when])
stop-on
on
during
during/spawn
during*)
@ -62,7 +62,10 @@
(syntax-case stx ()
[id
(identifier? #'id)
#'(or (current-turn) (error 'this-turn "Illegal use outside an Actor turn"))]))))
#'(this-turn*)]))))
(define (this-turn*)
(or (current-turn) (error 'this-turn "Illegal use outside an Actor turn")))
(define-syntax-rule (with-this-turn turn-expr expr ...)
(parameterize ([current-turn turn-expr])
@ -187,7 +190,7 @@
[(_ [#:do expr e ...] body ...)
#'(begin expr (let-event [e ...] body ...))]
[(_ [e0 e ...] body ...)
#'(react (stop-when e0 (let-event [e ...] body ...)))]))
#'(react (stop-on e0 (let-event [e ...] body ...)))]))
(define-syntax-rule (define-field id initial-value)
(define id (turn-field! this-turn 'id initial-value)))
@ -212,8 +215,21 @@
(define-syntax-rule (sync! peer expr ...)
(turn-sync! this-turn peer (lambda (_reply) expr ...)))
(define-syntax-rule (send! peer assertion)
(turn-message! this-turn peer (->preserve assertion)))
(define-for-syntax (with-valid-this-target orig-stx result-stx)
;; Invoke this-target transformer for its side effect: when it's
;; illegal to use it, it will signal an error.
(let ((v (syntax-parameter-value #'this-target)))
(when (procedure? v)
(v orig-stx)))
result-stx)
(define-syntax (send! stx)
(syntax-parse stx
[(_ peer assertion)
(syntax/loc stx (turn-message! this-turn peer (->preserve assertion)))]
[(_ assertion)
(with-valid-this-target stx
(syntax/loc stx (send! this-target assertion)))]))
(define-syntax (spawn stx)
(syntax-parse stx
@ -223,7 +239,9 @@
#'(turn-spawn! #:name name.N
#:daemon? daemon.D
this-turn
(lambda () setup-expr ...))]))
(lambda ()
(syntax-parameterize ([this-target illegal-use-of-this-target])
setup-expr ...)))]))
(define-syntax (spawn/link stx)
(syntax-parse stx
@ -237,7 +255,9 @@
#:daemon? daemon.D
#:link
(entity/stop-on-retract #:name (list name 'monitor-in-child))
(lambda () setup-expr ...)
(lambda ()
(syntax-parameterize ([this-target illegal-use-of-this-target])
setup-expr ...))
(hasheq monitor-handle #t)))]))
(define-syntax-rule (begin/dataflow expr ...)
@ -247,7 +267,7 @@
(begin (define-field id #f)
(begin/dataflow (id expr))))
(define-syntax-rule (stop-when-true test expr ...)
(define-syntax-rule (stop-on-true test expr ...)
(begin/dataflow
(when test
(stop-current-facet expr ...))))
@ -260,61 +280,62 @@
(define-for-syntax orig-insp
(variable-reference->module-declaration-inspector (#%variable-reference)))
(define-syntax-parameter this-target
(define-for-syntax illegal-use-of-this-target
(lambda (stx)
(raise-syntax-error #f "Illegal use outside an `at` expression" stx)))
(raise-syntax-error 'this-target "Illegal use outside an `at` expression" stx)))
(define-syntax-parameter this-target illegal-use-of-this-target)
(define-syntax (at stx)
(syntax-case stx ()
[(_ target-expr items ...)
#`(let ((target target-expr))
(syntax-parameterize ([this-target (make-rename-transformer #'target)])
#,@(for/list [(item-stx (in-list (syntax->list #'(items ...))))]
(let loop ((item-stx item-stx))
(define disarmed-item-stx (syntax-disarm item-stx orig-insp))
(syntax-case disarmed-item-stx ()
[(expander args ...)
(event-expander-id? #'expander)
(event-expander-transform disarmed-item-stx
(lambda (r) (loop (syntax-rearm r item-stx))))]
[_
item-stx])))))]))
items ...))]))
(define-event-expander assert
(define-syntax assert
(lambda (stx)
(syntax-parse stx
[(_ condition:<when> expr)
#`(turn-assert/dataflow! this-turn
this-target
(lambda ()
(if condition.E
(->preserve expr)
(void))))])))
(with-valid-this-target stx
(quasisyntax/loc stx
(turn-assert/dataflow! this-turn
this-target
#,(quasisyntax/loc #'expr
(lambda () (if condition.E
(->preserve expr)
(void)))))))])))
(define-event-expander stop-when
(syntax-rules ()
[(_ event expr ...)
(event:when event (stop-current-facet expr ...))]))
(define-syntax-rule (stop-on event expr ...)
(on event (stop-current-facet expr ...)))
(require "schemas/gen/dataspace.rkt")
(define-event-expander event:when
(define-syntax on
(lambda (stx)
(syntax-parse stx
[(_ ((~datum message) pat) expr ...)
#`(assert (Observe (:pattern pat)
(ref (entity #:message
(lambda (bindings)
(match-define (list #,@(analyse-pattern-bindings #'pat)) bindings)
expr ...)))))]
[(_ ((~datum asserted) pat) expr ...)
#`(assert (Observe (:pattern pat)
(define disarmed-stx (syntax-disarm stx orig-insp))
(syntax-parse disarmed-stx
[(_ ((~datum message) condition:<when> pat) expr ...)
(quasisyntax/loc stx
(assert #:when condition.E
#,(quasisyntax/loc #'pat
(Observe (:pattern pat)
(ref (entity #:message
(lambda (bindings)
(match-define (list #,@(analyse-pattern-bindings #'pat)) bindings)
expr ...)))))))]
[(_ ((~datum asserted) condition:<when> pat) expr ...)
(quasisyntax/loc stx
(assert #:when condition.E
(Observe (:pattern pat)
(ref (entity #:assert
(lambda (bindings _handle)
(match-define (list #,@(analyse-pattern-bindings #'pat)) bindings)
expr ...)))))]
[(_ ((~datum retracted) pat) expr ...)
#`(assert (Observe (:pattern pat)
expr ...))))))]
[(_ ((~datum retracted) condition:<when> pat) expr ...)
(quasisyntax/loc stx
(assert #:when condition.E
(Observe (:pattern pat)
(let ((assertion-map (make-hash)))
(ref (entity #:assert
(lambda (bindings handle)
@ -324,28 +345,31 @@
(match-define (list #,@(analyse-pattern-bindings #'pat))
(hash-ref assertion-map handle))
(hash-remove! assertion-map handle)
expr ...))))))]))
(syntax-rules ()
[(_ test expr ...)
(when test expr ...)]))
expr ...)))))))]
[(_ (expander args ...) body ...) #:when (event-expander-id? #'expander)
(event-expander-transform #'(expander [args ...] body ...) (lambda (r) (syntax-rearm r stx)))]
[_
(raise-syntax-error #f "Invalid event pattern")])))
(define-event-expander during
(define-syntax during
(lambda (stx)
(syntax-case stx ()
[(_ pat expr ...)
#`(assert (Observe (:pattern pat)
(quasisyntax/loc stx
(assert (Observe (:pattern pat)
(ref (during* (lambda (bindings)
(match-define (list #,@(analyse-pattern-bindings #'pat)) bindings)
expr ...)))))])))
expr ...))))))])))
(define-event-expander during/spawn
(define-syntax during/spawn
(lambda (stx)
(syntax-parse stx
[(_ pat name-stx:<name> daemon:<daemon?> expr ...)
#`(assert (Observe (:pattern pat)
(quasisyntax/loc stx
(assert (Observe (:pattern pat)
(ref (during* (lambda (bindings)
(match-define (list #,@(analyse-pattern-bindings #'pat)) bindings)
(spawn/link #:name name-stx.N #:daemon? daemon.D expr ...))))))])))
(spawn/link #:name name-stx.N #:daemon? daemon.D expr ...)))))))])))
(define (during* f #:name [name '?])
(define assertion-map (make-hash))
@ -370,10 +394,11 @@
;;; eval: (put 'actor-system/dataspace 'racket-indent-function 1)
;;; eval: (put 'at 'racket-indent-function 1)
;;; eval: (put 'object 'racket-indent-function 0)
;;; eval: (put 'on 'racket-indent-function 1)
;;; eval: (put 'react 'racket-indent-function 0)
;;; eval: (put 'send! 'racket-indent-function 1)
;;; eval: (put 'sync! 'racket-indent-function 1)
;;; eval: (put 'spawn 'racket-indent-function 0)
;;; eval: (put 'stop-when 'racket-indent-function 1)
;;; eval: (put 'stop-when-true 'racket-indent-function 1)
;;; eval: (put 'stop-on 'racket-indent-function 1)
;;; eval: (put 'stop-on-true 'racket-indent-function 1)
;;; eval: (put 'sync! 'racket-indent-function 1)
;;; End:

View File

@ -0,0 +1,18 @@
#lang syndicate
;;; SPDX-License-Identifier: LGPL-3.0-or-later
;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
(require syndicate/drivers/timer)
(module+ test
(actor-system/dataspace (ds)
(spawn-timer-driver ds)
(spawn (at ds
(assert 'item)
(on (timeout 50) (stop-current-facet))))
(spawn #:daemon? #t
(at ds
(during/spawn 'item
#:name 'item-handler
(on-start (log-info "up"))
(on-stop (log-info "down")))))))

View File

@ -0,0 +1,13 @@
#lang syndicate
;;; SPDX-License-Identifier: LGPL-3.0-or-later
;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
(module+ test
(actor-system/dataspace (ds)
(spawn
(define (loop n)
(log-info "loop ~v" n)
(when (positive? n)
(react (on-start (send! ds (- n 1)))
(at ds (stop-on (message $n) (loop n))))))
(loop 5))))