Nested dataspaces

This commit is contained in:
Tony Garnock-Jones 2018-04-29 14:54:14 +01:00
parent 204197c3eb
commit dd2cddb6a7
9 changed files with 328 additions and 43 deletions

View File

@ -1,6 +1,7 @@
#lang racket/base
(provide make-dataspace ;; TODO: how to cleanly provide this?
with-current-facet ;; TODO: shouldn't be provided
run-scripts! ;; TODO: how to cleanly provide this?
message-struct
@ -38,6 +39,7 @@
stop-facet!
add-stop-script! ;; TODO: shouldn't be provided - inline syntax.rkt??
add-endpoint!
remove-endpoint!
terminate-facet! ;; TODO: shouldn't be provided - inline syntax.rkt??
schedule-script! ;; TODO: shouldn't be provided - inline syntax.rkt??
push-script! ;; TODO: shouldn't be provided - inline syntax.rkt??
@ -45,6 +47,7 @@
spawn! ;; TODO: should this be provided?
enqueue-send! ;; TODO: should this be provided?
enqueue-deferred-turn! ;; TODO: should this be provided?
adhoc-retract! ;; TODO: should this be provided?
adhoc-assert! ;; TODO: should this be provided?
actor-adhoc-assertions ;; TODO: should this be provided?
@ -78,11 +81,13 @@
;; - `(patch (MutableDeltaof Assertion))`
;; - `(message Assertion)`
;; - `(spawn Any BootProc (Set Assertion))`
;; - `(quit)`.
;; - `(quit)`
;; - `(deferred-turn (-> Any))`
(struct patch (changes) #:prefab)
(struct message (body) #:prefab)
(struct spawn (name boot-proc initial-assertions) #:prefab)
(struct quit () #:prefab)
(struct deferred-turn (continuation) #:prefab)
(struct dataspace ([next-id #:mutable] ;; Nat
routing-table ;; Skeleton
@ -342,7 +347,7 @@
(for [(group (in-list groups))]
(match-define (action-group ac actions) group)
(for [(action (in-list actions))]
;; (log-info "~a performing ~a" ac action)
;; (log-info "~a in ~a performing ~a" ac (eq-hash-code ds) action)
(match action
[(patch delta)
(apply-patch! ds ac delta)]
@ -351,7 +356,9 @@
[(spawn name boot-proc initial-assertions)
(add-actor! ds name boot-proc initial-assertions)]
[(quit)
(apply-patch! ds ac (actor-cleanup-changes ac))]))))
(apply-patch! ds ac (actor-cleanup-changes ac))]
[(deferred-turn k)
(push-script! ac k)]))))
(define (apply-patch! ds ac delta)
(define ds-assertions (dataspace-assertions ds))
@ -422,10 +429,7 @@
(define ds (actor-dataspace ac))
(push-script! ac (lambda ()
(for [((eid ep) (in-hash (facet-endpoints f)))]
(dataflow-forget-subject! (dataspace-dataflow ds) (list f eid))
(retract! ac (endpoint-assertion ep))
(define h (endpoint-handler ep))
(when h (dataspace-unsubscribe! ds h))))))
(destroy-endpoint! ds ac f ep)))))
(define (abandon-queued-work! ac)
(set-actor-pending-actions! ac (make-queue))
@ -496,7 +500,23 @@
(define ep (endpoint eid assertion assertion-fn handler))
(assert! (facet-actor f) assertion)
(when handler (dataspace-subscribe! ds handler))
(hash-set! (facet-endpoints f) eid ep))
(hash-set! (facet-endpoints f) eid ep)
eid)
(define (remove-endpoint! f eid)
(define eps (facet-endpoints f))
(define ep (hash-ref eps eid #f))
(when ep
(define ac (facet-actor f))
(define ds (actor-dataspace ac))
(hash-remove! eps eid)
(destroy-endpoint! ds ac f ep)))
(define (destroy-endpoint! ds ac f ep)
(match-define (endpoint eid assertion _assertion-fn handler) ep)
(dataflow-forget-subject! (dataspace-dataflow ds) (list f eid))
(retract! ac assertion)
(when handler (dataspace-unsubscribe! ds handler)))
(define (enqueue-action! ac action)
(set-actor-pending-actions! ac (enqueue (actor-pending-actions ac) action)))
@ -546,11 +566,12 @@
(error who "Attempt to perform action outside script; are you missing an (on ...)?")))
(define (enqueue-send! ac body)
(ensure-in-script! 'enqueue-send!)
(enqueue-action! ac (message body)))
(define (enqueue-deferred-turn! ac k)
(enqueue-action! ac (deferred-turn (capture-facet-context k))))
(define (spawn! ac name boot-proc initial-assertions)
(ensure-in-script! 'spawn!)
(enqueue-action! ac (spawn name boot-proc initial-assertions)))
;;---------------------------------------------------------------------------
@ -631,7 +652,8 @@
(current-actor)
(lambda ()
(log-info "box: taking on new-value ~v" new-value)
(current-value new-value)))))))))
(current-value new-value))))))
#f)))
(set))
(spawn!
(current-actor)
@ -650,7 +672,8 @@
(stop-facet!
(current-facet)
(lambda ()
(log-info "client: box has gone"))))))))
(log-info "client: box has gone"))))))
#f))
(add-endpoint! (current-facet)
'on-asserted-box-state
(lambda () (observe (box-state (capture (discard)))))
@ -666,7 +689,8 @@
(lambda ()
(log-info "client: learned that box's value is now ~v" v)
(enqueue-send! (current-actor)
(set-box (+ v 1)))))))))))
(set-box (+ v 1))))))))
#f)))
(set)))))))
(require racket/pretty)

View File

@ -0,0 +1,12 @@
#lang imperative-syndicate
(assertion-struct greeting (text))
(spawn #:name "A" (assert (greeting "Hi from outer space!")))
(spawn #:name "B" (on (asserted (greeting $t))
(printf "Outer dataspace: ~a\n" t)))
(dataspace #:name "C"
(spawn #:name "D" (assert (outbound (greeting "Hi from inner!"))))
(spawn #:name "E" (on (asserted (inbound (greeting $t)))
(printf "Inner dataspace: ~a\n" t))))

View File

@ -2,10 +2,12 @@
(provide (all-from-out "dataspace.rkt")
(all-from-out "syntax.rkt")
(all-from-out "ground.rkt"))
(all-from-out "ground.rkt")
(all-from-out "relay.rkt"))
(module reader syntax/module-reader imperative-syndicate/lang)
(require "dataspace.rkt")
(require "syntax.rkt")
(require "ground.rkt")
(require "relay.rkt")

102
syndicate/relay.rkt Normal file
View File

@ -0,0 +1,102 @@
#lang racket/base
;; Cross-layer relaying between adjacent dataspaces
;; TODO: protocol for shutdown of a dataspace
;; TODO: protocol for *clean* shutdown of a dataspace
(provide (struct-out inbound)
(struct-out outbound)
dataspace)
(require racket/match)
(require racket/set)
(require "dataspace.rkt")
(require "syntax.rkt")
(require "skeleton.rkt")
(require "term.rkt")
(require "bag.rkt")
(require (for-syntax racket/base))
(struct inbound (assertion) #:prefab)
(struct outbound (assertion) #:prefab)
(define-syntax (dataspace stx)
(syntax-case stx ()
[(_ form ...)
(syntax/loc stx
(spawn (define outer-actor (current-actor))
(define outer-facet (current-facet))
(define inner-ds (make-dataspace
(lambda ()
(schedule-script!
(current-actor)
(lambda ()
(spawn #:name 'dataspace-relay
(boot-relay outer-actor
outer-facet))
(spawn* form ...))))))
(on-start (schedule-turn! inner-ds))))]))
(define (schedule-turn! inner-ds)
(defer-turn! (lambda ()
(when (run-scripts! inner-ds)
(schedule-turn! inner-ds)))))
(define (boot-relay outer-actor outer-facet)
(define inbound-endpoints (make-hash))
(define outbound-endpoints (make-hash))
(define inner-actor (current-actor))
(define inner-facet (current-facet))
(on (asserted (observe (inbound $x)))
(with-current-facet [outer-actor outer-facet #f]
(define i (skeleton-interest
(term->skeleton x)
(term->skeleton-proj x)
(term->key x)
(term->capture-proj x)
(lambda (op . captured-values)
(define term (inbound (instantiate-term->value x captured-values)))
(push-script! inner-actor
(lambda ()
(match op
['+ (adhoc-assert! inner-actor term)]
['- (adhoc-retract! inner-actor term)]
['! (enqueue-send! inner-actor term)]))))
(lambda (cache)
(push-script! inner-actor
(lambda ()
(for [(captured-values (in-bag cache))]
(define term
(inbound (instantiate-term->value x captured-values)))
(adhoc-retract! inner-actor term)))))))
(hash-set! inbound-endpoints
x
(add-endpoint! outer-facet
"dataspace-relay (observe (inbound ...))"
(lambda () (observe x))
i))))
(on (retracted (observe (inbound $x)))
(with-current-facet [outer-actor outer-facet #f]
(remove-endpoint! outer-facet (hash-ref inbound-endpoints x))
(hash-remove! inbound-endpoints x)))
(on (asserted (outbound $x))
(with-current-facet [outer-actor outer-facet #f]
(hash-set! outbound-endpoints
x
(add-endpoint! outer-facet
"dataspace-relay (outbound ...)"
(lambda () x)
#f))))
(on (retracted (outbound $x))
(with-current-facet [outer-actor outer-facet #f]
(remove-endpoint! outer-facet (hash-ref outbound-endpoints x))
(hash-remove! outbound-endpoints x)))
(on (message (outbound $x))
(with-current-facet [outer-actor outer-facet #f]
(send! x))))

View File

@ -84,14 +84,25 @@
;; A `SkInterest` is a specification for an addition to or removal
;; from an existing `Skeleton`.
;;
;; SkInterest = (skeleton-interest SkDesc SkProj SkKey SkProj (... -> Any))
;; SkInterest = (skeleton-interest SkDesc
;; SkProj
;; SkKey
;; SkProj
;; (... -> Any)
;; (Option ((MutableBag SkKey) -> Any)))
;;
;; The `SkDesc` gives the silhouette. The first `SkProj` is the
;; constant-portion selector, to be matched against the `SkKey`. The
;; second `SkProj` is used on matching assertions to extract the
;; variable portions, to be passed to the handler function.
;;
(struct skeleton-interest (desc const-selector const-value var-selector handler) #:transparent)
(struct skeleton-interest (desc
const-selector
const-value
var-selector
handler
cleanup
) #:transparent)
;;---------------------------------------------------------------------------
@ -103,8 +114,8 @@
(define (make-empty-skeleton)
(make-empty-skeleton/cache (mutable-set)))
(define (skcont-add! c i apply-handler!)
(match-define (skeleton-interest _desc cs cv vs h) i)
(define (skcont-add! c i)
(match-define (skeleton-interest _desc cs cv vs h _cleanup) i)
(define (make-matched-constant)
(skeleton-matched-constant (for/mutable-set [(a (skeleton-continuation-cache c))
#:when (equal? (apply-projection a cs) cv)]
@ -120,16 +131,18 @@
(skeleton-accumulator cache (mutable-seteq)))
(define acc (hash-ref! (skeleton-matched-constant-table sc) vs make-accumulator))
(set-add! (skeleton-accumulator-handlers acc) h)
(for [(vars (in-bag (skeleton-accumulator-cache acc)))] (apply-handler! h vars)))
(for [(vars (in-bag (skeleton-accumulator-cache acc)))] (apply h '+ vars)))
(define (skcont-remove! c i)
(match-define (skeleton-interest _desc cs cv vs h) i)
(match-define (skeleton-interest _desc cs cv vs h cleanup) i)
(define cvt (hash-ref (skeleton-continuation-table c) cs #f))
(when cvt
(define sc (hash-ref cvt cv #f))
(when sc
(define acc (hash-ref (skeleton-matched-constant-table sc) vs #f))
(when acc
(when (and cleanup (set-member? (skeleton-accumulator-handlers acc) h))
(cleanup (skeleton-accumulator-cache acc)))
(set-remove! (skeleton-accumulator-handlers acc) h)
(when (set-empty? (skeleton-accumulator-handlers acc))
(hash-remove! (skeleton-matched-constant-table sc) vs)))
@ -185,9 +198,7 @@
(define (add-interest! sk i)
(let ((sk (extend-skeleton! sk (skeleton-interest-desc i))))
(skcont-add! (skeleton-node-continuation sk)
i
(lambda (h vars) (apply h '+ vars)))))
(skcont-add! (skeleton-node-continuation sk) i)))
(define (remove-interest! sk i)
(let ((sk (extend-skeleton! sk (skeleton-interest-desc i))))
@ -307,13 +318,17 @@
(b 'zot)
123)))
(add-interest! sk
(skeleton-interest (list struct:a (list struct:b #f) #f)
'((0 0 0))
'(foo)
'((0 1))
(lambda (op . bindings)
(printf "xAB HANDLER: ~v ~v\n" op bindings))))
(define i1
(skeleton-interest (list struct:a (list struct:b #f) #f)
'((0 0 0))
'(foo)
'((0 1))
(lambda (op . bindings)
(printf "xAB HANDLER: ~v ~v\n" op bindings))
(lambda (vars)
(printf "xAB CLEANUP: ~v\n" vars))))
(add-interest! sk i1)
(void (extend-skeleton! sk (list struct:a (list struct:b #f) #f)))
(void (extend-skeleton! sk (list struct:a #f (list struct:c #f))))
@ -333,7 +348,9 @@
'(DCZ)
'((0) (0 0) (0 0 0) (0 1))
(lambda (op . bindings)
(printf "DBC HANDLER: ~v ~v\n" op bindings))))
(printf "DBC HANDLER: ~v ~v\n" op bindings))
(lambda (vars)
(printf "DBC CLEANUP: ~v\n" vars))))
(remove-assertion! sk (a (b 'foo) (c 'bar)))
(remove-assertion! sk (d (b 'B1) (b 'DBY) (c 'DCZ)))
@ -350,7 +367,9 @@
'(DBY)
'((0 0) (0 2))
(lambda (op . bindings)
(printf "xDB HANDLER: ~v ~v\n" op bindings))))
(printf "xDB HANDLER: ~v ~v\n" op bindings))
(lambda (vars)
(printf "xDB CLEANUP: ~v\n" vars))))
(send-assertion! sk (d (b 'BX) (b 'DBY) (c 'DCZ)))
(send-assertion! sk (d (b 'BX) (b 'DBY) (c 'DCZ)))
@ -362,4 +381,6 @@
(remove-assertion! sk (d (b 'BX) (b 'DBY) (c 'DCZ)))
(remove-assertion! sk (d (b 'B1) (b 'DBY) (c 'CX)))
;; sk
(remove-interest! sk i1)
)

View File

@ -46,6 +46,7 @@
;; immediate-query
send!
defer-turn!
flush!
assert!
retract!
@ -120,11 +121,13 @@
(syntax-parse stx
[(_ name:name assertions:assertions script ...)
(quasisyntax/loc stx
(spawn!
(current-actor)
name.N
(lambda () (begin/void-default script ...))
(set assertions.exprs ...)))]))
(begin
(ensure-in-script! 'spawn!)
(spawn!
(current-actor)
name.N
(lambda () (begin/void-default script ...))
(set assertions.exprs ...))))]))
(define-syntax (begin/void-default stx)
(syntax-parse stx
@ -322,7 +325,8 @@
(current-actor)
#,(quasisyntax/loc script-stx
(lambda ()
#,script-stx)))))))))]
#,script-stx))))))
#f)))]
[(asserted P)
(analyse-asserted/retracted outer-expr-stx when-pred-stx script-stx #t #'P priority-stx)]
[(retracted P)
@ -351,7 +355,8 @@
(current-actor)
#,(quasisyntax/loc script-stx
(lambda ()
#,script-stx))))))))))
#,script-stx))))))
#f))))
(define-syntax (during stx)
(syntax-parse stx
@ -559,8 +564,13 @@
;; (on-start (flush!) (k (query-result) ...)))))]))
(define (send! m)
(ensure-in-script! 'send!)
(enqueue-send! (current-actor) m))
(define (defer-turn! k)
(ensure-in-script! 'defer-turn!)
(enqueue-deferred-turn! (current-actor) k))
(define (flush!)
(ensure-in-script! 'flush!)
(define ack (gensym 'flush!))

98
syndicate/term.rkt Normal file
View File

@ -0,0 +1,98 @@
#lang racket/base
;; Like pattern.rkt, but for dynamic use rather than compile-time use.
(provide term->skeleton
term->skeleton-proj
term->key
term->capture-proj
instantiate-term->value)
(require racket/match)
(require syndicate/support/struct)
(require "pattern.rkt")
(define (term->skeleton t)
(let walk ((t t))
(match t
[(capture detail)
(walk detail)]
[(discard)
#f]
[(? non-object-struct?)
(cons (struct->struct-type t) (map walk (cdr (vector->list (struct->vector t)))))]
[(? list?)
(cons 'list (map walk t))]
[atom
#f])))
(define (select-term-leaves t capture-fn atom-fn)
(define (walk-node key-rev t)
(match t
[(capture detail)
(append (capture-fn key-rev) (walk-node key-rev detail))]
[(discard)
(list)]
[(? non-object-struct?)
(walk-edge 0 key-rev (cdr (vector->list (struct->vector t))))]
[(? list?)
(walk-edge 0 key-rev t)]
[atom
(atom-fn key-rev atom)]))
(define (walk-edge index key-rev pieces)
(match pieces
['() '()]
[(cons p pieces) (append (walk-node (cons index key-rev) p)
(walk-edge (+ index 1) key-rev pieces))]))
(walk-node '(0) t))
(define (term->skeleton-proj t)
(select-term-leaves t
(lambda (key-rev) (list))
(lambda (key-rev atom) (list (reverse key-rev)))))
(define (term->key t)
(select-term-leaves t
(lambda (key-rev) (list))
(lambda (key-rev atom) (list atom))))
(define (term->capture-proj t)
(select-term-leaves t
(lambda (key-rev) (list (reverse key-rev)))
(lambda (key-rev atom) (list))))
(define (instantiate-term->value t actuals)
(define (pop-actual!)
(define v (car actuals))
(set! actuals (cdr actuals))
v)
(define (pop-captures! t)
(match t
[(capture detail)
(pop-actual!)
(pop-captures! detail)]
[(discard)
(void)]
[(? non-object-struct?)
(for-each pop-captures! (cdr (vector->list (struct->vector t))))]
[(? list?)
(for-each pop-captures! t)]
[_ (void)]))
(define (walk t)
(match t
[(capture detail)
(begin0 (pop-actual!)
(pop-captures! detail))] ;; to consume nested bindings
[(discard)
(discard)]
[(? non-object-struct?)
(apply (struct-type-make-constructor (struct->struct-type t))
(map walk (cdr (vector->list (struct->vector t)))))]
[(? list?)
(map walk t)]
[other other]))
(walk t))

View File

@ -23,8 +23,7 @@
log-test-result!
(all-from-out racket/base)
(all-from-out "dataspace.rkt")
(all-from-out "syntax.rkt"))
(all-from-out "main.rkt"))
(module reader syntax/module-reader imperative-syndicate/test-implementation)
@ -33,8 +32,7 @@
(require (only-in racket/string string-split string-join string-contains?))
(require "bag.rkt")
(require "dataspace.rkt")
(require "syntax.rkt")
(require "main.rkt")
(require (for-syntax racket/base))
(require (for-syntax syntax/srcloc))

View File

@ -0,0 +1,18 @@
#lang imperative-syndicate/test-implementation
(test-case
[(assertion-struct greeting (text))
(spawn #:name "A" (assert (greeting "Hi from outer space!")))
(spawn #:name "B" (on (asserted (greeting $t))
(printf "Outer dataspace: ~a\n" t)))
(dataspace #:name "C"
(spawn #:name "D" (assert (outbound (greeting "Hi from inner!"))))
(spawn #:name "E" (on (asserted (inbound (greeting $t)))
(printf "Inner dataspace: ~a\n" t))))]
no-crashes
(expected-output "Outer dataspace: Hi from outer space!"
"Inner dataspace: Hi from outer space!"
"Outer dataspace: Hi from inner!"
"Inner dataspace: Hi from inner!"))