diff --git a/syndicate/bag.rkt b/syndicate/bag.rkt index c374e0a..984a651 100644 --- a/syndicate/bag.rkt +++ b/syndicate/bag.rkt @@ -5,7 +5,9 @@ bag-change! bag-ref bag-clear! - in-bag) + bag-member? + in-bag + in-bag/count) ;; A `(Bagof X)` is a `(MutableHash X Nat)`, where the `Nat` against ;; an `X` is its replication count in the bag. @@ -28,5 +30,7 @@ (hash-ref b x 0)) (define bag-clear! hash-clear!) +(define bag-member? hash-has-key?) -(define-syntax-rule (in-bag piece ...) (in-hash piece ...)) +(define-syntax-rule (in-bag piece ...) (in-hash-keys piece ...)) +(define-syntax-rule (in-bag/count piece ...) (in-hash piece ...)) diff --git a/syndicate/dataspace.rkt b/syndicate/dataspace.rkt index 294465e..0407ab7 100644 --- a/syndicate/dataspace.rkt +++ b/syndicate/dataspace.rkt @@ -33,6 +33,7 @@ add-facet! stop-facet! + add-stop-script! ;; TODO: shouldn't be provided - inline syntax.rkt?? add-endpoint! terminate-facet! ;; TODO: shouldn't be provided - inline syntax.rkt?? schedule-script! ;; TODO: shouldn't be provided - inline syntax.rkt?? @@ -41,6 +42,9 @@ spawn! ;; TODO: should this be provided? enqueue-send! ;; TODO: should this be provided? + adhoc-retract! ;; TODO: should this be provided? + adhoc-assert! ;; TODO: should this be provided? + actor-adhoc-assertions ;; TODO: should this be provided? ) (require syndicate/functional-queue) @@ -76,6 +80,9 @@ (struct dataspace ([next-id #:mutable] ;; Nat routing-table ;; Skeleton actors ;; (MutableHash ActorID Actor) + ;; v TODO: Caches have to be bags, not sets; once + ;; this change is made, can I avoid keeping a bag + ;; of assertions in the dataspace as a whole? assertions ;; (Bagof Assertion) dataflow ;; DataflowGraph [runnable #:mutable] ;; (Listof Actor) @@ -88,6 +95,8 @@ [runnable? #:mutable] ;; Boolean pending-scripts ;; (MutableVectorof (Queueof (-> Any))) [pending-actions #:mutable] ;; (Queueof Action) + ;; TODO: consider using a bag, rather than set, of ad-hoc assertions. + [adhoc-assertions #:mutable] ;; (Setof Assertion) ) #:methods gen:custom-write [(define (write-proc a p mode) @@ -212,7 +221,8 @@ #f #f (make-vector priority-count (make-queue)) - (make-queue))) + (make-queue) + (set))) (hash-set! (dataspace-actors ds) the-actor-id the-actor) (for [(a initial-assertions)] (match (bag-change! (dataspace-assertions ds) a 1) @@ -312,7 +322,7 @@ ;; (log-info "performing ~a" action) (match action [(patch delta) - (for [((a count) (in-bag delta))] + (for [((a count) (in-bag/count delta))] (match (bag-change! (dataspace-assertions ds) a count) ['present->absent (remove-assertion! (dataspace-routing-table ds) a)] ['absent->present (add-assertion! (dataspace-routing-table ds) a)] @@ -388,6 +398,8 @@ ;; Abruptly terminates an entire actor, without running stop-scripts etc. (define (terminate-actor! ds the-actor) (hash-remove! (dataspace-actors ds) (actor-id the-actor)) + (push-script! ds the-actor + (lambda () (for [(a (actor-adhoc-assertions the-actor))] (retract! the-actor a)))) (let ((f (actor-root-facet the-actor))) (when f (let abort-facet! ((f f)) @@ -468,6 +480,16 @@ (when (not (void? assertion)) (bag-change! (ensure-patch-action! ac) assertion +1))) +(define (adhoc-retract! ac assertion) + (when (not (void? assertion)) + (set-actor-adhoc-assertions! ac (set-remove (actor-adhoc-assertions ac) assertion)) + (retract! ac assertion))) + +(define (adhoc-assert! ac assertion) + (when (not (void? assertion)) + (set-actor-adhoc-assertions! ac (set-add (actor-adhoc-assertions ac) assertion)) + (assert! ac assertion))) + (define (dataspace-unsubscribe! ds h) (remove-interest! (dataspace-routing-table ds) h)) diff --git a/syndicate/pattern.rkt b/syndicate/pattern.rkt index ddcdf13..cb6fd6e 100644 --- a/syndicate/pattern.rkt +++ b/syndicate/pattern.rkt @@ -4,6 +4,7 @@ (struct-out capture) (for-syntax analyse-pattern + instantiate-pattern desc->key desc->skeleton-proj desc->skeleton-stx @@ -76,7 +77,25 @@ (list 'discard)] [_ (list 'atom stx)])) - ) + + (define (instantiate-pattern stx) + (syntax-case stx ($) + [(ctor piece ...) + (struct-info? (id-value #'ctor)) + (quasisyntax/loc stx (ctor #,@(stx-map instantiate-pattern #'(piece ...))))] + [(list piece ...) + (list-id? #'list) + (quasisyntax/loc stx (list #,@(stx-map instantiate-pattern #'(piece ...))))] + [id + (dollar-id? #'id) + (undollar #'id)] + [($ id p) + #'id] + [id + (discard-id? #'id) + #'id] + [other + #'other]))) ;;--------------------------------------------------------------------------- diff --git a/syndicate/skeleton.rkt b/syndicate/skeleton.rkt index 0dcf77d..04fe719 100644 --- a/syndicate/skeleton.rkt +++ b/syndicate/skeleton.rkt @@ -18,6 +18,8 @@ (require racket/hash) (require racket/list) +(require "bag.rkt") + (module+ test (require rackunit)) ;; A `Skeleton` is a structural guard on an assertion: essentially, @@ -50,12 +52,12 @@ ;; (MutableSet Assertion) ;; (MutableHash SkProj SkAcc)) ;; SkAcc = (skeleton-accumulator -;; (Set SkKey) +;; (MutableBag SkKey) ;; (MutableSeteq (... -> Any))) ;; (struct skeleton-continuation (cache table) #:transparent) (struct skeleton-matched-constant (cache table) #:transparent) -(struct skeleton-accumulator ([cache #:mutable] handlers) #:transparent) +(struct skeleton-accumulator (cache handlers) #:transparent) ;; ;; A `SkProj` is a *skeleton projection*, a specification of loci ;; within a tree-shaped assertion to collect into a flat list. @@ -111,12 +113,14 @@ (define cvt (hash-ref! (skeleton-continuation-table c) cs make-hash)) (define sc (hash-ref! cvt cv make-matched-constant)) (define (make-accumulator) - (skeleton-accumulator (for/set [(a (skeleton-matched-constant-cache sc))] - (apply-projection a vs)) - (mutable-seteq))) + (define cache (make-bag)) + (for [(a (skeleton-matched-constant-cache sc))] + (define vars (apply-projection a vs)) + (bag-change! cache vars 1)) + (skeleton-accumulator cache (mutable-seteq))) (define acc (hash-ref! (skeleton-matched-constant-table sc) vs make-accumulator)) (set-add! (skeleton-accumulator-handlers acc) h) - (for [(vars (skeleton-accumulator-cache acc))] (apply-handler! h vars))) + (for [(vars (in-bag (skeleton-accumulator-cache acc)))] (apply-handler! h vars))) (define (skcont-remove! c i apply-handler!) (match-define (skeleton-interest _desc cs cv vs h) i) @@ -126,7 +130,7 @@ (when sc (define acc (hash-ref (skeleton-matched-constant-table sc) vs #f)) (when acc - (for [(vars (skeleton-accumulator-cache acc))] (apply-handler! h vars)) + (for [(vars (in-bag (skeleton-accumulator-cache acc)))] (apply-handler! h vars)) (set-remove! (skeleton-accumulator-handlers acc) h) (when (set-empty? (skeleton-accumulator-handlers acc)) (hash-remove! (skeleton-matched-constant-table sc) vs))) @@ -229,9 +233,13 @@ (define (add-term-to-skconst! skconst term) (set-add! (skeleton-matched-constant-cache skconst) term)) (define (add-term-to-skacc! skacc vars _term) - (set-skeleton-accumulator-cache! skacc (set-add (skeleton-accumulator-cache skacc) vars)) - (for [(handler (skeleton-accumulator-handlers skacc))] - (apply handler '+ vars))) + (match (bag-change! (skeleton-accumulator-cache skacc) vars 1) + ['absent->present + (for [(handler (skeleton-accumulator-handlers skacc))] + (apply handler '+ vars))] + ;; 'present->absent and 'absent->absent absurd + ['present->present + (void)])) (define (add-assertion! sk term) (skeleton-modify! sk @@ -245,9 +253,16 @@ (define (remove-term-from-skconst! skconst term) (set-remove! (skeleton-matched-constant-cache skconst) term)) (define (remove-term-from-skacc! skacc vars _term) - (set-skeleton-accumulator-cache! skacc (set-remove (skeleton-accumulator-cache skacc) vars)) - (for [(handler (skeleton-accumulator-handlers skacc))] - (apply handler '- vars))) + (define cache (skeleton-accumulator-cache skacc)) + (if (bag-member? cache vars) + (match (bag-change! cache vars -1) + ['present->absent + (for [(handler (skeleton-accumulator-handlers skacc))] + (apply handler '- vars))] + ;; 'absent->absent and 'absent->present absurd + ['present->present + (void)]) + (log-warning "Removing assertion not previously added: ~v" _term))) (define (remove-assertion! sk term) (skeleton-modify! sk diff --git a/syndicate/syntax-classes.rkt b/syndicate/syntax-classes.rkt index e67de9b..acddb82 100644 --- a/syndicate/syntax-classes.rkt +++ b/syndicate/syntax-classes.rkt @@ -13,7 +13,7 @@ (begin-for-syntax (define-splicing-syntax-class assertions (pattern (~seq #:assertions [exprs ...])) - (pattern (~seq) #:attr (exprs 1) #'())) + (pattern (~seq) #:attr (exprs 1) '())) (define-splicing-syntax-class name (pattern (~seq #:name N)) diff --git a/syndicate/syntax.rkt b/syndicate/syntax.rkt index d4a57e6..7d24a8f 100644 --- a/syndicate/syntax.rkt +++ b/syndicate/syntax.rkt @@ -17,7 +17,7 @@ on-start on-stop on - ;; during + during ;; during/spawn begin/dataflow define/dataflow @@ -47,6 +47,9 @@ send! flush! + assert! + retract! + current-adhoc-assertions ;; @@ -357,17 +360,14 @@ (lambda () #,script-stx)))))))))) -;; (define-syntax (during stx) -;; (syntax-parse stx -;; [(_ P O ...) -;; (define E-stx (syntax/loc #'P (asserted P))) -;; (define-values (_proj _pat _bindings instantiated) -;; (analyze-pattern E-stx #'P)) -;; (quasisyntax/loc stx -;; (on #,E-stx -;; (let ((p #,instantiated)) -;; (react (stop-when (retracted p)) -;; O ...))))])) +(define-syntax (during stx) + (syntax-parse stx + [(_ P O ...) + (define Q-stx (instantiate-pattern #'P)) + (quasisyntax/loc stx + (on (asserted P) + (react (stop-when (retracted #,Q-stx)) + O ...)))])) ;; (define-syntax (during/spawn stx) ;; (syntax-parse stx @@ -576,6 +576,17 @@ (until (message ack) (on-start (send! ack)))) +(define (assert! a) + (ensure-in-script! 'assert!) + (adhoc-assert! (current-actor) a)) + +(define (retract! a) + (ensure-in-script! 'retract!) + (adhoc-retract! (current-actor) a)) + +(define (current-adhoc-assertions) + (actor-adhoc-assertions (current-actor))) + ;;--------------------------------------------------------------------------- (module+ test @@ -589,6 +600,71 @@ (schedule-script! (current-dataspace) (current-actor) + + #;(lambda () + (struct foo (x y) #:prefab) + + (spawn (field [x 123]) + (assert (foo (x) 999)) + (during (foo (x) $v) + (log-info "x=~a v=~a" (x) v) + (when (= (x) 123) (x 124)) + (on-stop + (log-info "finally for x=~a v=~a" (x) v)))) + ) + + (lambda () + ;; .../racket/syndicate/examples/actor/example-partial-retraction.rkt + ;; + (struct ready (what) #:prefab) + (struct entry (key val) #:prefab) + + (spawn (assert (ready 'listener)) + (on (asserted (entry $key _)) + (log-info "key ~v asserted" key) + (until (retracted (entry key _)) + (on (asserted (entry key $value)) + (log-info "add binding: ~v -> ~v" key value)) + (on (retracted (entry key $value)) + (log-info "del binding: ~v -> ~v" key value))) + (log-info "key ~v retracted" key))) + + (spawn (assert (ready 'other-listener)) + (during (entry $key _) + (log-info "(other-listener) key ~v asserted" key) + (on-stop (log-info "(other-listener) key ~v retracted" key)) + (during (entry key $value) + (log-info "(other-listener) ~v ---> ~v" key value) + (on-stop (log-info "(other-listener) ~v -/-> ~v" key value))))) + + (define (pause) + (log-info "pause") + (define token (gensym 'pause)) ;; FIXME:: If we use the same token every time, need epochs! + (until (asserted (ready token)) + (assert (ready token)))) + + (spawn* (until (asserted (ready 'listener))) + (until (asserted (ready 'other-listener))) + (assert! (entry 'a 1)) + (assert! (entry 'a 2)) + (assert! (entry 'b 3)) + (assert! (entry 'c 33)) + (assert! (entry 'a 4)) + (assert! (entry 'a 5)) + (pause) + (retract! (entry 'a 2)) + (retract! (entry 'c 33)) + (assert! (entry 'a 9)) + (pause) + (for [(a (current-adhoc-assertions))] + (local-require racket/match) + (match a + [(entry 'a _) (retract! a)] + [_ (void)])) + ;; ^ (retract! (entry 'a ?)) + (pause)) + ) + #;(lambda () (spawn (on (message $v) (if (= v 10000000) @@ -597,7 +673,7 @@ (on-start (send! 0))) ) - (lambda () + #;(lambda () (message-struct stage (n))