diff --git a/syndicate/dataspace.rkt b/syndicate/dataspace.rkt index 61449c0..c8d3823 100644 --- a/syndicate/dataspace.rkt +++ b/syndicate/dataspace.rkt @@ -4,9 +4,11 @@ with-current-facet ;; TODO: shouldn't be provided with-non-script-context ;; TODO: shouldn't be provided run-scripts! ;; TODO: how to cleanly provide this? + apply-patch! ;; TODO: DEFINITELY SHOULDN'T BE PROVIDED - needed by relay.rkt dataspace? - dataspace-assertions ;; TODO: shouldn't be provided - needed by test.rkt + dataspace-assertions ;; TODO: shouldn't be provided - needed by various tests + dataspace-routing-table ;; TODO: shouldn't be provided - needed by relay.rkt generate-id! ;; TODO: shouldn't be provided - inline syntax.rkt?? actor? @@ -359,14 +361,14 @@ (push-script! ac k)]) (run-all-pending-scripts! ds)))) -(define (apply-patch! ds ac delta) +(define (apply-patch! ds ac delta [restriction-path #f]) (define ds-assertions (dataspace-assertions ds)) (define rt (dataspace-routing-table ds)) (define new-cleanup-changes (for/fold [(cleanup-changes (actor-cleanup-changes ac))] [((a count) (in-bag/count delta))] - (match (bag-change! ds-assertions a count) - ['present->absent (remove-assertion! rt a)] - ['absent->present (add-assertion! rt a)] + (match (bag-change! ds-assertions (cons a restriction-path) count) + ['present->absent (remove-assertion! rt a restriction-path)] + ['absent->present (add-assertion! rt a restriction-path)] ;; 'absent->absent absurd ['present->present (void)]) ;; i.e. no visible change (define-values (updated-bag _summary) (bag-change cleanup-changes a (- count))) diff --git a/syndicate/relay.rkt b/syndicate/relay.rkt index 62fe108..d645c36 100644 --- a/syndicate/relay.rkt +++ b/syndicate/relay.rkt @@ -66,18 +66,22 @@ (define inbound-endpoints (make-hash)) (define outbound-endpoints (make-hash)) - (define inner-actor (current-actor)) (define inner-facet (current-facet)) + (define inner-actor (current-actor)) + (define inner-ds (actor-dataspace inner-actor)) (on (asserted (observe (inbound $x))) ;; (log-info "~a (asserted (observe (inbound ~v)))" inner-actor x) (with-current-facet [outer-facet] (with-non-script-context + (define outer-capture-proj (term->capture-proj x)) + (define inner-capture-proj (map (lambda (p) (cons 0 p)) outer-capture-proj)) + ;; ^ inner-capture-proj accounts for the extra (inbound ...) layer around assertions (define i (skeleton-interest (term->skeleton x) (term->skeleton-proj x) (term->key x) - (term->capture-proj x) + outer-capture-proj (lambda (op . captured-values) (define term (inbound (instantiate-term->value x captured-values))) ;; (log-info "~a => ~a ~a ~v" @@ -85,22 +89,21 @@ ;; inner-facet ;; op ;; term) - (push-script! inner-actor - (lambda () - ;; (log-info "~a (~a) ~v" inner-actor op term) - (match op - ['+ (adhoc-assert! inner-actor term)] - ['- (adhoc-retract! inner-actor term)] - ['! (enqueue-send! inner-actor term)]))) + (match op + ['+ (apply-patch! inner-ds inner-actor (bag term +1) inner-capture-proj)] + ['- (apply-patch! inner-ds inner-actor (bag term -1) inner-capture-proj)] + ['! (send-assertion! (dataspace-routing-table inner-ds) + term + inner-capture-proj)]) (schedule-inner!)) (lambda (cache) - (push-script! inner-actor - (lambda () - (for [(captured-values (in-bag cache))] - (define term - (inbound (instantiate-term->value x captured-values))) - ;; (log-info "~a (cleanup) ~v" inner-actor term) - (adhoc-retract! inner-actor term)))) + (apply-patch! + inner-ds + inner-actor + (for/bag/count [(captured-values (in-bag cache))] + ;; (log-info "~a (cleanup) ~v" inner-actor term) + (values (inbound (instantiate-term->value x captured-values)) -1)) + inner-capture-proj) (schedule-inner!)))) (add-endpoint-if-live! outer-facet inbound-endpoints diff --git a/syndicate/skeleton.rkt b/syndicate/skeleton.rkt index 44eb615..b56fad3 100644 --- a/syndicate/skeleton.rkt +++ b/syndicate/skeleton.rkt @@ -215,7 +215,7 @@ (let ((sk (extend-skeleton! sk (skeleton-interest-desc i)))) (skcont-remove! (skeleton-node-continuation sk) i))) -(define (skeleton-modify! sk term0 modify-skcont! modify-skconst! modify-skacc!) +(define (skeleton-modify! sk term0 restriction-path modify-skcont! modify-skconst! modify-skacc!) (define (walk-node! sk term-stack) (match-define (skeleton-node continuation edges) sk) @@ -228,8 +228,10 @@ (modify-skconst! proj-handler term0) (hash-for-each (skeleton-matched-constant-table proj-handler) (lambda (variable-proj acc) - (define variables (apply-projection term0 variable-proj)) - (modify-skacc! acc variables term0)))))) + (when (or (not restriction-path) + (equal? restriction-path variable-proj)) + (define variables (apply-projection term0 variable-proj)) + (modify-skacc! acc variables term0))))))) (for [(edge (in-list edges))] (match-define (cons (skeleton-selector pop-count index) table) edge) @@ -255,7 +257,7 @@ (hash-set! (skeleton-continuation-cache skcont) term #t)) (define (add-term-to-skconst! skconst term) (hash-set! (skeleton-matched-constant-cache skconst) term #t)) -(define (add-term-to-skacc! skacc vars _term) +(define (add-term-to-skacc! skacc vars term) (match (bag-change! (skeleton-accumulator-cache skacc) vars 1) ['absent->present (hash-for-each (skeleton-accumulator-handlers skacc) @@ -264,9 +266,10 @@ ['present->present (void)])) -(define (add-assertion! sk term) +(define (add-assertion! sk term [restriction-path #f]) (skeleton-modify! sk term + restriction-path add-term-to-skcont! add-term-to-skconst! add-term-to-skacc!)) @@ -287,16 +290,18 @@ (void)]) (log-warning "Removing assertion not previously added: ~v" _term))) -(define (remove-assertion! sk term) +(define (remove-assertion! sk term [restriction-path #f]) (skeleton-modify! sk term + restriction-path remove-term-from-skcont! remove-term-from-skconst! remove-term-from-skacc!)) -(define (send-assertion! sk term) +(define (send-assertion! sk term [restriction-path #f]) (skeleton-modify! sk term + restriction-path void void (lambda (skacc vars _term) diff --git a/syndicate/test-implementation.rkt b/syndicate/test-implementation.rkt index 8a32e31..673332e 100644 --- a/syndicate/test-implementation.rkt +++ b/syndicate/test-implementation.rkt @@ -50,11 +50,12 @@ (define test-run-time (make-parameter 0)) (define test-gc-time (make-parameter 0)) -(define (asserted? v) - (bag-member? (dataspace-assertions (final-dataspace)) v)) +(define (asserted? v [restriction-path #f]) + (bag-member? (dataspace-assertions (final-dataspace)) (cons v restriction-path))) (define (final-assertions) - (bag->set (dataspace-assertions (final-dataspace)))) + (for/set [(assertion-and-restriction-path (in-bag (dataspace-assertions (final-dataspace))))] + (car assertion-and-restriction-path))) (define (emitted? v) (member v (collected-events))) diff --git a/syndicate/test/core/nesting-confusion.rkt b/syndicate/test/core/nesting-confusion.rkt index cbbe069..2deafa3 100644 --- a/syndicate/test/core/nesting-confusion.rkt +++ b/syndicate/test/core/nesting-confusion.rkt @@ -78,3 +78,43 @@ no-crashes no-mention-of-discard (correct-topics-and-researchers)) + +;;--------------------------------------------------------------------------- + +(assertion-struct claim (detail)) + +(define (asserts-then-retractions) + (and (equal? (length (collected-lines)) 4) + (equal? (list->set (take (collected-lines) 2)) (set "Specific claim asserted" + "Nonspecific claim 123 asserted")) + (equal? (list->set (drop (collected-lines) 2)) (set "Specific claim retracted" + "Nonspecific claim 123 retracted")))) + +(test-case + [(spawn #:name 'claimant + (assert (claim 123)) + (on-start (for [(i 5)] (flush!)) (stop-current-facet))) + (spawn #:name 'monitor + (during (claim 123) + (on-start (printf "Specific claim asserted\n")) + (on-stop (printf "Specific claim retracted\n"))) + (during (claim $detail) + (on-start (printf "Nonspecific claim ~v asserted\n" detail)) + (on-stop (printf "Nonspecific claim ~v retracted\n" detail))))] + no-crashes + asserts-then-retractions) + +(test-case + [(spawn #:name 'claimant + (assert (claim 123)) + (on-start (for [(i 5)] (flush!)) (stop-current-facet))) + (dataspace #:name 'inner-dataspace + (spawn #:name 'monitor + (during (inbound (claim 123)) + (on-start (printf "Specific claim asserted\n")) + (on-stop (printf "Specific claim retracted\n"))) + (during (inbound (claim $detail)) + (on-start (printf "Nonspecific claim ~v asserted\n" detail)) + (on-stop (printf "Nonspecific claim ~v retracted\n" detail)))))] + no-crashes + asserts-then-retractions)