From 0bc775a89f983e294bd7f19ec94a749789104297 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Wed, 24 Aug 2016 17:35:38 +0100 Subject: [PATCH] First commit moving from (actor (react ...)) to (actor ...) --- racket/syndicate-gl/examples/basic.rkt | 76 +++++---- racket/syndicate/actor.rkt | 14 +- racket/syndicate/broker/server.rkt | 43 ++--- racket/syndicate/drivers/line-reader.rkt | 22 ++- racket/syndicate/drivers/web.rkt | 50 +++--- .../syndicate/examples/actor/bank-account.rkt | 18 +- racket/syndicate/examples/actor/big-bang.rkt | 28 ++-- .../examples/actor/box-and-client.rkt | 24 +-- .../examples/actor/broadcast-messages.rkt | 10 +- racket/syndicate/examples/actor/chain.rkt | 8 +- .../syndicate/examples/actor/chat-client.rkt | 20 +-- .../actor/chat-no-quit-world-no-nesting.rkt | 26 +-- .../examples/actor/chat-no-quit-world.rkt | 19 ++- .../actor/chat-simplified-internals.rkt | 42 ++--- racket/syndicate/examples/actor/chat.rkt | 25 +-- racket/syndicate/examples/actor/echo.rkt | 15 +- .../actor/example-action-after-suspension.rkt | 33 ++-- .../example-during-criterion-snapshotting.rkt | 13 +- .../actor/example-partial-retraction.rkt | 64 ++++---- .../actor/example-synthetic-patch.rkt | 28 ++-- .../syndicate/examples/actor/fib-server.rkt | 45 +++-- .../examples/actor/file-system-during.rkt | 20 +-- .../examples/actor/file-system-during2.rkt | 14 +- .../syndicate/examples/actor/file-system.rkt | 22 +-- .../syndicate/examples/actor/file-system2.rkt | 38 ++--- .../examples/actor/forward-chaining-mini.rkt | 18 +- racket/syndicate/examples/actor/fs-shell.rkt | 58 +++---- racket/syndicate/examples/actor/mini-echo.rkt | 22 +-- racket/syndicate/examples/actor/mutex.rkt | 114 +++++++------ .../syndicate/examples/actor/parameters.rkt | 13 +- racket/syndicate/examples/actor/query-set.rkt | 104 ++++++------ .../examples/actor/queue-no-credit.rkt | 86 +++++----- .../examples/actor/queue-no-credit2.rkt | 84 +++++----- racket/syndicate/examples/actor/queue.rkt | 126 +++++++------- .../actor/show-field-scope-enforcement1.rkt | 15 +- .../actor/show-field-scope-enforcement2.rkt | 31 ++-- .../actor/show-field-scope-enforcement3.rkt | 22 +-- .../actor/show-field-scope-enforcement4.rkt | 24 +-- .../syndicate/examples/actor/spreadsheet.rkt | 42 ++--- .../examples/actor/two-buyer-protocol.rkt | 154 +++++++++--------- racket/syndicate/examples/actor/web-demo.rkt | 44 +++-- .../examples/actor/web-sanity-check.rkt | 135 ++++++++------- .../all-pairs-shortest-paths.rkt | 54 +++--- .../all-pairs-shortest-paths2.rkt | 56 +++---- racket/syndicate/examples/udp-multicast.rkt | 21 ++- racket/syndicate/examples/ws-echo-client.rkt | 22 +-- racket/syndicate/examples/ws-echo.rkt | 26 +-- racket/syndicate/threaded.rkt | 13 +- 48 files changed, 996 insertions(+), 1005 deletions(-) diff --git a/racket/syndicate-gl/examples/basic.rkt b/racket/syndicate-gl/examples/basic.rkt index 8bca6bd..48274ad 100644 --- a/racket/syndicate-gl/examples/basic.rkt +++ b/racket/syndicate-gl/examples/basic.rkt @@ -6,58 +6,56 @@ (define (spawn-background) (actor - (react - (during (inbound (window $width $height)) - (assert (outbound - (scene (seal `((push-matrix (scale ,width ,(* height 2)) - (translate 0 -0.25) - (texture - ,(overlay/xy (rectangle 1 1 "solid" "white") - 0 0 - (rectangle 1 2 "solid" "black")))) - ;; (rotate -30) - ;; (scale 5 5) - )) - (seal `())))))))) + (during (inbound (window $width $height)) + (assert (outbound + (scene (seal `((push-matrix (scale ,width ,(* height 2)) + (translate 0 -0.25) + (texture + ,(overlay/xy (rectangle 1 1 "solid" "white") + 0 0 + (rectangle 1 2 "solid" "black")))) + ;; (rotate -30) + ;; (scale 5 5) + )) + (seal `()))))))) (define (spawn-player-avatar) (local-require 2htdp/planetcute) (define CC character-cat-girl) - (actor (react - (field [x 100] [y 100]) - (assert (outbound (simple-sprite -0.5 (x) (y) (image-width CC) (image-height CC) CC))) + (actor (field [x 100] [y 100]) + (assert (outbound (simple-sprite -0.5 (x) (y) (image-width CC) (image-height CC) CC))) - (field [keys-down (set)]) - (on (asserted (key-pressed $k)) (keys-down (set-add (keys-down) k))) - (on (retracted (key-pressed $k)) (keys-down (set-remove (keys-down) k))) - (define (key->delta k distance) (if (set-member? (keys-down) k) distance 0)) + (field [keys-down (set)]) + (on (asserted (key-pressed $k)) (keys-down (set-add (keys-down) k))) + (on (retracted (key-pressed $k)) (keys-down (set-remove (keys-down) k))) + (define (key->delta k distance) (if (set-member? (keys-down) k) distance 0)) - (on (message (inbound (frame-event _ _ $elapsed-ms _))) - (define-values (old-x old-y) (values (x) (y))) - (define distance (* 0.360 elapsed-ms)) - (define nx (+ old-x (key->delta 'right distance) (key->delta 'left (- distance)))) - (define ny (+ old-y (key->delta 'down distance) (key->delta 'up (- distance)))) - (when (not (and (= nx old-x) (= ny old-y))) - (x nx) - (y ny)))))) + (on (message (inbound (frame-event _ _ $elapsed-ms _))) + (define-values (old-x old-y) (values (x) (y))) + (define distance (* 0.360 elapsed-ms)) + (define nx (+ old-x (key->delta 'right distance) (key->delta 'left (- distance)))) + (define ny (+ old-y (key->delta 'down distance) (key->delta 'up (- distance)))) + (when (not (and (= nx old-x) (= ny old-y))) + (x nx) + (y ny))))) (define (spawn-frame-counter) - (actor (react (field [i empty-image]) - (assert (outbound - (simple-sprite -10 300 10 (image-width (i)) (image-height (i)) (i)))) - (on (message (inbound (frame-event $counter $sim-time-ms _ _))) - (when (> sim-time-ms 0) - (define fps (/ counter (/ sim-time-ms 1000.0))) - (i (text (format "~a fps" fps) 22 "black"))))))) + (actor (field [i empty-image]) + (assert (outbound + (simple-sprite -10 300 10 (image-width (i)) (image-height (i)) (i)))) + (on (message (inbound (frame-event $counter $sim-time-ms _ _))) + (when (> sim-time-ms 0) + (define fps (/ counter (/ sim-time-ms 1000.0))) + (i (text (format "~a fps" fps) 22 "black")))))) (spawn-keyboard-integrator) (spawn-background) ;; (spawn-frame-counter) (spawn-player-avatar) -(actor (react (assert (outbound (simple-sprite 0 50 50 50 50 (circle 50 "solid" "orange")))) - (assert (outbound (simple-sprite -1 60 60 50 50 (circle 50 "solid" "green")))))) -(actor (until (message (inbound (key-event #\q #t _)))) - (assert! (outbound 'stop))) +(actor (assert (outbound (simple-sprite 0 50 50 50 50 (circle 50 "solid" "orange")))) + (assert (outbound (simple-sprite -1 60 60 50 50 (circle 50 "solid" "green"))))) +(actor* (until (message (inbound (key-event #\q #t _)))) + (assert! (outbound 'stop))) (module+ main (current-ground-dataspace (2d-dataspace))) diff --git a/racket/syndicate/actor.rkt b/racket/syndicate/actor.rkt index be1615f..2151ac7 100644 --- a/racket/syndicate/actor.rkt +++ b/racket/syndicate/actor.rkt @@ -1,6 +1,7 @@ #lang racket/base (provide actor + actor* dataspace react @@ -277,6 +278,15 @@ name.N))))])) (define-syntax (actor stx) + (syntax-parse stx + [(_ name:name O ...) + (quasisyntax/loc stx + (let ((spawn-action (actor-action #:name name.N (react O ...)))) + (if (syndicate-effects-available?) + (schedule-action! spawn-action) + spawn-action)))])) + +(define-syntax (actor* stx) (syntax-parse stx [(_ name:name script ...) (quasisyntax/loc stx @@ -433,8 +443,8 @@ (on #,E-stx (let ((p #,instantiated)) (w.wrapper #:name name.N - (react (stop-when (retracted p)) - O ...)))))])) + (stop-when (retracted p)) + O ...))))])) (define-syntax (begin/dataflow stx) (syntax-parse stx diff --git a/racket/syndicate/broker/server.rkt b/racket/syndicate/broker/server.rkt index 96da6c6..2dae7cf 100644 --- a/racket/syndicate/broker/server.rkt +++ b/racket/syndicate/broker/server.rkt @@ -51,32 +51,33 @@ (define (send-event e) (send! (outbound (websocket-message server-id c (jsexpr->string (lift-json-event e)))))) - (arm-ping-timer!) + (on-start (arm-ping-timer!) + (log-syndicate-broker-info "Starting broker connection from ~v" c)) - (log-syndicate-broker-info "Starting broker connection from ~v" c) - (until (retracted (inbound (advertise (websocket-message c server-id _)))) - (assert (outbound (advertise (websocket-message server-id c _)))) + (stop-when (retracted (inbound (advertise (websocket-message c server-id _))))) + (assert (outbound (advertise (websocket-message server-id c _)))) - (on (asserted (inbound - (websocket-peer-details server-id c _ _ $remote-addr $remote-port))) - (log-syndicate-broker-info "Connection ~v is from ~a:~a" c remote-addr remote-port)) + (on (asserted (inbound + (websocket-peer-details server-id c _ _ $remote-addr $remote-port))) + (log-syndicate-broker-info "Connection ~v is from ~a:~a" c remote-addr remote-port)) - (on (message (inbound (timer-expired c _))) - (arm-ping-timer!) - (send-event 'ping)) + (on (message (inbound (timer-expired c _))) + (arm-ping-timer!) + (send-event 'ping)) - (on (message (inbound (websocket-message c server-id $data))) - (match (drop-json-action (string->jsexpr data)) - ['ping (send-event 'pong)] - ['pong (void)] - [(? patch? p) (patch! (log-packet c 'inbound 'patch (wrap-patch scope p)))] - [(message body) (send! (log-packet c 'inbound 'message (broker-data scope body)))])) + (on (message (inbound (websocket-message c server-id $data))) + (match (drop-json-action (string->jsexpr data)) + ['ping (send-event 'pong)] + ['pong (void)] + [(? patch? p) (patch! (log-packet c 'inbound 'patch (wrap-patch scope p)))] + [(message body) (send! (log-packet c 'inbound 'message (broker-data scope body)))])) - (on-event - [(? patch? p) (send-event (log-packet c 'outbound 'patch (unwrap-patch scope p)))] - [(message (broker-data (== scope) body)) - (send-event (message (log-packet c 'outbound 'message body)))])) - (log-syndicate-broker-info "Ending broker connection from ~v" c))) + (on-event + [(? patch? p) (send-event (log-packet c 'outbound 'patch (unwrap-patch scope p)))] + [(message (broker-data (== scope) body)) + (send-event (message (log-packet c 'outbound 'message body)))]) + + (on-stop (log-syndicate-broker-info "Ending broker connection from ~v" c)))) (define (log-packet c direction kind value) (log-syndicate-broker-debug "Broker: ~v: ~a ~a\n~v" c direction kind value) diff --git a/racket/syndicate/drivers/line-reader.rkt b/racket/syndicate/drivers/line-reader.rkt index 0e6b8ee..6ab6f2e 100644 --- a/racket/syndicate/drivers/line-reader.rkt +++ b/racket/syndicate/drivers/line-reader.rkt @@ -14,15 +14,13 @@ [(eqv? (bytes-ref bs i) b) i] [else (loop (+ i 1))]))) -(actor - (react - (during/actor (observe (tcp-channel-line $src $dst _)) - (field [buffer #""]) - (on (message (tcp-channel src dst $bs)) - (buffer (bytes-append (buffer) bs))) - (begin/dataflow - (define newline-pos (bytes-index (buffer) (char->integer #\newline))) - (when newline-pos - (define line (subbytes (buffer) 0 newline-pos)) - (buffer (subbytes (buffer) (+ newline-pos 1))) - (send! (tcp-channel-line src dst line))))))) +(actor (during/actor (observe (tcp-channel-line $src $dst _)) + (field [buffer #""]) + (on (message (tcp-channel src dst $bs)) + (buffer (bytes-append (buffer) bs))) + (begin/dataflow + (define newline-pos (bytes-index (buffer) (char->integer #\newline))) + (when newline-pos + (define line (subbytes (buffer) 0 newline-pos)) + (buffer (subbytes (buffer) (+ newline-pos 1))) + (send! (tcp-channel-line src dst line)))))) diff --git a/racket/syndicate/drivers/web.rkt b/racket/syndicate/drivers/web.rkt index 560f6d6..1366879 100644 --- a/racket/syndicate/drivers/web.rkt +++ b/racket/syndicate/drivers/web.rkt @@ -143,18 +143,16 @@ (define (spawn-web-driver) (actor #:name 'web-server-manager - (react - (during/actor (web-virtual-host "http" _ $port) - #:name (list 'web-server port) - (setup-web-server "http" - (or (web-server-connection-manager) - (start-connection-manager)) - port)))) + (during/actor (web-virtual-host "http" _ $port) + #:name (list 'web-server port) + (setup-web-server "http" + (or (web-server-connection-manager) + (start-connection-manager)) + port))) (actor #:name 'web-client-manager - (react - (on (message (web-request $id 'outbound $req $body)) - (actor #:name (list 'web-client id) - (do-client-request id req body)))))) + (on (message (web-request $id 'outbound $req $body)) + (actor #:name (list 'web-client id) + (do-client-request id req body))))) (define (setup-web-server scheme cm port) (define listener (tcp-listen port (web-server-max-waiting) #t)) @@ -184,21 +182,21 @@ (url-query (request-uri lowlevel-req))) (request-post-data/raw lowlevel-req))) (actor #:name (list 'web-req id) - (react (on-start (send! (set-timer (list 'web-req id) 100 'relative)) - (send! web-req)) - (stop-when (message (timer-expired (list 'web-req id) _)) - (do-response-complete control-ch - id - (make-web-response-header - #:code 404 - #:message #"Not found") - '())) - (stop-when (message (web-response-complete id $rh $body)) - (do-response-complete control-ch id rh body)) - (stop-when (asserted (web-response-chunked id $rh)) - (do-response-chunked control-ch id rh)) - (stop-when (asserted (web-response-websocket id $headers)) - (do-response-websocket control-ch id headers)))))) + (on-start (send! (set-timer (list 'web-req id) 100 'relative)) + (send! web-req)) + (stop-when (message (timer-expired (list 'web-req id) _)) + (do-response-complete control-ch + id + (make-web-response-header + #:code 404 + #:message #"Not found") + '())) + (stop-when (message (web-response-complete id $rh $body)) + (do-response-complete control-ch id rh body)) + (stop-when (asserted (web-response-chunked id $rh)) + (do-response-chunked control-ch id rh)) + (stop-when (asserted (web-response-websocket id $headers)) + (do-response-websocket control-ch id headers))))) (define (do-response-complete control-ch id rh constree-of-bytes) (match-define (web-response-header code resp-message last-modified-seconds mime-type headers) rh) diff --git a/racket/syndicate/examples/actor/bank-account.rkt b/racket/syndicate/examples/actor/bank-account.rkt index e1bd314..fc32bcd 100644 --- a/racket/syndicate/examples/actor/bank-account.rkt +++ b/racket/syndicate/examples/actor/bank-account.rkt @@ -4,14 +4,14 @@ (struct account (balance) #:prefab) (struct deposit (amount) #:prefab) -(actor (react (field [balance 0]) - (assert (account (balance))) - (on (message (deposit $amount)) - (balance (+ (balance) amount))))) +(actor (field [balance 0]) + (assert (account (balance))) + (on (message (deposit $amount)) + (balance (+ (balance) amount)))) -(actor (react (on (asserted (account $balance)) - (printf "Balance changed to ~a\n" balance)))) +(actor (on (asserted (account $balance)) + (printf "Balance changed to ~a\n" balance))) -(actor (until (asserted (observe (deposit _)))) - (send! (deposit +100)) - (send! (deposit -30))) +(actor* (until (asserted (observe (deposit _)))) + (send! (deposit +100)) + (send! (deposit -30))) diff --git a/racket/syndicate/examples/actor/big-bang.rkt b/racket/syndicate/examples/actor/big-bang.rkt index ce77dfe..08d3efe 100644 --- a/racket/syndicate/examples/actor/big-bang.rkt +++ b/racket/syndicate/examples/actor/big-bang.rkt @@ -8,16 +8,15 @@ #:font-size [font-size 22] name x y label callback) (define label-image (text label font-size foreground)) - (actor (forever - (on (message (inbound (mouse-event _ _ name "button-down"))) (callback)) - (assert (outbound - (window name x y 0 - (seal - (overlay label-image - (rectangle (+ (image-width label-image) 20) - (+ (image-height label-image) 20) - "solid" - background))))))))) + (actor (on (message (inbound (mouse-event _ _ name "button-down"))) (callback)) + (assert (outbound + (window name x y 0 + (seal + (overlay label-image + (rectangle (+ (image-width label-image) 20) + (+ (image-height label-image) 20) + "solid" + background)))))))) (define (draggable-shape name orig-x orig-y image) (define (window-at x y) (window name x y 10 (seal image))) @@ -38,12 +37,11 @@ (my nmy)) (stop-when (message (inbound (mouse-event $mx $my _ (? mouse-left-event-type? $t)))) (idle 0 (- mx dx) (- my dy))))) - (actor (idle 0 orig-x orig-y))) + (actor* (idle 0 orig-x orig-y))) -(actor (forever - (during (inbound (active-window $id)) - (assert (outbound (window 'active-window-label 300 0 0 - (seal (text (format "~v" id) 22 "black")))))))) +(actor (during (inbound (active-window $id)) + (assert (outbound (window 'active-window-label 300 0 0 + (seal (text (format "~v" id) 22 "black"))))))) (button #:background "red" 'stop-button 0 0 "Exit" (lambda () (assert! (outbound 'stop)))) (draggable-shape 'c1 50 50 (circle 30 "solid" "orange")) diff --git a/racket/syndicate/examples/actor/box-and-client.rkt b/racket/syndicate/examples/actor/box-and-client.rkt index c77bd21..edafd6c 100644 --- a/racket/syndicate/examples/actor/box-and-client.rkt +++ b/racket/syndicate/examples/actor/box-and-client.rkt @@ -4,16 +4,16 @@ (struct set-box (new-value) #:transparent) (struct box-state (value) #:transparent) -(actor (react (field [current-value 0]) - (assert (box-state (current-value))) - (stop-when (rising-edge (= (current-value) 10)) - (log-info "box: terminating")) - (on (message (set-box $new-value)) - (log-info "box: taking on new-value ~v" new-value) - (current-value new-value)))) +(actor (field [current-value 0]) + (assert (box-state (current-value))) + (stop-when (rising-edge (= (current-value) 10)) + (log-info "box: terminating")) + (on (message (set-box $new-value)) + (log-info "box: taking on new-value ~v" new-value) + (current-value new-value))) -(actor (react (stop-when (retracted (observe (set-box _))) - (log-info "client: box has gone")) - (on (asserted (box-state $v)) - (log-info "client: learned that box's value is now ~v" v) - (send! (set-box (+ v 1)))))) +(actor (stop-when (retracted (observe (set-box _))) + (log-info "client: box has gone")) + (on (asserted (box-state $v)) + (log-info "client: learned that box's value is now ~v" v) + (send! (set-box (+ v 1))))) diff --git a/racket/syndicate/examples/actor/broadcast-messages.rkt b/racket/syndicate/examples/actor/broadcast-messages.rkt index d390e7f..9405429 100644 --- a/racket/syndicate/examples/actor/broadcast-messages.rkt +++ b/racket/syndicate/examples/actor/broadcast-messages.rkt @@ -3,13 +3,13 @@ (struct envelope (destination message) #:prefab) -(actor (react (on (message (envelope 'alice $message)) - (log-info "Alice received ~v" message)))) +(actor (on (message (envelope 'alice $message)) + (log-info "Alice received ~v" message))) -(actor (react (on (message (envelope 'bob $message)) - (log-info "Bob received ~v" message)))) +(actor (on (message (envelope 'bob $message)) + (log-info "Bob received ~v" message))) -(actor +(actor* (log-info "Waiting for Alice and Bob.") (until (asserted (observe (envelope 'alice _)))) (until (asserted (observe (envelope 'bob _)))) diff --git a/racket/syndicate/examples/actor/chain.rkt b/racket/syndicate/examples/actor/chain.rkt index 62257a3..a7cb4ce 100644 --- a/racket/syndicate/examples/actor/chain.rkt +++ b/racket/syndicate/examples/actor/chain.rkt @@ -9,9 +9,9 @@ (define (chain-step n) (printf "chain-step ~v\n" n) - (actor (sleep 1) - (if (< n 5) - (chain-step (+ n 1)) - (printf "done.\n")))) + (actor* (sleep 1) + (if (< n 5) + (chain-step (+ n 1)) + (printf "done.\n")))) (chain-step 0) diff --git a/racket/syndicate/examples/actor/chat-client.rkt b/racket/syndicate/examples/actor/chat-client.rkt index a4c1def..4013eac 100644 --- a/racket/syndicate/examples/actor/chat-client.rkt +++ b/racket/syndicate/examples/actor/chat-client.rkt @@ -8,15 +8,13 @@ (define remote-handle (tcp-address "localhost" 5999)) (define stdin-evt (read-bytes-line-evt (current-input-port) 'any)) -(actor - (react/suspend (quit) - (on (message (inbound (external-event stdin-evt (list $line)))) - (if (eof-object? line) - (quit) - (send! (tcp-channel local-handle remote-handle line)))) +(actor (stop-when (message (inbound (external-event stdin-evt (list (? eof-object? _)))))) + (stop-when (retracted (advertise (tcp-channel remote-handle local-handle _)))) + (assert (advertise (tcp-channel local-handle remote-handle _))) - (assert (advertise (tcp-channel local-handle remote-handle _))) - (on (retracted (advertise (tcp-channel remote-handle local-handle _))) (quit)) - (on (message (tcp-channel remote-handle local-handle $bs)) - (write-bytes bs) - (flush-output)))) + (on (message (inbound (external-event stdin-evt (list (? bytes? $line))))) + (send! (tcp-channel local-handle remote-handle line))) + + (on (message (tcp-channel remote-handle local-handle $bs)) + (write-bytes bs) + (flush-output))) diff --git a/racket/syndicate/examples/actor/chat-no-quit-world-no-nesting.rkt b/racket/syndicate/examples/actor/chat-no-quit-world-no-nesting.rkt index 6e4000e..d743d2d 100644 --- a/racket/syndicate/examples/actor/chat-no-quit-world-no-nesting.rkt +++ b/racket/syndicate/examples/actor/chat-no-quit-world-no-nesting.rkt @@ -16,21 +16,21 @@ (send-to-remote "~a ~a\n" who (apply format fmt vs)))) (define user (gensym 'user)) - (send-to-remote "Welcome, ~a.\n" user) + (on-start (send-to-remote "Welcome, ~a.\n" user)) - (until (retracted (advertise (tcp-channel them us _))) - (assert (present user)) - (on (asserted (present $who)) (say who "arrived.")) - (on (retracted (present $who)) (say who "departed.")) + (stop-when (retracted (advertise (tcp-channel them us _)))) - (on (message (says $who $what)) (say who "says: ~a" what)) + (assert (present user)) + (on (asserted (present $who)) (say who "arrived.")) + (on (retracted (present $who)) (say who "departed.")) - (assert (advertise (tcp-channel us them _))) - (on (message (tcp-channel them us $bs)) - (send! (says user (string-trim (bytes->string/utf-8 bs)))))))) + (on (message (says $who $what)) (say who "says: ~a" what)) + + (assert (advertise (tcp-channel us them _))) + (on (message (tcp-channel them us $bs)) + (send! (says user (string-trim (bytes->string/utf-8 bs))))))) (define us (tcp-listener 5999)) -(actor - (forever (assert (advertise (observe (tcp-channel _ us _)))) - (on (asserted (advertise (tcp-channel $them us _))) - (spawn-session them us)))) +(actor (assert (advertise (observe (tcp-channel _ us _)))) + (on (asserted (advertise (tcp-channel $them us _))) + (spawn-session them us))) diff --git a/racket/syndicate/examples/actor/chat-no-quit-world.rkt b/racket/syndicate/examples/actor/chat-no-quit-world.rkt index b437783..e9a63e2 100644 --- a/racket/syndicate/examples/actor/chat-no-quit-world.rkt +++ b/racket/syndicate/examples/actor/chat-no-quit-world.rkt @@ -16,18 +16,19 @@ (send-to-remote "~a ~a\n" who (apply format fmt vs)))) (define user (gensym 'user)) - (send-to-remote "Welcome, ~a.\n" user) + (on-start (send-to-remote "Welcome, ~a.\n" user)) - (until (retracted (inbound (advertise (tcp-channel them us _)))) - (assert (present user)) - (on (asserted (present $who)) (say who "arrived.")) - (on (retracted (present $who)) (say who "departed.")) + (stop-when (retracted (inbound (advertise (tcp-channel them us _))))) - (on (message (says $who $what)) (say who "says: ~a" what)) + (assert (present user)) + (on (asserted (present $who)) (say who "arrived.")) + (on (retracted (present $who)) (say who "departed.")) - (assert (outbound (advertise (tcp-channel us them _)))) - (on (message (inbound (tcp-channel them us $bs))) - (send! (says user (string-trim (bytes->string/utf-8 bs)))))))) + (on (message (says $who $what)) (say who "says: ~a" what)) + + (assert (outbound (advertise (tcp-channel us them _)))) + (on (message (inbound (tcp-channel them us $bs))) + (send! (says user (string-trim (bytes->string/utf-8 bs))))))) (dataspace (define us (tcp-listener 5999)) (forever (assert (outbound (advertise (observe (tcp-channel _ us _))))) diff --git a/racket/syndicate/examples/actor/chat-simplified-internals.rkt b/racket/syndicate/examples/actor/chat-simplified-internals.rkt index afe8ce2..cacae58 100644 --- a/racket/syndicate/examples/actor/chat-simplified-internals.rkt +++ b/racket/syndicate/examples/actor/chat-simplified-internals.rkt @@ -21,30 +21,30 @@ (send-to-remote "~a ~a\n" who (apply format fmt vs)))) (define user (gensym 'user)) - (send-to-remote "Welcome, ~a.\n" user) + (on-start (send-to-remote "Welcome, ~a.\n" user)) - (until (retracted (tcp-remote-open id)) - (assert (tcp-local-open id)) - (assert (present user)) + (stop-when (retracted (tcp-remote-open id))) + (assert (tcp-local-open id)) + (assert (present user)) - (on (asserted (present $who)) (say who "arrived.")) - (on (retracted (present $who)) (say who "departed.")) - (on (message (says $who $what)) (say who "says: ~a" what)) + (on (asserted (present $who)) (say who "arrived.")) + (on (retracted (present $who)) (say who "departed.")) + (on (message (says $who $what)) (say who "says: ~a" what)) - (on (message (tcp-incoming-data id $bs)) - (send! (says user (string-trim (bytes->string/utf-8 bs)))))))) + (on (message (tcp-incoming-data id $bs)) + (send! (says user (string-trim (bytes->string/utf-8 bs))))))) (define us (tcp-listener 5999)) -(actor (forever (assert (advertise (observe (tcp-channel _ us _)))) - (on (asserted (advertise (tcp-channel $them us _))) - (define id (seal (list them us))) - (actor (react (stop-when (retracted (advertise (tcp-channel them us _)))) - (stop-when (retracted (tcp-local-open id))) - (assert (tcp-remote-open id)) - (on (message (tcp-channel them us $bs)) - (send! (tcp-incoming-data id bs))) - (on (message (tcp-outgoing-data id $bs)) - (send! (tcp-channel us them bs)))))))) +(actor (assert (advertise (observe (tcp-channel _ us _)))) + (on (asserted (advertise (tcp-channel $them us _))) + (define id (seal (list them us))) + (actor (stop-when (retracted (advertise (tcp-channel them us _)))) + (stop-when (retracted (tcp-local-open id))) + (assert (tcp-remote-open id)) + (on (message (tcp-channel them us $bs)) + (send! (tcp-incoming-data id bs))) + (on (message (tcp-outgoing-data id $bs)) + (send! (tcp-channel us them bs)))))) -(actor (forever (on (asserted (tcp-remote-open $id)) - (spawn-session id)))) +(actor (on (asserted (tcp-remote-open $id)) + (spawn-session id))) diff --git a/racket/syndicate/examples/actor/chat.rkt b/racket/syndicate/examples/actor/chat.rkt index 825ee62..b564add 100644 --- a/racket/syndicate/examples/actor/chat.rkt +++ b/racket/syndicate/examples/actor/chat.rkt @@ -17,21 +17,22 @@ (send-to-remote "~a ~a\n" who (apply format fmt vs)))) (define user (gensym 'user)) - (send-to-remote "Welcome, ~a.\n" user) + (on-start (send-to-remote "Welcome, ~a.\n" user)) - (until (retracted (inbound (advertise (tcp-channel them us _)))) - (assert (present user)) - (on (asserted (present $who)) (say who "arrived.")) - (on (retracted (present $who)) (say who "departed.")) + (stop-when (retracted (inbound (advertise (tcp-channel them us _))))) - (on (message (says $who $what)) (say who "says: ~a" what)) + (assert (present user)) + (on (asserted (present $who)) (say who "arrived.")) + (on (retracted (present $who)) (say who "departed.")) - (assert (outbound (advertise (tcp-channel us them _)))) - (on (message (inbound (tcp-channel them us $bs))) - (define input-string (string-trim (bytes->string/utf-8 bs))) - (if (equal? input-string "quit-dataspace") - (send! (shutdown)) - (send! (says user input-string))))))) + (on (message (says $who $what)) (say who "says: ~a" what)) + + (assert (outbound (advertise (tcp-channel us them _)))) + (on (message (inbound (tcp-channel them us $bs))) + (define input-string (string-trim (bytes->string/utf-8 bs))) + (if (equal? input-string "quit-dataspace") + (send! (shutdown)) + (send! (says user input-string)))))) (dataspace (define us (tcp-listener 5999)) (until (message (shutdown)) diff --git a/racket/syndicate/examples/actor/echo.rkt b/racket/syndicate/examples/actor/echo.rkt index d000838..1ac0360 100644 --- a/racket/syndicate/examples/actor/echo.rkt +++ b/racket/syndicate/examples/actor/echo.rkt @@ -4,11 +4,10 @@ (require/activate syndicate/drivers/tcp) (define server-id (tcp-listener 5999)) -(actor - (forever (assert (advertise (observe (tcp-channel _ server-id _)))) - (during/actor (advertise (tcp-channel $c server-id _)) - (on-start (printf "Accepted connection from ~v\n" c)) - (assert (advertise (tcp-channel server-id c _))) - (on (message (tcp-channel c server-id $bs)) - (send! (tcp-channel server-id c bs))) - (on-stop (printf "Closed connection ~v\n" c))))) +(actor (assert (advertise (observe (tcp-channel _ server-id _)))) + (during/actor (advertise (tcp-channel $c server-id _)) + (on-start (printf "Accepted connection from ~v\n" c)) + (assert (advertise (tcp-channel server-id c _))) + (on (message (tcp-channel c server-id $bs)) + (send! (tcp-channel server-id c bs))) + (on-stop (printf "Closed connection ~v\n" c)))) diff --git a/racket/syndicate/examples/actor/example-action-after-suspension.rkt b/racket/syndicate/examples/actor/example-action-after-suspension.rkt index d4f20f2..d9b7acd 100644 --- a/racket/syndicate/examples/actor/example-action-after-suspension.rkt +++ b/racket/syndicate/examples/actor/example-action-after-suspension.rkt @@ -16,24 +16,23 @@ (struct x (v) #:prefab) -(actor (forever (on (message (x 'ping)) - (send! (x 'pong))))) +(actor (on (message (x 'ping)) + (send! (x 'pong)))) -(actor (react - (field [flag 'clear]) - (begin/dataflow - (printf "flag: ~v\n" (flag))) +(actor (field [flag 'clear]) + (begin/dataflow + (printf "flag: ~v\n" (flag))) - (field [spec #f]) - (begin/dataflow - (when (spec) - (let-event [(asserted (observe (x (spec))))] - (send! (x (list 'saw (spec)))) - (flag 'set)))) + (field [spec #f]) + (begin/dataflow + (when (spec) + (let-event [(asserted (observe (x (spec))))] + (send! (x (list 'saw (spec)))) + (flag 'set)))) - (on-start (send! (x 'first))) - (on (message (x 'first)) - (spec 'ping)))) + (on-start (send! (x 'first))) + (on (message (x 'first)) + (spec 'ping))) -(actor (forever (on (message (x $v)) - (printf "- ~v\n" v)))) +(actor (on (message (x $v)) + (printf "- ~v\n" v))) diff --git a/racket/syndicate/examples/actor/example-during-criterion-snapshotting.rkt b/racket/syndicate/examples/actor/example-during-criterion-snapshotting.rkt index f4e0945..b4ccd31 100644 --- a/racket/syndicate/examples/actor/example-during-criterion-snapshotting.rkt +++ b/racket/syndicate/examples/actor/example-during-criterion-snapshotting.rkt @@ -17,10 +17,9 @@ (struct foo (x y) #:prefab) (actor (field [x 123]) - (react - (assert (foo (x) 999)) - (during (foo (x) $v) - (log-info "x=~a v=~a" (x) v) - (when (= (x) 123) (x 124)) - (on-stop - (log-info "finally for x=~a v=~a" (x) v))))) + (assert (foo (x) 999)) + (during (foo (x) $v) + (log-info "x=~a v=~a" (x) v) + (when (= (x) 123) (x 124)) + (on-stop + (log-info "finally for x=~a v=~a" (x) v)))) diff --git a/racket/syndicate/examples/actor/example-partial-retraction.rkt b/racket/syndicate/examples/actor/example-partial-retraction.rkt index e513e7a..a5dd170 100644 --- a/racket/syndicate/examples/actor/example-partial-retraction.rkt +++ b/racket/syndicate/examples/actor/example-partial-retraction.rkt @@ -5,25 +5,23 @@ (struct ready (what) #:prefab) (struct entry (key val) #:prefab) -(actor (react - (assert (ready 'listener)) - (on (asserted (entry $key _)) - (log-info "key ~v asserted" key) - (until (retracted (entry key _)) - (on (asserted (entry key $value)) - (log-info "add binding: ~v -> ~v" key value)) - (on (retracted (entry key $value)) - (log-info "del binding: ~v -> ~v" key value))) - (log-info "key ~v retracted" key)))) +(actor (assert (ready 'listener)) + (on (asserted (entry $key _)) + (log-info "key ~v asserted" key) + (until (retracted (entry key _)) + (on (asserted (entry key $value)) + (log-info "add binding: ~v -> ~v" key value)) + (on (retracted (entry key $value)) + (log-info "del binding: ~v -> ~v" key value))) + (log-info "key ~v retracted" key))) -(actor (react - (assert (ready 'other-listener)) - (during (entry $key _) - (log-info "(other-listener) key ~v asserted" key) - (on-stop (log-info "(other-listener) key ~v retracted" key)) - (during (entry key $value) - (log-info "(other-listener) ~v ---> ~v" key value) - (on-stop (log-info "(other-listener) ~v -/-> ~v" key value)))))) +(actor (assert (ready 'other-listener)) + (during (entry $key _) + (log-info "(other-listener) key ~v asserted" key) + (on-stop (log-info "(other-listener) key ~v retracted" key)) + (during (entry key $value) + (log-info "(other-listener) ~v ---> ~v" key value) + (on-stop (log-info "(other-listener) ~v -/-> ~v" key value))))) (define (pause) (log-info "pause") @@ -31,18 +29,18 @@ (until (asserted (ready token)) (assert (ready token)))) -(actor (until (asserted (ready 'listener))) - (until (asserted (ready 'other-listener))) - (assert! (entry 'a 1)) - (assert! (entry 'a 2)) - (assert! (entry 'b 3)) - (assert! (entry 'c 33)) - (assert! (entry 'a 4)) - (assert! (entry 'a 5)) - (pause) - (retract! (entry 'a 2)) - (retract! (entry 'c 33)) - (assert! (entry 'a 9)) - (pause) - (retract! (entry 'a ?)) - (pause)) +(actor* (until (asserted (ready 'listener))) + (until (asserted (ready 'other-listener))) + (assert! (entry 'a 1)) + (assert! (entry 'a 2)) + (assert! (entry 'b 3)) + (assert! (entry 'c 33)) + (assert! (entry 'a 4)) + (assert! (entry 'a 5)) + (pause) + (retract! (entry 'a 2)) + (retract! (entry 'c 33)) + (assert! (entry 'a 9)) + (pause) + (retract! (entry 'a ?)) + (pause)) diff --git a/racket/syndicate/examples/actor/example-synthetic-patch.rkt b/racket/syndicate/examples/actor/example-synthetic-patch.rkt index d1278cc..dbd3bbb 100644 --- a/racket/syndicate/examples/actor/example-synthetic-patch.rkt +++ b/racket/syndicate/examples/actor/example-synthetic-patch.rkt @@ -24,18 +24,18 @@ (struct outer (v) #:prefab) (struct show () #:prefab) -(actor (react (field [v "first"]) - (assert (outer (v))) - (assert (show)) - (on (message 2) - (v "second")))) +(actor (field [v "first"]) + (assert (outer (v))) + (assert (show)) + (on (message 2) + (v "second"))) -(actor (react (on-start (send! 1)) - (during (outer $v) - (on-start (log-info "+outer ~v" v)) - (on-stop (log-info "-outer ~v" v)) - (during (show) - (on-start (log-info "+show")) - (on-stop (log-info "-show")))) - (on (message 1) - (send! 2)))) +(actor (on-start (send! 1)) + (during (outer $v) + (on-start (log-info "+outer ~v" v)) + (on-stop (log-info "-outer ~v" v)) + (during (show) + (on-start (log-info "+show")) + (on-stop (log-info "-show")))) + (on (message 1) + (send! 2))) diff --git a/racket/syndicate/examples/actor/fib-server.rkt b/racket/syndicate/examples/actor/fib-server.rkt index bea1f18..9be86e5 100644 --- a/racket/syndicate/examples/actor/fib-server.rkt +++ b/racket/syndicate/examples/actor/fib-server.rkt @@ -4,25 +4,24 @@ (require/activate syndicate/drivers/timer) (actor - (react - (during/actor (observe `(fib ,$n ,_)) - #:actor actor/thread - (on-start (log-info "Computing fib ~a..." n)) - (on-stop (log-info "Leaving fib ~a" n)) - (assert `(up ,n)) - (on-start - (flush!) - (react - (assert `(fib ,n - ,(let ((answer - (let f ((n n)) - (if (< n 2) - n - (+ (f (- n 1)) - (f (- n 2))))))) - (if (= n 36) - (error 'fib "Deliberate, hardcoded failure for n=36") - answer))))))))) + (during/actor (observe `(fib ,$n ,_)) + #:actor actor/thread + (on-start (log-info "Computing fib ~a..." n)) + (on-stop (log-info "Leaving fib ~a" n)) + (assert `(up ,n)) + (on-start + (flush!) + (react + (assert `(fib ,n + ,(let ((answer + (let f ((n n)) + (if (< n 2) + n + (+ (f (- n 1)) + (f (- n 2))))))) + (if (= n 36) + (error 'fib "Deliberate, hardcoded failure for n=36") + answer)))))))) (dataspace/thread (actor @@ -30,10 +29,10 @@ (define (arm!) (log-info "Tick ~v!" (tick-count)) (send! (outbound (set-timer 'tick 1000 'relative)))) - (react (on (message (inbound (timer-expired 'tick _))) - (tick-count (+ (tick-count) 1)) - (arm!)) - (on-start (arm!)))) + (on (message (inbound (timer-expired 'tick _))) + (tick-count (+ (tick-count) 1)) + (arm!)) + (on-start (arm!))) (field [counter 0]) (react diff --git a/racket/syndicate/examples/actor/file-system-during.rkt b/racket/syndicate/examples/actor/file-system-during.rkt index da3dde3..c0a5418 100644 --- a/racket/syndicate/examples/actor/file-system-during.rkt +++ b/racket/syndicate/examples/actor/file-system-during.rkt @@ -5,13 +5,13 @@ (require/activate "fs-shell.rkt") (require/activate "fs-protocol.rkt") -(actor (react (field [files (hash)]) - (during (observe (file $name _)) - (on-start (printf "At least one reader exists for ~v\n" name)) - (on-stop (printf "No remaining readers exist for ~v\n" name)) - (field [content (hash-ref (files) name #f)]) - (assert (file name (content))) - (on (message (save (file name $new-content))) (content new-content)) - (on (message (delete name)) (content #f))) - (on (message (save (file $name $content))) (files (hash-set (files) name content))) - (on (message (delete $name)) (files (hash-remove (files) name))))) +(actor (field [files (hash)]) + (during (observe (file $name _)) + (on-start (printf "At least one reader exists for ~v\n" name)) + (on-stop (printf "No remaining readers exist for ~v\n" name)) + (field [content (hash-ref (files) name #f)]) + (assert (file name (content))) + (on (message (save (file name $new-content))) (content new-content)) + (on (message (delete name)) (content #f))) + (on (message (save (file $name $content))) (files (hash-set (files) name content))) + (on (message (delete $name)) (files (hash-remove (files) name)))) diff --git a/racket/syndicate/examples/actor/file-system-during2.rkt b/racket/syndicate/examples/actor/file-system-during2.rkt index 900cf5f..03e7f28 100644 --- a/racket/syndicate/examples/actor/file-system-during2.rkt +++ b/racket/syndicate/examples/actor/file-system-during2.rkt @@ -5,10 +5,10 @@ (require/activate "fs-shell.rkt") (require/activate "fs-protocol.rkt") -(actor (react (field [files (hash)]) - (during (observe (file $name _)) - (on-start (printf "At least one reader exists for ~v\n" name)) - (assert (file name (hash-ref (files) name #f))) - (on-stop (printf "No remaining readers exist for ~v\n" name))) - (on (message (save (file $name $content))) (files (hash-set (files) name content))) - (on (message (delete $name)) (files (hash-remove (files) name))))) +(actor (field [files (hash)]) + (during (observe (file $name _)) + (on-start (printf "At least one reader exists for ~v\n" name)) + (assert (file name (hash-ref (files) name #f))) + (on-stop (printf "No remaining readers exist for ~v\n" name))) + (on (message (save (file $name $content))) (files (hash-set (files) name content))) + (on (message (delete $name)) (files (hash-remove (files) name)))) diff --git a/racket/syndicate/examples/actor/file-system.rkt b/racket/syndicate/examples/actor/file-system.rkt index 3516257..4c19948 100644 --- a/racket/syndicate/examples/actor/file-system.rkt +++ b/racket/syndicate/examples/actor/file-system.rkt @@ -5,14 +5,14 @@ (require/activate "fs-shell.rkt") (require/activate "fs-protocol.rkt") -(actor (react (field [files (hash)]) - (on (asserted (observe (file $name _))) - (printf "At least one reader exists for ~v\n" name) - (until (retracted (observe (file name _))) - (field [content (hash-ref (files) name #f)]) - (assert (file name (content))) - (on (message (save (file name $new-content))) (content new-content)) - (on (message (delete name)) (content #f))) - (printf "No remaining readers exist for ~v\n" name)) - (on (message (save (file $name $content))) (files (hash-set (files) name content))) - (on (message (delete $name)) (files (hash-remove (files) name))))) +(actor (field [files (hash)]) + (on (asserted (observe (file $name _))) + (printf "At least one reader exists for ~v\n" name) + (until (retracted (observe (file name _))) + (field [content (hash-ref (files) name #f)]) + (assert (file name (content))) + (on (message (save (file name $new-content))) (content new-content)) + (on (message (delete name)) (content #f))) + (printf "No remaining readers exist for ~v\n" name)) + (on (message (save (file $name $content))) (files (hash-set (files) name content))) + (on (message (delete $name)) (files (hash-remove (files) name)))) diff --git a/racket/syndicate/examples/actor/file-system2.rkt b/racket/syndicate/examples/actor/file-system2.rkt index 3ef3af2..23456ed 100644 --- a/racket/syndicate/examples/actor/file-system2.rkt +++ b/racket/syndicate/examples/actor/file-system2.rkt @@ -6,22 +6,22 @@ (require/activate "fs-protocol.rkt") (require racket/set) -(actor (react (field [files (hash)] [monitored (set)]) - (on (asserted (observe (file $name _))) - (printf "At least one reader exists for ~v\n" name) - (assert! (file name (hash-ref (files) name #f))) - (monitored (set-add (monitored) name))) - (on (retracted (observe (file $name _))) - (printf "No remaining readers exist for ~v\n" name) - (retract! (file name (hash-ref (files) name #f))) - (monitored (set-remove (monitored) name))) - (on (message (save (file $name $content))) - (when (set-member? (monitored) name) - (retract! (file name (hash-ref (files) name #f))) - (assert! (file name content))) - (files (hash-set (files) name content))) - (on (message (delete $name)) - (when (set-member? (monitored) name) - (retract! (file name (hash-ref (files) name #f))) - (assert! (file name #f))) - (files (hash-remove (files) name))))) +(actor (field [files (hash)] [monitored (set)]) + (on (asserted (observe (file $name _))) + (printf "At least one reader exists for ~v\n" name) + (assert! (file name (hash-ref (files) name #f))) + (monitored (set-add (monitored) name))) + (on (retracted (observe (file $name _))) + (printf "No remaining readers exist for ~v\n" name) + (retract! (file name (hash-ref (files) name #f))) + (monitored (set-remove (monitored) name))) + (on (message (save (file $name $content))) + (when (set-member? (monitored) name) + (retract! (file name (hash-ref (files) name #f))) + (assert! (file name content))) + (files (hash-set (files) name content))) + (on (message (delete $name)) + (when (set-member? (monitored) name) + (retract! (file name (hash-ref (files) name #f))) + (assert! (file name #f))) + (files (hash-remove (files) name)))) diff --git a/racket/syndicate/examples/actor/forward-chaining-mini.rkt b/racket/syndicate/examples/actor/forward-chaining-mini.rkt index 1614663..6317c9b 100644 --- a/racket/syndicate/examples/actor/forward-chaining-mini.rkt +++ b/racket/syndicate/examples/actor/forward-chaining-mini.rkt @@ -1,17 +1,17 @@ #lang syndicate/actor ;; Minimal syndicate/actor variation on examples/forward-chaining.rkt. -(actor (react (assert `(parent john douglas)))) -(actor (react (assert `(parent bob john)))) -(actor (react (assert `(parent ebbon bob)))) +(actor (assert `(parent john douglas))) +(actor (assert `(parent bob john))) +(actor (assert `(parent ebbon bob))) ;; This looks like an implication: ;; (parent A C) ⇒ ((ancestor A C) ∧ ((ancestor C B) ⇒ (ancestor A B))) ;; -(actor (react (during `(parent ,$A ,$C) - (assert `(ancestor ,A ,C)) - (during `(ancestor ,C ,$B) - (assert `(ancestor ,A ,B)))))) +(actor (during `(parent ,$A ,$C) + (assert `(ancestor ,A ,C)) + (during `(ancestor ,C ,$B) + (assert `(ancestor ,A ,B))))) -(actor (react (on (asserted `(ancestor ,$A ,$B)) - (log-info "~a is an ancestor of ~a" A B)))) +(actor (on (asserted `(ancestor ,$A ,$B)) + (log-info "~a is an ancestor of ~a" A B))) diff --git a/racket/syndicate/examples/actor/fs-shell.rkt b/racket/syndicate/examples/actor/fs-shell.rkt index bf68ec7..cc5fbe0 100644 --- a/racket/syndicate/examples/actor/fs-shell.rkt +++ b/racket/syndicate/examples/actor/fs-shell.rkt @@ -18,32 +18,32 @@ (printf "> ") (flush-output)) (actor (field [reader-count 0]) - (print-prompt) - (until (message (inbound (external-event e (list (? eof-object? _))))) - (on (message (inbound (external-event e (list (? bytes? $bs))))) - (match (string-split (string-trim (bytes->string/utf-8 bs))) - [(list "open" name) - (define reader-id (reader-count)) - (reader-count (+ (reader-count) 1)) - (actor (printf "Reader ~a opening file ~v.\n" reader-id name) - (until (message `(stop-watching ,name)) - (on (asserted (file name $contents)) - (printf "Reader ~a sees that ~v contains: ~v\n" - reader-id - name - contents))) - (printf "Reader ~a closing file ~v.\n" reader-id name))] - [(list "close" name) - (send! `(stop-watching ,name))] - [(list* "write" name words) - (send! (save (file name words)))] - [(list "delete" name) - (send! (delete name))] - [_ - (printf "I'm afraid I didn't understand that.\n") - (printf "Try: open filename\n") - (printf " close filename\n") - (printf " write filename some text goes here\n") - (printf " delete filename\n")]) - (sleep 0.1) - (print-prompt))))) + (on-start (print-prompt)) + (stop-when (message (inbound (external-event e (list (? eof-object? _)))))) + (on (message (inbound (external-event e (list (? bytes? $bs))))) + (match (string-split (string-trim (bytes->string/utf-8 bs))) + [(list "open" name) + (define reader-id (reader-count)) + (reader-count (+ (reader-count) 1)) + (actor (on-start (printf "Reader ~a opening file ~v.\n" reader-id name)) + (stop-when (message `(stop-watching ,name))) + (on (asserted (file name $contents)) + (printf "Reader ~a sees that ~v contains: ~v\n" + reader-id + name + contents)) + (on-stop (printf "Reader ~a closing file ~v.\n" reader-id name)))] + [(list "close" name) + (send! `(stop-watching ,name))] + [(list* "write" name words) + (send! (save (file name words)))] + [(list "delete" name) + (send! (delete name))] + [_ + (printf "I'm afraid I didn't understand that.\n") + (printf "Try: open filename\n") + (printf " close filename\n") + (printf " write filename some text goes here\n") + (printf " delete filename\n")]) + (sleep 0.1) + (print-prompt)))) diff --git a/racket/syndicate/examples/actor/mini-echo.rkt b/racket/syndicate/examples/actor/mini-echo.rkt index 08e17ab..eb92f10 100644 --- a/racket/syndicate/examples/actor/mini-echo.rkt +++ b/racket/syndicate/examples/actor/mini-echo.rkt @@ -3,16 +3,16 @@ (struct echo-req (body) #:prefab) (struct echo-resp (body) #:prefab) -(actor (react (field [count 0]) - (on (message (echo-req $body)) - (send! (echo-resp body)) - (count (+ (count) 1))))) +(actor (field [count 0]) + (on (message (echo-req $body)) + (send! (echo-resp body)) + (count (+ (count) 1)))) -(actor (react (on (message (echo-resp $body)) - (printf "Received: ~v\n" body)))) +(actor (on (message (echo-resp $body)) + (printf "Received: ~v\n" body))) -(actor (until (asserted (observe (echo-req _)))) - (until (asserted (observe (echo-resp _)))) - (send! (echo-req 0)) - (send! (echo-req 1)) - (send! (echo-req 2))) +(actor* (until (asserted (observe (echo-req _)))) + (until (asserted (observe (echo-resp _)))) + (send! (echo-req 0)) + (send! (echo-req 1)) + (send! (echo-req 2))) diff --git a/racket/syndicate/examples/actor/mutex.rkt b/racket/syndicate/examples/actor/mutex.rkt index a97d584..c2244fd 100644 --- a/racket/syndicate/examples/actor/mutex.rkt +++ b/racket/syndicate/examples/actor/mutex.rkt @@ -9,37 +9,36 @@ (struct resource-status (resource-id waiter-count) #:prefab) (define (spawn-resource resource-id total-available-leases) - (actor - (react (field [waiters (make-queue)] - [free-lease-count total-available-leases]) + (actor (field [waiters (make-queue)] + [free-lease-count total-available-leases]) - (begin/dataflow (log-info "~as available: ~a" resource-id (free-lease-count))) + (begin/dataflow (log-info "~as available: ~a" resource-id (free-lease-count))) - (begin/dataflow ;; This might be a nice place to put a kind of "class contract" - (unless (and (>= (free-lease-count) 0) - (<= (free-lease-count) total-available-leases) - (or (zero? (free-lease-count)) - (queue-empty? (waiters)))) - (error 'resource "~a: Invariant violated" resource-id))) + (begin/dataflow ;; This might be a nice place to put a kind of "class contract" + (unless (and (>= (free-lease-count) 0) + (<= (free-lease-count) total-available-leases) + (or (zero? (free-lease-count)) + (queue-empty? (waiters)))) + (error 'resource "~a: Invariant violated" resource-id))) - (on (asserted (lease-request resource-id $w)) - (cond [(positive? (free-lease-count)) - (assert! (lease-assignment resource-id w)) - (free-lease-count (- (free-lease-count) 1))] - [else - (waiters (enqueue (waiters) w))])) + (on (asserted (lease-request resource-id $w)) + (cond [(positive? (free-lease-count)) + (assert! (lease-assignment resource-id w)) + (free-lease-count (- (free-lease-count) 1))] + [else + (waiters (enqueue (waiters) w))])) - (on (retracted (lease-request resource-id $w)) - (waiters (queue-filter (lambda (x) (not (equal? w x))) (waiters))) - (retract! (lease-assignment resource-id w))) + (on (retracted (lease-request resource-id $w)) + (waiters (queue-filter (lambda (x) (not (equal? w x))) (waiters))) + (retract! (lease-assignment resource-id w))) - (on (retracted (lease-assignment resource-id $w)) - (cond [(queue-empty? (waiters)) - (free-lease-count (+ (free-lease-count) 1))] - [else - (define-values (w remainder) (dequeue (waiters))) - (assert! (lease-assignment resource-id w)) - (waiters remainder)]))))) + (on (retracted (lease-assignment resource-id $w)) + (cond [(queue-empty? (waiters)) + (free-lease-count (+ (free-lease-count) 1))] + [else + (define-values (w remainder) (dequeue (waiters))) + (assert! (lease-assignment resource-id w)) + (waiters remainder)])))) ;;--------------------------------------------------------------------------- @@ -47,42 +46,41 @@ (struct philosopher-status (name status) #:prefab) -(actor (react (define/query-hash-set thinkers (philosopher-status $who $status) status who) - (begin/dataflow - (log-info "~a" (for/list (((status names) (in-hash (thinkers)))) - (format "~a: ~a" status (set->list names))))))) +(actor (define/query-hash-set thinkers (philosopher-status $who $status) status who) + (begin/dataflow + (log-info "~a" (for/list (((status names) (in-hash (thinkers)))) + (format "~a: ~a" status (set->list names)))))) (define (philosopher name) - (actor - (react (field [status 'starting]) - (assert (philosopher-status name (status))) + (actor (field [status 'starting]) + (assert (philosopher-status name (status))) - (stop-when (rising-edge (eq? (status) 'inspired))) + (stop-when (rising-edge (eq? (status) 'inspired))) - (on-start - (let loop () - (define thinking-duration (* (random) 4)) - (log-info "~a thinks for ~a seconds" name thinking-duration) - (status 'thinking) - (until (message (timer-expired name _)) - (on-start (send! (set-timer name (* thinking-duration 1000.0) 'relative)))) - (if (> (random) 0.95) - (begin - (log-info "~a stops thinking, leaps up, shouts \"EUREKA!\", and leaves.\n" name) - (status 'inspired)) - (begin - (log-info "~a stops thinking, and waits for a fork" name) - (status 'waiting) - (react (assert (lease-request 'fork name)) - (on (asserted (lease-assignment 'fork name)) - (status 'eating) - (log-info "~a claims a fork" name) - (define eating-duration (* (random) 4)) - (log-info "~a is eating for ~a seconds" name eating-duration) - (send! (set-timer name (* eating-duration 1000.0) 'relative))) - (stop-when (message (timer-expired name _)) - (log-info "~a finishes eating and puts down the fork" name) - (loop)))))))))) + (on-start + (let loop () + (define thinking-duration (* (random) 4)) + (log-info "~a thinks for ~a seconds" name thinking-duration) + (status 'thinking) + (until (message (timer-expired name _)) + (on-start (send! (set-timer name (* thinking-duration 1000.0) 'relative)))) + (if (> (random) 0.95) + (begin + (log-info "~a stops thinking, leaps up, shouts \"EUREKA!\", and leaves.\n" name) + (status 'inspired)) + (begin + (log-info "~a stops thinking, and waits for a fork" name) + (status 'waiting) + (react (assert (lease-request 'fork name)) + (on (asserted (lease-assignment 'fork name)) + (status 'eating) + (log-info "~a claims a fork" name) + (define eating-duration (* (random) 4)) + (log-info "~a is eating for ~a seconds" name eating-duration) + (send! (set-timer name (* eating-duration 1000.0) 'relative))) + (stop-when (message (timer-expired name _)) + (log-info "~a finishes eating and puts down the fork" name) + (loop))))))))) (spawn-resource 'fork 2) (philosopher 'Socrates) diff --git a/racket/syndicate/examples/actor/parameters.rkt b/racket/syndicate/examples/actor/parameters.rkt index 265a382..a4f2c16 100644 --- a/racket/syndicate/examples/actor/parameters.rkt +++ b/racket/syndicate/examples/actor/parameters.rkt @@ -33,14 +33,13 @@ (define p-at-spawn-time (p)) (actor #:name (list 'spawn-one p-at-spawn-time) (define p-at-start-time (p)) - (react - (assert `(p-at-spawn-time ,p-at-spawn-time)) - (assert `(p-at-start-time ,p-at-start-time)) - (assert `(p ,(p))) - (on (message 'survey) - (send! `(survey-response ,(p))))))) + (assert `(p-at-spawn-time ,p-at-spawn-time)) + (assert `(p-at-start-time ,p-at-start-time)) + (assert `(p ,(p))) + (on (message 'survey) + (send! `(survey-response ,(p)))))) -(actor +(actor* (spawn-one) (parameterize ((p 'first)) (spawn-one)) (parameterize ((p 'second)) (spawn-one)) diff --git a/racket/syndicate/examples/actor/query-set.rkt b/racket/syndicate/examples/actor/query-set.rkt index b733abc..1f6ef1c 100644 --- a/racket/syndicate/examples/actor/query-set.rkt +++ b/racket/syndicate/examples/actor/query-set.rkt @@ -3,58 +3,57 @@ (require racket/set) (actor #:name 'queryer - (forever - (define/query-value as-value 'absent `(item ,$a ,$b) (list a b)) - (define/query-set as-set `(item ,$a ,$b) (list a b) - #:on-add (log-info "as-set adding ~v/~v" a b) - #:on-remove (log-info "as-set removing ~v/~v" a b)) - (define/query-hash as-hash `(item ,$a ,$b) a b) - (define/query-hash-set as-hash-set `(item ,$a ,$b) a b) + (define/query-value as-value 'absent `(item ,$a ,$b) (list a b)) + (define/query-set as-set `(item ,$a ,$b) (list a b) + #:on-add (log-info "as-set adding ~v/~v" a b) + #:on-remove (log-info "as-set removing ~v/~v" a b)) + (define/query-hash as-hash `(item ,$a ,$b) a b) + (define/query-hash-set as-hash-set `(item ,$a ,$b) a b) - (field [as-value-notification-counter 0]) + (field [as-value-notification-counter 0]) - (begin/dataflow - (log-info "Notification counter: ~v" (as-value-notification-counter)) - (local-require (only-in racket/base sleep)) - (sleep 1)) + (begin/dataflow + (log-info "Notification counter: ~v" (as-value-notification-counter)) + (local-require (only-in racket/base sleep)) + (sleep 1)) - (let ((shadow-counter 0)) - (begin/dataflow - (log-info "as-value is now: ~v" (as-value)) - (set! shadow-counter (+ shadow-counter 1)) - (as-value-notification-counter shadow-counter))) + (let ((shadow-counter 0)) + (begin/dataflow + (log-info "as-value is now: ~v" (as-value)) + (set! shadow-counter (+ shadow-counter 1)) + (as-value-notification-counter shadow-counter))) - (on (message 'dump) - (printf "----------------------------------------\n") - (printf "Queried as-value: ~v\n" (as-value)) - (newline) - (printf "Queried as-set:\n") - (for [(item (as-set))] - (match-define (list a b) item) - (printf " ~v -> ~v\n" a b)) - (newline) - (printf "Queried as-hash:\n") - (for [((k v) (in-hash (as-hash)))] - (printf " ~v -> ~v\n" k v)) - (newline) - (printf "Queried as-hash-set:\n") - (for [((k vs) (in-hash (as-hash-set)))] - (printf " ~v -> ~v\n" k vs)) - (printf "----------------------------------------\n") - (flush-output)))) + (on (message 'dump) + (printf "----------------------------------------\n") + (printf "Queried as-value: ~v\n" (as-value)) + (newline) + (printf "Queried as-set:\n") + (for [(item (as-set))] + (match-define (list a b) item) + (printf " ~v -> ~v\n" a b)) + (newline) + (printf "Queried as-hash:\n") + (for [((k v) (in-hash (as-hash)))] + (printf " ~v -> ~v\n" k v)) + (newline) + (printf "Queried as-hash-set:\n") + (for [((k vs) (in-hash (as-hash-set)))] + (printf " ~v -> ~v\n" k vs)) + (printf "----------------------------------------\n") + (flush-output))) -(actor #:name 'mutator - (until (asserted 'observer-in-ds-ready)) - (assert! `(item a 1)) - (assert! `(item b 2)) - (assert! `(item b 3)) - (send! 'dump) - (retract! `(item b ,?)) - (send! 'dump) - (assert! `(item c 1)) - (assert! `(item c 4)) - (send! 'dump) - (forever)) +(actor* #:name 'mutator + (until (asserted 'observer-in-ds-ready)) + (assert! `(item a 1)) + (assert! `(item b 2)) + (assert! `(item b 3)) + (send! 'dump) + (retract! `(item b ,?)) + (send! 'dump) + (assert! `(item c 1)) + (assert! `(item c 4)) + (send! 'dump) + (forever)) (let ((anchor (level-anchor))) (dataspace (define LEVEL (level-anchor->meta-level anchor)) @@ -62,10 +61,9 @@ (log-info "Inner level anchor: ~a" (level-anchor)) (log-info "Computed meta-level: ~v" LEVEL) (actor #:name 'observer-in-ds - (forever - (assert (outbound* LEVEL 'observer-in-ds-ready)) - (on-start (log-info "observer-in-ds: STARTING")) - (define/query-set items (inbound* LEVEL `(item ,$a ,$b)) (list a b)) - (on (message (inbound* LEVEL 'dump)) - (log-info "observer-in-ds: ~v" (items))))) + (assert (outbound* LEVEL 'observer-in-ds-ready)) + (on-start (log-info "observer-in-ds: STARTING")) + (define/query-set items (inbound* LEVEL `(item ,$a ,$b)) (list a b)) + (on (message (inbound* LEVEL 'dump)) + (log-info "observer-in-ds: ~v" (items)))) (forever))) diff --git a/racket/syndicate/examples/actor/queue-no-credit.rkt b/racket/syndicate/examples/actor/queue-no-credit.rkt index dc2c9dc..6508fbe 100644 --- a/racket/syndicate/examples/actor/queue-no-credit.rkt +++ b/racket/syndicate/examples/actor/queue-no-credit.rkt @@ -41,65 +41,65 @@ ;; EFFECT: Spawn a queue process named `queue-id`. (define (spawn-queue queue-id) (actor #:name (list 'queue queue-id) - (react (field [waiters (make-queue)]) - (field [messages (make-queue)]) + (field [waiters (make-queue)]) + (field [messages (make-queue)]) - (define/query-set subscribers (subscription queue-id $who) who - #:on-add (enq! waiters who)) + (define/query-set subscribers (subscription queue-id $who) who + #:on-add (enq! waiters who)) - (on (message (delivery $who queue-id $body)) (enq! messages body)) + (on (message (delivery $who queue-id $body)) (enq! messages body)) - (begin/dataflow - (when (and (not (queue-empty? (waiters))) - (not (queue-empty? (messages)))) - (define who (deq! waiters)) - (when (set-member? (subscribers) who) ;; lazily remove entries from waiters - (enq! waiters who) - (define msg (deq! messages)) - (log-info "~a: sending ~a message ~a" queue-id who msg) - (send! (delivery queue-id who msg))))) + (begin/dataflow + (when (and (not (queue-empty? (waiters))) + (not (queue-empty? (messages)))) + (define who (deq! waiters)) + (when (set-member? (subscribers) who) ;; lazily remove entries from waiters + (enq! waiters who) + (define msg (deq! messages)) + (log-info "~a: sending ~a message ~a" queue-id who msg) + (send! (delivery queue-id who msg))))) - (assert (metric (list 'subscriber-count queue-id) (set-count (subscribers)))) - (assert (metric (list 'backlog queue-id) (queue-length (messages)))) + (assert (metric (list 'subscriber-count queue-id) (set-count (subscribers)))) + (assert (metric (list 'backlog queue-id) (queue-length (messages)))) - ;;------------------------------------------------------------ + ;;------------------------------------------------------------ - (local-require (submod syndicate/actor priorities)) - (begin/dataflow #:priority *idle-priority* ;; Check invariants - (define has-waiters? (not (queue-empty? (waiters)))) - (define has-messages? (not (queue-empty? (messages)))) - (unless (and (or (not has-waiters?) (not has-messages?)) - (or (not has-messages?) (not has-waiters?))) - (error 'queue - "~a: invariant violated: ~v" - queue-id - `((has-waiters? ,has-waiters?) - (has-messages? ,has-messages?) - (waiters ,(queue->list (waiters))) - (messages ,(queue->list (messages)))))))))) + (local-require (submod syndicate/actor priorities)) + (begin/dataflow #:priority *idle-priority* ;; Check invariants + (define has-waiters? (not (queue-empty? (waiters)))) + (define has-messages? (not (queue-empty? (messages)))) + (unless (and (or (not has-waiters?) (not has-messages?)) + (or (not has-messages?) (not has-waiters?))) + (error 'queue + "~a: invariant violated: ~v" + queue-id + `((has-waiters? ,has-waiters?) + (has-messages? ,has-messages?) + (waiters ,(queue->list (waiters))) + (messages ,(queue->list (messages))))))))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Example (define (spawn-consumer consumer-id #:variant [variant 'normal]) (actor #:name (list 'consumer consumer-id) - (react (assert (subscription 'q consumer-id)) - (on (message (delivery 'q consumer-id $body)) - (log-info "Consumer ~a got: ~a" consumer-id body) - (when (eq? variant 'crashy) - (error consumer-id - "Hark, canst thou hear me? I will play the swan / and die in music.")))))) + (assert (subscription 'q consumer-id)) + (on (message (delivery 'q consumer-id $body)) + (log-info "Consumer ~a got: ~a" consumer-id body) + (when (eq? variant 'crashy) + (error consumer-id + "Hark, canst thou hear me? I will play the swan / and die in music."))))) -(actor (react (define/query-hash metrics (metric $k $v) k v) - (begin/dataflow (log-info " ~a" (hash->list (metrics)))))) +(actor (define/query-hash metrics (metric $k $v) k v) + (begin/dataflow (log-info " ~a" (hash->list (metrics))))) (spawn-queue 'q) (spawn-consumer 'c1) (spawn-consumer 'c2 #:variant 'crashy) (spawn-consumer 'c3) -(actor (until (asserted (observe (delivery _ 'q _)))) - (for ((n (in-range 10))) - (send! (delivery #f 'q n)) - ;; (flush!) - )) +(actor* (until (asserted (observe (delivery _ 'q _)))) + (for ((n (in-range 10))) + (send! (delivery #f 'q n)) + ;; (flush!) + )) diff --git a/racket/syndicate/examples/actor/queue-no-credit2.rkt b/racket/syndicate/examples/actor/queue-no-credit2.rkt index 55e99da..8816dcf 100644 --- a/racket/syndicate/examples/actor/queue-no-credit2.rkt +++ b/racket/syndicate/examples/actor/queue-no-credit2.rkt @@ -41,63 +41,63 @@ ;; EFFECT: Spawn a queue process named `queue-id`. (define (spawn-queue queue-id) (actor #:name (list 'queue queue-id) - (react (field [waiters (make-queue)]) - (field [messages (make-queue)]) + (field [waiters (make-queue)]) + (field [messages (make-queue)]) - (on (asserted (subscription queue-id $who)) (enq! waiters who)) - (on (retracted (subscription queue-id $who)) (waiters (queue-remove who (waiters)))) - (on (message (delivery $who queue-id $body)) (enq! messages body)) + (on (asserted (subscription queue-id $who)) (enq! waiters who)) + (on (retracted (subscription queue-id $who)) (waiters (queue-remove who (waiters)))) + (on (message (delivery $who queue-id $body)) (enq! messages body)) - (begin/dataflow - (when (and (not (queue-empty? (waiters))) - (not (queue-empty? (messages)))) - (define who (deq! waiters)) - (define msg (deq! messages)) - (log-info "~a: sending ~a message ~a" queue-id who msg) - (send! (delivery queue-id who msg)) - (enq! waiters who))) + (begin/dataflow + (when (and (not (queue-empty? (waiters))) + (not (queue-empty? (messages)))) + (define who (deq! waiters)) + (define msg (deq! messages)) + (log-info "~a: sending ~a message ~a" queue-id who msg) + (send! (delivery queue-id who msg)) + (enq! waiters who))) - (assert (metric (list 'subscriber-count queue-id) (queue-length (waiters)))) - (assert (metric (list 'backlog queue-id) (queue-length (messages)))) + (assert (metric (list 'subscriber-count queue-id) (queue-length (waiters)))) + (assert (metric (list 'backlog queue-id) (queue-length (messages)))) - ;;------------------------------------------------------------ + ;;------------------------------------------------------------ - (local-require (submod syndicate/actor priorities)) - (begin/dataflow #:priority *idle-priority* ;; Check invariants - (define has-waiters? (not (queue-empty? (waiters)))) - (define has-messages? (not (queue-empty? (messages)))) - (unless (and (or (not has-waiters?) (not has-messages?)) - (or (not has-messages?) (not has-waiters?))) - (error 'queue - "~a: invariant violated: ~v" - queue-id - `((has-waiters? ,has-waiters?) - (has-messages? ,has-messages?) - (waiters ,(queue->list (waiters))) - (messages ,(queue->list (messages)))))))))) + (local-require (submod syndicate/actor priorities)) + (begin/dataflow #:priority *idle-priority* ;; Check invariants + (define has-waiters? (not (queue-empty? (waiters)))) + (define has-messages? (not (queue-empty? (messages)))) + (unless (and (or (not has-waiters?) (not has-messages?)) + (or (not has-messages?) (not has-waiters?))) + (error 'queue + "~a: invariant violated: ~v" + queue-id + `((has-waiters? ,has-waiters?) + (has-messages? ,has-messages?) + (waiters ,(queue->list (waiters))) + (messages ,(queue->list (messages))))))))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Example (define (spawn-consumer consumer-id #:variant [variant 'normal]) (actor #:name (list 'consumer consumer-id) - (react (assert (subscription 'q consumer-id)) - (on (message (delivery 'q consumer-id $body)) - (log-info "Consumer ~a got: ~a" consumer-id body) - (when (eq? variant 'crashy) - (error consumer-id - "Hark, canst thou hear me? I will play the swan / and die in music.")))))) + (assert (subscription 'q consumer-id)) + (on (message (delivery 'q consumer-id $body)) + (log-info "Consumer ~a got: ~a" consumer-id body) + (when (eq? variant 'crashy) + (error consumer-id + "Hark, canst thou hear me? I will play the swan / and die in music."))))) -(actor (react (define/query-hash metrics (metric $k $v) k v) - (begin/dataflow (log-info " ~a" (hash->list (metrics)))))) +(actor (define/query-hash metrics (metric $k $v) k v) + (begin/dataflow (log-info " ~a" (hash->list (metrics))))) (spawn-queue 'q) (spawn-consumer 'c1) (spawn-consumer 'c2 #:variant 'crashy) (spawn-consumer 'c3) -(actor (until (asserted (observe (delivery _ 'q _)))) - (for ((n (in-range 10))) - (send! (delivery #f 'q n)) - (when (odd? n) (flush!)) - )) +(actor* (until (asserted (observe (delivery _ 'q _)))) + (for ((n (in-range 10))) + (send! (delivery #f 'q n)) + (when (odd? n) (flush!)) + )) diff --git a/racket/syndicate/examples/actor/queue.rkt b/racket/syndicate/examples/actor/queue.rkt index 9682df4..4fc4a9c 100644 --- a/racket/syndicate/examples/actor/queue.rkt +++ b/racket/syndicate/examples/actor/queue.rkt @@ -44,86 +44,86 @@ ;; EFFECT: Spawn a queue process named `queue-id`. (define (spawn-queue queue-id) (actor #:name (list 'queue queue-id) - (react (field [waiters (make-queue)]) - (field [messages (make-queue)]) + (field [waiters (make-queue)]) + (field [messages (make-queue)]) - (define/query-hash credits (subscription queue-id $who) who 0) ;; Start with no credit + (define/query-hash credits (subscription queue-id $who) who 0) ;; Start with no credit - (on (message (credit queue-id $who $amount)) - (define old-credit (hash-ref (credits) who #f)) - (when old-credit - (credits (hash-set (credits) who (+ amount old-credit))) - (when (zero? old-credit) (enq! waiters who)))) + (on (message (credit queue-id $who $amount)) + (define old-credit (hash-ref (credits) who #f)) + (when old-credit + (credits (hash-set (credits) who (+ amount old-credit))) + (when (zero? old-credit) (enq! waiters who)))) - (on (message (delivery $who queue-id $body)) - (send! (credit who queue-id 1)) - (enq! messages body)) + (on (message (delivery $who queue-id $body)) + (send! (credit who queue-id 1)) + (enq! messages body)) - (begin/dataflow - (when (and (not (queue-empty? (waiters))) - (not (queue-empty? (messages)))) - (define who (deq! waiters)) - (define old-credit (hash-ref (credits) who 0)) - (when (positive? old-credit) ;; lazily remove entries from waiters - (define new-credit (- old-credit 1)) - (credits (hash-set (credits) who new-credit)) - (when (positive? new-credit) (enq! waiters who)) - (define msg (deq! messages)) - (log-info "~a: sending ~a message ~a" queue-id who msg) - (send! (delivery queue-id who msg))))) + (begin/dataflow + (when (and (not (queue-empty? (waiters))) + (not (queue-empty? (messages)))) + (define who (deq! waiters)) + (define old-credit (hash-ref (credits) who 0)) + (when (positive? old-credit) ;; lazily remove entries from waiters + (define new-credit (- old-credit 1)) + (credits (hash-set (credits) who new-credit)) + (when (positive? new-credit) (enq! waiters who)) + (define msg (deq! messages)) + (log-info "~a: sending ~a message ~a" queue-id who msg) + (send! (delivery queue-id who msg))))) - (during (subscription queue-id $who) - (assert (metric (list 'credit queue-id who) (hash-ref (credits) who 0)))) - (assert (metric (list 'backlog queue-id) (queue-length (messages)))) + (during (subscription queue-id $who) + (assert (metric (list 'credit queue-id who) (hash-ref (credits) who 0)))) + (assert (metric (list 'backlog queue-id) (queue-length (messages)))) - ;;------------------------------------------------------------ + ;;------------------------------------------------------------ - (local-require (submod syndicate/actor priorities)) - (begin/dataflow #:priority *idle-priority* ;; Check invariants - (define has-waiters? (not (queue-empty? (waiters)))) - (define has-messages? (not (queue-empty? (messages)))) - (define total-credits (for/sum ((v (in-hash-values (credits)))) v)) - (unless (and (or (not has-messages?) (zero? total-credits)) - (or (not has-waiters?) (not has-messages?)) - (equal? has-waiters? (positive? total-credits))) - (error 'queue - "~a: invariant violated: ~v" - queue-id - `((has-waiters? ,has-waiters?) - (has-messages? ,has-messages?) - (total-credits ,total-credits) - (waiters ,(queue->list (waiters))) - (messages ,(queue->list (messages))) - (credits ,(hash->list (credits)))))))))) + (local-require (submod syndicate/actor priorities)) + (begin/dataflow #:priority *idle-priority* ;; Check invariants + (define has-waiters? (not (queue-empty? (waiters)))) + (define has-messages? (not (queue-empty? (messages)))) + (define total-credits (for/sum ((v (in-hash-values (credits)))) v)) + (unless (and (or (not has-messages?) (zero? total-credits)) + (or (not has-waiters?) (not has-messages?)) + (equal? has-waiters? (positive? total-credits))) + (error 'queue + "~a: invariant violated: ~v" + queue-id + `((has-waiters? ,has-waiters?) + (has-messages? ,has-messages?) + (total-credits ,total-credits) + (waiters ,(queue->list (waiters))) + (messages ,(queue->list (messages))) + (credits ,(hash->list (credits))))))))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Example (define (spawn-consumer consumer-id initial-credit #:variant [variant 'normal]) (actor #:name (list 'consumer consumer-id) - (react (assert (subscription 'q consumer-id)) - (on-start (send! (credit 'q consumer-id initial-credit))) - (on (message (delivery 'q consumer-id $body)) - (log-info "Consumer ~a got: ~a" consumer-id body) - (case variant - [(normal) - (send! (credit 'q consumer-id 1))] - [(crashy) - (error consumer-id - "Hark, canst thou hear me? I will play the swan / and die in music.")] - [(overloaded) ;; don't issue credit - (void)]))))) + (assert (subscription 'q consumer-id)) + (on-start (send! (credit 'q consumer-id initial-credit))) + (on (message (delivery 'q consumer-id $body)) + (log-info "Consumer ~a got: ~a" consumer-id body) + (case variant + [(normal) + (send! (credit 'q consumer-id 1))] + [(crashy) + (error consumer-id + "Hark, canst thou hear me? I will play the swan / and die in music.")] + [(overloaded) ;; don't issue credit + (void)])))) -(actor (react (define/query-hash metrics (metric $k $v) k v) - (begin/dataflow (log-info " ~a" (hash->list (metrics)))))) +(actor (define/query-hash metrics (metric $k $v) k v) + (begin/dataflow (log-info " ~a" (hash->list (metrics))))) (spawn-queue 'q) (spawn-consumer 'c1 2) (spawn-consumer 'c2 2 #:variant 'crashy) (spawn-consumer 'c3 3 #:variant 'overloaded) -(actor (until (asserted (observe (delivery _ 'q _)))) - (for ((n (in-range 10))) - (send! (delivery #f 'q n)) - ;; (flush!) - )) +(actor* (until (asserted (observe (delivery _ 'q _)))) + (for ((n (in-range 10))) + (send! (delivery #f 'q n)) + ;; (flush!) + )) diff --git a/racket/syndicate/examples/actor/show-field-scope-enforcement1.rkt b/racket/syndicate/examples/actor/show-field-scope-enforcement1.rkt index 73e7bab..86c5bad 100644 --- a/racket/syndicate/examples/actor/show-field-scope-enforcement1.rkt +++ b/racket/syndicate/examples/actor/show-field-scope-enforcement1.rkt @@ -2,14 +2,13 @@ ;; Demonstrates that fields may not be passed between actors. (actor #:name 'reading-actor - (react - (on (message `(read-from ,$this-field)) - (log-info "Trying to read from ~a" this-field) - (log-info "Read: ~a" (this-field)) - (send! `(read-successfully ,this-field))))) + (on (message `(read-from ,$this-field)) + (log-info "Trying to read from ~a" this-field) + (log-info "Read: ~a" (this-field)) + (send! `(read-successfully ,this-field)))) (actor #:name 'requesting-actor (field [a 123]) - (send! `(read-from ,a)) - (until (message `(read-successfully ,a))) - (log-info "Done.")) + (on-start (send! `(read-from ,a))) + (stop-when (message `(read-successfully ,a))) + (on-stop (log-info "Done."))) diff --git a/racket/syndicate/examples/actor/show-field-scope-enforcement2.rkt b/racket/syndicate/examples/actor/show-field-scope-enforcement2.rkt index 2b207ed..5748613 100644 --- a/racket/syndicate/examples/actor/show-field-scope-enforcement2.rkt +++ b/racket/syndicate/examples/actor/show-field-scope-enforcement2.rkt @@ -3,19 +3,18 @@ ;; facet, but not the other way around. (actor #:name 'reading-actor - (react - (field [top 123]) - (on (message `(read-from ,$this-field)) - (log-info "Trying to read from ~a" this-field) - (log-info "Read: ~a" (this-field)) - (send! `(read-successfully ,this-field))) - (on-start - (react (field [inner 234]) - (on-start - (log-info "Inner access to ~a: ~a" top (top)) ;; OK - (log-info "Inner access to ~a: ~a" inner (inner)) ;; OK - (send! `(read-from ,top)) ;; OK - (until (message `(read-successfully ,top))) - (send! `(read-from ,inner)) ;; Will cause a failure. - (until (message `(read-successfully ,inner))) ;; Will never happen. - (log-info "Done.")))))) + (field [top 123]) + (on (message `(read-from ,$this-field)) + (log-info "Trying to read from ~a" this-field) + (log-info "Read: ~a" (this-field)) + (send! `(read-successfully ,this-field))) + (on-start + (react (field [inner 234]) + (on-start + (log-info "Inner access to ~a: ~a" top (top)) ;; OK + (log-info "Inner access to ~a: ~a" inner (inner)) ;; OK + (send! `(read-from ,top)) ;; OK + (until (message `(read-successfully ,top))) + (send! `(read-from ,inner)) ;; Will cause a failure. + (until (message `(read-successfully ,inner))) ;; Will never happen. + (log-info "Done."))))) diff --git a/racket/syndicate/examples/actor/show-field-scope-enforcement3.rkt b/racket/syndicate/examples/actor/show-field-scope-enforcement3.rkt index ccc976b..d162e1b 100644 --- a/racket/syndicate/examples/actor/show-field-scope-enforcement3.rkt +++ b/racket/syndicate/examples/actor/show-field-scope-enforcement3.rkt @@ -1,14 +1,14 @@ #lang syndicate/actor ;; Demonstrates that fields may not be passed between sibling facets. -(actor (react - (on (message `(read-from ,$this-field)) - (log-info "Trying to read from ~a" this-field) - (log-info "Read: ~a" (this-field)) - (send! `(read-successfully ,this-field)))) - (react - (field [a 123]) - (on-start - (send! `(read-from ,a)) - (until (message `(read-successfully ,a))) - (log-info "Done.")))) +(actor (on (message `(read-from ,$this-field)) + (log-info "Trying to read from ~a" this-field) + (log-info "Read: ~a" (this-field)) + (send! `(read-successfully ,this-field))) + (on-start + (react + (field [a 123]) + (on-start + (send! `(read-from ,a)) + (until (message `(read-successfully ,a))) + (log-info "Done."))))) diff --git a/racket/syndicate/examples/actor/show-field-scope-enforcement4.rkt b/racket/syndicate/examples/actor/show-field-scope-enforcement4.rkt index 99d1292..2307329 100644 --- a/racket/syndicate/examples/actor/show-field-scope-enforcement4.rkt +++ b/racket/syndicate/examples/actor/show-field-scope-enforcement4.rkt @@ -1,15 +1,15 @@ #lang syndicate/actor ;; Demonstrates that fields at actor scope are visible to facets. -(actor (field [x 123]) - (react - (on (message `(read-from ,$this-field)) - (log-info "Trying to read from ~a" this-field) - (log-info "Read: ~a" (this-field)) - (send! `(read-successfully ,this-field)))) - (react - (on-start - (log-info "x in second facet: ~v (should be 123)" (x)) - (send! `(read-from ,x)) - (until (message `(read-successfully ,x))) - (log-info "Done.")))) +(actor* (field [x 123]) + (react + (on (message `(read-from ,$this-field)) + (log-info "Trying to read from ~a" this-field) + (log-info "Read: ~a" (this-field)) + (send! `(read-successfully ,this-field)))) + (react + (on-start + (log-info "x in second facet: ~v (should be 123)" (x)) + (send! `(read-from ,x)) + (until (message `(read-successfully ,x))) + (log-info "Done.")))) diff --git a/racket/syndicate/examples/actor/spreadsheet.rkt b/racket/syndicate/examples/actor/spreadsheet.rkt index 1105184..9462b0b 100644 --- a/racket/syndicate/examples/actor/spreadsheet.rkt +++ b/racket/syndicate/examples/actor/spreadsheet.rkt @@ -26,33 +26,33 @@ (define (cell-expr->actor-expr name expr) (define bindings (set->list (extract-bindings expr))) - `(actor (until (message (set-cell ',name _)) - (field ,@(for/list [(b bindings)] `[,b (void)])) - (assert #:when (andmap non-void-field? (list ,@bindings)) - (cell ',name - (let (,@(for/list [(b bindings)] `(,b (,b)))) - ,expr))) - ,@(for/list [(b bindings)] - `(on (asserted (cell ',b $value)) - (,b value)))))) + `(actor (stop-when (message (set-cell ',name _))) + (field ,@(for/list [(b bindings)] `[,b (void)])) + (assert #:when (andmap non-void-field? (list ,@bindings)) + (cell ',name + (let (,@(for/list [(b bindings)] `(,b (,b)))) + ,expr))) + ,@(for/list [(b bindings)] + `(on (asserted (cell ',b $value)) + (,b value))))) -(actor (react (on (message (set-cell $name $expr)) - (define actor-expr (cell-expr->actor-expr name expr)) - ;; (local-require racket/pretty) (pretty-print actor-expr) - (eval actor-expr (namespace-anchor->namespace ns))))) +(actor (on (message (set-cell $name $expr)) + (define actor-expr (cell-expr->actor-expr name expr)) + ;; (local-require racket/pretty) (pretty-print actor-expr) + (eval actor-expr (namespace-anchor->namespace ns)))) -(actor (react (on (asserted (cell $name $value)) - (printf ">>> ~a ~v\n" name value) - (flush-output)))) +(actor (on (asserted (cell $name $value)) + (printf ">>> ~a ~v\n" name value) + (flush-output))) -(actor (void (thread (lambda () +(actor (stop-when (message (inbound 'quit))) + (on (message (inbound (set-cell $name $expr))) + (send! (set-cell name expr))) + (void (thread (lambda () (let loop () (define cell-name (read)) (if (eof-object? cell-name) (send-ground-message 'quit) (let ((new-expr (read))) (send-ground-message (set-cell cell-name new-expr)) - (loop))))))) - (until (message (inbound 'quit)) - (on (message (inbound (set-cell $name $expr))) - (send! (set-cell name expr))))) + (loop)))))))) diff --git a/racket/syndicate/examples/actor/two-buyer-protocol.rkt b/racket/syndicate/examples/actor/two-buyer-protocol.rkt index d307ec1..9c76e96 100644 --- a/racket/syndicate/examples/actor/two-buyer-protocol.rkt +++ b/racket/syndicate/examples/actor/two-buyer-protocol.rkt @@ -75,7 +75,7 @@ (define-syntax while-relevant-assert (syntax-rules () [(_ P) - (until (retracted (observe P)) + (begin (stop-when (retracted (observe P))) (assert P))])) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -83,43 +83,43 @@ ;; SELLER ;; (define (seller) - (actor (react (field [books (hash "The Wind in the Willows" 3.95 - "Catch 22" 2.22 - "Candide" 34.95)] - [next-order-id 10001483]) + (actor (field [books (hash "The Wind in the Willows" 3.95 + "Catch 22" 2.22 + "Candide" 34.95)] + [next-order-id 10001483]) - ;; Give quotes to interested parties. + ;; Give quotes to interested parties. + ;; + (during (observe (book-quote $title _)) + (assert (book-quote title (hash-ref (books) title #f)))) + + ;; Respond to order requests. + ;; + (on (asserted (observe (order $title $offer-price _ _))) + (define asking-price (hash-ref (books) title #f)) + (cond + + [(or (not asking-price) (< offer-price asking-price)) + ;; We cannot sell a book we do not have, and we will not sell for less + ;; than our asking price. ;; - (during (observe (book-quote $title _)) - (assert (book-quote title (hash-ref (books) title #f)))) + (react (while-relevant-assert (order title offer-price #f #f)))] - ;; Respond to order requests. + [else + ;; Allocate an order ID. ;; - (on (asserted (observe (order $title $offer-price _ _))) - (define asking-price (hash-ref (books) title #f)) - (cond + (define order-id (next-order-id)) + (next-order-id (+ order-id 1)) - [(or (not asking-price) (< offer-price asking-price)) - ;; We cannot sell a book we do not have, and we will not sell for less - ;; than our asking price. - ;; - (while-relevant-assert (order title offer-price #f #f))] + ;; Remove the book from our shelves. + ;; + (books (hash-remove (books) title)) - [else - ;; Allocate an order ID. - ;; - (define order-id (next-order-id)) - (next-order-id (+ order-id 1)) - - ;; Remove the book from our shelves. - ;; - (books (hash-remove (books) title)) - - ;; Tell the ordering party their order ID and delivery date. - ;; - (actor - (while-relevant-assert - (order title offer-price order-id "March 9th")))]))))) + ;; Tell the ordering party their order ID and delivery date. + ;; + (actor + (while-relevant-assert + (order title offer-price order-id "March 9th")))])))) ;; Serial SPLIT-PROPOSER ;; @@ -175,64 +175,62 @@ (log-info "A learns that the split-proposal for ~v was rejected" title) (try-to-split (+ contribution (/ (- price contribution) 2)))))]))])])) - (actor (try-to-buy (list "Catch 22" - "Encyclopaedia Brittannica" - "Candide" - "The Wind in the Willows") - 35.00))) + (actor* (try-to-buy (list "Catch 22" + "Encyclopaedia Brittannica" + "Candide" + "The Wind in the Willows") + 35.00))) ;; Serial SPLIT-DISPOSER ;; (define (buyer-b) - (actor (react + (actor ;; This actor maintains a record of the amount of money it has to spend. + ;; + (field [funds 5.00]) - ;; This actor maintains a record of the amount of money it has to spend. - ;; - (field [funds 5.00]) + (on (asserted (observe (split-proposal $title $price $their-contribution _))) - (on (asserted (observe (split-proposal $title $price $their-contribution _))) + (define my-contribution (- price their-contribution)) + (log-info "B is being asked to contribute ~a toward ~v at price ~a" + my-contribution + title + price) - (define my-contribution (- price their-contribution)) - (log-info "B is being asked to contribute ~a toward ~v at price ~a" - my-contribution - title - price) + (cond + [(> my-contribution (funds)) + (log-info "B hasn't enough funds (~a remaining)" (funds)) + (react (while-relevant-assert (split-proposal title price their-contribution #f)))] - (cond - [(> my-contribution (funds)) - (log-info "B hasn't enough funds (~a remaining)" (funds)) - (while-relevant-assert (split-proposal title price their-contribution #f))] + [else - [else - - ;; Spawn a small actor (TODO: when we revise actor.rkt's implementation style, - ;; this could perhaps be a facet rather than a full actor) to handle the - ;; actual purchase now that we have agreed on a split. - ;; - (actor (define-values (order-id delivery-date) + ;; Spawn a small actor (TODO: when we revise actor.rkt's implementation style, + ;; this could perhaps be a facet rather than a full actor) to handle the + ;; actual purchase now that we have agreed on a split. + ;; + (actor* (define-values (order-id delivery-date) (react/suspend (yield) - ;; While we are in this state, waiting for order confirmation, take - ;; the opportunity to signal to our SPLIT-PROPOSER that we accepted - ;; their proposal. - ;; - (assert (split-proposal title price their-contribution #t)) + ;; While we are in this state, waiting for order confirmation, take + ;; the opportunity to signal to our SPLIT-PROPOSER that we accepted + ;; their proposal. + ;; + (assert (split-proposal title price their-contribution #t)) - (stop-when (asserted (order title price $id $date)) - ;; We have received order confirmation from the SELLER. - ;; - (yield id date)))) - (log-info "The order for ~v has id ~a, and will be delivered on ~a" - title - order-id - delivery-date)) + (stop-when (asserted (order title price $id $date)) + ;; We have received order confirmation from the SELLER. + ;; + (yield id date)))) + (log-info "The order for ~v has id ~a, and will be delivered on ~a" + title + order-id + delivery-date)) - ;; Meanwhile, update our records of our available funds, and continue to wait - ;; for more split-proposals to arrive. - ;; - (define remaining-funds (- (funds) my-contribution)) - (log-info "B accepts the offer, leaving them with ~a remaining funds" - remaining-funds) - (funds remaining-funds)]))))) + ;; Meanwhile, update our records of our available funds, and continue to wait + ;; for more split-proposals to arrive. + ;; + (define remaining-funds (- (funds) my-contribution)) + (log-info "B accepts the offer, leaving them with ~a remaining funds" + remaining-funds) + (funds remaining-funds)])))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; diff --git a/racket/syndicate/examples/actor/web-demo.rkt b/racket/syndicate/examples/actor/web-demo.rkt index 23dc152..11555c6 100644 --- a/racket/syndicate/examples/actor/web-demo.rkt +++ b/racket/syndicate/examples/actor/web-demo.rkt @@ -6,29 +6,27 @@ (require net/url) (actor #:name 'server - (react - (define vh (web-virtual-host "http" ? 9090)) + (define vh (web-virtual-host "http" ? 9090)) - (assert vh) + (assert vh) - (on (web-request-incoming (id req) vh _ ("ws" ())) - (actor - (react - (assert (web-response-websocket id)) - (stop-when (websocket-connection-closed id) (log-info "Connection dropped")) - (stop-when (websocket-message-recv id "quit") (log-info "Received quit command")) - (on (websocket-message-recv id $str) - (log-info "Got ~v" str) - (websocket-message-send! id str))))) + (on (web-request-incoming (id req) vh _ ("ws" ())) + (actor + (assert (web-response-websocket id)) + (stop-when (websocket-connection-closed id) (log-info "Connection dropped")) + (stop-when (websocket-message-recv id "quit") (log-info "Received quit command")) + (on (websocket-message-recv id $str) + (log-info "Got ~v" str) + (websocket-message-send! id str)))) - (field [counter 0]) - (on (web-request-get (id req) vh ("foo" ,$path)) - (define req-num (counter)) - (counter (+ (counter) 1)) - (web-respond/xexpr! id - `(html - (body - (h1 "Hi there.") - (p ,(format "Your path was ~v, and this is request ~a" - path - req-num)))))))) + (field [counter 0]) + (on (web-request-get (id req) vh ("foo" ,$path)) + (define req-num (counter)) + (counter (+ (counter) 1)) + (web-respond/xexpr! id + `(html + (body + (h1 "Hi there.") + (p ,(format "Your path was ~v, and this is request ~a" + path + req-num))))))) diff --git a/racket/syndicate/examples/actor/web-sanity-check.rkt b/racket/syndicate/examples/actor/web-sanity-check.rkt index 0e7b12c..0ad3068 100644 --- a/racket/syndicate/examples/actor/web-sanity-check.rkt +++ b/racket/syndicate/examples/actor/web-sanity-check.rkt @@ -13,74 +13,73 @@ (on-start (send! (set-timer timer-id (* sec 1000.0) 'relative))))) (actor #:name 'server - (react - (field [counter 0]) - (assert vh) + (field [counter 0]) + (assert vh) - (on (message (web-request $id - 'inbound - ($ req (web-request-header _ (web-resource vh `("ws" ())) _ _)) - _)) - (actor (react - (assert (web-response-websocket id)) - (stop-when (retracted (observe (websocket-message id 'outbound _))) - (log-info "Connection dropped")) - (stop-when (message (websocket-message id 'inbound "quit")) - (log-info "Received quit command")) - (on (message (websocket-message id 'inbound $str)) - (log-info "Got ~v" str) - (define u (string->url str)) - (when (url-scheme u) - (let ((r (gensym 'client))) - (react (on-start - (send! (web-request r - 'outbound - (web-request-header 'get - (url->resource u) - '() - '()) - #""))) - (stop-when (asserted (web-response-complete r $h $body)) - (log-info "Got headers back: ~v" h) - (log-info "Got body back: ~v" body))))) - (send! (websocket-message id 'outbound str)))))) + (on (message (web-request $id + 'inbound + ($ req (web-request-header _ (web-resource vh `("ws" ())) _ _)) + _)) + (actor (react + (assert (web-response-websocket id)) + (stop-when (retracted (observe (websocket-message id 'outbound _))) + (log-info "Connection dropped")) + (stop-when (message (websocket-message id 'inbound "quit")) + (log-info "Received quit command")) + (on (message (websocket-message id 'inbound $str)) + (log-info "Got ~v" str) + (define u (string->url str)) + (when (url-scheme u) + (let ((r (gensym 'client))) + (react (on-start + (send! (web-request r + 'outbound + (web-request-header 'get + (url->resource u) + '() + '()) + #""))) + (stop-when (asserted (web-response-complete r $h $body)) + (log-info "Got headers back: ~v" h) + (log-info "Got body back: ~v" body))))) + (send! (websocket-message id 'outbound str)))))) - (on (message (web-request $id - 'inbound - (web-request-header 'get (web-resource vh `("slow" ())) _ _) - _)) - (react (field [done? #f]) - (stop-when (rising-edge (done?))) - (assert (web-response-chunked id - (web-response-header #:message #"Slowly" - #:mime-type #"text/plain"))) - (on (asserted (observe (web-response-chunk id _))) - ;; - ;; TODO: output-response-body/chunked in web-server's response.rkt - ;; doesn't flush each chunk as it appears. Should it? - ;; - ;; TODO: this kind of protocol pattern appears quite frequently. Perhaps - ;; we want a general-purpose *stream* protocol? For use by TCP, - ;; websockets, etc etc. - ;; - (send! (web-response-chunk id #"first\n")) - (sleep 2) - (send! (web-response-chunk id #"second\n")) - (sleep 2) - (send! (web-response-chunk id #"third\n")) - (sleep 2) - (done? #t)))) + (on (message (web-request $id + 'inbound + (web-request-header 'get (web-resource vh `("slow" ())) _ _) + _)) + (react (field [done? #f]) + (stop-when (rising-edge (done?))) + (assert (web-response-chunked id + (web-response-header #:message #"Slowly" + #:mime-type #"text/plain"))) + (on (asserted (observe (web-response-chunk id _))) + ;; + ;; TODO: output-response-body/chunked in web-server's response.rkt + ;; doesn't flush each chunk as it appears. Should it? + ;; + ;; TODO: this kind of protocol pattern appears quite frequently. Perhaps + ;; we want a general-purpose *stream* protocol? For use by TCP, + ;; websockets, etc etc. + ;; + (send! (web-response-chunk id #"first\n")) + (sleep 2) + (send! (web-response-chunk id #"second\n")) + (sleep 2) + (send! (web-response-chunk id #"third\n")) + (sleep 2) + (done? #t)))) - (on (message (web-request $id - 'inbound - (web-request-header 'get (web-resource vh `("foo" ,$path)) _ _) - _)) - (define req-num (counter)) - (counter (+ (counter) 1)) - (send! (web-response-complete - id - (web-response-header #:mime-type #"text/plain") - (string->bytes/utf-8 - (format "Hi there. Your path was ~v, and this is request ~a" - path - req-num))))))) + (on (message (web-request $id + 'inbound + (web-request-header 'get (web-resource vh `("foo" ,$path)) _ _) + _)) + (define req-num (counter)) + (counter (+ (counter) 1)) + (send! (web-response-complete + id + (web-response-header #:mime-type #"text/plain") + (string->bytes/utf-8 + (format "Hi there. Your path was ~v, and this is request ~a" + path + req-num)))))) diff --git a/racket/syndicate/examples/all-pairs-shortest-paths/all-pairs-shortest-paths.rkt b/racket/syndicate/examples/all-pairs-shortest-paths/all-pairs-shortest-paths.rkt index ec5924a..3ec5a90 100644 --- a/racket/syndicate/examples/all-pairs-shortest-paths/all-pairs-shortest-paths.rkt +++ b/racket/syndicate/examples/all-pairs-shortest-paths/all-pairs-shortest-paths.rkt @@ -12,34 +12,34 @@ (struct path-exists (from to) #:prefab) ;; Hmm. (struct min-cost (from to cost) #:prefab) -(actor (forever (assert (link 1 3 -2)) - (assert (link 2 1 4)) - (assert (link 2 3 3)) - (assert (link 3 4 2)) - (assert (link 4 2 -1)))) +(actor (assert (link 1 3 -2)) + (assert (link 2 1 4)) + (assert (link 2 3 3)) + (assert (link 3 4 2)) + (assert (link 4 2 -1))) -(actor (forever (during (link $from $to $cost) - (assert (path-exists from to)) - (assert (path from to cost))))) +(actor (during (link $from $to $cost) + (assert (path-exists from to)) + (assert (path from to cost)))) -(actor (forever (during (link $A $B $link-cost) - (during (path B $C $path-cost) - (assert (path-exists A C)) - (assert (path A C (+ link-cost path-cost))))))) +(actor (during (link $A $B $link-cost) + (during (path B $C $path-cost) + (assert (path-exists A C)) + (assert (path A C (+ link-cost path-cost)))))) -(actor (forever (during (path-exists $from $to) - (field [costs (set)] [least +inf.0]) - (assert (min-cost from to (least))) - (on (asserted (path from to $cost)) - (costs (set-add (costs) cost)) - (least (min (least) cost))) - (on (retracted (path from to $cost)) - (define new-costs (set-remove (costs) cost)) - (costs new-costs) - (least (for/fold [(least +inf.0)] [(x new-costs)] (min x least))))))) +(actor (during (path-exists $from $to) + (field [costs (set)] [least +inf.0]) + (assert (min-cost from to (least))) + (on (asserted (path from to $cost)) + (costs (set-add (costs) cost)) + (least (min (least) cost))) + (on (retracted (path from to $cost)) + (define new-costs (set-remove (costs) cost)) + (costs new-costs) + (least (for/fold [(least +inf.0)] [(x new-costs)] (min x least)))))) -;; (actor (forever (during (path $from $to $cost) -;; (on-start (displayln `(+ ,(path from to cost)))) -;; (on-stop (displayln `(- ,(path from to cost))))))) -(actor (forever (on (asserted (min-cost $from $to $cost)) - (displayln (min-cost from to cost))))) +(actor (during (path $from $to $cost) + (on-start (displayln `(+ ,(path from to cost)))) + (on-stop (displayln `(- ,(path from to cost)))))) +(actor (on (asserted (min-cost $from $to $cost)) + (displayln (min-cost from to cost)))) diff --git a/racket/syndicate/examples/all-pairs-shortest-paths/all-pairs-shortest-paths2.rkt b/racket/syndicate/examples/all-pairs-shortest-paths/all-pairs-shortest-paths2.rkt index 0786fca..dba50e8 100644 --- a/racket/syndicate/examples/all-pairs-shortest-paths/all-pairs-shortest-paths2.rkt +++ b/racket/syndicate/examples/all-pairs-shortest-paths/all-pairs-shortest-paths2.rkt @@ -11,35 +11,35 @@ (struct path-exists (from to) #:prefab) ;; Hmm. (struct min-cost (from to cost) #:prefab) -(actor (forever (assert (link 1 3 -2)) - (assert (link 2 1 4)) - (assert (link 2 3 3)) - (assert (link 3 4 2)) - (assert (link 4 2 -1)))) +(actor (assert (link 1 3 -2)) + (assert (link 2 1 4)) + (assert (link 2 3 3)) + (assert (link 3 4 2)) + (assert (link 4 2 -1))) -(actor (forever (during (link $from $to $cost) - (assert (path-exists from to)) - (assert (path from to (set from to) cost))))) +(actor (during (link $from $to $cost) + (assert (path-exists from to)) + (assert (path from to (set from to) cost)))) -(actor (forever (during (link $A $B $link-cost) - (during (path B $C $seen $path-cost) - (assert #:when (not (set-member? seen A)) (path-exists A C)) - (assert #:when (not (set-member? seen A)) - (path A C (set-add seen A) (+ link-cost path-cost))))))) +(actor (during (link $A $B $link-cost) + (during (path B $C $seen $path-cost) + (assert #:when (not (set-member? seen A)) (path-exists A C)) + (assert #:when (not (set-member? seen A)) + (path A C (set-add seen A) (+ link-cost path-cost)))))) -(actor (forever (during (path-exists $from $to) - (field [costs (set)] [least +inf.0]) - (assert (min-cost from to (least))) - (on (asserted (path from to _ $cost)) - (costs (set-add (costs) cost)) - (least (min (least) cost))) - (on (retracted (path from to _ $cost)) - (define new-costs (set-remove (costs) cost)) - (costs new-costs) - (least (for/fold [(least +inf.0)] [(x new-costs)] (min x least))))))) +(actor (during (path-exists $from $to) + (field [costs (set)] [least +inf.0]) + (assert (min-cost from to (least))) + (on (asserted (path from to _ $cost)) + (costs (set-add (costs) cost)) + (least (min (least) cost))) + (on (retracted (path from to _ $cost)) + (define new-costs (set-remove (costs) cost)) + (costs new-costs) + (least (for/fold [(least +inf.0)] [(x new-costs)] (min x least)))))) -(actor (forever (during (path $from $to $seen $cost) - (on-start (displayln `(+ ,(path from to seen cost)))) - (on-stop (displayln `(- ,(path from to seen cost))))))) -(actor (forever (on (asserted (min-cost $from $to $cost)) - (displayln (min-cost from to cost))))) +(actor (during (path $from $to $seen $cost) + (on-start (displayln `(+ ,(path from to seen cost)))) + (on-stop (displayln `(- ,(path from to seen cost)))))) +(actor (on (asserted (min-cost $from $to $cost)) + (displayln (min-cost from to cost)))) diff --git a/racket/syndicate/examples/udp-multicast.rkt b/racket/syndicate/examples/udp-multicast.rkt index 6b7f88f..51162bc 100644 --- a/racket/syndicate/examples/udp-multicast.rkt +++ b/racket/syndicate/examples/udp-multicast.rkt @@ -26,15 +26,14 @@ (define (rearm!) (send! (set-timer h 1000 'relative))) - (rearm!) + (on-start (rearm!)) - (forever - (assert (udp-multicast-group-member h group-address #f)) - (assert (udp-multicast-loopback h #t)) - (on (message (udp-packet $source h $body)) - (printf "~a: ~a\n" source body)) - (on (message (timer-expired h $now)) - (rearm!) - (send! (udp-packet h - (udp-remote-address group-address group-port) - (string->bytes/utf-8 (format "~a ~a" me now))))))) + (assert (udp-multicast-group-member h group-address #f)) + (assert (udp-multicast-loopback h #t)) + (on (message (udp-packet $source h $body)) + (printf "~a: ~a\n" source body)) + (on (message (timer-expired h $now)) + (rearm!) + (send! (udp-packet h + (udp-remote-address group-address group-port) + (string->bytes/utf-8 (format "~a ~a" me now)))))) diff --git a/racket/syndicate/examples/ws-echo-client.rkt b/racket/syndicate/examples/ws-echo-client.rkt index 1e88704..c98834e 100644 --- a/racket/syndicate/examples/ws-echo-client.rkt +++ b/racket/syndicate/examples/ws-echo-client.rkt @@ -20,14 +20,14 @@ (define (generate-reader-id) (begin0 reader-count (set! reader-count (+ reader-count 1)))) - (actor (react (assert (advertise (websocket-message c s _))) - (on (asserted (websocket-peer-details c s $la _ $ra _)) - (log-info "~a: local ~v :: remote ~v" c la ra)) - (on (message (inbound (external-event e (list (? bytes? $bs))))) - (send! (websocket-message c s bs))) - (on (message (websocket-message s c $bs)) - (printf "(From server: ~v)\n" bs)) - (stop-when (message (inbound (external-event e (list (? eof-object? _))))) - (printf "Local EOF. Terminating.\n")) - (stop-when (retracted (advertise (websocket-message s c _))) - (printf "Server disconnected.\n"))))) + (actor (assert (advertise (websocket-message c s _))) + (on (asserted (websocket-peer-details c s $la _ $ra _)) + (log-info "~a: local ~v :: remote ~v" c la ra)) + (on (message (inbound (external-event e (list (? bytes? $bs))))) + (send! (websocket-message c s bs))) + (on (message (websocket-message s c $bs)) + (printf "(From server: ~v)\n" bs)) + (stop-when (message (inbound (external-event e (list (? eof-object? _))))) + (printf "Local EOF. Terminating.\n")) + (stop-when (retracted (advertise (websocket-message s c _))) + (printf "Server disconnected.\n")))) diff --git a/racket/syndicate/examples/ws-echo.rkt b/racket/syndicate/examples/ws-echo.rkt index 2f22e91..7afaf76 100644 --- a/racket/syndicate/examples/ws-echo.rkt +++ b/racket/syndicate/examples/ws-echo.rkt @@ -9,18 +9,18 @@ (define ssl-server-id (websocket-local-server 8084 (websocket-ssl-options "server-cert.pem" "private-key.pem"))) -(actor (forever (assert (advertise (observe (websocket-message any-client tcp-server-id _)))) - (on (asserted (advertise (websocket-message ($ c any-client) tcp-server-id _))) - (handle-connection tcp-server-id c)))) -(actor (forever (assert (advertise (observe (websocket-message any-client ssl-server-id _)))) - (on (asserted (advertise (websocket-message ($ c any-client) ssl-server-id _))) - (handle-connection ssl-server-id c)))) +(actor (assert (advertise (observe (websocket-message any-client tcp-server-id _)))) + (on (asserted (advertise (websocket-message ($ c any-client) tcp-server-id _))) + (handle-connection tcp-server-id c))) +(actor (assert (advertise (observe (websocket-message any-client ssl-server-id _)))) + (on (asserted (advertise (websocket-message ($ c any-client) ssl-server-id _))) + (handle-connection ssl-server-id c))) (define (handle-connection s c) - (actor (until (retracted (advertise (websocket-message c s _))) - (on (asserted (websocket-peer-details s c $la _ $ra _)) - (log-info "~a: local ~v :: remote ~v" c la ra)) - (on (message (websocket-message c s $body)) - (log-info "~a: ~v" c body) - (send! (websocket-message s c body)))) - (log-info "~a: disconnected" c))) + (actor (stop-when (retracted (advertise (websocket-message c s _)))) + (on (asserted (websocket-peer-details s c $la _ $ra _)) + (log-info "~a: local ~v :: remote ~v" c la ra)) + (on (message (websocket-message c s $body)) + (log-info "~a: ~v" c body) + (send! (websocket-message s c body))) + (on-stop (log-info "~a: disconnected" c)))) diff --git a/racket/syndicate/threaded.rkt b/racket/syndicate/threaded.rkt index e7b1021..543625d 100644 --- a/racket/syndicate/threaded.rkt +++ b/racket/syndicate/threaded.rkt @@ -1,14 +1,15 @@ #lang racket/base (provide spawn-threaded-actor - actor/thread ;; \__ once dataspace is split into mux and relay, these two - dataspace/thread) ;; / will be very thin convenience macros over a common impl. + actor/thread ;; \__ once dataspace is split into mux and relay, these three + actor*/thread ;; | will be very thin convenience macros over a common impl. + dataspace/thread) ;; / (require racket/match) (require (for-syntax racket/base)) (require (except-in syndicate dataspace)) -(require (only-in syndicate/actor actor dataspace schedule-action!)) +(require (only-in syndicate/actor actor actor* dataspace schedule-action!)) (require syndicate/hierarchy) (require syndicate/store) @@ -73,6 +74,12 @@ (schedule-action! (spawn-threaded-actor (lambda () (actor body ...))))])) +(define-syntax actor*/thread + (syntax-rules () + [(_ body ...) + (schedule-action! + (spawn-threaded-actor (lambda () (actor* body ...))))])) + (define-syntax dataspace/thread (syntax-rules () [(_ body ...)