Implement during and ad-hoc assertions
This commit is contained in:
parent
5377a486c9
commit
55f4b55784
|
@ -5,7 +5,9 @@
|
|||
bag-change!
|
||||
bag-ref
|
||||
bag-clear!
|
||||
in-bag)
|
||||
bag-member?
|
||||
in-bag
|
||||
in-bag/count)
|
||||
|
||||
;; A `(Bagof X)` is a `(MutableHash X Nat)`, where the `Nat` against
|
||||
;; an `X` is its replication count in the bag.
|
||||
|
@ -28,5 +30,7 @@
|
|||
(hash-ref b x 0))
|
||||
|
||||
(define bag-clear! hash-clear!)
|
||||
(define bag-member? hash-has-key?)
|
||||
|
||||
(define-syntax-rule (in-bag piece ...) (in-hash piece ...))
|
||||
(define-syntax-rule (in-bag piece ...) (in-hash-keys piece ...))
|
||||
(define-syntax-rule (in-bag/count piece ...) (in-hash piece ...))
|
||||
|
|
|
@ -33,6 +33,7 @@
|
|||
|
||||
add-facet!
|
||||
stop-facet!
|
||||
add-stop-script! ;; TODO: shouldn't be provided - inline syntax.rkt??
|
||||
add-endpoint!
|
||||
terminate-facet! ;; TODO: shouldn't be provided - inline syntax.rkt??
|
||||
schedule-script! ;; TODO: shouldn't be provided - inline syntax.rkt??
|
||||
|
@ -41,6 +42,9 @@
|
|||
|
||||
spawn! ;; TODO: should this be provided?
|
||||
enqueue-send! ;; TODO: should this be provided?
|
||||
adhoc-retract! ;; TODO: should this be provided?
|
||||
adhoc-assert! ;; TODO: should this be provided?
|
||||
actor-adhoc-assertions ;; TODO: should this be provided?
|
||||
)
|
||||
|
||||
(require syndicate/functional-queue)
|
||||
|
@ -76,6 +80,9 @@
|
|||
(struct dataspace ([next-id #:mutable] ;; Nat
|
||||
routing-table ;; Skeleton
|
||||
actors ;; (MutableHash ActorID Actor)
|
||||
;; v TODO: Caches have to be bags, not sets; once
|
||||
;; this change is made, can I avoid keeping a bag
|
||||
;; of assertions in the dataspace as a whole?
|
||||
assertions ;; (Bagof Assertion)
|
||||
dataflow ;; DataflowGraph
|
||||
[runnable #:mutable] ;; (Listof Actor)
|
||||
|
@ -88,6 +95,8 @@
|
|||
[runnable? #:mutable] ;; Boolean
|
||||
pending-scripts ;; (MutableVectorof (Queueof (-> Any)))
|
||||
[pending-actions #:mutable] ;; (Queueof Action)
|
||||
;; TODO: consider using a bag, rather than set, of ad-hoc assertions.
|
||||
[adhoc-assertions #:mutable] ;; (Setof Assertion)
|
||||
)
|
||||
#:methods gen:custom-write
|
||||
[(define (write-proc a p mode)
|
||||
|
@ -212,7 +221,8 @@
|
|||
#f
|
||||
#f
|
||||
(make-vector priority-count (make-queue))
|
||||
(make-queue)))
|
||||
(make-queue)
|
||||
(set)))
|
||||
(hash-set! (dataspace-actors ds) the-actor-id the-actor)
|
||||
(for [(a initial-assertions)]
|
||||
(match (bag-change! (dataspace-assertions ds) a 1)
|
||||
|
@ -312,7 +322,7 @@
|
|||
;; (log-info "performing ~a" action)
|
||||
(match action
|
||||
[(patch delta)
|
||||
(for [((a count) (in-bag delta))]
|
||||
(for [((a count) (in-bag/count delta))]
|
||||
(match (bag-change! (dataspace-assertions ds) a count)
|
||||
['present->absent (remove-assertion! (dataspace-routing-table ds) a)]
|
||||
['absent->present (add-assertion! (dataspace-routing-table ds) a)]
|
||||
|
@ -388,6 +398,8 @@
|
|||
;; Abruptly terminates an entire actor, without running stop-scripts etc.
|
||||
(define (terminate-actor! ds the-actor)
|
||||
(hash-remove! (dataspace-actors ds) (actor-id the-actor))
|
||||
(push-script! ds the-actor
|
||||
(lambda () (for [(a (actor-adhoc-assertions the-actor))] (retract! the-actor a))))
|
||||
(let ((f (actor-root-facet the-actor)))
|
||||
(when f
|
||||
(let abort-facet! ((f f))
|
||||
|
@ -468,6 +480,16 @@
|
|||
(when (not (void? assertion))
|
||||
(bag-change! (ensure-patch-action! ac) assertion +1)))
|
||||
|
||||
(define (adhoc-retract! ac assertion)
|
||||
(when (not (void? assertion))
|
||||
(set-actor-adhoc-assertions! ac (set-remove (actor-adhoc-assertions ac) assertion))
|
||||
(retract! ac assertion)))
|
||||
|
||||
(define (adhoc-assert! ac assertion)
|
||||
(when (not (void? assertion))
|
||||
(set-actor-adhoc-assertions! ac (set-add (actor-adhoc-assertions ac) assertion))
|
||||
(assert! ac assertion)))
|
||||
|
||||
(define (dataspace-unsubscribe! ds h)
|
||||
(remove-interest! (dataspace-routing-table ds) h))
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
(struct-out capture)
|
||||
|
||||
(for-syntax analyse-pattern
|
||||
instantiate-pattern
|
||||
desc->key
|
||||
desc->skeleton-proj
|
||||
desc->skeleton-stx
|
||||
|
@ -76,7 +77,25 @@
|
|||
(list 'discard)]
|
||||
[_
|
||||
(list 'atom stx)]))
|
||||
)
|
||||
|
||||
(define (instantiate-pattern stx)
|
||||
(syntax-case stx ($)
|
||||
[(ctor piece ...)
|
||||
(struct-info? (id-value #'ctor))
|
||||
(quasisyntax/loc stx (ctor #,@(stx-map instantiate-pattern #'(piece ...))))]
|
||||
[(list piece ...)
|
||||
(list-id? #'list)
|
||||
(quasisyntax/loc stx (list #,@(stx-map instantiate-pattern #'(piece ...))))]
|
||||
[id
|
||||
(dollar-id? #'id)
|
||||
(undollar #'id)]
|
||||
[($ id p)
|
||||
#'id]
|
||||
[id
|
||||
(discard-id? #'id)
|
||||
#'id]
|
||||
[other
|
||||
#'other])))
|
||||
|
||||
;;---------------------------------------------------------------------------
|
||||
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
(require racket/hash)
|
||||
(require racket/list)
|
||||
|
||||
(require "bag.rkt")
|
||||
|
||||
(module+ test (require rackunit))
|
||||
|
||||
;; A `Skeleton` is a structural guard on an assertion: essentially,
|
||||
|
@ -50,12 +52,12 @@
|
|||
;; (MutableSet Assertion)
|
||||
;; (MutableHash SkProj SkAcc))
|
||||
;; SkAcc = (skeleton-accumulator
|
||||
;; (Set SkKey)
|
||||
;; (MutableBag SkKey)
|
||||
;; (MutableSeteq (... -> Any)))
|
||||
;;
|
||||
(struct skeleton-continuation (cache table) #:transparent)
|
||||
(struct skeleton-matched-constant (cache table) #:transparent)
|
||||
(struct skeleton-accumulator ([cache #:mutable] handlers) #:transparent)
|
||||
(struct skeleton-accumulator (cache handlers) #:transparent)
|
||||
;;
|
||||
;; A `SkProj` is a *skeleton projection*, a specification of loci
|
||||
;; within a tree-shaped assertion to collect into a flat list.
|
||||
|
@ -111,12 +113,14 @@
|
|||
(define cvt (hash-ref! (skeleton-continuation-table c) cs make-hash))
|
||||
(define sc (hash-ref! cvt cv make-matched-constant))
|
||||
(define (make-accumulator)
|
||||
(skeleton-accumulator (for/set [(a (skeleton-matched-constant-cache sc))]
|
||||
(apply-projection a vs))
|
||||
(mutable-seteq)))
|
||||
(define cache (make-bag))
|
||||
(for [(a (skeleton-matched-constant-cache sc))]
|
||||
(define vars (apply-projection a vs))
|
||||
(bag-change! cache vars 1))
|
||||
(skeleton-accumulator cache (mutable-seteq)))
|
||||
(define acc (hash-ref! (skeleton-matched-constant-table sc) vs make-accumulator))
|
||||
(set-add! (skeleton-accumulator-handlers acc) h)
|
||||
(for [(vars (skeleton-accumulator-cache acc))] (apply-handler! h vars)))
|
||||
(for [(vars (in-bag (skeleton-accumulator-cache acc)))] (apply-handler! h vars)))
|
||||
|
||||
(define (skcont-remove! c i apply-handler!)
|
||||
(match-define (skeleton-interest _desc cs cv vs h) i)
|
||||
|
@ -126,7 +130,7 @@
|
|||
(when sc
|
||||
(define acc (hash-ref (skeleton-matched-constant-table sc) vs #f))
|
||||
(when acc
|
||||
(for [(vars (skeleton-accumulator-cache acc))] (apply-handler! h vars))
|
||||
(for [(vars (in-bag (skeleton-accumulator-cache acc)))] (apply-handler! h vars))
|
||||
(set-remove! (skeleton-accumulator-handlers acc) h)
|
||||
(when (set-empty? (skeleton-accumulator-handlers acc))
|
||||
(hash-remove! (skeleton-matched-constant-table sc) vs)))
|
||||
|
@ -229,9 +233,13 @@
|
|||
(define (add-term-to-skconst! skconst term)
|
||||
(set-add! (skeleton-matched-constant-cache skconst) term))
|
||||
(define (add-term-to-skacc! skacc vars _term)
|
||||
(set-skeleton-accumulator-cache! skacc (set-add (skeleton-accumulator-cache skacc) vars))
|
||||
(for [(handler (skeleton-accumulator-handlers skacc))]
|
||||
(apply handler '+ vars)))
|
||||
(match (bag-change! (skeleton-accumulator-cache skacc) vars 1)
|
||||
['absent->present
|
||||
(for [(handler (skeleton-accumulator-handlers skacc))]
|
||||
(apply handler '+ vars))]
|
||||
;; 'present->absent and 'absent->absent absurd
|
||||
['present->present
|
||||
(void)]))
|
||||
|
||||
(define (add-assertion! sk term)
|
||||
(skeleton-modify! sk
|
||||
|
@ -245,9 +253,16 @@
|
|||
(define (remove-term-from-skconst! skconst term)
|
||||
(set-remove! (skeleton-matched-constant-cache skconst) term))
|
||||
(define (remove-term-from-skacc! skacc vars _term)
|
||||
(set-skeleton-accumulator-cache! skacc (set-remove (skeleton-accumulator-cache skacc) vars))
|
||||
(for [(handler (skeleton-accumulator-handlers skacc))]
|
||||
(apply handler '- vars)))
|
||||
(define cache (skeleton-accumulator-cache skacc))
|
||||
(if (bag-member? cache vars)
|
||||
(match (bag-change! cache vars -1)
|
||||
['present->absent
|
||||
(for [(handler (skeleton-accumulator-handlers skacc))]
|
||||
(apply handler '- vars))]
|
||||
;; 'absent->absent and 'absent->present absurd
|
||||
['present->present
|
||||
(void)])
|
||||
(log-warning "Removing assertion not previously added: ~v" _term)))
|
||||
|
||||
(define (remove-assertion! sk term)
|
||||
(skeleton-modify! sk
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
(begin-for-syntax
|
||||
(define-splicing-syntax-class assertions
|
||||
(pattern (~seq #:assertions [exprs ...]))
|
||||
(pattern (~seq) #:attr (exprs 1) #'()))
|
||||
(pattern (~seq) #:attr (exprs 1) '()))
|
||||
|
||||
(define-splicing-syntax-class name
|
||||
(pattern (~seq #:name N))
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
on-start
|
||||
on-stop
|
||||
on
|
||||
;; during
|
||||
during
|
||||
;; during/spawn
|
||||
begin/dataflow
|
||||
define/dataflow
|
||||
|
@ -47,6 +47,9 @@
|
|||
|
||||
send!
|
||||
flush!
|
||||
assert!
|
||||
retract!
|
||||
current-adhoc-assertions
|
||||
|
||||
;;
|
||||
|
||||
|
@ -357,17 +360,14 @@
|
|||
(lambda ()
|
||||
#,script-stx))))))))))
|
||||
|
||||
;; (define-syntax (during stx)
|
||||
;; (syntax-parse stx
|
||||
;; [(_ P O ...)
|
||||
;; (define E-stx (syntax/loc #'P (asserted P)))
|
||||
;; (define-values (_proj _pat _bindings instantiated)
|
||||
;; (analyze-pattern E-stx #'P))
|
||||
;; (quasisyntax/loc stx
|
||||
;; (on #,E-stx
|
||||
;; (let ((p #,instantiated))
|
||||
;; (react (stop-when (retracted p))
|
||||
;; O ...))))]))
|
||||
(define-syntax (during stx)
|
||||
(syntax-parse stx
|
||||
[(_ P O ...)
|
||||
(define Q-stx (instantiate-pattern #'P))
|
||||
(quasisyntax/loc stx
|
||||
(on (asserted P)
|
||||
(react (stop-when (retracted #,Q-stx))
|
||||
O ...)))]))
|
||||
|
||||
;; (define-syntax (during/spawn stx)
|
||||
;; (syntax-parse stx
|
||||
|
@ -576,6 +576,17 @@
|
|||
(until (message ack)
|
||||
(on-start (send! ack))))
|
||||
|
||||
(define (assert! a)
|
||||
(ensure-in-script! 'assert!)
|
||||
(adhoc-assert! (current-actor) a))
|
||||
|
||||
(define (retract! a)
|
||||
(ensure-in-script! 'retract!)
|
||||
(adhoc-retract! (current-actor) a))
|
||||
|
||||
(define (current-adhoc-assertions)
|
||||
(actor-adhoc-assertions (current-actor)))
|
||||
|
||||
;;---------------------------------------------------------------------------
|
||||
|
||||
(module+ test
|
||||
|
@ -589,6 +600,71 @@
|
|||
(schedule-script!
|
||||
(current-dataspace)
|
||||
(current-actor)
|
||||
|
||||
#;(lambda ()
|
||||
(struct foo (x y) #:prefab)
|
||||
|
||||
(spawn (field [x 123])
|
||||
(assert (foo (x) 999))
|
||||
(during (foo (x) $v)
|
||||
(log-info "x=~a v=~a" (x) v)
|
||||
(when (= (x) 123) (x 124))
|
||||
(on-stop
|
||||
(log-info "finally for x=~a v=~a" (x) v))))
|
||||
)
|
||||
|
||||
(lambda ()
|
||||
;; .../racket/syndicate/examples/actor/example-partial-retraction.rkt
|
||||
;;
|
||||
(struct ready (what) #:prefab)
|
||||
(struct entry (key val) #:prefab)
|
||||
|
||||
(spawn (assert (ready 'listener))
|
||||
(on (asserted (entry $key _))
|
||||
(log-info "key ~v asserted" key)
|
||||
(until (retracted (entry key _))
|
||||
(on (asserted (entry key $value))
|
||||
(log-info "add binding: ~v -> ~v" key value))
|
||||
(on (retracted (entry key $value))
|
||||
(log-info "del binding: ~v -> ~v" key value)))
|
||||
(log-info "key ~v retracted" key)))
|
||||
|
||||
(spawn (assert (ready 'other-listener))
|
||||
(during (entry $key _)
|
||||
(log-info "(other-listener) key ~v asserted" key)
|
||||
(on-stop (log-info "(other-listener) key ~v retracted" key))
|
||||
(during (entry key $value)
|
||||
(log-info "(other-listener) ~v ---> ~v" key value)
|
||||
(on-stop (log-info "(other-listener) ~v -/-> ~v" key value)))))
|
||||
|
||||
(define (pause)
|
||||
(log-info "pause")
|
||||
(define token (gensym 'pause)) ;; FIXME:: If we use the same token every time, need epochs!
|
||||
(until (asserted (ready token))
|
||||
(assert (ready token))))
|
||||
|
||||
(spawn* (until (asserted (ready 'listener)))
|
||||
(until (asserted (ready 'other-listener)))
|
||||
(assert! (entry 'a 1))
|
||||
(assert! (entry 'a 2))
|
||||
(assert! (entry 'b 3))
|
||||
(assert! (entry 'c 33))
|
||||
(assert! (entry 'a 4))
|
||||
(assert! (entry 'a 5))
|
||||
(pause)
|
||||
(retract! (entry 'a 2))
|
||||
(retract! (entry 'c 33))
|
||||
(assert! (entry 'a 9))
|
||||
(pause)
|
||||
(for [(a (current-adhoc-assertions))]
|
||||
(local-require racket/match)
|
||||
(match a
|
||||
[(entry 'a _) (retract! a)]
|
||||
[_ (void)]))
|
||||
;; ^ (retract! (entry 'a ?))
|
||||
(pause))
|
||||
)
|
||||
|
||||
#;(lambda ()
|
||||
(spawn (on (message $v)
|
||||
(if (= v 10000000)
|
||||
|
@ -597,7 +673,7 @@
|
|||
(on-start (send! 0)))
|
||||
)
|
||||
|
||||
(lambda ()
|
||||
#;(lambda ()
|
||||
|
||||
(message-struct stage (n))
|
||||
|
||||
|
|
Loading…
Reference in New Issue