Credit-based flow control on tcp driver; line mode
This commit is contained in:
parent
5e1518c2bb
commit
d304d1e87d
|
@ -6,13 +6,17 @@
|
||||||
spawn-tcp-driver
|
spawn-tcp-driver
|
||||||
accept-connection
|
accept-connection
|
||||||
establish-connection
|
establish-connection
|
||||||
|
send-credit
|
||||||
|
send-line
|
||||||
send-data
|
send-data
|
||||||
|
send-eof)
|
||||||
|
|
||||||
read-bytes-avail)
|
(require racket/async-channel)
|
||||||
|
|
||||||
(require racket/tcp)
|
(require racket/tcp)
|
||||||
|
(require racket/port)
|
||||||
(require (only-in racket/exn exn->string))
|
(require (only-in racket/exn exn->string))
|
||||||
(require syndicate/driver-support)
|
(require syndicate/driver-support)
|
||||||
|
(require syndicate/functional-queue)
|
||||||
(require syndicate/schemas/gen/tcp)
|
(require syndicate/schemas/gen/tcp)
|
||||||
(require syndicate/schemas/gen/dataspace-patterns)
|
(require syndicate/schemas/gen/dataspace-patterns)
|
||||||
|
|
||||||
|
@ -51,6 +55,14 @@
|
||||||
(lambda () (spawn-inbound ds connection-custodian i o (TcpInbound host port))))
|
(lambda () (spawn-inbound ds connection-custodian i o (TcpInbound host port))))
|
||||||
(loop)))))
|
(loop)))))
|
||||||
|
|
||||||
|
(define (mode->line-terminator mode)
|
||||||
|
(match mode
|
||||||
|
[(LineMode-lf) "\n"]
|
||||||
|
[(LineMode-crlf) "\r\n"]))
|
||||||
|
|
||||||
|
(define (render-line line mode)
|
||||||
|
(string->bytes/utf-8 (string-append line (mode->line-terminator mode))))
|
||||||
|
|
||||||
(define (run-outbound ds local-peer host port)
|
(define (run-outbound ds local-peer host port)
|
||||||
(define connection-custodian (make-custodian))
|
(define connection-custodian (make-custodian))
|
||||||
((with-handlers ([exn:fail:network?
|
((with-handlers ([exn:fail:network?
|
||||||
|
@ -62,12 +74,15 @@
|
||||||
(define name (call-with-values (lambda () (tcp-addresses i #t)) list))
|
(define name (call-with-values (lambda () (tcp-addresses i #t)) list))
|
||||||
(on-stop (close-input-port i)
|
(on-stop (close-input-port i)
|
||||||
(close-output-port o))
|
(close-output-port o))
|
||||||
(start-inbound-relay connection-custodian name local-peer i)
|
(define issue-credit (start-inbound-relay connection-custodian name (lambda () local-peer) i))
|
||||||
(define relay (outbound-relay name o))
|
(define relay (outbound-relay name o))
|
||||||
(at local-peer
|
(at local-peer
|
||||||
(assert (ActiveSocket-controller
|
(assert (ActiveSocket-controller
|
||||||
(object #:name (list name 'socket)
|
(object #:name (list name 'socket)
|
||||||
[#:asserted (Socket data) (relay data)]))))))))
|
[#:asserted (Socket-Credit c) (issue-credit c)]
|
||||||
|
[#:asserted (Socket-line line mode) (relay (render-line line mode))]
|
||||||
|
[#:asserted (Socket-data data) (relay data)]
|
||||||
|
[#:asserted (Socket-eof) (close-output-port o)]))))))))
|
||||||
|
|
||||||
(define (spawn-inbound ds custodian i o spec)
|
(define (spawn-inbound ds custodian i o spec)
|
||||||
(define name (call-with-values (lambda () (tcp-addresses i #t)) list))
|
(define name (call-with-values (lambda () (tcp-addresses i #t)) list))
|
||||||
|
@ -76,6 +91,7 @@
|
||||||
(on-stop (close-input-port i)
|
(on-stop (close-input-port i)
|
||||||
(close-output-port o))
|
(close-output-port o))
|
||||||
|
|
||||||
|
(define issue-credit #f)
|
||||||
(define active-controller #f)
|
(define active-controller #f)
|
||||||
(define relay (outbound-relay name o))
|
(define relay (outbound-relay name o))
|
||||||
(at ds
|
(at ds
|
||||||
|
@ -85,7 +101,8 @@
|
||||||
[#:asserted (ActiveSocket-controller controller)
|
[#:asserted (ActiveSocket-controller controller)
|
||||||
(log-syndicate/drivers/tcp-debug "~v controller for ~v" controller this-actor)
|
(log-syndicate/drivers/tcp-debug "~v controller for ~v" controller this-actor)
|
||||||
(when (not active-controller)
|
(when (not active-controller)
|
||||||
(start-inbound-relay custodian name controller i))
|
(set! issue-credit
|
||||||
|
(start-inbound-relay custodian name (lambda () active-controller) i)))
|
||||||
(set! active-controller controller)
|
(set! active-controller controller)
|
||||||
#:retracted
|
#:retracted
|
||||||
(when (eq? controller active-controller)
|
(when (eq? controller active-controller)
|
||||||
|
@ -94,21 +111,77 @@
|
||||||
[#:asserted (ActiveSocket-close message)
|
[#:asserted (ActiveSocket-close message)
|
||||||
(log-syndicate/drivers/tcp-debug "closing ~v:\n~a" this-actor message)
|
(log-syndicate/drivers/tcp-debug "closing ~v:\n~a" this-actor message)
|
||||||
(stop-current-facet)]
|
(stop-current-facet)]
|
||||||
[#:asserted (ActiveSocket-Socket (Socket data))
|
[#:asserted (ActiveSocket-Socket (Socket-Credit c))
|
||||||
(relay data)])
|
(if issue-credit
|
||||||
|
(issue-credit c)
|
||||||
|
(log-syndicate/drivers/tcp-warning
|
||||||
|
"Socket-Credit ~v ignored because no controller present" c))]
|
||||||
|
[#:asserted (ActiveSocket-Socket (Socket-line line mode))
|
||||||
|
(relay (render-line line mode))]
|
||||||
|
[#:asserted (ActiveSocket-Socket (Socket-data data))
|
||||||
|
(relay data)]
|
||||||
|
[#:asserted (ActiveSocket-Socket (Socket-eof))
|
||||||
|
(close-output-port o)])
|
||||||
spec)))))
|
spec)))))
|
||||||
|
|
||||||
(define (start-inbound-relay custodian name target i)
|
(define (start-inbound-relay custodian name target-proc i)
|
||||||
|
(define eof-received? #f)
|
||||||
|
(define control-ch (make-async-channel))
|
||||||
(linked-thread
|
(linked-thread
|
||||||
#:name (list name 'input-thread)
|
#:name (list name 'input-thread)
|
||||||
#:custodian custodian
|
#:custodian custodian
|
||||||
|
#:peer (object #:name 'inbound-relay-monitor
|
||||||
|
[#:asserted _
|
||||||
|
#:retracted
|
||||||
|
(close-input-port i)
|
||||||
|
(when (not eof-received?) (stop-current-facet))])
|
||||||
(lambda (facet)
|
(lambda (facet)
|
||||||
(let loop ()
|
(define (update-count remaining-count mode q)
|
||||||
(define bs (read-bytes-avail i))
|
(if (zero? remaining-count)
|
||||||
(when (bytes? bs)
|
q
|
||||||
(log-syndicate/drivers/tcp-debug "inbound data ~v for ~v" bs name)
|
(undequeue (cons remaining-count mode) q)))
|
||||||
(turn! facet (lambda () (send-data target bs)))
|
(let loop ((credits (make-queue)))
|
||||||
(loop))))))
|
(sync (handle-evt control-ch
|
||||||
|
(match-lambda
|
||||||
|
[(Credit (CreditAmount-count 0) _mode) (loop credits)]
|
||||||
|
[(Credit (CreditAmount-count n) mode)
|
||||||
|
(loop (match (unenqueue* credits)
|
||||||
|
[(list q (cons c (== mode))) (enqueue q (cons (+ c n) mode))]
|
||||||
|
[_ (enqueue credits (cons n mode))]))]
|
||||||
|
[(Credit (CreditAmount-unbounded) mode)
|
||||||
|
(loop (match (unenqueue* credits)
|
||||||
|
[(list q (cons _ (== mode))) (enqueue q (cons +inf.0 mode))]
|
||||||
|
[_ (enqueue credits (cons +inf.0 mode))]))]))
|
||||||
|
(match (dequeue* credits)
|
||||||
|
[(list)
|
||||||
|
never-evt]
|
||||||
|
[(list (cons count (and mode (CreditMode-bytes))) q)
|
||||||
|
(define buffer (make-bytes (inexact->exact (min count 131072))))
|
||||||
|
(handle-evt (read-bytes-avail!-evt buffer i)
|
||||||
|
(match-lambda
|
||||||
|
[(? number? read-count)
|
||||||
|
(define bs (subbytes buffer 0 read-count))
|
||||||
|
(log-syndicate/drivers/tcp-debug "inbound data ~v for ~v" bs name)
|
||||||
|
(turn! facet (lambda () (send-data (target-proc) bs)))
|
||||||
|
(loop (update-count (- count read-count) mode q))]
|
||||||
|
[(? eof-object?)
|
||||||
|
(log-syndicate/drivers/tcp-debug "inbound eof for ~v" name)
|
||||||
|
(turn! facet (lambda () (send-eof (target-proc))))]))]
|
||||||
|
[(list (cons count (and mode (CreditMode-lines line-mode))) q)
|
||||||
|
(handle-evt (read-line-evt i (match line-mode
|
||||||
|
[(LineMode-lf) 'linefeed]
|
||||||
|
[(LineMode-crlf) 'return-linefeed]))
|
||||||
|
(match-lambda
|
||||||
|
[(? string? line)
|
||||||
|
(log-syndicate/drivers/tcp-debug "inbound line ~v for ~v" line name)
|
||||||
|
(turn! facet (lambda () (send-line (target-proc) line line-mode)))
|
||||||
|
(loop (update-count (- count 1) mode q))]
|
||||||
|
[(? eof-object?)
|
||||||
|
(log-syndicate/drivers/tcp-debug "inbound eof for ~v" name)
|
||||||
|
(turn! facet (lambda () (send-eof (target-proc))))]))])))))
|
||||||
|
(define (issue-credit c)
|
||||||
|
(async-channel-put control-ch c))
|
||||||
|
issue-credit)
|
||||||
|
|
||||||
(define-syntax (EPIPE stx)
|
(define-syntax (EPIPE stx)
|
||||||
(local-require ffi/unsafe)
|
(local-require ffi/unsafe)
|
||||||
|
@ -138,34 +211,52 @@
|
||||||
(with-stop-current-facet-on-epipe 'flushing
|
(with-stop-current-facet-on-epipe 'flushing
|
||||||
(lambda () (flush-output o))))))))
|
(lambda () (flush-output o))))))))
|
||||||
|
|
||||||
(define (read-bytes-avail input-port #:limit [limit 65536])
|
(define (accept-connection conn
|
||||||
(define buffer (make-bytes limit))
|
#:initial-credit [initial-credit (CreditAmount-unbounded)]
|
||||||
(match (read-bytes-avail! buffer input-port)
|
#:initial-mode [initial-mode (CreditMode-bytes)]
|
||||||
[(? number? count) (subbytes buffer 0 count)]
|
#:on-data on-data
|
||||||
[other other]))
|
#:on-eof [on-eof void]
|
||||||
|
#:on-credit [on-credit (lambda (c) (void))])
|
||||||
(define (accept-connection conn #:on-data on-data)
|
|
||||||
(at conn
|
(at conn
|
||||||
(assert (ActiveSocket-controller
|
(assert (ActiveSocket-controller
|
||||||
(object #:name 'inbound-socket-controller
|
(object #:name 'inbound-socket-controller
|
||||||
[#:asserted (Socket data) (on-data data)])))))
|
[#:asserted (Socket-Credit c) (on-credit c)]
|
||||||
|
[#:asserted (Socket-line line mode) (on-data line mode)]
|
||||||
|
[#:asserted (Socket-data data) (on-data data)]
|
||||||
|
[#:asserted (Socket-eof) (on-eof)]))))
|
||||||
|
(when initial-credit (send-credit conn initial-credit initial-mode)))
|
||||||
|
|
||||||
(define (establish-connection ds spec
|
(define (establish-connection ds spec
|
||||||
|
#:initial-credit [initial-credit (CreditAmount-unbounded)]
|
||||||
|
#:initial-mode [initial-mode (CreditMode-bytes)]
|
||||||
#:on-connected on-connected
|
#:on-connected on-connected
|
||||||
#:on-data on-data
|
#:on-data on-data
|
||||||
|
#:on-eof [on-eof void]
|
||||||
|
#:on-credit [on-credit (lambda (c) (void))]
|
||||||
#:on-disconnected [on-disconnected (lambda () (stop-current-facet))]
|
#:on-disconnected [on-disconnected (lambda () (stop-current-facet))]
|
||||||
#:on-rejected [on-rejected (lambda () (stop-current-facet))])
|
#:on-rejected [on-rejected (lambda () (stop-current-facet))])
|
||||||
(define s
|
(define s
|
||||||
(object #:name 'outbound-socket
|
(object #:name 'outbound-socket
|
||||||
[#:asserted (ActiveSocket-controller peer)
|
[#:asserted (ActiveSocket-controller peer)
|
||||||
(on-connected peer)
|
(on-connected peer)
|
||||||
|
(when initial-credit (send-credit peer initial-credit initial-mode))
|
||||||
#:retracted
|
#:retracted
|
||||||
(on-disconnected)]
|
(on-disconnected)]
|
||||||
[#:asserted (ActiveSocket-close message)
|
[#:asserted (ActiveSocket-close message) (on-rejected message)]
|
||||||
(on-rejected message)]
|
[#:asserted (ActiveSocket-Socket (Socket-Credit c)) (on-credit c)]
|
||||||
[#:asserted (ActiveSocket-Socket (Socket data))
|
[#:asserted (ActiveSocket-Socket (Socket-line line mode)) (on-data line mode)]
|
||||||
(on-data data)]))
|
[#:asserted (ActiveSocket-Socket (Socket-data data)) (on-data data)]
|
||||||
|
[#:asserted (ActiveSocket-Socket (Socket-eof)) (on-eof)]))
|
||||||
(at ds (assert (Connection s spec))))
|
(at ds (assert (Connection s spec))))
|
||||||
|
|
||||||
|
(define (send-credit conn amount unit)
|
||||||
|
(send! conn (Socket-Credit (Credit amount unit))))
|
||||||
|
|
||||||
|
(define (send-line conn line [mode (LineMode-crlf)])
|
||||||
|
(send! conn (Socket-line line mode)))
|
||||||
|
|
||||||
(define (send-data conn data)
|
(define (send-data conn data)
|
||||||
(send! conn (Socket (if (bytes? data) data (string->bytes/utf-8 data)))))
|
(send! conn (Socket-data (if (bytes? data) data (string->bytes/utf-8 data)))))
|
||||||
|
|
||||||
|
(define (send-eof conn)
|
||||||
|
(send! conn (Socket-eof)))
|
||||||
|
|
|
@ -0,0 +1,130 @@
|
||||||
|
#lang racket/base
|
||||||
|
;;; SPDX-License-Identifier: LGPL-3.0-or-later
|
||||||
|
;;; SPDX-FileCopyrightText: Copyright © 2011-2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
|
||||||
|
|
||||||
|
(provide make-queue
|
||||||
|
queue?
|
||||||
|
enqueue
|
||||||
|
enqueue-all
|
||||||
|
undequeue
|
||||||
|
queue-prepare-for-dequeue
|
||||||
|
dequeue
|
||||||
|
dequeue*
|
||||||
|
queue-prepare-for-unenqueue
|
||||||
|
unenqueue
|
||||||
|
unenqueue*
|
||||||
|
list->queue
|
||||||
|
queue->list
|
||||||
|
queue-length
|
||||||
|
queue-empty?
|
||||||
|
queue-append
|
||||||
|
queue-append-list
|
||||||
|
queue-extract
|
||||||
|
queue-filter
|
||||||
|
queue-remove
|
||||||
|
queue-partition)
|
||||||
|
|
||||||
|
(require (only-in racket/list partition))
|
||||||
|
|
||||||
|
(struct queue (head tail) #:transparent)
|
||||||
|
|
||||||
|
(define (make-queue)
|
||||||
|
(queue '() '()))
|
||||||
|
|
||||||
|
(define (enqueue q v)
|
||||||
|
(queue (queue-head q)
|
||||||
|
(cons v (queue-tail q))))
|
||||||
|
|
||||||
|
(define (enqueue-all q v)
|
||||||
|
(queue (queue-head q)
|
||||||
|
(append (reverse v) (queue-tail q))))
|
||||||
|
|
||||||
|
(define (undequeue v q)
|
||||||
|
(queue (cons v (queue-head q))
|
||||||
|
(queue-tail q)))
|
||||||
|
|
||||||
|
(define (queue-prepare-for-dequeue q)
|
||||||
|
(if (null? (queue-head q))
|
||||||
|
(queue (reverse (queue-tail q)) '())
|
||||||
|
q))
|
||||||
|
|
||||||
|
(define (dequeue q)
|
||||||
|
(let* ((q1 (queue-prepare-for-dequeue q))
|
||||||
|
(h (queue-head q1)))
|
||||||
|
(if (pair? h)
|
||||||
|
(values (car h) (queue (cdr h) (queue-tail q1)))
|
||||||
|
(values))))
|
||||||
|
|
||||||
|
(define (dequeue* q)
|
||||||
|
(call-with-values (lambda () (dequeue q)) list))
|
||||||
|
|
||||||
|
(define (queue-prepare-for-unenqueue q)
|
||||||
|
(if (null? (queue-tail q))
|
||||||
|
(queue '() (reverse (queue-head q)))
|
||||||
|
q))
|
||||||
|
|
||||||
|
(define (unenqueue q)
|
||||||
|
(let* ((q1 (queue-prepare-for-unenqueue q))
|
||||||
|
(t (queue-tail q1)))
|
||||||
|
(if (pair? t)
|
||||||
|
(values (queue (queue-head q1) (cdr t)) (car t))
|
||||||
|
(values))))
|
||||||
|
|
||||||
|
(define (unenqueue* q)
|
||||||
|
(call-with-values (lambda () (unenqueue q)) list))
|
||||||
|
|
||||||
|
(define (list->queue xs)
|
||||||
|
(queue xs '()))
|
||||||
|
|
||||||
|
(define (queue->list q)
|
||||||
|
(append (queue-head q) (reverse (queue-tail q))))
|
||||||
|
|
||||||
|
(define (queue-length q)
|
||||||
|
(+ (length (queue-head q))
|
||||||
|
(length (queue-tail q))))
|
||||||
|
|
||||||
|
(define (queue-empty? q)
|
||||||
|
(and (null? (queue-head q))
|
||||||
|
(null? (queue-tail q))))
|
||||||
|
|
||||||
|
(define (queue-append q1 q2)
|
||||||
|
(queue (append (queue-head q1)
|
||||||
|
(reverse (queue-tail q1))
|
||||||
|
(queue-head q2))
|
||||||
|
(queue-tail q2)))
|
||||||
|
|
||||||
|
(define (queue-append-list q1 xs)
|
||||||
|
(queue (queue-head q1)
|
||||||
|
(append (reverse xs) (queue-tail q1))))
|
||||||
|
|
||||||
|
(define (queue-extract q predicate [default-value #f])
|
||||||
|
(let search-head ((head (queue-head q))
|
||||||
|
(rejected-head-rev '()))
|
||||||
|
(cond
|
||||||
|
((null? head) (let search-tail ((tail (reverse (queue-tail q)))
|
||||||
|
(rejected-tail-rev '()))
|
||||||
|
(cond
|
||||||
|
((null? tail) (values default-value q))
|
||||||
|
((predicate (car tail)) (values (car tail)
|
||||||
|
(queue (queue-head q)
|
||||||
|
(append (reverse (cdr tail))
|
||||||
|
rejected-tail-rev))))
|
||||||
|
(else (search-tail (cdr tail) (cons (car tail) rejected-tail-rev))))))
|
||||||
|
((predicate (car head)) (values (car head)
|
||||||
|
(queue (append (reverse rejected-head-rev)
|
||||||
|
(cdr head))
|
||||||
|
(queue-tail q))))
|
||||||
|
(else (search-head (cdr head) (cons (car head) rejected-head-rev))))))
|
||||||
|
|
||||||
|
(define (queue-filter pred q)
|
||||||
|
(queue (filter pred (queue-head q))
|
||||||
|
(filter pred (queue-tail q))))
|
||||||
|
|
||||||
|
(define (queue-remove item q)
|
||||||
|
(list->queue (remove item (queue->list q))))
|
||||||
|
|
||||||
|
(define (queue-partition pred q)
|
||||||
|
(define-values (head-t head-f) (partition pred (queue-head q)))
|
||||||
|
(define-values (tail-t tail-f) (partition pred (queue-tail q)))
|
||||||
|
(values (queue head-t tail-t)
|
||||||
|
(queue head-f tail-f)))
|
|
@ -13,5 +13,15 @@ ActiveSocket =
|
||||||
/ Socket
|
/ Socket
|
||||||
.
|
.
|
||||||
|
|
||||||
; TODO: <credit @amount int>
|
Socket =
|
||||||
Socket = <data @payload bytes>.
|
/ Credit
|
||||||
|
/ <line @text string @mode LineMode>
|
||||||
|
/ <data @payload bytes>
|
||||||
|
/ <eof>
|
||||||
|
.
|
||||||
|
|
||||||
|
LineMode = =lf / =crlf .
|
||||||
|
|
||||||
|
Credit = <credit @amount CreditAmount @unit CreditMode>.
|
||||||
|
CreditMode = =bytes / @lines LineMode .
|
||||||
|
CreditAmount = @count int / @unbounded =unbounded .
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
|
|
||||||
ref
|
ref
|
||||||
react
|
react
|
||||||
|
let-event
|
||||||
define-field
|
define-field
|
||||||
stop-facet
|
stop-facet
|
||||||
stop-current-facet
|
stop-current-facet
|
||||||
|
@ -52,6 +53,8 @@
|
||||||
(require "pattern.rkt")
|
(require "pattern.rkt")
|
||||||
(require "syntax-classes.rkt")
|
(require "syntax-classes.rkt")
|
||||||
|
|
||||||
|
(define-logger syndicate/object) ;; used by the (object) macro
|
||||||
|
|
||||||
(define-syntax this-turn
|
(define-syntax this-turn
|
||||||
(make-set!-transformer
|
(make-set!-transformer
|
||||||
(lambda (stx)
|
(lambda (stx)
|
||||||
|
@ -79,11 +82,12 @@
|
||||||
|
|
||||||
(define-syntax (object stx)
|
(define-syntax (object stx)
|
||||||
(syntax-parse stx
|
(syntax-parse stx
|
||||||
[(_ name:<name> handler ...)
|
[(_ name-stx:<name> handler ...)
|
||||||
#`(let ((state (make-hash)))
|
#`(let ((state (make-hash)))
|
||||||
|
(define name name-stx.N)
|
||||||
(define (handler-function assertion)
|
(define (handler-function assertion)
|
||||||
(-object-clauses assertion [] [handler ...]))
|
(-object-clauses name assertion [] [handler ...]))
|
||||||
(ref (entity #:name name.N
|
(ref (entity #:name name
|
||||||
#:assert (lambda (m h) (-object-assert state handler-function m h))
|
#:assert (lambda (m h) (-object-assert state handler-function m h))
|
||||||
#:retract (lambda (h) (-object-retract state h))
|
#:retract (lambda (h) (-object-retract state h))
|
||||||
#:message (lambda (m) (-object-message handler-function m)))))]))
|
#:message (lambda (m) (-object-message handler-function m)))))]))
|
||||||
|
@ -105,18 +109,22 @@
|
||||||
|
|
||||||
(define-syntax (-object-clauses stx)
|
(define-syntax (-object-clauses stx)
|
||||||
(syntax-parse stx
|
(syntax-parse stx
|
||||||
[(_ input [completed ...] [])
|
[(_ name input [completed ...] [])
|
||||||
#'(match input
|
#'(match input
|
||||||
completed ...
|
completed ...
|
||||||
[_ #f])]
|
[_
|
||||||
|
(log-syndicate/object-debug "Unhandled assertion ~v in ~v" input name)
|
||||||
|
#f])]
|
||||||
|
|
||||||
[(_ input [completed ...] [ [#:spawn pat body ...] more ... ])
|
[(_ name input [completed ...] [ [#:spawn pat body ...] more ... ])
|
||||||
#'(-object-clauses input
|
#'(-object-clauses name
|
||||||
|
input
|
||||||
[completed ...]
|
[completed ...]
|
||||||
[ [#:during pat (spawn/link body ...)] more ... ])]
|
[ [#:during pat (spawn/link body ...)] more ... ])]
|
||||||
|
|
||||||
[(_ input [completed ...] [ [#:asserted pat body+ ... #:retracted body- ...] more ... ])
|
[(_ name input [completed ...] [ [#:asserted pat body+ ... #:retracted body- ...] more ... ])
|
||||||
#`(-object-clauses input
|
#`(-object-clauses name
|
||||||
|
input
|
||||||
[ completed ... [(-object-pattern pat)
|
[ completed ... [(-object-pattern pat)
|
||||||
body+ ...
|
body+ ...
|
||||||
#,(if (null? (syntax->list #'(body- ...)))
|
#,(if (null? (syntax->list #'(body- ...)))
|
||||||
|
@ -124,13 +132,15 @@
|
||||||
#`(lambda () body- ...))] ]
|
#`(lambda () body- ...))] ]
|
||||||
[more ...])]
|
[more ...])]
|
||||||
|
|
||||||
[(_ input [completed ...] [ [#:asserted pat body+ ...] more ... ])
|
[(_ name input [completed ...] [ [#:asserted pat body+ ...] more ... ])
|
||||||
#'(-object-clauses input
|
#'(-object-clauses name
|
||||||
|
input
|
||||||
[completed ...]
|
[completed ...]
|
||||||
[ [#:asserted pat body+ ... #:retracted] more ... ])]
|
[ [#:asserted pat body+ ... #:retracted] more ... ])]
|
||||||
|
|
||||||
[(_ input [completed ...] [ [pat body ...] more ... ])
|
[(_ name input [completed ...] [ [pat body ...] more ... ])
|
||||||
#'(-object-clauses input
|
#'(-object-clauses name
|
||||||
|
input
|
||||||
[completed ...]
|
[completed ...]
|
||||||
[ [#:asserted pat
|
[ [#:asserted pat
|
||||||
(define f (react (facet-prevent-inert-check! this-facet) body ...))
|
(define f (react (facet-prevent-inert-check! this-facet) body ...))
|
||||||
|
@ -150,6 +160,15 @@
|
||||||
(define-syntax-rule (react setup-expr ...)
|
(define-syntax-rule (react setup-expr ...)
|
||||||
(turn-facet! this-turn (lambda () setup-expr ...)))
|
(turn-facet! this-turn (lambda () setup-expr ...)))
|
||||||
|
|
||||||
|
(define-syntax (let-event stx)
|
||||||
|
(syntax-parse stx
|
||||||
|
[(_ [] body ...)
|
||||||
|
#'(begin body ...)]
|
||||||
|
[(_ [#:do expr e ...] body ...)
|
||||||
|
#'(begin expr (let-event [e ...] body ...))]
|
||||||
|
[(_ [e0 e ...] body ...)
|
||||||
|
#'(react (stop-when e0 (let-event [e ...] body ...)))]))
|
||||||
|
|
||||||
(define-syntax-rule (define-field id initial-value)
|
(define-syntax-rule (define-field id initial-value)
|
||||||
(define id (turn-field! this-turn 'id initial-value)))
|
(define id (turn-field! this-turn 'id initial-value)))
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue