From dd2cddb6a75a85d6f5873142762d2ded2bbcd65d Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sun, 29 Apr 2018 14:54:14 +0100 Subject: [PATCH] Nested dataspaces --- syndicate/dataspace.rkt | 50 +++++++--- syndicate/examples/simple-cross-layer.rkt | 12 +++ syndicate/main.rkt | 4 +- syndicate/relay.rkt | 102 +++++++++++++++++++++ syndicate/skeleton.rkt | 57 ++++++++---- syndicate/syntax.rkt | 24 +++-- syndicate/term.rkt | 98 ++++++++++++++++++++ syndicate/test-implementation.rkt | 6 +- syndicate/test/core/simple-cross-layer.rkt | 18 ++++ 9 files changed, 328 insertions(+), 43 deletions(-) create mode 100644 syndicate/examples/simple-cross-layer.rkt create mode 100644 syndicate/relay.rkt create mode 100644 syndicate/term.rkt create mode 100644 syndicate/test/core/simple-cross-layer.rkt diff --git a/syndicate/dataspace.rkt b/syndicate/dataspace.rkt index fd0169c..853c561 100644 --- a/syndicate/dataspace.rkt +++ b/syndicate/dataspace.rkt @@ -1,6 +1,7 @@ #lang racket/base (provide make-dataspace ;; TODO: how to cleanly provide this? + with-current-facet ;; TODO: shouldn't be provided run-scripts! ;; TODO: how to cleanly provide this? message-struct @@ -38,6 +39,7 @@ stop-facet! add-stop-script! ;; TODO: shouldn't be provided - inline syntax.rkt?? add-endpoint! + remove-endpoint! terminate-facet! ;; TODO: shouldn't be provided - inline syntax.rkt?? schedule-script! ;; TODO: shouldn't be provided - inline syntax.rkt?? push-script! ;; TODO: shouldn't be provided - inline syntax.rkt?? @@ -45,6 +47,7 @@ spawn! ;; TODO: should this be provided? enqueue-send! ;; TODO: should this be provided? + enqueue-deferred-turn! ;; TODO: should this be provided? adhoc-retract! ;; TODO: should this be provided? adhoc-assert! ;; TODO: should this be provided? actor-adhoc-assertions ;; TODO: should this be provided? @@ -78,11 +81,13 @@ ;; - `(patch (MutableDeltaof Assertion))` ;; - `(message Assertion)` ;; - `(spawn Any BootProc (Set Assertion))` -;; - `(quit)`. +;; - `(quit)` +;; - `(deferred-turn (-> Any))` (struct patch (changes) #:prefab) (struct message (body) #:prefab) (struct spawn (name boot-proc initial-assertions) #:prefab) (struct quit () #:prefab) +(struct deferred-turn (continuation) #:prefab) (struct dataspace ([next-id #:mutable] ;; Nat routing-table ;; Skeleton @@ -342,7 +347,7 @@ (for [(group (in-list groups))] (match-define (action-group ac actions) group) (for [(action (in-list actions))] - ;; (log-info "~a performing ~a" ac action) + ;; (log-info "~a in ~a performing ~a" ac (eq-hash-code ds) action) (match action [(patch delta) (apply-patch! ds ac delta)] @@ -351,7 +356,9 @@ [(spawn name boot-proc initial-assertions) (add-actor! ds name boot-proc initial-assertions)] [(quit) - (apply-patch! ds ac (actor-cleanup-changes ac))])))) + (apply-patch! ds ac (actor-cleanup-changes ac))] + [(deferred-turn k) + (push-script! ac k)])))) (define (apply-patch! ds ac delta) (define ds-assertions (dataspace-assertions ds)) @@ -422,10 +429,7 @@ (define ds (actor-dataspace ac)) (push-script! ac (lambda () (for [((eid ep) (in-hash (facet-endpoints f)))] - (dataflow-forget-subject! (dataspace-dataflow ds) (list f eid)) - (retract! ac (endpoint-assertion ep)) - (define h (endpoint-handler ep)) - (when h (dataspace-unsubscribe! ds h)))))) + (destroy-endpoint! ds ac f ep))))) (define (abandon-queued-work! ac) (set-actor-pending-actions! ac (make-queue)) @@ -496,7 +500,23 @@ (define ep (endpoint eid assertion assertion-fn handler)) (assert! (facet-actor f) assertion) (when handler (dataspace-subscribe! ds handler)) - (hash-set! (facet-endpoints f) eid ep)) + (hash-set! (facet-endpoints f) eid ep) + eid) + +(define (remove-endpoint! f eid) + (define eps (facet-endpoints f)) + (define ep (hash-ref eps eid #f)) + (when ep + (define ac (facet-actor f)) + (define ds (actor-dataspace ac)) + (hash-remove! eps eid) + (destroy-endpoint! ds ac f ep))) + +(define (destroy-endpoint! ds ac f ep) + (match-define (endpoint eid assertion _assertion-fn handler) ep) + (dataflow-forget-subject! (dataspace-dataflow ds) (list f eid)) + (retract! ac assertion) + (when handler (dataspace-unsubscribe! ds handler))) (define (enqueue-action! ac action) (set-actor-pending-actions! ac (enqueue (actor-pending-actions ac) action))) @@ -546,11 +566,12 @@ (error who "Attempt to perform action outside script; are you missing an (on ...)?"))) (define (enqueue-send! ac body) - (ensure-in-script! 'enqueue-send!) (enqueue-action! ac (message body))) +(define (enqueue-deferred-turn! ac k) + (enqueue-action! ac (deferred-turn (capture-facet-context k)))) + (define (spawn! ac name boot-proc initial-assertions) - (ensure-in-script! 'spawn!) (enqueue-action! ac (spawn name boot-proc initial-assertions))) ;;--------------------------------------------------------------------------- @@ -631,7 +652,8 @@ (current-actor) (lambda () (log-info "box: taking on new-value ~v" new-value) - (current-value new-value))))))))) + (current-value new-value)))))) + #f))) (set)) (spawn! (current-actor) @@ -650,7 +672,8 @@ (stop-facet! (current-facet) (lambda () - (log-info "client: box has gone")))))))) + (log-info "client: box has gone")))))) + #f)) (add-endpoint! (current-facet) 'on-asserted-box-state (lambda () (observe (box-state (capture (discard))))) @@ -666,7 +689,8 @@ (lambda () (log-info "client: learned that box's value is now ~v" v) (enqueue-send! (current-actor) - (set-box (+ v 1))))))))))) + (set-box (+ v 1)))))))) + #f))) (set))))))) (require racket/pretty) diff --git a/syndicate/examples/simple-cross-layer.rkt b/syndicate/examples/simple-cross-layer.rkt new file mode 100644 index 0000000..c8f10ff --- /dev/null +++ b/syndicate/examples/simple-cross-layer.rkt @@ -0,0 +1,12 @@ +#lang imperative-syndicate + +(assertion-struct greeting (text)) + +(spawn #:name "A" (assert (greeting "Hi from outer space!"))) +(spawn #:name "B" (on (asserted (greeting $t)) + (printf "Outer dataspace: ~a\n" t))) + +(dataspace #:name "C" + (spawn #:name "D" (assert (outbound (greeting "Hi from inner!")))) + (spawn #:name "E" (on (asserted (inbound (greeting $t))) + (printf "Inner dataspace: ~a\n" t)))) diff --git a/syndicate/main.rkt b/syndicate/main.rkt index 24a5b68..0f02183 100644 --- a/syndicate/main.rkt +++ b/syndicate/main.rkt @@ -2,10 +2,12 @@ (provide (all-from-out "dataspace.rkt") (all-from-out "syntax.rkt") - (all-from-out "ground.rkt")) + (all-from-out "ground.rkt") + (all-from-out "relay.rkt")) (module reader syntax/module-reader imperative-syndicate/lang) (require "dataspace.rkt") (require "syntax.rkt") (require "ground.rkt") +(require "relay.rkt") diff --git a/syndicate/relay.rkt b/syndicate/relay.rkt new file mode 100644 index 0000000..054141f --- /dev/null +++ b/syndicate/relay.rkt @@ -0,0 +1,102 @@ +#lang racket/base +;; Cross-layer relaying between adjacent dataspaces +;; TODO: protocol for shutdown of a dataspace +;; TODO: protocol for *clean* shutdown of a dataspace + +(provide (struct-out inbound) + (struct-out outbound) + dataspace) + +(require racket/match) +(require racket/set) +(require "dataspace.rkt") +(require "syntax.rkt") +(require "skeleton.rkt") +(require "term.rkt") +(require "bag.rkt") + +(require (for-syntax racket/base)) + +(struct inbound (assertion) #:prefab) +(struct outbound (assertion) #:prefab) + +(define-syntax (dataspace stx) + (syntax-case stx () + [(_ form ...) + (syntax/loc stx + (spawn (define outer-actor (current-actor)) + (define outer-facet (current-facet)) + (define inner-ds (make-dataspace + (lambda () + (schedule-script! + (current-actor) + (lambda () + (spawn #:name 'dataspace-relay + (boot-relay outer-actor + outer-facet)) + (spawn* form ...)))))) + (on-start (schedule-turn! inner-ds))))])) + +(define (schedule-turn! inner-ds) + (defer-turn! (lambda () + (when (run-scripts! inner-ds) + (schedule-turn! inner-ds))))) + +(define (boot-relay outer-actor outer-facet) + (define inbound-endpoints (make-hash)) + (define outbound-endpoints (make-hash)) + + (define inner-actor (current-actor)) + (define inner-facet (current-facet)) + + (on (asserted (observe (inbound $x))) + (with-current-facet [outer-actor outer-facet #f] + (define i (skeleton-interest + (term->skeleton x) + (term->skeleton-proj x) + (term->key x) + (term->capture-proj x) + (lambda (op . captured-values) + (define term (inbound (instantiate-term->value x captured-values))) + (push-script! inner-actor + (lambda () + (match op + ['+ (adhoc-assert! inner-actor term)] + ['- (adhoc-retract! inner-actor term)] + ['! (enqueue-send! inner-actor term)])))) + (lambda (cache) + (push-script! inner-actor + (lambda () + (for [(captured-values (in-bag cache))] + (define term + (inbound (instantiate-term->value x captured-values))) + (adhoc-retract! inner-actor term))))))) + (hash-set! inbound-endpoints + x + (add-endpoint! outer-facet + "dataspace-relay (observe (inbound ...))" + (lambda () (observe x)) + i)))) + + (on (retracted (observe (inbound $x))) + (with-current-facet [outer-actor outer-facet #f] + (remove-endpoint! outer-facet (hash-ref inbound-endpoints x)) + (hash-remove! inbound-endpoints x))) + + (on (asserted (outbound $x)) + (with-current-facet [outer-actor outer-facet #f] + (hash-set! outbound-endpoints + x + (add-endpoint! outer-facet + "dataspace-relay (outbound ...)" + (lambda () x) + #f)))) + + (on (retracted (outbound $x)) + (with-current-facet [outer-actor outer-facet #f] + (remove-endpoint! outer-facet (hash-ref outbound-endpoints x)) + (hash-remove! outbound-endpoints x))) + + (on (message (outbound $x)) + (with-current-facet [outer-actor outer-facet #f] + (send! x)))) diff --git a/syndicate/skeleton.rkt b/syndicate/skeleton.rkt index 62ce116..c130c4f 100644 --- a/syndicate/skeleton.rkt +++ b/syndicate/skeleton.rkt @@ -84,14 +84,25 @@ ;; A `SkInterest` is a specification for an addition to or removal ;; from an existing `Skeleton`. ;; -;; SkInterest = (skeleton-interest SkDesc SkProj SkKey SkProj (... -> Any)) +;; SkInterest = (skeleton-interest SkDesc +;; SkProj +;; SkKey +;; SkProj +;; (... -> Any) +;; (Option ((MutableBag SkKey) -> Any))) ;; ;; The `SkDesc` gives the silhouette. The first `SkProj` is the ;; constant-portion selector, to be matched against the `SkKey`. The ;; second `SkProj` is used on matching assertions to extract the ;; variable portions, to be passed to the handler function. ;; -(struct skeleton-interest (desc const-selector const-value var-selector handler) #:transparent) +(struct skeleton-interest (desc + const-selector + const-value + var-selector + handler + cleanup + ) #:transparent) ;;--------------------------------------------------------------------------- @@ -103,8 +114,8 @@ (define (make-empty-skeleton) (make-empty-skeleton/cache (mutable-set))) -(define (skcont-add! c i apply-handler!) - (match-define (skeleton-interest _desc cs cv vs h) i) +(define (skcont-add! c i) + (match-define (skeleton-interest _desc cs cv vs h _cleanup) i) (define (make-matched-constant) (skeleton-matched-constant (for/mutable-set [(a (skeleton-continuation-cache c)) #:when (equal? (apply-projection a cs) cv)] @@ -120,16 +131,18 @@ (skeleton-accumulator cache (mutable-seteq))) (define acc (hash-ref! (skeleton-matched-constant-table sc) vs make-accumulator)) (set-add! (skeleton-accumulator-handlers acc) h) - (for [(vars (in-bag (skeleton-accumulator-cache acc)))] (apply-handler! h vars))) + (for [(vars (in-bag (skeleton-accumulator-cache acc)))] (apply h '+ vars))) (define (skcont-remove! c i) - (match-define (skeleton-interest _desc cs cv vs h) i) + (match-define (skeleton-interest _desc cs cv vs h cleanup) i) (define cvt (hash-ref (skeleton-continuation-table c) cs #f)) (when cvt (define sc (hash-ref cvt cv #f)) (when sc (define acc (hash-ref (skeleton-matched-constant-table sc) vs #f)) (when acc + (when (and cleanup (set-member? (skeleton-accumulator-handlers acc) h)) + (cleanup (skeleton-accumulator-cache acc))) (set-remove! (skeleton-accumulator-handlers acc) h) (when (set-empty? (skeleton-accumulator-handlers acc)) (hash-remove! (skeleton-matched-constant-table sc) vs))) @@ -185,9 +198,7 @@ (define (add-interest! sk i) (let ((sk (extend-skeleton! sk (skeleton-interest-desc i)))) - (skcont-add! (skeleton-node-continuation sk) - i - (lambda (h vars) (apply h '+ vars))))) + (skcont-add! (skeleton-node-continuation sk) i))) (define (remove-interest! sk i) (let ((sk (extend-skeleton! sk (skeleton-interest-desc i)))) @@ -307,13 +318,17 @@ (b 'zot) 123))) - (add-interest! sk - (skeleton-interest (list struct:a (list struct:b #f) #f) - '((0 0 0)) - '(foo) - '((0 1)) - (lambda (op . bindings) - (printf "xAB HANDLER: ~v ~v\n" op bindings)))) + (define i1 + (skeleton-interest (list struct:a (list struct:b #f) #f) + '((0 0 0)) + '(foo) + '((0 1)) + (lambda (op . bindings) + (printf "xAB HANDLER: ~v ~v\n" op bindings)) + (lambda (vars) + (printf "xAB CLEANUP: ~v\n" vars)))) + + (add-interest! sk i1) (void (extend-skeleton! sk (list struct:a (list struct:b #f) #f))) (void (extend-skeleton! sk (list struct:a #f (list struct:c #f)))) @@ -333,7 +348,9 @@ '(DCZ) '((0) (0 0) (0 0 0) (0 1)) (lambda (op . bindings) - (printf "DBC HANDLER: ~v ~v\n" op bindings)))) + (printf "DBC HANDLER: ~v ~v\n" op bindings)) + (lambda (vars) + (printf "DBC CLEANUP: ~v\n" vars)))) (remove-assertion! sk (a (b 'foo) (c 'bar))) (remove-assertion! sk (d (b 'B1) (b 'DBY) (c 'DCZ))) @@ -350,7 +367,9 @@ '(DBY) '((0 0) (0 2)) (lambda (op . bindings) - (printf "xDB HANDLER: ~v ~v\n" op bindings)))) + (printf "xDB HANDLER: ~v ~v\n" op bindings)) + (lambda (vars) + (printf "xDB CLEANUP: ~v\n" vars)))) (send-assertion! sk (d (b 'BX) (b 'DBY) (c 'DCZ))) (send-assertion! sk (d (b 'BX) (b 'DBY) (c 'DCZ))) @@ -362,4 +381,6 @@ (remove-assertion! sk (d (b 'BX) (b 'DBY) (c 'DCZ))) (remove-assertion! sk (d (b 'B1) (b 'DBY) (c 'CX))) ;; sk + + (remove-interest! sk i1) ) diff --git a/syndicate/syntax.rkt b/syndicate/syntax.rkt index c6b1ee3..5f54ffa 100644 --- a/syndicate/syntax.rkt +++ b/syndicate/syntax.rkt @@ -46,6 +46,7 @@ ;; immediate-query send! + defer-turn! flush! assert! retract! @@ -120,11 +121,13 @@ (syntax-parse stx [(_ name:name assertions:assertions script ...) (quasisyntax/loc stx - (spawn! - (current-actor) - name.N - (lambda () (begin/void-default script ...)) - (set assertions.exprs ...)))])) + (begin + (ensure-in-script! 'spawn!) + (spawn! + (current-actor) + name.N + (lambda () (begin/void-default script ...)) + (set assertions.exprs ...))))])) (define-syntax (begin/void-default stx) (syntax-parse stx @@ -322,7 +325,8 @@ (current-actor) #,(quasisyntax/loc script-stx (lambda () - #,script-stx)))))))))] + #,script-stx)))))) + #f)))] [(asserted P) (analyse-asserted/retracted outer-expr-stx when-pred-stx script-stx #t #'P priority-stx)] [(retracted P) @@ -351,7 +355,8 @@ (current-actor) #,(quasisyntax/loc script-stx (lambda () - #,script-stx)))))))))) + #,script-stx)))))) + #f)))) (define-syntax (during stx) (syntax-parse stx @@ -559,8 +564,13 @@ ;; (on-start (flush!) (k (query-result) ...)))))])) (define (send! m) + (ensure-in-script! 'send!) (enqueue-send! (current-actor) m)) +(define (defer-turn! k) + (ensure-in-script! 'defer-turn!) + (enqueue-deferred-turn! (current-actor) k)) + (define (flush!) (ensure-in-script! 'flush!) (define ack (gensym 'flush!)) diff --git a/syndicate/term.rkt b/syndicate/term.rkt new file mode 100644 index 0000000..3017152 --- /dev/null +++ b/syndicate/term.rkt @@ -0,0 +1,98 @@ +#lang racket/base +;; Like pattern.rkt, but for dynamic use rather than compile-time use. + +(provide term->skeleton + term->skeleton-proj + term->key + term->capture-proj + instantiate-term->value) + +(require racket/match) +(require syndicate/support/struct) +(require "pattern.rkt") + +(define (term->skeleton t) + (let walk ((t t)) + (match t + [(capture detail) + (walk detail)] + [(discard) + #f] + [(? non-object-struct?) + (cons (struct->struct-type t) (map walk (cdr (vector->list (struct->vector t)))))] + [(? list?) + (cons 'list (map walk t))] + [atom + #f]))) + +(define (select-term-leaves t capture-fn atom-fn) + (define (walk-node key-rev t) + (match t + [(capture detail) + (append (capture-fn key-rev) (walk-node key-rev detail))] + [(discard) + (list)] + [(? non-object-struct?) + (walk-edge 0 key-rev (cdr (vector->list (struct->vector t))))] + [(? list?) + (walk-edge 0 key-rev t)] + [atom + (atom-fn key-rev atom)])) + + (define (walk-edge index key-rev pieces) + (match pieces + ['() '()] + [(cons p pieces) (append (walk-node (cons index key-rev) p) + (walk-edge (+ index 1) key-rev pieces))])) + + (walk-node '(0) t)) + +(define (term->skeleton-proj t) + (select-term-leaves t + (lambda (key-rev) (list)) + (lambda (key-rev atom) (list (reverse key-rev))))) + +(define (term->key t) + (select-term-leaves t + (lambda (key-rev) (list)) + (lambda (key-rev atom) (list atom)))) + +(define (term->capture-proj t) + (select-term-leaves t + (lambda (key-rev) (list (reverse key-rev))) + (lambda (key-rev atom) (list)))) + +(define (instantiate-term->value t actuals) + (define (pop-actual!) + (define v (car actuals)) + (set! actuals (cdr actuals)) + v) + + (define (pop-captures! t) + (match t + [(capture detail) + (pop-actual!) + (pop-captures! detail)] + [(discard) + (void)] + [(? non-object-struct?) + (for-each pop-captures! (cdr (vector->list (struct->vector t))))] + [(? list?) + (for-each pop-captures! t)] + [_ (void)])) + + (define (walk t) + (match t + [(capture detail) + (begin0 (pop-actual!) + (pop-captures! detail))] ;; to consume nested bindings + [(discard) + (discard)] + [(? non-object-struct?) + (apply (struct-type-make-constructor (struct->struct-type t)) + (map walk (cdr (vector->list (struct->vector t)))))] + [(? list?) + (map walk t)] + [other other])) + + (walk t)) diff --git a/syndicate/test-implementation.rkt b/syndicate/test-implementation.rkt index c1c4bc6..f2a0b79 100644 --- a/syndicate/test-implementation.rkt +++ b/syndicate/test-implementation.rkt @@ -23,8 +23,7 @@ log-test-result! (all-from-out racket/base) - (all-from-out "dataspace.rkt") - (all-from-out "syntax.rkt")) + (all-from-out "main.rkt")) (module reader syntax/module-reader imperative-syndicate/test-implementation) @@ -33,8 +32,7 @@ (require (only-in racket/string string-split string-join string-contains?)) (require "bag.rkt") -(require "dataspace.rkt") -(require "syntax.rkt") +(require "main.rkt") (require (for-syntax racket/base)) (require (for-syntax syntax/srcloc)) diff --git a/syndicate/test/core/simple-cross-layer.rkt b/syndicate/test/core/simple-cross-layer.rkt new file mode 100644 index 0000000..56a7d2a --- /dev/null +++ b/syndicate/test/core/simple-cross-layer.rkt @@ -0,0 +1,18 @@ +#lang imperative-syndicate/test-implementation + +(test-case + [(assertion-struct greeting (text)) + + (spawn #:name "A" (assert (greeting "Hi from outer space!"))) + (spawn #:name "B" (on (asserted (greeting $t)) + (printf "Outer dataspace: ~a\n" t))) + + (dataspace #:name "C" + (spawn #:name "D" (assert (outbound (greeting "Hi from inner!")))) + (spawn #:name "E" (on (asserted (inbound (greeting $t))) + (printf "Inner dataspace: ~a\n" t))))] + no-crashes + (expected-output "Outer dataspace: Hi from outer space!" + "Inner dataspace: Hi from outer space!" + "Outer dataspace: Hi from inner!" + "Inner dataspace: Hi from inner!"))