Split dataspaces in to relay and mux sublayers.

This is a major change to the previous design, and also a change with
respect to the semantics in the ESOP 2016 paper. All the complexity of
echo-cancellation is stripped out of the core dataspace semantics, and
the relaying protocol is changed from one constructor, `at-meta`, to
two, `inbound` and `outbound`. The relay connecting a dataspace to its
container is now completely symmetric with the contained actors: it
initially asserts interest in what it is to relay, just like any other
actor would. Dataspaces no longer treat relaying specially.

This commit has updated all (I think) of the non-graphical examples. The
graphical code remains to be done in a following commit.
This commit is contained in:
Tony Garnock-Jones 2016-07-30 13:02:07 -04:00
parent b8c109d82b
commit 68ba2f74a6
66 changed files with 830 additions and 740 deletions

View File

@ -50,7 +50,7 @@
(react (on (retracted interface)
(async-channel-put control-ch 'quit)))))
(on (message ($ p (ethernet-packet interface #t _ _ _ _)) #:meta-level 1)
(on (message (inbound ($ p (ethernet-packet interface #t _ _ _ _))))
;; (log-info "Interface ~a inbound packet ~a -> ~a (type 0x~a)"
;; (ethernet-interface-name (ethernet-packet-interface p))
;; (pretty-bytes (ethernet-packet-source p))

View File

@ -12,6 +12,7 @@
(require racket/set)
(require (only-in racket/string string-split))
(require bitsyntax)
(require syndicate/protocol/advertise)
(require "dump-bytes.rkt")
(require "configuration.rkt")

View File

@ -1,7 +1,6 @@
#lang syndicate/actor
;;(log-events-and-actions? #t)
(require syndicate/protocol/advertise)
(require/activate syndicate/drivers/timer)
(require/activate "ethernet.rkt")
@ -21,8 +20,7 @@
(define (spawn-session them us)
(actor (define (send-to-remote fmt . vs)
(send! (tcp-channel us them (string->bytes/utf-8 (apply format fmt vs)))
#:meta-level 1))
(send! (outbound (tcp-channel us them (string->bytes/utf-8 (apply format fmt vs))))))
(define (say who fmt . vs)
(unless (equal? who user)
@ -31,20 +29,20 @@
(define user (gensym 'user))
(send-to-remote "Welcome, ~a.\n" user)
(until (retracted (advertise (tcp-channel them us _)) #:meta-level 1)
(until (retracted (inbound (advertise (tcp-channel them us _))))
(assert (present user))
(on (asserted (present $who)) (say who "arrived."))
(on (retracted (present $who)) (say who "departed."))
(on (message (says $who $what)) (say who "says: ~a" what))
(assert (advertise (tcp-channel us them _)) #:meta-level 1)
(on (message (tcp-channel them us $bs) #:meta-level 1)
(assert (outbound (advertise (tcp-channel us them _))))
(on (message (inbound (tcp-channel them us $bs)))
(send! (says user (string-trim (bytes->string/utf-8 bs))))))))
(dataspace (define us (tcp-listener 5999))
(forever (assert (advertise (observe (tcp-channel _ us _))) #:meta-level 1)
(on (asserted (advertise (tcp-channel $them us _)) #:meta-level 1)
(forever (assert (outbound (advertise (observe (tcp-channel _ us _)))))
(on (asserted (inbound (advertise (tcp-channel $them us _))))
(spawn-session them us)))))
(let ((dst (udp-listener 6667)))
@ -61,13 +59,13 @@
(counter (+ (counter) 1)))))
(forever (define us (tcp-listener 80))
(assert (advertise (observe (tcp-channel _ us _))) #:meta-level 1)
(during/actor (advertise (tcp-channel ($ them (tcp-address _ _)) us _)) #:meta-level 1
(assert (outbound (advertise (observe (tcp-channel _ us _)))))
(during/actor (inbound (advertise (tcp-channel ($ them (tcp-address _ _)) us _)))
(log-info "Got connection from ~v" them)
(field [done? #f])
(stop-when (rising-edge (done?)))
(assert (advertise (tcp-channel us them _)) #:meta-level 1)
(on (message (tcp-channel them us _) #:meta-level 1)) ;; ignore input
(assert (outbound (advertise (tcp-channel us them _))))
(on (message (inbound (tcp-channel them us _)))) ;; ignore input
(on-start (send! 'bump))
(on (message `(counter ,$counter))
@ -81,5 +79,5 @@
"TCP/IP stack</a>.</p>\n"
"<p>There have been ~a requests prior to this one.</p>\n")
counter)))
(send! (tcp-channel us them response) #:meta-level 1)
(send! (outbound (tcp-channel us them response)))
(done? #t))))))

View File

@ -8,6 +8,7 @@
(require racket/set)
(require bitsyntax)
(require syndicate/protocol/advertise)
(require "dump-bytes.rkt")
(require "checksum.rkt")

View File

@ -10,6 +10,7 @@
(require racket/set)
(require bitsyntax)
(require syndicate/protocol/advertise)
(require "dump-bytes.rkt")
(require "checksum.rkt")

View File

@ -55,7 +55,7 @@
(quit))
(begin (async-channel-put control-ch 'unblock)
#f))]
[(message (at-meta (? ethernet-packet? p)))
[(message (inbound (? ethernet-packet? p)))
;; (log-info "Interface ~a inbound packet ~a -> ~a (type 0x~a)"
;; (ethernet-interface-name (ethernet-packet-interface p))
;; (pretty-bytes (ethernet-packet-source p))
@ -77,7 +77,7 @@
(scn/union (assertion interface)
(subscription (ethernet-packet interface #f ? ? ? ?))
(subscription (observe (ethernet-packet interface #t ? ? ? ?)))
(subscription (ethernet-packet interface #t ? ? ? ?) #:meta-level 1)))]))
(subscription (inbound (ethernet-packet interface #t ? ? ? ?)))))]))
(define (interface-packet-read-loop interface h control-ch)
(define (blocked)

View File

@ -16,6 +16,7 @@
(require syndicate/monolithic)
(require syndicate/drivers/timer)
(require syndicate/demand-matcher)
(require syndicate/protocol/advertise)
(require bitsyntax)
(require "dump-bytes.rkt")

View File

@ -2,6 +2,7 @@
(require syndicate/demand-matcher)
(require syndicate/drivers/timer)
(require syndicate/protocol/advertise)
(require "demo-config.rkt")
(require "ethernet.rkt")
(require "arp.rkt")
@ -26,17 +27,17 @@
(define (spawn-session them us)
(define user (gensym 'user))
(define remote-detector (at-meta (?!)))
(define remote-detector (inbound (?!)))
(define peer-detector (advertise `(,(?!) says ,?)))
(define (send-to-remote fmt . vs)
(message (at-meta (tcp-channel us them (string->bytes/utf-8 (apply format fmt vs))))))
(message (outbound (tcp-channel us them (string->bytes/utf-8 (apply format fmt vs))))))
(define (say who fmt . vs)
(unless (equal? who user) (send-to-remote "~a ~a\n" who (apply format fmt vs))))
(list (send-to-remote "Welcome, ~a.\n" user)
(spawn
(lambda (e peers)
(match e
[(message (at-meta (tcp-channel _ _ bs)))
[(message (inbound (tcp-channel _ _ bs)))
(transition peers (message `(,user says ,(string-trim (bytes->string/utf-8 bs)))))]
[(message `(,who says ,what))
(transition peers (say who "says: ~a" what))]
@ -55,15 +56,14 @@
(subscription `(,? says ,?)) ;; read actual chat messages
(subscription (advertise `(,? says ,?))) ;; observe peer presence
(advertisement `(,user says ,?)) ;; advertise our presence
(subscription (tcp-channel them us ?) #:meta-level 1) ;; read from remote client
(subscription (advertise (tcp-channel them us ?)) #:meta-level 1) ;; monitor remote client
(advertisement (tcp-channel us them ?) #:meta-level 1) ;; we will write to remote client
(subscription (inbound (tcp-channel them us ?))) ;; read from remote client
(subscription (inbound (advertise (tcp-channel them us ?)))) ;; monitor remote client
(advertisement (inbound (tcp-channel us them ?))) ;; we will write to remote client
))))
(spawn-dataspace
(spawn-demand-matcher (advertise (tcp-channel (?!) (?! (tcp-listener 5999)) ?))
(observe (tcp-channel (?!) (?! (tcp-listener 5999)) ?))
#:meta-level 1
(spawn-demand-matcher (inbound (advertise (tcp-channel (?!) (?! (tcp-listener 5999)) ?)))
(inbound (observe (tcp-channel (?!) (?! (tcp-listener 5999)) ?)))
spawn-session))
)
@ -97,13 +97,13 @@
"TCP/IP stack</a>.</p>\n"
"<p>There have been ~a requests prior to this one.</p>")
counter)))
(quit (message (at-meta (tcp-channel us them response))))]
(quit (message (outbound (tcp-channel us them response))))]
[_ #f]))
(void)
(scn/union (subscription `(counter ,?))
(subscription (tcp-channel them us ?) #:meta-level 1)
(subscription (advertise (tcp-channel them us ?)) #:meta-level 1)
(advertisement (tcp-channel us them ?) #:meta-level 1)))))
(subscription (inbound (tcp-channel them us ?)))
(subscription (inbound (advertise (tcp-channel them us ?))))
(advertisement (inbound (tcp-channel us them ?)))))))
(spawn-dataspace
(spawn (lambda (e counter)
@ -113,9 +113,9 @@
[_ #f]))
0
(scn (subscription 'bump)))
(spawn-demand-matcher (advertise (tcp-channel (?! (tcp-address ? ?)) (?! (tcp-listener 80)) ?))
(observe (tcp-channel (?! (tcp-address ? ?)) (?! (tcp-listener 80)) ?))
#:meta-level 1
spawn-session))
(spawn-demand-matcher
(inbound (advertise (tcp-channel (?! (tcp-address ? ?)) (?! (tcp-listener 80)) ?)))
(inbound (observe (tcp-channel (?! (tcp-address ? ?)) (?! (tcp-listener 80)) ?)))
spawn-session))
)

View File

@ -11,6 +11,7 @@
(require syndicate/monolithic)
(require syndicate/drivers/timer)
(require syndicate/demand-matcher)
(require syndicate/protocol/advertise)
(require bitsyntax)
(require "dump-bytes.rkt")

View File

@ -12,6 +12,7 @@
(require racket/match)
(require syndicate/monolithic)
(require syndicate/demand-matcher)
(require syndicate/protocol/advertise)
(require bitsyntax)
(require "dump-bytes.rkt")

View File

@ -79,6 +79,7 @@
(require (for-syntax syntax/srcloc))
(require (prefix-in core: "core.rkt"))
(require (prefix-in core: "dataspace.rkt"))
(require "mux.rkt")
(require "patch.rkt")
(require "trie.rkt")
@ -255,10 +256,6 @@
(pattern (~seq #:when Pred))
(pattern (~seq) #:attr Pred #'#t))
(define-splicing-syntax-class meta-level
(pattern (~seq #:meta-level level:expr))
(pattern (~seq) #:attr level #'0))
(define-splicing-syntax-class priority
(pattern (~seq #:priority level))
(pattern (~seq) #:attr level #'*normal-priority*))
@ -351,13 +348,13 @@
(define-syntax (assert stx)
(syntax-parse stx
[(_ w:when-pred P L:meta-level)
[(_ w:when-pred P)
(define-values (proj pat bindings _instantiated)
(analyze-pattern stx #'P))
(quasisyntax/loc stx
(add-endpoint! #,(source-location->string stx)
(lambda ()
#,(let ((patch-stx #`(core:assert #,pat #:meta-level L.level)))
#,(let ((patch-stx #`(core:assert #,pat)))
(if #'w.Pred
#`(if w.Pred #,patch-stx patch-empty)
patch-stx)))
@ -413,27 +410,27 @@
(define-syntax (during stx)
(syntax-parse stx
[(_ P L:meta-level O ...)
(define E-stx (syntax/loc #'P (asserted P #:meta-level L.level)))
[(_ P O ...)
(define E-stx (syntax/loc #'P (asserted P)))
(define-values (_proj _pat _bindings instantiated)
(analyze-pattern E-stx #'P))
(quasisyntax/loc stx
(on #,E-stx
(let ((p #,instantiated))
(react (stop-when (retracted p #:meta-level L.level))
(react (stop-when (retracted p))
O ...))))]))
(define-syntax (during/actor stx)
(syntax-parse stx
[(_ P L:meta-level w:actor-wrapper name:name O ...)
(define E-stx (syntax/loc #'P (asserted P #:meta-level L.level)))
[(_ P w:actor-wrapper name:name O ...)
(define E-stx (syntax/loc #'P (asserted P)))
(define-values (_proj _pat _bindings instantiated)
(analyze-pattern E-stx #'P))
(quasisyntax/loc stx
(on #,E-stx
(let ((p #,instantiated))
(w.wrapper #:name name.N
(react (stop-when (retracted p #:meta-level L.level))
(react (stop-when (retracted p))
O ...)))))]))
(define-syntax (begin/dataflow stx)
@ -509,13 +506,13 @@
(define-syntax (query-value* stx)
(syntax-parse stx
[(_ field-name absent-expr P L:meta-level expr on-add:on-add on-remove:on-remove)
[(_ field-name absent-expr P expr on-add:on-add on-remove:on-remove)
(quasisyntax/loc stx
(let ()
(on (asserted P #:meta-level L.level) #:priority *query-priority*
(on (asserted P) #:priority *query-priority*
#,@(schedule-query-handler-stxs (attribute on-add.expr))
(field-name expr))
(on (retracted P #:meta-level L.level) #:priority *query-priority*
(on (retracted P) #:priority *query-priority*
#,@(schedule-query-handler-stxs (attribute on-remove.expr))
(field-name absent-expr))
field-name))]))
@ -530,13 +527,13 @@
(define-syntax (query-set* stx)
(syntax-parse stx
[(_ field-name P L:meta-level expr on-add:on-add on-remove:on-remove)
[(_ field-name P expr on-add:on-add on-remove:on-remove)
(quasisyntax/loc stx
(let ()
(on (asserted P #:meta-level L.level) #:priority *query-priority*
(on (asserted P) #:priority *query-priority*
#,@(schedule-query-handler-stxs (attribute on-add.expr))
(field-name (set-add (field-name) expr)))
(on (retracted P #:meta-level L.level) #:priority *query-priority*
(on (retracted P) #:priority *query-priority*
#,@(schedule-query-handler-stxs (attribute on-remove.expr))
(field-name (set-remove (field-name) expr)))
field-name))]))
@ -551,10 +548,10 @@
(define-syntax (query-hash* stx)
(syntax-parse stx
[(_ field-name P L:meta-level key-expr value-expr on-add:on-add on-remove:on-remove)
[(_ field-name P key-expr value-expr on-add:on-add on-remove:on-remove)
(quasisyntax/loc stx
(let ()
(on (asserted P #:meta-level L.level) #:priority *query-priority*
(on (asserted P) #:priority *query-priority*
(let ((key key-expr))
(when (hash-has-key? (field-name) key)
(log-warning "query-hash: field ~v with pattern ~v: overwriting existing entry ~v"
@ -563,7 +560,7 @@
key))
#,@(schedule-query-handler-stxs (attribute on-add.expr))
(field-name (hash-set (field-name) key value-expr))))
(on (retracted P #:meta-level L.level) #:priority *query-priority*
(on (retracted P) #:priority *query-priority*
#,@(schedule-query-handler-stxs (attribute on-remove.expr))
(field-name (hash-remove (field-name) key-expr)))
field-name))]))
@ -578,13 +575,13 @@
(define-syntax (query-hash-set* stx)
(syntax-parse stx
[(_ field-name P L:meta-level key-expr value-expr on-add:on-add on-remove:on-remove)
[(_ field-name P key-expr value-expr on-add:on-add on-remove:on-remove)
(quasisyntax/loc stx
(let ()
(on (asserted P #:meta-level L.level) #:priority *query-priority*
(on (asserted P) #:priority *query-priority*
#,@(schedule-query-handler-stxs (attribute on-add.expr))
(field-name (hashset-add (field-name) key-expr value-expr)))
(on (retracted P #:meta-level L.level) #:priority *query-priority*
(on (retracted P) #:priority *query-priority*
#,@(schedule-query-handler-stxs (attribute on-remove.expr))
(field-name (hashset-remove (field-name) key-expr value-expr)))
field-name))]))
@ -643,7 +640,6 @@
script-stx
asserted?
P-stx
meta-level
priority-stx)
(define-values (proj-stx pat bindings _instantiated)
(analyze-pattern event-stx P-stx))
@ -654,12 +650,12 @@
(quasisyntax/loc outer-expr-stx
(add-endpoint! #,(source-location->string outer-expr-stx)
(lambda () (if #,when-pred-stx
(core:sub #,pat #:meta-level #,meta-level)
(core:sub #,pat)
patch-empty))
(lambda (e)
(core:match-event e
[(? #,event-predicate-stx p)
(define proj (core:prepend-at-meta #,proj-stx #,meta-level))
(define proj #,proj-stx)
(define proj-arity (projection-arity proj))
(define entry-set (trie-project/set #:take proj-arity
(#,patch-accessor-stx p)
@ -716,13 +712,13 @@
terminal?
script-stx
priority-stx)))]
[(core:message P L:meta-level)
[(core:message P)
(define-values (proj pat bindings _instantiated)
(analyze-pattern event-stx #'P))
(quasisyntax/loc outer-expr-stx
(add-endpoint! #,(source-location->string outer-expr-stx)
(lambda () (if #,when-pred-stx
(core:sub #,pat #:meta-level L.level)
(core:sub #,pat)
patch-empty))
(lambda (e)
(core:match-event e
@ -730,7 +726,7 @@
(define capture-vals
(match-value/captures
body
(core:prepend-at-meta #,proj L.level)))
#,proj))
(and capture-vals
(schedule-script!
#:priority #,priority-stx
@ -738,12 +734,12 @@
(lambda ()
(apply (lambda #,bindings #,script-stx)
capture-vals))))]))))]
[(asserted P L:meta-level)
[(asserted P)
(analyze-asserted/retracted outer-expr-stx when-pred-stx event-stx terminal? script-stx
#t #'P #'L.level priority-stx)]
[(retracted P L:meta-level)
#t #'P priority-stx)]
[(retracted P)
(analyze-asserted/retracted outer-expr-stx when-pred-stx event-stx terminal? script-stx
#f #'P #'L.level priority-stx)]
#f #'P priority-stx)]
[(rising-edge Pred)
(define field-name
(datum->syntax event-stx
@ -1132,19 +1128,19 @@
(when (not (in-script?))
(error who "Attempt to perform action outside script; are you missing an (on ...)?")))
(define (send! M #:meta-level [meta-level 0])
(define (send! M)
(ensure-in-script! 'send!)
(schedule-action! (core:message (core:prepend-at-meta M meta-level))))
(schedule-action! (core:message M)))
(define *adhoc-label* -1)
(define (assert! P #:meta-level [meta-level 0])
(define (assert! P)
(ensure-in-script! 'assert!)
(update-stream! *adhoc-label* (core:assert P #:meta-level meta-level)))
(update-stream! *adhoc-label* (core:assert P)))
(define (retract! P #:meta-level [meta-level 0])
(define (retract! P)
(ensure-in-script! 'retract!)
(update-stream! *adhoc-label* (core:retract P #:meta-level meta-level)))
(update-stream! *adhoc-label* (core:retract P)))
(define (patch! p)
(ensure-in-script! 'patch!)

View File

@ -8,14 +8,16 @@
(require racket/set)
(require racket/match)
(require net/rfc6455)
(require (except-in "../main.rkt" dataspace assert))
(require "../actor.rkt")
(require "../trie.rkt")
(require "../patch.rkt")
(require "../demand-matcher.rkt")
(require "../drivers/timer.rkt")
(require "../drivers/websocket.rkt")
(require json)
;; (require (except-in "../main.rkt" dataspace assert))
;; (require "../actor.rkt")
(require syndicate/trie)
(require syndicate/patch)
(require syndicate/demand-matcher)
(require syndicate/protocol/advertise)
(require/activate syndicate/drivers/timer)
(require/activate syndicate/drivers/websocket)
(require "protocol.rkt")
(define-logger syndicate-broker)
@ -31,38 +33,39 @@
#:ssl-options [ssl-options #f])
(define any-client any-websocket-remote-client)
(define server-id (websocket-local-server port ssl-options))
(spawn-demand-matcher (advertise (websocket-message (?! any-client) server-id ?))
(observe (websocket-message (?! any-client) server-id ?))
#:meta-level 1
(lambda (c) (spawn-connection-handler c server-id))))
(spawn-demand-matcher (inbound (advertise (websocket-message (?! any-client) server-id ?)))
(inbound (observe (websocket-message (?! any-client) server-id ?)))
(lambda (c) (spawn-connection-handler c server-id))
#:name 'broker:dm))
(define (spawn-connection-handler c server-id)
(actor (define scope (broker-scope (websocket-remote-client-request-host c)
(actor #:name (list 'broker server-id)
(define scope (broker-scope (websocket-remote-client-request-host c)
(websocket-remote-client-request-port c)
(websocket-remote-client-request-path c)))
(define (arm-ping-timer!)
(send! #:meta-level 1 (set-timer c (ping-interval) 'relative)))
(send! (outbound (set-timer c (ping-interval) 'relative))))
(define (send-event e)
(send! #:meta-level 1
(websocket-message server-id c (jsexpr->string (lift-json-event e)))))
(send! (outbound (websocket-message server-id c (jsexpr->string (lift-json-event e))))))
(arm-ping-timer!)
(log-syndicate-broker-info "Starting broker connection from ~v" c)
(until (retracted (advertise (websocket-message c server-id _)) #:meta-level 1)
(assert (advertise (websocket-message server-id c _)) #:meta-level 1)
(until (retracted (inbound (advertise (websocket-message c server-id _))))
(assert (outbound (advertise (websocket-message server-id c _))))
(on (asserted (websocket-peer-details server-id c _ _ $remote-addr $remote-port)
#:meta-level 1)
(on (asserted (inbound
(websocket-peer-details server-id c _ _ $remote-addr $remote-port)))
(log-syndicate-broker-info "Connection ~v is from ~a:~a" c remote-addr remote-port))
(on (message (timer-expired c _) #:meta-level 1)
(on (message (inbound (timer-expired c _)))
(arm-ping-timer!)
(send-event 'ping))
(on (message (websocket-message c server-id $data) #:meta-level 1)
(on (message (inbound (websocket-message c server-id $data)))
(match (drop-json-action (string->jsexpr data))
['ping (send-event 'pong)]
['pong (void)]
@ -110,9 +113,6 @@
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(require/activate syndicate/drivers/timer)
(require/activate syndicate/drivers/websocket)
(let ((ssl-options
(match (current-command-line-arguments)
[(vector c p) (websocket-ssl-options c p)]

View File

@ -1,14 +1,13 @@
#lang racket/base
;; Core implementation of Incremental Syndicate.
;; Core structures and utilities for implementation of Incremental Syndicate.
(provide (struct-out message)
(except-out (struct-out quit) quit)
(struct-out quit-dataspace)
(rename-out [quit <quit>])
(except-out (struct-out spawn) spawn)
(rename-out [spawn <spawn>])
(struct-out quit-dataspace)
(struct-out transition)
(struct-out dataspace)
(struct-out seal)
sealof
@ -38,23 +37,17 @@
meta-label?
prepend-at-meta
observe-at-meta
assert
retract
sub
unsub
pub
unpub
(rename-out [make-quit quit])
make-dataspace
spawn-dataspace
(rename-out [spawn-process spawn])
spawn/stateless
make-spawn-dataspace
general-transition?
ensure-transition
transition-bind
sequence-transitions
@ -62,22 +55,14 @@
sequence-transitions0
sequence-transitions0*
dataspace-handle-event
clean-transition
clean-actions
clean-transition)
pretty-print-dataspace)
(require racket/set)
(require racket/match)
(require (only-in racket/list flatten))
(require "functional-queue.rkt")
(require "trie.rkt")
(require "patch.rkt")
(require "hierarchy.rkt")
(require "trace.rkt")
(require "mux.rkt")
(require "pretty.rkt")
(module+ test (require rackunit))
;; Events = Patches Messages
(struct message (body) #:prefab)
@ -108,24 +93,6 @@
;; A PID is a Nat.
;; A Label is a PID or 'meta.
;; Long-lived process data: (process-info Any Behavior)
(struct process-info (name behavior) #:transparent)
;; Sentinel
(define missing-process-info (process-info #f #f))
;; VM private states
(struct dataspace (mux ;; Multiplexer
pending-action-queue ;; (Queueof (Cons Label (U Action 'quit)))
runnable-pids ;; (Setof PID)
process-table ;; (HashTable PID ProcessInfo)
states ;; (HashTable PID Any)
)
#:transparent
#:methods gen:syndicate-pretty-printable
[(define (syndicate-pretty-print w [p (current-output-port)])
(pretty-print-dataspace w p))])
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Seals are used by protocols to prevent the routing tries from
;; examining internal structure of values.
@ -146,30 +113,15 @@
clause ...
[_ #f]))
(define (prepend-at-meta pattern level)
(if (zero? level)
pattern
(at-meta (prepend-at-meta pattern (- level 1)))))
(define (assert pattern)
(patch (pattern->trie '<assert> pattern) trie-empty))
(define (retract pattern)
(patch trie-empty (pattern->trie '<retract> pattern)))
(define (observe-at-meta pattern level)
(if (zero? level)
(pattern->trie '<observe-at-meta> (observe pattern))
(trie-union
(pattern->trie '<observe-at-meta> (observe (prepend-at-meta pattern level)))
(pattern->trie '<observe-at-meta> (at-meta (embedded-trie (observe-at-meta pattern (- level 1))))))))
(define (assert pattern #:meta-level [level 0])
(patch (pattern->trie '<assert> (prepend-at-meta pattern level)) trie-empty))
(define (retract pattern #:meta-level [level 0])
(patch trie-empty (pattern->trie '<retract> (prepend-at-meta pattern level))))
(define (sub pattern #:meta-level [level 0])
(patch (observe-at-meta pattern level) trie-empty))
(define (unsub pattern #:meta-level [level 0])
(patch trie-empty (observe-at-meta pattern level)))
(define (pub pattern #:meta-level [level 0]) (assert (advertise pattern) #:meta-level level))
(define (unpub pattern #:meta-level [level 0]) (retract (advertise pattern) #:meta-level level))
(define (sub pattern)
(patch (pattern->trie '<sub> (observe pattern)) trie-empty))
(define (unsub pattern)
(patch trie-empty (pattern->trie '<unsub> (observe pattern))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -192,69 +144,6 @@
(define (clean-actions actions)
(filter (lambda (x) (and (action? x) (not (patch-empty? x)))) (flatten actions)))
(define (send-event e pid w)
(define behavior (process-info-behavior
(hash-ref (dataspace-process-table w) pid missing-process-info)))
(define old-state (hash-ref (dataspace-states w) pid #f))
(if (not behavior)
w
(begin
(trace-process-step e pid behavior old-state)
(invoke-process pid
(lambda () (clean-transition (ensure-transition (behavior e old-state))))
(match-lambda
[#f w]
[(and q (quit exn final-actions))
(trace-process-step-result e pid behavior old-state exn q)
(enqueue-actions (disable-process pid exn w) pid (append final-actions
(list 'quit)))]
[(and t (transition new-state new-actions))
(trace-process-step-result e pid behavior old-state #f t)
(enqueue-actions (mark-pid-runnable (update-state w pid new-state) pid)
pid
new-actions)])
(lambda (exn)
(trace-process-step-result e pid behavior old-state exn #f)
(enqueue-actions (disable-process pid exn w) pid (list 'quit)))))))
(define (update-state w pid s)
(struct-copy dataspace w [states (hash-set (dataspace-states w) pid s)]))
(define (send-event/guard e pid w)
(if (patch-empty? e)
w
(send-event e pid w)))
(define (disable-process pid exn w)
(when exn
(log-error "Process ~v ~a died with exception:\n~a"
(process-info-name (hash-ref (dataspace-process-table w) pid missing-process-info))
(append (current-actor-path) (list pid))
(exn->string exn)))
(struct-copy dataspace w
[process-table (hash-remove (dataspace-process-table w) pid)]
[states (hash-remove (dataspace-states w) pid)]))
(define (invoke-process pid thunk k-ok k-exn)
(define-values (ok? result)
(call/extended-actor-path
pid
(lambda ()
(with-handlers ([(lambda (exn) #t) (lambda (exn) (values #f exn))])
(values #t (with-continuation-mark 'minimart-process pid (thunk)))))))
(if ok?
(k-ok result)
(k-exn result)))
(define (mark-pid-runnable w pid)
(struct-copy dataspace w [runnable-pids (set-add (dataspace-runnable-pids w) pid)]))
(define (enqueue-actions w label actions)
(struct-copy dataspace w
[pending-action-queue
(queue-append-list (dataspace-pending-action-queue w)
(for/list [(a actions)] (cons label a)))]))
(define (make-quit #:exception [exn #f] . actions)
(quit exn actions))
@ -289,26 +178,6 @@
[(? quit? q) q]
[actions (transition state actions)]))
(define-syntax spawn-dataspace
(syntax-rules ()
[(spawn-dataspace #:name name-exp boot-action ...)
(make-spawn-dataspace #:name name-exp (lambda () (list boot-action ...)))]
[(spawn-dataspace boot-action ...)
(make-spawn-dataspace (lambda () (list boot-action ...)))]))
(define (make-dataspace boot-actions)
(dataspace (mux)
(list->queue (for/list ((a (in-list (clean-actions boot-actions)))) (cons 'meta a)))
(set)
(hash)
(hash)))
(define (make-spawn-dataspace #:name [name #f] boot-actions-thunk)
(spawn (lambda ()
(list dataspace-handle-event
(transition (make-dataspace (boot-actions-thunk)) '())
name))))
(define (transition-bind k t0)
(match t0
[#f (error 'transition-bind "Cannot bind from transition #f with continuation ~v" k)]
@ -337,173 +206,8 @@
[(? quit? q) q]
[(? transition? t) (sequence-transitions* t rest)])]))
(define (inert? w)
(and (queue-empty? (dataspace-pending-action-queue w))
(set-empty? (dataspace-runnable-pids w))))
(define (dataspace-handle-event e w)
(if (or e (not (inert? w)))
(sequence-transitions (transition w '())
(inject-event e)
perform-actions
(lambda (w) (or (step-children w) (transition w '()))))
(step-children w)))
(define ((inject-event e) w)
(transition (match e
[#f w]
[(? targeted-event?) (enqueue-actions w 'meta (list e))]
[(? patch? delta) (enqueue-actions w 'meta (list (lift-patch delta)))]
[(message body) (enqueue-actions w 'meta (list (message (at-meta body))))])
'()))
(define (perform-actions w)
(for/fold ([wt (transition (struct-copy dataspace w [pending-action-queue (make-queue)]) '())])
((entry (in-list (queue->list (dataspace-pending-action-queue w)))))
#:break (quit? wt) ;; TODO: should a quit action be delayed until the end of the turn?
(match-define [cons label a] entry)
(trace-internal-action label a (transition-state wt))
(define wt1 (transition-bind (perform-action label a) wt))
(trace-internal-action-result label a (transition-state wt) wt1)
wt1))
(define ((perform-action label a) w)
(match a
[(spawn boot)
(invoke-process (mux-next-pid (dataspace-mux w)) ;; anticipate pid allocation
(lambda ()
(match (boot)
[(and results (list (? procedure?) (? general-transition?) _))
results]
[other
(error 'spawn
"Spawn boot procedure must yield boot spec; received ~v"
other)]))
(lambda (results)
(match-define (list behavior initial-transition name) results)
(create-process w behavior initial-transition name))
(lambda (exn)
(log-error "Spawned process in dataspace ~a died with exception:\n~a"
(current-actor-path)
(exn->string exn))
(transition w '())))]
['quit
(define-values (new-mux _label delta delta-aggregate)
(mux-remove-stream (dataspace-mux w) label))
;; behavior & state in w already removed by disable-process
(deliver-patches w new-mux label delta delta-aggregate)]
[(quit-dataspace)
(make-quit)]
[(? patch? delta-orig)
(define-values (new-mux _label delta delta-aggregate)
(mux-update-stream (dataspace-mux w) label delta-orig))
(deliver-patches w new-mux label delta delta-aggregate)]
[(and m (message body))
(when (observe? body)
(log-warning "Stream ~a sent message containing query ~v"
(append (current-actor-path) (list label))
body))
(if (and (not (meta-label? label)) ;; it's from a local process, not envt
(at-meta? body)) ;; it relates to envt, not local
(transition w (message (at-meta-claim body)))
(transition (for/fold [(w w)]
[(pid (in-list (mux-route-message (dataspace-mux w) body)))]
(send-event m pid w))
'()))]
[(targeted-event (cons pid remaining-path) e)
(transition (send-event/guard (target-event remaining-path e) pid w) '())]))
(define (create-process w behavior initial-transition name)
(if (not initial-transition)
(transition w '()) ;; Uh, ok
(let ()
(define-values (postprocess initial-actions)
(match (clean-transition initial-transition)
[(and q (quit exn initial-actions0))
(values (lambda (w pid)
(trace-process-step-result 'boot pid behavior (void) exn q)
(disable-process pid exn w))
(append initial-actions0 (list 'quit)))]
[(and t (transition initial-state initial-actions0))
(values (lambda (w pid)
(trace-process-step-result 'boot pid behavior (void) #f t)
(mark-pid-runnable (update-state w pid initial-state) pid))
initial-actions0)]))
(define-values (initial-patch remaining-initial-actions)
(match initial-actions
[(cons (? patch? p) rest) (values p rest)]
[other (values patch-empty other)]))
(define-values (new-mux new-pid delta delta-aggregate)
(mux-add-stream (dataspace-mux w) initial-patch))
(let* ((w (struct-copy dataspace w
[process-table (hash-set (dataspace-process-table w)
new-pid
(process-info name
behavior))]))
(w (enqueue-actions (postprocess w new-pid) new-pid remaining-initial-actions)))
(deliver-patches w new-mux new-pid delta delta-aggregate)))))
(define (deliver-patches w new-mux acting-label delta delta-aggregate)
(define-values (patches meta-action)
(compute-patches (dataspace-mux w) new-mux acting-label delta delta-aggregate))
(transition (for/fold [(w (struct-copy dataspace w [mux new-mux]))]
[(entry (in-list patches))]
(match-define (cons label event) entry)
(send-event/guard event label w))
meta-action))
(define (step-children w)
(define runnable-pids (dataspace-runnable-pids w))
(if (set-empty? runnable-pids)
#f ;; dataspace is inert.
(transition (for/fold [(w (struct-copy dataspace w [runnable-pids (set)]))]
[(pid (in-set runnable-pids))]
(send-event #f pid w))
'())))
(define (pretty-print-dataspace w [p (current-output-port)])
(match-define (dataspace mux qs runnable process-table states) w)
(fprintf p "DATASPACE:\n")
(fprintf p " - ~a queued actions\n" (queue-length qs))
(fprintf p " - ~a runnable pids ~a\n" (set-count runnable) (set->list runnable))
(fprintf p " - ~a live processes\n" (hash-count states))
(fprintf p " - ")
(display (indented-port-output 3 (lambda (p) (syndicate-pretty-print mux p)) #:first-line? #f) p)
(newline p)
(for ([pid (set-union (hash-keys (mux-interest-table mux)) (hash-keys states))])
(define i (hash-ref process-table pid missing-process-info))
(fprintf p " ---- process ~a, name ~v, behavior ~v, STATE:\n"
pid
(process-info-name i)
(process-info-behavior i))
(define state (hash-ref states pid #f))
(display (indented-port-output 6 (lambda (p) (syndicate-pretty-print state p))) p)
(newline p)
(fprintf p " process ~a, name ~v, behavior ~v, CLAIMS:\n"
pid
(process-info-name i)
(process-info-behavior i))
(display (indented-port-output 6 (lambda (p)
(pretty-print-trie (mux-interests-of mux pid) p)))
p)
(newline p)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(module+ test
(require racket/pretty)
(define (step* w)
(let loop ((w w) (actions '()))
(pretty-print w)
(match (dataspace-handle-event #f w)
[#f (values w #f (flatten actions))]
[(quit exn new-actions) (values w exn (flatten (cons actions new-actions)))]
[(transition new-w new-actions) (loop new-w (cons actions new-actions))])))
(step* (make-dataspace '()))
)
;;; Local Variables:
;;; eval: (put 'match-event 'scheme-indent-function 1)
;;; eval: (put 'match-event 'racket-indent-function 1)

View File

@ -3,6 +3,7 @@
(provide dataflow-graph?
make-dataflow-graph
dataflow-graph-edges-forward
current-dataflow-subject-id

View File

@ -0,0 +1,268 @@
#lang racket/base
;; Dataspaces without configured relaying.
(provide (struct-out dataspace)
make-dataspace
spawn-dataspace
make-spawn-dataspace
dataspace-handle-event
pretty-print-dataspace)
(require racket/set)
(require racket/match)
(require "functional-queue.rkt")
(require "trie.rkt")
(require "patch.rkt")
(require "hierarchy.rkt")
(require "trace.rkt")
(require "mux.rkt")
(require "pretty.rkt")
(require "core.rkt")
(require "protocol/standard-relay.rkt")
;; Long-lived process data: (process-info Any Behavior)
(struct process-info (name behavior) #:transparent)
;; Sentinel
(define missing-process-info (process-info #f #f))
;; VM private states
(struct dataspace (mux ;; Multiplexer
pending-action-queue ;; (Queueof (Cons Label (U Action 'quit)))
runnable-pids ;; (Setof PID)
process-table ;; (HashTable PID ProcessInfo)
states ;; (HashTable PID Any)
)
#:transparent
#:methods gen:syndicate-pretty-printable
[(define (syndicate-pretty-print w [p (current-output-port)])
(pretty-print-dataspace w p))])
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(define (send-event e pid w)
(define behavior (process-info-behavior
(hash-ref (dataspace-process-table w) pid missing-process-info)))
(define old-state (hash-ref (dataspace-states w) pid #f))
(if (not behavior)
w
(begin
(trace-process-step e pid behavior old-state)
(invoke-process pid
(lambda () (clean-transition (ensure-transition (behavior e old-state))))
(match-lambda
[#f w]
[(and q (<quit> exn final-actions))
(trace-process-step-result e pid behavior old-state exn q)
(enqueue-actions (disable-process pid exn w) pid (append final-actions
(list 'quit)))]
[(and t (transition new-state new-actions))
(trace-process-step-result e pid behavior old-state #f t)
(enqueue-actions (mark-pid-runnable (update-state w pid new-state) pid)
pid
new-actions)])
(lambda (exn)
(trace-process-step-result e pid behavior old-state exn #f)
(enqueue-actions (disable-process pid exn w) pid (list 'quit)))))))
(define (update-state w pid s)
(struct-copy dataspace w [states (hash-set (dataspace-states w) pid s)]))
(define (send-event/guard e pid w)
(if (patch-empty? e)
w
(send-event e pid w)))
(define (disable-process pid exn w)
(when exn
(log-error "Process ~v ~a died with exception:\n~a"
(process-info-name (hash-ref (dataspace-process-table w) pid missing-process-info))
(append (current-actor-path) (list pid))
(exn->string exn)))
(struct-copy dataspace w
[process-table (hash-remove (dataspace-process-table w) pid)]
[states (hash-remove (dataspace-states w) pid)]))
(define (invoke-process pid thunk k-ok k-exn)
(define-values (ok? result)
(call/extended-actor-path
pid
(lambda ()
(with-handlers ([(lambda (exn) #t) (lambda (exn) (values #f exn))])
(values #t (with-continuation-mark 'minimart-process pid (thunk)))))))
(if ok?
(k-ok result)
(k-exn result)))
(define (mark-pid-runnable w pid)
(struct-copy dataspace w [runnable-pids (set-add (dataspace-runnable-pids w) pid)]))
(define (enqueue-actions w label actions)
(struct-copy dataspace w
[pending-action-queue
(queue-append-list (dataspace-pending-action-queue w)
(for/list [(a actions)] (cons label a)))]))
(define-syntax spawn-dataspace
(syntax-rules ()
[(spawn-dataspace #:name name-exp boot-action ...)
(spawn-standard-relay
(make-spawn-dataspace #:name name-exp (lambda () (list boot-action ...))))]
[(spawn-dataspace boot-action ...)
(spawn-standard-relay
(make-spawn-dataspace (lambda () (list boot-action ...))))]))
(define (make-dataspace boot-actions)
(dataspace (mux)
(list->queue (for/list ((a (in-list (clean-actions boot-actions)))) (cons 'meta a)))
(set)
(hash)
(hash)))
(define (make-spawn-dataspace #:name [name #f] boot-actions-thunk)
(<spawn> (lambda ()
(list dataspace-handle-event
(transition (make-dataspace (boot-actions-thunk)) '())
name))))
(define (inert? w)
(and (queue-empty? (dataspace-pending-action-queue w))
(set-empty? (dataspace-runnable-pids w))))
(define (dataspace-handle-event e w)
(if (or e (not (inert? w)))
(sequence-transitions (transition w '())
(inject-event e)
perform-actions
(lambda (w) (or (step-children w) (transition w '()))))
(step-children w)))
(define ((inject-event e) w)
(transition (if (not e) w (enqueue-actions w 'meta (list e))) '()))
(define (perform-actions w)
(for/fold ([wt (transition (struct-copy dataspace w [pending-action-queue (make-queue)]) '())])
((entry (in-list (queue->list (dataspace-pending-action-queue w)))))
#:break (quit? wt) ;; TODO: should a quit action be delayed until the end of the turn?
(match-define [cons label a] entry)
(trace-internal-action label a (transition-state wt))
(define wt1 (transition-bind (perform-action label a) wt))
(trace-internal-action-result label a (transition-state wt) wt1)
wt1))
(define ((perform-action label a) w)
(match a
[(<spawn> boot)
(invoke-process (mux-next-pid (dataspace-mux w)) ;; anticipate pid allocation
(lambda ()
(match (boot)
[(and results (list (? procedure?) (? general-transition?) _))
results]
[other
(error 'spawn
"Spawn boot procedure must yield boot spec; received ~v"
other)]))
(lambda (results)
(match-define (list behavior initial-transition name) results)
(create-process w behavior initial-transition name))
(lambda (exn)
(log-error "Spawned process in dataspace ~a died with exception:\n~a"
(current-actor-path)
(exn->string exn))
(transition w '())))]
['quit
(define-values (new-mux _label delta delta-aggregate)
(mux-remove-stream (dataspace-mux w) label))
;; behavior & state in w already removed by disable-process
(deliver-patches w new-mux label delta delta-aggregate)]
[(quit-dataspace)
(quit)]
[(? patch? delta-orig)
(define-values (new-mux _label delta delta-aggregate)
(mux-update-stream (dataspace-mux w) label delta-orig))
(deliver-patches w new-mux label delta delta-aggregate)]
[(and m (message body))
(when (observe? body)
(log-warning "Stream ~a sent message containing query ~v"
(append (current-actor-path) (list label))
body))
(define-values (affected-pids meta-affected?) (mux-route-message (dataspace-mux w) body))
(transition (for/fold [(w w)] [(pid (in-list affected-pids))] (send-event m pid w))
(and meta-affected? m))]
[(targeted-event (cons pid remaining-path) e)
(transition (send-event/guard (target-event remaining-path e) pid w) '())]))
(define (create-process w behavior initial-transition name)
(if (not initial-transition)
(transition w '()) ;; Uh, ok
(let ()
(define-values (postprocess initial-actions)
(match (clean-transition initial-transition)
[(and q (<quit> exn initial-actions0))
(values (lambda (w pid)
(trace-process-step-result 'boot pid behavior (void) exn q)
(disable-process pid exn w))
(append initial-actions0 (list 'quit)))]
[(and t (transition initial-state initial-actions0))
(values (lambda (w pid)
(trace-process-step-result 'boot pid behavior (void) #f t)
(mark-pid-runnable (update-state w pid initial-state) pid))
initial-actions0)]))
(define-values (initial-patch remaining-initial-actions)
(match initial-actions
[(cons (? patch? p) rest) (values p rest)]
[other (values patch-empty other)]))
(define-values (new-mux new-pid delta delta-aggregate)
(mux-add-stream (dataspace-mux w) initial-patch))
(let* ((w (struct-copy dataspace w
[process-table (hash-set (dataspace-process-table w)
new-pid
(process-info name
behavior))]))
(w (enqueue-actions (postprocess w new-pid) new-pid remaining-initial-actions)))
(deliver-patches w new-mux new-pid delta delta-aggregate)))))
(define (deliver-patches w new-mux acting-label delta delta-aggregate)
(define-values (patches meta-action)
(compute-patches (dataspace-mux w) new-mux acting-label delta delta-aggregate))
(transition (for/fold [(w (struct-copy dataspace w [mux new-mux]))]
[(entry (in-list patches))]
(match-define (cons label event) entry)
(send-event/guard event label w))
(and (patch-non-empty? meta-action) meta-action)))
(define (step-children w)
(define runnable-pids (dataspace-runnable-pids w))
(if (set-empty? runnable-pids)
#f ;; dataspace is inert.
(transition (for/fold [(w (struct-copy dataspace w [runnable-pids (set)]))]
[(pid (in-set runnable-pids))]
(send-event #f pid w))
'())))
(define (pretty-print-dataspace w [p (current-output-port)])
(match-define (dataspace mux qs runnable process-table states) w)
(fprintf p "DATASPACE:\n")
(fprintf p " - ~a queued actions\n" (queue-length qs))
(fprintf p " - ~a runnable pids ~a\n" (set-count runnable) (set->list runnable))
(fprintf p " - ~a live processes\n" (hash-count states))
(fprintf p " - ")
(display (indented-port-output 3 (lambda (p) (syndicate-pretty-print mux p)) #:first-line? #f) p)
(newline p)
(for ([pid (set-union (hash-keys (mux-interest-table mux)) (hash-keys states))])
(define i (hash-ref process-table pid missing-process-info))
(fprintf p " ---- process ~a, name ~v, behavior ~v, STATE:\n"
pid
(process-info-name i)
(process-info-behavior i))
(define state (hash-ref states pid #f))
(display (indented-port-output 6 (lambda (p) (syndicate-pretty-print state p))) p)
(newline p)
(fprintf p " process ~a, name ~v, behavior ~v, CLAIMS:\n"
pid
(process-info-name i)
(process-info-behavior i))
(display (indented-port-output 6 (lambda (p)
(pretty-print-trie (mux-interests-of mux pid) p)))
p)
(newline p)))

View File

@ -7,6 +7,7 @@
(require "drivers/timer.rkt")
(require "pretty.rkt")
(require "support/hash.rkt")
(require "protocol/advertise.rkt")
(provide (except-out (struct-out demand-matcher) demand-matcher)
(rename-out [make-demand-matcher demand-matcher])
@ -206,10 +207,9 @@
start-task
[on-task-exit #f]
#:task-supervisor [task-supervisor default-task-supervisor]
#:name [name #f]
#:meta-level [meta-level 0])
(define d (make-demand-matcher (prepend-at-meta demand-spec meta-level)
(prepend-at-meta supply-spec meta-level)
#:name [name #f])
(define d (make-demand-matcher demand-spec
supply-spec
(lambda (acs . rs) (cons (apply start-task rs) acs))
(if on-task-exit
(lambda (acs . rs) (cons (apply on-task-exit rs) acs))
@ -218,9 +218,9 @@
(spawn #:name name
demand-matcher-handle-event
d
(patch-seq (sub (projection->pattern demand-spec) #:meta-level meta-level)
(sub (projection->pattern supply-spec) #:meta-level meta-level)
(pub (projection->pattern supply-spec) #:meta-level meta-level))))
(patch-seq (sub (projection->pattern demand-spec))
(sub (projection->pattern supply-spec))
(pub (projection->pattern supply-spec)))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

View File

@ -3,7 +3,8 @@
(require racket/exn)
(require (prefix-in tcp: racket/tcp))
(require (only-in racket/port read-bytes-avail!-evt))
(require "../demand-matcher.rkt")
(require syndicate/demand-matcher)
(require syndicate/protocol/advertise)
(require racket/unit)
(require net/tcp-sig)
@ -36,14 +37,16 @@
(define (spawn-tcp-driver)
(list (spawn-demand-matcher (advertise (observe (tcp-channel ? (?! (tcp-listener ?)) ?)))
(advertise (advertise (tcp-channel ? (?! (tcp-listener ?)) ?)))
spawn-tcp-listener)
spawn-tcp-listener
#:name 'drivers/tcp:dm:listener)
(spawn-demand-matcher (advertise (tcp-channel (?! (tcp-handle ?)) (?! (tcp-address ? ?)) ?))
(observe (tcp-channel (?! (tcp-handle ?)) (?! (tcp-address ? ?)) ?))
spawn-tcp-connection
(lambda (local-addr remote-addr)
(log-debug "Outbound TCP connection closed: ~v -> ~v"
local-addr
remote-addr)))))
remote-addr))
#:name 'drivers/tcp:dm:connect)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Listener
@ -78,7 +81,7 @@
(cond [(patch/removed? p) (channel-put ch 'quit) (quit)]
[(patch/added? p) (channel-put ch 'unblock) #f]
[else #f])]
[(message (at-meta (tcp-accepted remote-addr _ cin cout)))
[(message (inbound (tcp-accepted remote-addr _ cin cout)))
(transition state (spawn-connection (listener-state-server-addr state)
remote-addr
cin
@ -90,12 +93,13 @@
(define listener (tcp:tcp-listen port 128 #t))
(define control-ch (make-channel))
(thread (lambda () (tcp-listener-thread control-ch listener server-addr)))
(spawn tcp-listener-behavior
(spawn #:name (list 'drivers/tcp:listen port)
tcp-listener-behavior
(listener-state control-ch server-addr)
(patch-seq
(sub (advertise (observe (tcp-channel ? server-addr ?)))) ;; monitor peer
(pub (advertise (tcp-channel ? server-addr ?))) ;; declare we might make connections
(sub (tcp-accepted ? server-addr ? ?) #:meta-level 1) ;; events from driver thread
(sub (inbound (tcp-accepted ? server-addr ? ?))) ;; events from driver thread
)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -158,10 +162,10 @@
(shutdown-connection! state)
(raise exn)))]
(match e
[(message (at-meta (tcp-channel remote-addr local-addr (? eof-object?))))
[(message (inbound (tcp-channel remote-addr local-addr (? eof-object?))))
(shutdown-connection! state)
(quit)]
[(message (at-meta (tcp-channel remote-addr local-addr (? bytes? bs))))
[(message (inbound (tcp-channel remote-addr local-addr (? bytes? bs))))
(transition state (message (tcp-channel remote-addr local-addr bs)))]
[(message (tcp-channel _ _ bs))
(write-bytes bs (connection-state-cout state))
@ -177,13 +181,14 @@
(define (spawn-connection local-addr remote-addr cin cout)
(define control-ch (make-channel))
(thread (lambda () (tcp-connection-thread remote-addr local-addr control-ch cin)))
(spawn tcp-connection
(spawn #:name (list 'drivers/tcp:connect local-addr remote-addr)
tcp-connection
(connection-state control-ch cout)
(patch-seq
(sub (observe (tcp-channel remote-addr local-addr ?))) ;; monitor peer
(pub (tcp-channel remote-addr local-addr ?)) ;; may send segments to peer
(sub (tcp-channel local-addr remote-addr ?)) ;; want segments from peer
(sub (tcp-channel remote-addr local-addr ?) #:meta-level 1) ;; segments from driver thread
(sub (inbound (tcp-channel remote-addr local-addr ?))) ;; segments from driver thread
)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

View File

@ -7,6 +7,7 @@
(require racket/set)
(require data/heap)
(require syndicate/protocol/advertise)
(struct pending-timer (deadline label) #:transparent)
@ -17,7 +18,7 @@
(struct set-timer (label msecs kind) #:prefab)
(struct timer-expired (label msecs) #:prefab)
(define expiry-projection (at-meta (?! (timer-expired ? ?))))
(define expiry-projection (inbound (?! (timer-expired ? ?))))
(define (spawn-timer-driver)
(define control-ch (make-channel))
@ -63,13 +64,14 @@
(send-ground-patch interrupt-clearing-patch)
(transition new-count
(cons (reverse actions-rev)
(when (zero? new-count) (unsub (timer-expired ? ?) #:meta-level 1))))]
(when (zero? new-count) (unsub (inbound (timer-expired ? ?))))))]
[(message (and instruction (set-timer _ _ _)))
(channel-put control-ch instruction)
(transition (+ count 1)
(when (= count 0) (sub (timer-expired ? ?) #:meta-level 1)))]
(when (= count 0) (sub (inbound (timer-expired ? ?)))))]
[_ #f]))
(spawn timer-driver
(spawn #:name 'drivers/timer
timer-driver
0 ;; initial count
(patch-seq (sub (set-timer ? ? ?))
(pub (timer-expired ? ?)))))

View File

@ -1,7 +1,8 @@
#lang syndicate
(require (prefix-in udp: racket/udp))
(require "../demand-matcher.rkt")
(require syndicate/demand-matcher)
(require syndicate/protocol/advertise)
(provide (struct-out udp-remote-address)
(struct-out udp-handle)
@ -82,14 +83,14 @@
(when (peer-quit? p)
(channel-put control-ch 'quit)
(quit))]
[(message (at-meta (? udp-packet? p)))
[(message (inbound (? udp-packet? p)))
(transition s (message p))]
[(message (udp-packet _ (udp-remote-address host port) body))
(udp:udp-send-to socket host port body)
#f]
[_ #f]))
(void)
(patch-seq (sub (udp-packet ? local-addr ?) #:meta-level 1)
(patch-seq (sub (inbound (udp-packet ? local-addr ?)))
(sub (udp-packet local-addr (udp-remote-address ? ?) ?))
(pub (udp-packet (udp-remote-address ? ?) local-addr ?))
(sub (udp-multicast-group-member local-addr ? ?))

View File

@ -171,7 +171,7 @@
(on-stop (channel-put listener-control 'quit))
(on (message (web-raw-request $id port $conn $lowlevel-req $control-ch) #:meta-level 1)
(on (message (inbound (web-raw-request $id port $conn $lowlevel-req $control-ch)))
(define web-req (web-request id
'inbound
(web-request-header
@ -245,8 +245,8 @@
(on-stop (channel-put ws-ch 'quit))
(on (message (websocket-message id 'outbound $body))
(channel-put ws-ch (list 'send body)))
(stop-when (message (web-incoming-message id (? eof-object? _)) #:meta-level 1))
(on (message (web-incoming-message id $body) #:meta-level 1)
(stop-when (message (inbound (web-incoming-message id (? eof-object? _)))))
(on (message (inbound (web-incoming-message id $body)))
(unless (eof-object? body) (send! (websocket-message id 'inbound body)))))
(define (req->virtual-host scheme r port)
@ -401,7 +401,7 @@
(log-info "Connected to ~a ~a" url (current-inexact-milliseconds))
((websocket-connection-main id control-ch) c (void))))))
(react
(stop-when (message (web-raw-client-conn id $c) #:meta-level 1)
(stop-when (message (inbound (web-raw-client-conn id $c)))
(react (stop-when (retracted (observe (web-response-websocket id _))))
(if (ws-conn? c)
(begin (assert (web-response-websocket id (ws-conn-headers c)))
@ -447,7 +447,7 @@
(error 'http-sendrecv "Bad first line: ~v" first-line)])))
(send-ground-message (web-raw-client-conn id response))))
(react
(stop-when (message (web-raw-client-conn id $r) #:meta-level 1)
(stop-when (message (inbound (web-raw-client-conn id $r)))
(react (stop-when (retracted (observe (web-response-complete id _ _))))
(if (exn? r)
(assert (web-response-websocket id #f #f))

View File

@ -17,7 +17,8 @@
ws-conn-peer-addresses
ws-conn-host+port
ws-conn-path))
(require "../demand-matcher.rkt")
(require syndicate/demand-matcher)
(require syndicate/protocol/advertise)
(require racket/unit)
(require racket/tcp)
@ -58,14 +59,16 @@
?))
(list (spawn-demand-matcher (advertise (observe inbound-listener-message-pat))
(advertise (advertise inbound-listener-message-pat))
spawn-websocket-listener)
spawn-websocket-listener
#:name 'drivers/websocket:dm:listen)
(spawn-demand-matcher (advertise outbound-conn-message-pat)
(observe outbound-conn-message-pat)
spawn-websocket-connection
(lambda (local-addr remote-addr)
(log-debug "Outbound websocket connection closed: ~v -> ~v"
local-addr
remote-addr)))))
remote-addr))
#:name 'drivers/websocket:dm:connect)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Listener
@ -77,7 +80,7 @@
[(? patch/removed? p)
((listener-state-shutdown-procedure state))
(quit)]
[(message (at-meta (websocket-connection id local-addr remote-addr c control-ch)))
[(message (inbound (websocket-connection id local-addr remote-addr c control-ch)))
(transition state (spawn-connection local-addr remote-addr id c control-ch))]
[_ #f]))
@ -133,12 +136,13 @@
(ssl-options->ssl-tcp@ ssl-options)
tcp@)
(connection-handler server-addr)))
(spawn websocket-listener
(spawn #:name (list 'drivers/websocket:listen port)
websocket-listener
(listener-state shutdown-procedure server-addr)
(patch-seq
(sub (advertise (observe (websocket-message ? server-addr ?)))) ;; monitor peer
(pub (advertise (websocket-message ? server-addr ?))) ;; declare we might make connections
(sub (websocket-connection ? server-addr ? ? ?) #:meta-level 1) ;; events from driver thd
(sub (inbound (websocket-connection ? server-addr ? ? ?))) ;; events from driver thd
)))
(define (spawn-websocket-connection local-addr remote-addr)
@ -156,9 +160,10 @@
(when (not (exn? c))
(log-info "Connected to ~a ~a" url (current-inexact-milliseconds))
(connection-thread-loop control-ch c id))))
(spawn (lambda (e buffered-messages-rev)
(spawn #:name (list 'drivers/websocket:connect/initial local-addr remote-addr id)
(lambda (e buffered-messages-rev)
(match e
[(message (at-meta (websocket-connection _ _ _ c _)))
[(message (inbound (websocket-connection _ _ _ c _)))
(quit
(when (not (exn? c))
(for [(m (reverse buffered-messages-rev))] (ws-send! c m))
@ -175,7 +180,7 @@
;; has been established. This way, if the connection fails,
;; it looks like it came up briefly and went down again.
(sub (websocket-message local-addr remote-addr ?))
(sub (websocket-connection id local-addr remote-addr ? control-ch) #:meta-level 1))))
(sub (inbound (websocket-connection id local-addr remote-addr ? control-ch))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Connection
@ -192,9 +197,9 @@
(shutdown-connection! state)
(raise exn)))]
(match e
[(message (at-meta (websocket-incoming-message _ (? eof-object?))))
[(message (inbound (websocket-incoming-message _ (? eof-object?))))
(shutdown-connection! state)]
[(message (at-meta (websocket-incoming-message _ bytes-or-string)))
[(message (inbound (websocket-incoming-message _ bytes-or-string)))
(transition state (message (websocket-message (connection-state-remote-addr state)
(connection-state-local-addr state)
bytes-or-string)))]
@ -209,7 +214,8 @@
[_ #f])))
(define (spawn-connection local-addr remote-addr id c control-ch)
(spawn websocket-connection-behaviour
(spawn #:name (list 'drivers/websocket:connect local-addr remote-addr id)
websocket-connection-behaviour
(connection-state local-addr remote-addr c control-ch)
(patch-seq
(let-values (((la lp ra rp) (ws-conn-peer-addresses c)))
@ -217,7 +223,7 @@
(sub (observe (websocket-message remote-addr local-addr ?))) ;; monitor peer
(pub (websocket-message remote-addr local-addr ?)) ;; may send messages to peer
(sub (websocket-message local-addr remote-addr ?)) ;; want segments from peer
(sub (websocket-incoming-message id ?) #:meta-level 1) ;; segments from driver thd
(sub (inbound (websocket-incoming-message id ?))) ;; segments from driver thd
)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

View File

@ -1,5 +1,6 @@
#lang syndicate/actor
(require syndicate/protocol/advertise)
(require/activate syndicate/drivers/tcp)
(require (only-in racket/port read-bytes-line-evt))
@ -9,7 +10,7 @@
(actor
(react/suspend (quit)
(on (message (external-event stdin-evt (list $line)) #:meta-level 1)
(on (message (inbound (external-event stdin-evt (list $line))))
(if (eof-object? line)
(quit)
(send! (tcp-channel local-handle remote-handle line))))

View File

@ -1,5 +1,6 @@
#lang syndicate/actor
(require syndicate/protocol/advertise)
(require/activate syndicate/drivers/tcp)
(require (only-in racket/string string-trim))

View File

@ -1,5 +1,6 @@
#lang syndicate/actor
(require syndicate/protocol/advertise)
(require/activate syndicate/drivers/tcp)
(require (only-in racket/string string-trim))
@ -8,8 +9,7 @@
(define (spawn-session them us)
(actor (define (send-to-remote fmt . vs)
(send! (tcp-channel us them (string->bytes/utf-8 (apply format fmt vs)))
#:meta-level 1))
(send! (outbound (tcp-channel us them (string->bytes/utf-8 (apply format fmt vs))))))
(define (say who fmt . vs)
(unless (equal? who user)
@ -18,18 +18,18 @@
(define user (gensym 'user))
(send-to-remote "Welcome, ~a.\n" user)
(until (retracted (advertise (tcp-channel them us _)) #:meta-level 1)
(until (retracted (inbound (advertise (tcp-channel them us _))))
(assert (present user))
(on (asserted (present $who)) (say who "arrived."))
(on (retracted (present $who)) (say who "departed."))
(on (message (says $who $what)) (say who "says: ~a" what))
(assert (advertise (tcp-channel us them _)) #:meta-level 1)
(on (message (tcp-channel them us $bs) #:meta-level 1)
(assert (outbound (advertise (tcp-channel us them _))))
(on (message (inbound (tcp-channel them us $bs)))
(send! (says user (string-trim (bytes->string/utf-8 bs))))))))
(dataspace (define us (tcp-listener 5999))
(forever (assert (advertise (observe (tcp-channel _ us _))) #:meta-level 1)
(on (asserted (advertise (tcp-channel $them us _)) #:meta-level 1)
(forever (assert (outbound (advertise (observe (tcp-channel _ us _)))))
(on (asserted (inbound (advertise (tcp-channel $them us _))))
(spawn-session them us))))

View File

@ -1,5 +1,6 @@
#lang syndicate/actor
(require syndicate/protocol/advertise)
(require/activate syndicate/drivers/tcp)
(require (only-in racket/string string-trim))

View File

@ -1,5 +1,6 @@
#lang syndicate/actor
(require syndicate/protocol/advertise)
(require/activate syndicate/drivers/tcp)
(require (only-in racket/string string-trim))
@ -9,8 +10,7 @@
(define (spawn-session them us)
(actor (define (send-to-remote fmt . vs)
(send! (tcp-channel us them (string->bytes/utf-8 (apply format fmt vs)))
#:meta-level 1))
(send! (outbound (tcp-channel us them (string->bytes/utf-8 (apply format fmt vs))))))
(define (say who fmt . vs)
(unless (equal? who user)
@ -19,15 +19,15 @@
(define user (gensym 'user))
(send-to-remote "Welcome, ~a.\n" user)
(until (retracted (advertise (tcp-channel them us _)) #:meta-level 1)
(until (retracted (inbound (advertise (tcp-channel them us _))))
(assert (present user))
(on (asserted (present $who)) (say who "arrived."))
(on (retracted (present $who)) (say who "departed."))
(on (message (says $who $what)) (say who "says: ~a" what))
(assert (advertise (tcp-channel us them _)) #:meta-level 1)
(on (message (tcp-channel them us $bs) #:meta-level 1)
(assert (outbound (advertise (tcp-channel us them _))))
(on (message (inbound (tcp-channel them us $bs)))
(define input-string (string-trim (bytes->string/utf-8 bs)))
(if (equal? input-string "quit-dataspace")
(send! (shutdown))
@ -35,6 +35,6 @@
(dataspace (define us (tcp-listener 5999))
(until (message (shutdown))
(assert (advertise (observe (tcp-channel _ us _))) #:meta-level 1)
(on (asserted (advertise (tcp-channel $them us _)) #:meta-level 1)
(assert (outbound (advertise (observe (tcp-channel _ us _)))))
(on (asserted (inbound (advertise (tcp-channel $them us _))))
(spawn-session them us))))

View File

@ -1,5 +1,6 @@
#lang syndicate/actor
(require syndicate/protocol/advertise)
(require/activate syndicate/drivers/tcp)
(define server-id (tcp-listener 5999))

View File

@ -29,26 +29,26 @@
(field [tick-count 0])
(define (arm!)
(log-info "Tick ~v!" (tick-count))
(send! (set-timer 'tick 1000 'relative) #:meta-level 1))
(react (on (message (timer-expired 'tick _) #:meta-level 1)
(send! (outbound (set-timer 'tick 1000 'relative))))
(react (on (message (inbound (timer-expired 'tick _)))
(tick-count (+ (tick-count) 1))
(arm!))
(on-start (arm!))))
(field [counter 0])
(react
(during `(up ,$n) #:meta-level 1
(during (inbound `(up ,$n))
(on-start (log-info "up: ~v" n))
(on-stop (log-info "down: ~v" n)
(counter (+ (counter) 1)))))
(react (stop-when (asserted `(fib 36 ,$v) #:meta-level 1)
(react (stop-when (asserted (inbound `(fib 36 ,$v)))
(log-info "fib 36 is ~a" v)))
(react (stop-when (asserted `(fib 38 ,$v) #:meta-level 1)
(react (stop-when (asserted (inbound `(fib 38 ,$v)))
(log-info "fib 38 is ~a" v)))
(until (rising-edge (= (counter) 2)))
(log-info "Quitting main")
(until (message (timer-expired 'wait _) #:meta-level 1)
(on-start (send! (set-timer 'wait 100 'relative) #:meta-level 1))))
(until (message (inbound (timer-expired 'wait _)))
(on-start (send! (outbound (set-timer 'wait 100 'relative))))))

View File

@ -36,8 +36,8 @@
(begin0 reader-count
(set! reader-count (+ reader-count 1))))
(actor (print-prompt)
(until (message (external-event e (list (? eof-object? _))) #:meta-level 1)
(on (message (external-event e (list (? bytes? $bs))) #:meta-level 1)
(until (message (inbound (external-event e (list (? eof-object? _)))))
(on (message (inbound (external-event e (list (? bytes? $bs)))))
(match (string-split (string-trim (bytes->string/utf-8 bs)))
[(list "open" name)
(define reader-id (generate-reader-id))

View File

@ -67,8 +67,8 @@
(begin0 reader-count
(set! reader-count (+ reader-count 1))))
(actor (print-prompt)
(until (message (external-event e (list (? eof-object? _))) #:meta-level 1)
(on (message (external-event e (list (? bytes? $bs))) #:meta-level 1)
(until (message (inbound (external-event e (list (? eof-object? _)))))
(on (message (inbound (external-event e (list (? bytes? $bs)))))
(match (string-split (string-trim (bytes->string/utf-8 bs)))
[(list "open" name)
(define reader-id (generate-reader-id))

View File

@ -68,8 +68,8 @@
(begin0 reader-count
(set! reader-count (+ reader-count 1))))
(actor (print-prompt)
(until (message (external-event e (list (? eof-object? _))) #:meta-level 1)
(on (message (external-event e (list (? bytes? $bs))) #:meta-level 1)
(until (message (inbound (external-event e (list (? eof-object? _)))))
(on (message (inbound (external-event e (list (? bytes? $bs)))))
(match (string-split (string-trim (bytes->string/utf-8 bs)))
[(list "open" name)
(define reader-id (generate-reader-id))

View File

@ -37,8 +37,8 @@
(begin0 reader-count
(set! reader-count (+ reader-count 1))))
(actor (print-prompt)
(until (message (external-event e (list (? eof-object? _))) #:meta-level 1)
(on (message (external-event e (list (? bytes? $bs))) #:meta-level 1)
(until (message (inbound (external-event e (list (? eof-object? _)))))
(on (message (inbound (external-event e (list (? bytes? $bs)))))
(match (string-split (string-trim (bytes->string/utf-8 bs)))
[(list "open" name)
(define reader-id (generate-reader-id))

View File

@ -46,8 +46,8 @@
(begin0 reader-count
(set! reader-count (+ reader-count 1))))
(actor (print-prompt)
(until (message (external-event e (list (? eof-object? _))) #:meta-level 1)
(on (message (external-event e (list (? bytes? $bs))) #:meta-level 1)
(until (message (inbound (external-event e (list (? eof-object? _)))))
(on (message (inbound (external-event e (list (? bytes? $bs)))))
(match (string-split (string-trim (bytes->string/utf-8 bs)))
[(list "open" name)
(define reader-id (generate-reader-id))

View File

@ -63,9 +63,9 @@
(log-info "Computed meta-level: ~v" LEVEL)
(actor #:name 'observer-in-ds
(forever
(assert 'observer-in-ds-ready #:meta-level LEVEL)
(assert (outbound* LEVEL 'observer-in-ds-ready))
(on-start (log-info "observer-in-ds: STARTING"))
(define/query-set items `(item ,$a ,$b) #:meta-level LEVEL (list a b))
(on (message 'dump #:meta-level LEVEL)
(define/query-set items (inbound* LEVEL `(item ,$a ,$b)) (list a b))
(on (message (inbound* LEVEL 'dump))
(log-info "observer-in-ds: ~v" (items)))))
(forever)))

View File

@ -53,6 +53,6 @@
(let ((new-expr (read)))
(send-ground-message (set-cell cell-name new-expr))
(loop)))))))
(until (message 'quit #:meta-level 1)
(on (message (set-cell $name $expr) #:meta-level 1)
(until (message (inbound 'quit))
(on (message (inbound (set-cell $name $expr)))
(send! (set-cell name expr)))))

View File

@ -1,5 +1,6 @@
#lang syndicate
(require syndicate/protocol/advertise)
(require (only-in racket/port read-bytes-line-evt))
(require/activate "../drivers/tcp.rkt")
@ -9,9 +10,9 @@
(spawn/stateless (lambda (e)
(match e
[(? patch/removed?) (quit)]
[(message (at-meta (external-event _ (list (? eof-object?)))))
[(message (inbound (external-event _ (list (? eof-object?)))))
(quit)]
[(message (at-meta (external-event _ (list line))))
[(message (inbound (external-event _ (list line))))
(message (tcp-channel local-handle remote-handle line))]
[(message (tcp-channel _ _ bs))
(write-bytes bs)
@ -19,8 +20,7 @@
#f]
[_ #f]))
(patch-seq
(sub (external-event (read-bytes-line-evt (current-input-port) 'any) ?)
#:meta-level 1)
(sub (inbound (external-event (read-bytes-line-evt (current-input-port) 'any) ?)))
(sub (tcp-channel remote-handle local-handle ?))
(sub (advertise (tcp-channel remote-handle local-handle ?)))
(pub (tcp-channel local-handle remote-handle ?))))

View File

@ -1,5 +1,6 @@
#lang syndicate
(require syndicate/protocol/advertise)
(require (only-in racket/string string-trim))
(require/activate "../drivers/tcp.rkt")
(require "../demand-matcher.rkt")

View File

@ -1,22 +1,23 @@
#lang syndicate
(require syndicate/protocol/advertise)
(require (only-in racket/string string-trim))
(require/activate "../drivers/tcp.rkt")
(require "../demand-matcher.rkt")
(define (spawn-session them us)
(define user (gensym 'user))
(define remote-detector (at-meta (?!)))
(define remote-detector (inbound (?!)))
(define peer-detector (advertise `(,(?!) says ,?)))
(define (send-to-remote fmt . vs)
(message (at-meta (tcp-channel us them (string->bytes/utf-8 (apply format fmt vs))))))
(message (outbound (tcp-channel us them (string->bytes/utf-8 (apply format fmt vs))))))
(define (say who fmt . vs)
(unless (equal? who user) (send-to-remote "~a ~a\n" who (apply format fmt vs))))
(list (send-to-remote "Welcome, ~a.\n" user)
(spawn/stateless
(lambda (e)
(match e
[(message (at-meta (tcp-channel _ _ bs)))
[(message (inbound (tcp-channel _ _ bs)))
(message `(,user says ,(string-trim (bytes->string/utf-8 bs))))]
[(message `(,who says ,what))
(say who "says: ~a" what)]
@ -31,13 +32,12 @@
(sub `(,? says ,?)) ;; read actual chat messages
(sub (advertise `(,? says ,?))) ;; observe peer presence
(pub `(,user says ,?)) ;; advertise our presence
(sub (tcp-channel them us ?) #:meta-level 1) ;; read from remote client
(sub (advertise (tcp-channel them us ?)) #:meta-level 1) ;; monitor remote client
(pub (tcp-channel us them ?) #:meta-level 1) ;; we will write to remote client
(sub (inbound (tcp-channel them us ?))) ;; read from remote client
(sub (inbound (advertise (tcp-channel them us ?)))) ;; monitor remote client
(pub (inbound (tcp-channel us them ?))) ;; we will write to remote client
))))
(spawn-dataspace
(spawn-demand-matcher (advertise (tcp-channel (?!) (?! (tcp-listener 5999)) ?))
(observe (tcp-channel (?!) (?! (tcp-listener 5999)) ?))
#:meta-level 1
(spawn-demand-matcher (inbound (advertise (tcp-channel (?!) (?! (tcp-listener 5999)) ?)))
(inbound (observe (tcp-channel (?!) (?! (tcp-listener 5999)) ?)))
spawn-session))

View File

@ -2,6 +2,7 @@
;; The chat server, using a proxy abstracting over details of the TCP
;; driver's protocol.
(require syndicate/protocol/advertise)
(require (only-in racket/string string-trim))
(require/activate "../drivers/tcp.rkt")
(require "../demand-matcher.rkt")

View File

@ -1,22 +1,23 @@
#lang syndicate
(require syndicate/protocol/advertise)
(require (only-in racket/string string-trim))
(require/activate "../drivers/tcp.rkt")
(require "../demand-matcher.rkt")
(define (spawn-session them us)
(define user (gensym 'user))
(define remote-detector (at-meta (?!)))
(define remote-detector (inbound (?!)))
(define peer-detector (advertise `(,(?!) says ,?)))
(define (send-to-remote fmt . vs)
(message (at-meta (tcp-channel us them (string->bytes/utf-8 (apply format fmt vs))))))
(message (outbound (tcp-channel us them (string->bytes/utf-8 (apply format fmt vs))))))
(define (say who fmt . vs)
(unless (equal? who user) (send-to-remote "~a ~a\n" who (apply format fmt vs))))
(list (send-to-remote "Welcome, ~a.\n" user)
(spawn/stateless
(lambda (e)
(match e
[(message (at-meta (tcp-channel _ _ bs)))
[(message (inbound (tcp-channel _ _ bs)))
(define input-string (string-trim (bytes->string/utf-8 bs)))
(if (equal? input-string "quit-dataspace")
(quit-dataspace)
@ -34,13 +35,12 @@
(sub `(,? says ,?)) ;; read actual chat messages
(sub (advertise `(,? says ,?))) ;; observe peer presence
(pub `(,user says ,?)) ;; advertise our presence
(sub (tcp-channel them us ?) #:meta-level 1) ;; read from remote client
(sub (advertise (tcp-channel them us ?)) #:meta-level 1) ;; monitor remote client
(pub (tcp-channel us them ?) #:meta-level 1) ;; we will write to remote client
(sub (inbound (tcp-channel them us ?))) ;; read from remote client
(sub (inbound (advertise (tcp-channel them us ?)))) ;; monitor remote client
(pub (inbound (tcp-channel us them ?))) ;; we will write to remote client
))))
(spawn-dataspace
(spawn-demand-matcher (advertise (tcp-channel (?!) (?! (tcp-listener 5999)) ?))
(observe (tcp-channel (?!) (?! (tcp-listener 5999)) ?))
#:meta-level 1
(spawn-demand-matcher (inbound (advertise (tcp-channel (?!) (?! (tcp-listener 5999)) ?)))
(inbound (observe (tcp-channel (?!) (?! (tcp-listener 5999)) ?)))
spawn-session))

View File

@ -1,5 +1,6 @@
#lang syndicate
(require syndicate/protocol/advertise)
(require/activate "../drivers/tcp.rkt")
(require "../demand-matcher.rkt")

View File

@ -3,6 +3,11 @@
(require (only-in racket/port read-line-evt))
(require/activate "../drivers/timer.rkt")
(define sub-all-except-meta
(patch-seq (sub ?)
(unsub (inbound ?))
(unsub (outbound ?))))
(define (quasi-spy e s)
(printf "----------------------------------------\n")
(printf "QUASI-SPY:\n")
@ -13,11 +18,11 @@
(newline)])
(printf "========================================\n")
#f)
(spawn quasi-spy (void) (sub ?))
(spawn quasi-spy (void) sub-all-except-meta)
(define (r e s)
(match e
[(message body) (transition s (message (at-meta `(print (got ,body)))))]
[(message body) (transition s (message (outbound `(print (got ,body)))))]
[_ #f]))
(define (b e n)
@ -27,20 +32,20 @@
#f)]
[_ #f]))
(spawn-dataspace (spawn r (void) (sub ?))
(spawn-dataspace (spawn r (void) sub-all-except-meta)
(spawn b 0 '()))
(define (echoer e s)
(match e
[(message (at-meta (external-event _ (list (? eof-object?)))))
[(message (inbound (external-event _ (list (? eof-object?)))))
(quit)]
[(message (at-meta (external-event _ (list line))))
[(message (inbound (external-event _ (list line))))
(transition s (message `(print (got-line ,line))))]
[_ #f]))
(spawn echoer
(void)
(sub (external-event (read-line-evt (current-input-port) 'any) ?) #:meta-level 1))
(sub (inbound (external-event (read-line-evt (current-input-port) 'any) ?))))
(define (ticker e s)
(match e

View File

@ -13,11 +13,11 @@
(spawn-dataspace
(spawn (lambda (e s)
(match e
[(message (at-meta 'die)) (quit)]
[(message (inbound 'die)) (quit)]
[_ #f]))
(void)
(patch-seq (sub 'die #:meta-level 1)
(sub (observe 'die) #:meta-level 1))))
(patch-seq (sub (inbound 'die))
(sub (inbound (observe 'die))))))
(spawn (lambda (e s)
(match e

View File

@ -6,8 +6,8 @@
(spawn (lambda (e u)
(match u
[0 (transition 1 '())]
[1 (transition 2 (retract 'a #:meta-level 1))]
[1 (transition 2 (retract (outbound 'a)))]
[_ #f]))
0
(patch-seq (assert 'a #:meta-level 1)
(assert (observe 'a) #:meta-level 1))))
(patch-seq (assert (outbound 'a))
(assert (observe (inbound 'a))))))

View File

@ -12,6 +12,15 @@
;;
;; The fix was to adjust the implementation of state change
;; notifications to cancel the echo for metaassertions.
;;
;; 20160730 I'm in the process of revising the design of dataspace
;; relaying to avoid this problem in a different way. Instead of just
;; having `at-meta` for both inbound and outbound assertions, there
;; are now two constructors, `inbound` and `outbound`, and the relay
;; function of the dataspace pays attention to each in a different
;; way. Now there cannot be (accidental) routing loops: asserting
;; something `outbound`, no matter how briefly, will only ever result
;; in a pulse of an `inbound` assertion.
(require syndicate/pretty)
@ -22,6 +31,6 @@
(printf "Received event ~a:\n~a\n" new-counter (syndicate-pretty-print->string e))
(transition (+ counter 1) '()))))
0
(list (patch-seq (assert (at-meta 'x))
(sub 'x #:meta-level 1))
(retract (at-meta 'x)))))
(list (patch-seq (assert (outbound 'x))
(sub (inbound 'x)))
(retract (outbound 'x)))))

View File

@ -1,5 +1,5 @@
#lang syndicate
;; The actor should receive a single event adding the (at-meta x) assertion.
;; The actor should receive a single event adding an (inbound 'x) assertion.
(require syndicate/pretty)
@ -10,5 +10,5 @@
(printf "Received event ~a:\n~a\n" new-counter (syndicate-pretty-print->string e))
(transition (+ counter 1) '()))))
0
(list (patch-seq (sub 'x #:meta-level 1)
(assert (at-meta 'x))))))
(list (patch-seq (sub (inbound 'x))
(assert (outbound 'x))))))

View File

@ -5,6 +5,11 @@
(require "../main.rkt")
(require "../drivers/timer.rkt")
(define sub-all-except-meta
(patch-seq (sub ?)
(unsub (inbound ?))
(unsub (outbound ?))))
(define (quasi-spy e s)
(printf "----------------------------------------\n")
(printf "QUASI-SPY:\n")
@ -18,7 +23,7 @@
(define (r e s)
(match e
[(message body) (transition s (message (at-meta `(print (got ,body)))))]
[(message body) (transition s (message (outbound `(print (got ,body)))))]
[_ #f]))
(define (b e n)
@ -30,9 +35,9 @@
(define (echoer e s)
(match e
[(message (at-meta (external-event _ (list (? eof-object?)))))
[(message (inbound (external-event _ (list (? eof-object?)))))
(quit)]
[(message (at-meta (external-event _ (list line))))
[(message (inbound (external-event _ (list line))))
(transition s (message `(print (got-line ,line))))]
[_ #f]))
@ -56,17 +61,16 @@
#f]
[_ #f]))
(run-ground (spawn quasi-spy (void) (sub ?))
(run-ground (spawn quasi-spy (void) sub-all-except-meta)
(spawn-timer-driver)
(message (set-timer 'tick 1000 'relative))
(spawn ticker
1
(patch-seq (sub (observe (set-timer ? ? ?)))
(sub (timer-expired 'tick ?))))
(spawn-dataspace (spawn r (void) (sub ?))
(spawn-dataspace (spawn r (void) sub-all-except-meta)
(spawn b 0 '()))
(spawn echoer
(void)
(sub (external-event (read-line-evt (current-input-port) 'any) ?)
#:meta-level 1))
(sub (inbound (external-event (read-line-evt (current-input-port) 'any) ?))))
(spawn printer (void) (sub `(print ,?))))

View File

@ -6,23 +6,24 @@
(define (spawn-command-listener)
(spawn (lambda (e s)
(match e
[(message (at-meta (at-meta (external-event _ (list #"quit")))))
[(message (inbound (inbound (external-event _ (list #"quit")))))
(printf "Quitting just the leaf actor.\n")
(quit)]
[(message (at-meta (at-meta (external-event _ (list #"quit-dataspace")))))
[(message (inbound (inbound (external-event _ (list #"quit-dataspace")))))
(printf "Terminating the whole dataspace.\n")
(transition s (quit-dataspace))]
[_ #f]))
(void)
(sub (external-event (read-bytes-line-evt (current-input-port) 'any) ?)
#:meta-level 2)))
(sub (inbound (inbound
(external-event (read-bytes-line-evt (current-input-port) 'any) ?))))))
(define (spawn-ticker)
(define (sub-to-alarm)
(sub (external-event (alarm-evt (+ (current-inexact-milliseconds) 1000)) ?) #:meta-level 2))
(sub (inbound (inbound
(external-event (alarm-evt (+ (current-inexact-milliseconds) 1000)) ?)))))
(spawn (lambda (e s)
(match e
[(message (at-meta (at-meta (external-event _ _))))
[(message (inbound (inbound (external-event _ _))))
(printf "Tick!\n")
(transition s
(list (retract ?)

View File

@ -1,6 +1,6 @@
#lang syndicate
;; Demonstrate wildcard assertions.
;; One actor asserts everything except at-meta assertions (which break
;; One actor asserts everything except inbound/outbound assertions (which break
;; the ground VM). It therefore *subscribes* to everything too.
(require syndicate/pretty)
@ -16,4 +16,7 @@
#f))
trie-empty
(patch-seq (assert ?)
(retract (at-meta ?))))
(retract (outbound ?))
(retract (observe (inbound ?)))
(retract (inbound ?)) ;; not actually required for the purposes of this demo
))

View File

@ -1,9 +1,9 @@
#lang syndicate
;; Demonstrate almost-wildcard assertions.
;; One actor subscribes to everything - and so initially sees itself.
;; The other advertises everything except subscriptions and at-meta assertions.
;; One actor subscribes to everything except inbound assertions - and so initially sees itself.
;; The other advertises everything except subscriptions and inbound/outbound assertions.
;; The first actor's aggregate view of the dataspace then includes everything
;; except at-meta assertions.
;; except inbound assertions.
(require syndicate/pretty)
@ -17,7 +17,8 @@
(transition (update-interests s e) '())
#f))
trie-empty
(sub ?))
(patch-seq (sub ?)
(unsub (inbound ?))))
(spawn (lambda (e s)
(printf "Asserter\n")
@ -27,4 +28,5 @@
(void)
(patch-seq (assert ?)
(retract (observe ?))
(retract (at-meta ?))))
(retract (outbound ?))
(retract (inbound ?))))

View File

@ -1,5 +1,6 @@
#lang syndicate
(require syndicate/protocol/advertise)
(require/activate "../drivers/tcp.rkt")
(require "../demand-matcher.rkt")

View File

@ -1,6 +1,6 @@
#lang syndicate
(require/activate "../drivers/udp.rkt")
(require/activate syndicate/drivers/udp)
(spawn (lambda (e s)
(match e

View File

@ -4,6 +4,7 @@
;; racket ws-echo-client.rkt wss://localhost:8084/
(require/activate syndicate/drivers/websocket)
(require syndicate/protocol/advertise)
(require racket/port)
(define url
@ -22,12 +23,11 @@
(actor (react (assert (advertise (websocket-message c s _)))
(on (asserted (websocket-peer-details c s $la _ $ra _))
(log-info "~a: local ~v :: remote ~v" c la ra))
(on (message (external-event e (list (? bytes? $bs))) #:meta-level 1)
(on (message (inbound (external-event e (list (? bytes? $bs)))))
(send! (websocket-message c s bs)))
(on (message (websocket-message s c $bs))
(printf "(From server: ~v)\n" bs))
(stop-when (message (external-event e (list (? eof-object? _)))
#:meta-level 1)
(stop-when (message (inbound (external-event e (list (? eof-object? _)))))
(printf "Local EOF. Terminating.\n"))
(stop-when (retracted (advertise (websocket-message s c _)))
(printf "Server disconnected.\n")))))

View File

@ -1,6 +1,7 @@
#lang syndicate/actor
;; Websocket server that echoes all it receives
(require syndicate/protocol/advertise)
(require/activate syndicate/drivers/websocket)
(define any-client any-websocket-remote-client)

View File

@ -1,5 +1,6 @@
#lang syndicate
(require syndicate/protocol/advertise)
(require/activate "../drivers/websocket.rkt")
(require "../demand-matcher.rkt")

View File

@ -1,5 +1,6 @@
#lang syndicate
(require syndicate/protocol/advertise)
(require/activate "../drivers/websocket.rkt")
(require "../demand-matcher.rkt")

View File

@ -6,10 +6,12 @@
(require racket/match)
(require racket/list)
(require "core.rkt")
(require "dataspace.rkt")
(require "hierarchy.rkt")
(require "trace.rkt")
(require "trace/stderr.rkt")
(require "tset.rkt")
(require "protocol/standard-relay.rkt")
(provide (struct-out external-event)
send-ground-message
@ -94,48 +96,61 @@
(define poll-handler
(handle-evt always-evt (lambda _ #f)))
;; Boolean Behavior State AssertionSet Natural -> Void
(define (await-interrupt inert? beh st interests background-activity-count)
;; (log-info "~a ~a GROUND INTERESTS:\n~a"
;; inert?
;; background-activity-count
;; (trie->pretty-string interests))
(if (and inert? (zero? background-activity-count) (trie-empty? interests))
(begin (log-info "run-ground: Terminating because inert")
(void))
(match (apply sync
(current-ground-event-async-channel)
(cond
[inert? never-evt]
[(zero? background-activity-count) idle-handler]
[else poll-handler])
(extract-active-events interests))
[(background-activity-signal delta)
;; (log-info "background-activity-count ~v" (+ background-activity-count delta))
(await-interrupt inert? beh st interests (+ background-activity-count delta))]
[e
(inject-event e beh st interests background-activity-count)])))
;; Event Behavior State AssertionSet Natural -> Void
(define (inject-event e beh st interests background-activity-count)
(trace-process-step e #f beh st)
(define resulting-transition (clean-transition (beh e st)))
(trace-process-step-result e #f beh st #f resulting-transition)
(process-transition resulting-transition beh st interests background-activity-count))
;; Transition Behavior State AssertionSet Natural -> Void
(define (process-transition resulting-transition beh st interests background-activity-count)
(match resulting-transition
[#f ;; inert
(await-interrupt #t beh st interests background-activity-count)]
[(<quit> _ _)
(log-info "run-ground: Terminating by request")
(void)]
[(transition st actions)
(let process-actions ((actions actions) (interests interests))
(match actions
['() (await-interrupt #f beh st interests background-activity-count)]
[(cons a actions)
(match a
[(? patch? p)
(process-actions actions (apply-patch interests (label-patch p (datum-tset 'root))))]
[_
(log-warning "run-ground: ignoring useless meta-action ~v" a)
(process-actions actions interests)])]))]))
;; Action* -> Void
;; Runs a ground VM, booting the outermost Dataspace with the given Actions.
(define (run-ground . boot-actions)
(let await-interrupt ((inert? #f)
(w (make-dataspace boot-actions))
(interests trie-empty)
(background-activity-count 0))
;; (log-info "~a ~a GROUND INTERESTS:\n~a"
;; inert?
;; background-activity-count
;; (trie->pretty-string interests))
(if (and inert? (zero? background-activity-count) (trie-empty? interests))
(begin (log-info "run-ground: Terminating because inert")
(void))
(match (apply sync
(current-ground-event-async-channel)
(cond
[inert? never-evt]
[(zero? background-activity-count) idle-handler]
[else poll-handler])
(extract-active-events interests))
[(background-activity-signal delta)
;; (log-info "background-activity-count ~v" (+ background-activity-count delta))
(await-interrupt inert? w interests (+ background-activity-count delta))]
[e
(trace-process-step e #f dataspace-handle-event w)
(define resulting-transition (clean-transition (dataspace-handle-event e w)))
(trace-process-step-result e #f dataspace-handle-event w #f resulting-transition)
(match resulting-transition
[#f ;; inert
(await-interrupt #t w interests background-activity-count)]
[(<quit> _ _)
(log-info "run-ground: Terminating by request")
(void)]
[(transition w actions)
(let process-actions ((actions actions) (interests interests))
(match actions
['() (await-interrupt #f w interests background-activity-count)]
[(cons a actions)
(match a
[(? patch? p)
(process-actions actions (apply-patch interests (label-patch p (datum-tset 'root))))]
[_
(log-warning "run-ground: ignoring useless meta-action ~v" a)
(process-actions actions interests)])]))])]))))
(run-ground* (spawn-dataspace #:name 'ground boot-actions)))
;; Spawn -> Void
(define (run-ground* s)
(match-define (list beh t _name) ((spawn-boot s)))
(process-transition t beh 'undefined-initial-ground-state trie-empty 0))

View File

@ -1,9 +1,13 @@
#lang racket/base
(require "core.rkt")
(require "dataspace.rkt")
(require "protocol/standard-relay.rkt")
(require "ground.rkt")
(require "comprehensions.rkt")
(provide (all-from-out "core.rkt")
(all-from-out "dataspace.rkt")
(all-from-out "protocol/standard-relay.rkt")
(all-from-out "comprehensions.rkt")
(all-from-out "ground.rkt"))

View File

@ -7,7 +7,6 @@
assertion
subscription
advertisement
assertion-set-union
assertion-set-union*
@ -46,14 +45,11 @@
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(define (assertion pattern #:meta-level [level 0])
(pattern->trie '<assertion> (prepend-at-meta pattern level)))
(define (assertion pattern)
(pattern->trie '<assertion> pattern))
(define (subscription pattern #:meta-level [level 0])
(observe-at-meta pattern level))
(define (advertisement pattern #:meta-level [level 0])
(assertion (advertise pattern) #:meta-level level))
(define (subscription pattern)
(pattern->trie '<subscription> (observe pattern)))
(define (assertion-set-union . tries)
(assertion-set-union* tries))
@ -154,4 +150,4 @@
(match (b e)
[#f #f]
[(? quit? q) q]
[actions (transition state actions)]))
[actions (transition state actions)]))

View File

@ -2,8 +2,6 @@
;; State Change Notifications, and assorted protocol constructors
(provide (struct-out scn)
lift-scn
drop-scn
strip-scn
label-scn)
@ -23,12 +21,6 @@
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(define (lift-scn s)
(scn (pattern->trie '<lift-scn> (at-meta (embedded-trie (scn-trie s))))))
(define (drop-scn s)
(scn (drop-interests (scn-trie s))))
(define (strip-scn s)
(scn (strip-interests (scn-trie s))))

View File

@ -68,44 +68,27 @@
delta
delta-aggregate))
(define at-meta-everything (pattern->trie '<at-meta-everything> (at-meta ?)))
(define (echo-cancelled-trie t)
(trie-subtract t
at-meta-everything
#:combiner (lambda (v1 v2)
(if (tset-member? v1 'meta)
(trie-success only-meta-tset)
trie-empty))))
(define (compute-patches old-m new-m label delta delta-aggregate)
(define delta-aggregate/no-echo
(if (meta-label? label)
delta
(patch-without-at-meta delta-aggregate)))
(define old-routing-table (mux-routing-table old-m))
(define new-routing-table (mux-routing-table new-m))
(define affected-pids
(let ((pids (compute-affected-pids old-routing-table delta-aggregate/no-echo)))
(tset-remove (tset-add pids label) 'meta))) ;; TODO: removing meta is weird
(tset-remove (tset-add (compute-affected-pids old-routing-table delta) label) 'meta))
(define (entry-for pid)
(cond [(equal? pid label)
(define feedback
(patch-union
(patch (biased-intersection new-routing-table (patch-added delta))
(biased-intersection old-routing-table (patch-removed delta)))
(patch (biased-intersection (patch-added delta-aggregate)
(mux-interests-of new-m label))
(biased-intersection (patch-removed delta-aggregate)
(mux-interests-of old-m label)))))
(cons label feedback)]
[else
(cons pid (view-patch delta-aggregate (mux-interests-of old-m pid)))]))
(values (for/list [(pid (tset->list affected-pids))]
(cond [(equal? pid label)
(define feedback
(patch-union
(patch (echo-cancelled-trie
(biased-intersection new-routing-table (patch-added delta)))
(echo-cancelled-trie
(biased-intersection old-routing-table (patch-removed delta))))
(patch (biased-intersection (patch-added delta-aggregate/no-echo)
(mux-interests-of new-m label))
(biased-intersection (patch-removed delta-aggregate/no-echo)
(mux-interests-of old-m label)))))
(cons label feedback)]
[else
(cons pid (view-patch delta-aggregate/no-echo (mux-interests-of old-m pid)))]))
(and (not (meta-label? label))
(drop-patch
(compute-aggregate-patch delta label old-routing-table #:remove-meta? #t)))))
(entry-for pid))
(cdr (entry-for 'meta))))
(define (compute-affected-pids routing-table delta)
(define cover (trie-union (patch-added delta) (patch-removed delta)))
@ -117,11 +100,13 @@
(define (mux-route-message m body)
(if (trie-lookup (mux-routing-table m) body #f #:wildcard-union (lambda (a b) (or a b)))
;; some other stream has declared body
'()
(tset->list (trie-lookup (mux-routing-table m)
(values '() #f)
(let ((pids (trie-lookup (mux-routing-table m)
(observe body)
datum-tset-empty
#:wildcard-union tset-union))))
#:wildcard-union tset-union)))
(values (tset->list (tset-remove pids 'meta))
(tset-member? pids 'meta)))))
(define (mux-interests-of m label)
(hash-ref (mux-interest-table m) label trie-empty))

View File

@ -3,18 +3,12 @@
(provide (struct-out patch)
(struct-out observe)
(struct-out at-meta)
(struct-out advertise)
observe-parenthesis
at-meta-parenthesis
patch-empty
patch-empty?
patch-non-empty?
patch/added?
patch/removed?
lift-patch
drop-interests
drop-patch
strip-interests
label-interests
strip-patch
@ -22,10 +16,8 @@
limit-patch
limit-patch/routing-table
patch-pruned-by
patch-without-at-meta
patch-step
patch-step*
only-meta-tset
compute-aggregate-patch
apply-patch
update-interests
@ -64,19 +56,14 @@
[(define (syndicate-pretty-print d [p (current-output-port)])
(pretty-print-patch d p))])
;; Claims, Interests, Locations, and Advertisements
;; Interests
(struct observe (claim) #:prefab)
(struct at-meta (claim) #:prefab)
(struct advertise (claim) #:prefab)
(define patch-empty (patch trie-empty trie-empty))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(define observe-parenthesis (open-parenthesis 1 struct:observe))
(define at-meta-parenthesis (open-parenthesis 1 struct:at-meta))
(define at-meta-everything (pattern->trie #t (at-meta ?)))
(define (patch-empty? p)
(and (patch? p)
@ -91,19 +78,6 @@
(define (patch/added? p) (and (patch? p) (trie-non-empty? (patch-added p))))
(define (patch/removed? p) (and (patch? p) (trie-non-empty? (patch-removed p))))
(define (lift-patch p)
(match-define (patch in out) p)
(patch (pattern->trie '<lift-patch> (at-meta (embedded-trie in)))
(pattern->trie '<lift-patch> (at-meta (embedded-trie out)))))
(define (drop-interests pi)
(trie-step pi at-meta-parenthesis))
(define (drop-patch p)
(match-define (patch in out) p)
(patch (drop-interests in)
(drop-interests out)))
(define (strip-interests g)
(trie-relabel g (lambda (v) '<strip-interests>)))
@ -148,10 +122,6 @@
(patch (trie-subtract #:combiner (lambda (v1 v2) trie-empty) added t)
(trie-subtract #:combiner (lambda (v1 v2) trie-empty) removed t)))
;; Removes at-meta assertions from the given patch.
(define (patch-without-at-meta p)
(patch-pruned-by p at-meta-everything))
;; Steps both added and removes sets
(define (patch-step p key)
(match-define (patch added removed) p)
@ -161,8 +131,6 @@
(define (patch-step* p keys)
(foldl (lambda (key p) (patch-step p key)) p keys))
(define only-meta-tset (datum-tset 'meta))
;; Entries labelled with `label` may already exist in `base`; the
;; patch `p` MUST already have been limited to add only where no
;; `label`-labelled portions of `base` exist, and to remove only where
@ -180,13 +148,9 @@
;; from `label`'s own interests, but where interest remains from other
;; peers, the overall effect will be nil.
;;
;; If `remove-meta?` is true, then in addition to ignoring existing
;; `label` interests, we also ignore existing `'meta`-labelled
;; interests. This is used when computing an outbound/dropped patch.
;;
;; PRECONDITION: `p` is (set label)-labelled
;; PRECONDITION: `base` is (set ...)-labelled
(define (compute-aggregate-patch p label base #:remove-meta? [remove-meta? #f])
(define (compute-aggregate-patch p label base)
(define (add-combiner v1 v2)
;; Keep only points where `p` would add, where no `label` interest
;; is present*, and where no non-`label` interest is present. That
@ -196,36 +160,17 @@
;; furthermore, we know that a previous patch-limiting operation
;; has established that no `label` interest is present at these
;; points), we can always discard such points by returning a
;; constant #f.
;;
;; ...except when `remove-meta?` is true. In that case, we need to
;; keep the point in the case that the only interest present is
;; `'meta`-labeled interest.
(if (and remove-meta? (eq? v2 only-meta-tset)) ;; N.B. relies on canonicity of v2 !
(trie-success v1)
trie-empty))
;; constant trie-empty.
trie-empty)
(define (rem-combiner v1 v2)
;; Keep only points where `p` would remove, where `label` interest
;; is present, and where no non-`label` interest is present. We
;; know that a previous patch-limiting operation has ensured that
;; `label` interest is present, so we only need to check whether
;; any other interest exists at each point.
;;
;; ...and again, for `remove-meta?`, the condition is slightly
;; different. We need to keep the point in that case when either
;; only label interest exists (which by precondition is always the
;; case), or when exactly `label` and `'meta` interest exists, and
;; in no other case.
(if (= (tset-count v2) 1)
(trie-success v1) ;; only `label` interest (previously established) exists here.
(if (and remove-meta?
(= (tset-count v2) 2)
(tset-member? v2 'meta))
(trie-success v1)
;; ^ remove-meta? is true, and exactly `label` and `'meta` interest exists here.
trie-empty
;; ^ other interest exists here, so we should discard this removed-point.
)))
trie-empty))
(patch (trie-subtract (patch-added p) base #:combiner add-combiner)
(trie-subtract (patch-removed p) base #:combiner rem-combiner)))
@ -338,8 +283,6 @@
(define mab (trie-union ma mb))
(define mbc (trie-union mb mc))
(define m* (pattern->trie SP ?))
(define mA (pattern->trie SP (at-meta 'a)))
(define mAb (trie-union mA mb))
(printf "\nmab:\n")
(void (pretty-print-trie mab))
@ -389,12 +332,6 @@
(printf "\ncompute-aggregate-patch m0/m* P m*:\n")
(void (pretty-print-patch (compute-aggregate-patch (patch m0 m*) 'P m*)))
(printf "\nlift mc/mab:\n")
(void (pretty-print-patch (lift-patch (patch mc mab))))
(printf "\ndrop after lift mc/mab:\n")
(void (pretty-print-patch (drop-patch (lift-patch (patch mc mab)))))
(printf "\ncompose mbc/m0 after mc/mab:\n")
(void (pretty-print-patch (compose-patch (patch mbc m0) (patch mc mab))))
@ -407,20 +344,9 @@
(printf "\ncompose mbc/m0 after mc/m* (not disjoint):\n")
(void (pretty-print-patch (compose-patch (patch mbc m0) (patch mc m*))))
(printf "\ncompose mbc/m0 after lift mc/mab:\n")
(void (pretty-print-patch (compose-patch (patch mbc m0)
(lift-patch (patch mc mab)))))
(printf "\ndrop (compose mbc/m0 after lift mc/mab):\n")
(void (pretty-print-patch (drop-patch (compose-patch (patch mbc m0)
(lift-patch (patch mc mab))))))
(printf "\nstripped compose mc/m* (not disjoint) after mbc/m0:\n")
(void (pretty-print-patch (compose-patch (strip-patch (patch mc m*))
(strip-patch (patch mbc m0)))))
(printf "\ndrop mAb/m0:\n")
(void (pretty-print-patch (drop-patch (patch mAb m0))))
)
;; (sanity-check-examples)
@ -503,22 +429,6 @@
(check-equal? (compute-aggregate-patch p- 'a R5) p0)
(check-equal? (compute-aggregate-patch p- 'a R6) p0)
(check-equal? (compute-aggregate-patch p- 'a R7) p0)
(check-equal? (compute-aggregate-patch p0 'a R0 #:remove-meta? #t) p0)
(check-equal? (compute-aggregate-patch p0 'a R1 #:remove-meta? #t) p0)
(check-equal? (compute-aggregate-patch p0 'a R2 #:remove-meta? #t) p0)
(check-equal? (compute-aggregate-patch p0 'a R3 #:remove-meta? #t) p0)
(check-equal? (compute-aggregate-patch p0 'a R4 #:remove-meta? #t) p0)
(check-equal? (compute-aggregate-patch p0 'a R5 #:remove-meta? #t) p0)
(check-equal? (compute-aggregate-patch p0 'a R6 #:remove-meta? #t) p0)
(check-equal? (compute-aggregate-patch p0 'a R7 #:remove-meta? #t) p0)
(check-equal? (compute-aggregate-patch p+ 'a R0 #:remove-meta? #t) p+)
(check-equal? (compute-aggregate-patch p+ 'a R1 #:remove-meta? #t) p+)
(check-equal? (compute-aggregate-patch p+ 'a R2 #:remove-meta? #t) p0)
(check-equal? (compute-aggregate-patch p+ 'a R3 #:remove-meta? #t) p0)
(check-equal? (compute-aggregate-patch p- 'a R4 #:remove-meta? #t) p-)
(check-equal? (compute-aggregate-patch p- 'a R5 #:remove-meta? #t) p-)
(check-equal? (compute-aggregate-patch p- 'a R6 #:remove-meta? #t) p0)
(check-equal? (compute-aggregate-patch p- 'a R7 #:remove-meta? #t) p0)
)
(let ((m1 (set->trie '<m1> (set 1 2)))

View File

@ -0,0 +1,27 @@
#lang racket/base
;; Advertisement assertions.
(provide (struct-out advertise)
advertisement
pub
unpub)
(require racket/match)
(require "../core.rkt")
(require "../trie.rkt")
(require "standard-relay.rkt")
(struct advertise (claim) #:prefab)
(define (drop-advertisement a)
(match a
[(advertise (inbound x)) (outbound (advertise x))]
[_ a]))
;; Monolithic SCN
(define (advertisement pattern)
(pattern->trie '<advertisement> (drop-advertisement (advertise pattern))))
;; Incremental SCNs
(define (pub pattern) (assert (drop-advertisement (advertise pattern))))
(define (unpub pattern) (retract (drop-advertisement (advertise pattern))))

View File

@ -0,0 +1,28 @@
#lang racket/base
;; Relaying of inbound and outbound assertions between adjacent dataspaces.
(provide (struct-out inbound)
(struct-out outbound)
inbound*
outbound*
spawn-standard-relay)
(require "../trie.rkt")
(require "../relay.rkt")
(struct inbound (assertion) #:prefab)
(struct outbound (assertion) #:prefab)
(define (inbound* n x) (if (zero? n) x (inbound (inbound* (- n 1) x))))
(define (outbound* n x) (if (zero? n) x (outbound (outbound* (- n 1) x))))
(define inbound-parenthesis (open-parenthesis 1 struct:inbound))
(define outbound-parenthesis (open-parenthesis 1 struct:outbound))
(define (spawn-standard-relay inner-spawn)
(spawn-relay outbound?
outbound-assertion
outbound-parenthesis
inbound
inbound-parenthesis
inner-spawn))

110
racket/syndicate/relay.rkt Normal file
View File

@ -0,0 +1,110 @@
#lang racket/base
;; Rewrite assertions at a boundary between a containing dataspace and
;; a contained actor (or dataspace).
(provide (struct-out relay)
spawn-relay
relay-handle-event
pretty-print-relay)
(require racket/match)
(require (only-in racket/list filter-map))
(require "core.rkt")
(require "trie.rkt")
(require "pretty.rkt")
(require "hierarchy.rkt")
(struct relay (outbound? ;; Assertion -> Boolean
outbound-assertion ;; Assertion -> Assertion
outbound-parenthesis ;; OpenParenthesis/1
inbound-constructor ;; Assertion -> Assertion
inbound-parenthesis ;; OpenParenthesis/1
inner-behavior ;; Behavior
inner-state ;; Any
)
#:transparent
#:methods gen:syndicate-pretty-printable
[(define (syndicate-pretty-print r [p (current-output-port)])
(pretty-print-relay r p))])
(define (relay-lift-event e r)
(match e
[#f #f]
[(? targeted-event?) e]
[(message c) (message ((relay-inbound-constructor r) c))]
[(patch a d) (patch (trie-prepend (relay-inbound-parenthesis r) a)
(trie-prepend (relay-inbound-parenthesis r) d))]))
(define (relay-drop-interests t r)
(define interesting-inbound-assertions
(trie-step (trie-step t observe-parenthesis) (relay-inbound-parenthesis r)))
(define ordinary-outbound-assertions
(trie-step t (relay-outbound-parenthesis r)))
(define additional-outbound-assertions-of-interest
(trie-prepend observe-parenthesis interesting-inbound-assertions))
(trie-union ordinary-outbound-assertions
additional-outbound-assertions-of-interest))
(define (relay-drop-action ac r)
(match ac
[(message c)
(and ((relay-outbound? r) c)
(message ((relay-outbound-assertion r) c)))]
[(patch a d)
(define p (patch (relay-drop-interests a r) (relay-drop-interests d r)))
(and (patch-non-empty? p) p)]
[_
;; TODO: What should be done about spawn? Anything?
;; TODO: How about quit-dataspace? Could this be a better place for it than core.rkt?
(error 'relay-drop-action "Cannot drop action ~v" ac)]))
(define (relay-drop-actions acs r)
(filter-map (lambda (ac) (relay-drop-action ac r)) (clean-actions acs)))
(define (relay-transition t r)
(match t
[(<quit> exn actions)
(<quit> exn (relay-drop-actions actions r))]
[(transition st actions)
(transition (struct-copy relay r [inner-state st]) (relay-drop-actions actions r))]
[(or #f (? void?))
t]))
(define (relay-handle-event e r)
(relay-transition ((relay-inner-behavior r) (relay-lift-event e r) (relay-inner-state r)) r))
(define ((inject-relay-subscription r) initial-inner-state)
(define initial-patch
(patch-seq (patch (trie-prepend observe-parenthesis
(trie-prepend (relay-outbound-parenthesis r)
(pattern->trie '<relay> ?)))
trie-empty)
(sub (observe ((relay-inbound-constructor r) ?)))))
((relay-inner-behavior r) initial-patch initial-inner-state))
(define (spawn-relay outbound?
outbound-assertion
outbound-parenthesis
inbound-constructor
inbound-parenthesis
inner-spawn)
(<spawn> (lambda ()
(match-define (list inner-behavior initial-transition name) ((spawn-boot inner-spawn)))
(define initial-relay-state (relay outbound?
outbound-assertion
outbound-parenthesis
inbound-constructor
inbound-parenthesis
inner-behavior
'uninitialized:initial-inner-state))
(list relay-handle-event
(relay-transition (transition-bind (inject-relay-subscription initial-relay-state)
initial-transition)
initial-relay-state)
name))))
(define (pretty-print-relay r p)
(fprintf p "RELAY ~a/~a\n"
(open-parenthesis-type (relay-outbound-parenthesis r))
(open-parenthesis-type (relay-inbound-parenthesis r)))
(syndicate-pretty-print (relay-inner-state r) p))

View File

@ -8,6 +8,7 @@
(require racket/exn)
(require (only-in racket/string string-join))
(require "../core.rkt")
(require "../dataspace.rkt")
(require "../hierarchy.rkt")
(require "../trace.rkt")
(require "../mux.rkt")