More robust approach to cleanup of assertions on actor termination.
We now explicitly track *committed* assertions of each actor in a new
field, `actor-cleanup-changes`. Each time a patch action is
*performed*, `actor-cleanup-changes` is updated. When an actor quits,
it enqueues a special new kind of action, a `quit` action.
When a `quit` action is performed, any remaining contents of
`actor-cleanup-changes` are processed in order to fully remove any
leftover assertions. (Leftover assertions will only arise in
exceptional cases: when some stop-script or facet boot-script raises
an uncaught exception.)
As part of this commit, we undo the effect of commit 8624047
.
This commit is contained in:
parent
65a221ae68
commit
edb8e719f7
|
@ -19,8 +19,8 @@
|
||||||
;;
|
;;
|
||||||
;; A `(Bagof X)` is similar, but immutable.
|
;; A `(Bagof X)` is similar, but immutable.
|
||||||
;;
|
;;
|
||||||
;; A `(Deltaof X)` is a `(MutableHash X Int)`, just like a
|
;; `MutableDeltaof` and `Deltaof` are like `MutableBagof` and `Bagof`,
|
||||||
;; `(MutableBagof X)` except the replication counts can be negative.
|
;; respectively, except the replication counts can be negative.
|
||||||
|
|
||||||
(define make-bag make-hash)
|
(define make-bag make-hash)
|
||||||
(define bag hash)
|
(define bag hash)
|
||||||
|
@ -52,6 +52,6 @@
|
||||||
(define-syntax-rule (in-bag piece ...) (in-hash-keys piece ...))
|
(define-syntax-rule (in-bag piece ...) (in-hash-keys piece ...))
|
||||||
(define-syntax-rule (in-bag/count piece ...) (in-hash piece ...))
|
(define-syntax-rule (in-bag/count piece ...) (in-hash piece ...))
|
||||||
|
|
||||||
(define (set->bag s)
|
(define (set->bag s [count 1])
|
||||||
(for/hash [(e (in-set s))]
|
(for/hash [(e (in-set s))]
|
||||||
(values e 1)))
|
(values e count)))
|
||||||
|
|
|
@ -71,11 +71,15 @@
|
||||||
|
|
||||||
;; A `Dataspace` is a ... TODO
|
;; A `Dataspace` is a ... TODO
|
||||||
|
|
||||||
;; An `Action` is either `(patch (Deltaof Assertion))` or `(message
|
;; An `Action` is one of
|
||||||
;; Assertion)` or `(spawn Any BootProc (Set Assertion))`.
|
;; - `(patch (MutableDeltaof Assertion))`
|
||||||
|
;; - `(message Assertion)`
|
||||||
|
;; - `(spawn Any BootProc (Set Assertion))`
|
||||||
|
;; - `(quit)`.
|
||||||
(struct patch (changes) #:prefab)
|
(struct patch (changes) #:prefab)
|
||||||
(struct message (body) #:prefab)
|
(struct message (body) #:prefab)
|
||||||
(struct spawn (name boot-proc initial-assertions) #:prefab)
|
(struct spawn (name boot-proc initial-assertions) #:prefab)
|
||||||
|
(struct quit () #:prefab)
|
||||||
|
|
||||||
(struct dataspace ([next-id #:mutable] ;; Nat
|
(struct dataspace ([next-id #:mutable] ;; Nat
|
||||||
routing-table ;; Skeleton
|
routing-table ;; Skeleton
|
||||||
|
@ -95,6 +99,7 @@
|
||||||
pending-scripts ;; (MutableVectorof (Queueof (-> Any)))
|
pending-scripts ;; (MutableVectorof (Queueof (-> Any)))
|
||||||
[pending-actions #:mutable] ;; (Queueof Action)
|
[pending-actions #:mutable] ;; (Queueof Action)
|
||||||
[adhoc-assertions #:mutable] ;; (Bagof Assertion)
|
[adhoc-assertions #:mutable] ;; (Bagof Assertion)
|
||||||
|
[cleanup-changes #:mutable] ;; (Deltaof Assertion)
|
||||||
)
|
)
|
||||||
#:methods gen:custom-write
|
#:methods gen:custom-write
|
||||||
[(define (write-proc a p mode)
|
[(define (write-proc a p mode)
|
||||||
|
@ -230,7 +235,8 @@
|
||||||
#f
|
#f
|
||||||
(make-vector priority-count (make-queue))
|
(make-vector priority-count (make-queue))
|
||||||
(make-queue)
|
(make-queue)
|
||||||
(set->bag filtered-initial-assertions)))
|
(set->bag filtered-initial-assertions +1)
|
||||||
|
(set->bag filtered-initial-assertions -1)))
|
||||||
(for [(a filtered-initial-assertions)]
|
(for [(a filtered-initial-assertions)]
|
||||||
(match (bag-change! (dataspace-assertions ds) a +1)
|
(match (bag-change! (dataspace-assertions ds) a +1)
|
||||||
['absent->present (add-assertion! (dataspace-routing-table ds) a)]
|
['absent->present (add-assertion! (dataspace-routing-table ds) a)]
|
||||||
|
@ -333,16 +339,27 @@
|
||||||
;; (log-info "~a performing ~a" ac action)
|
;; (log-info "~a performing ~a" ac action)
|
||||||
(match action
|
(match action
|
||||||
[(patch delta)
|
[(patch delta)
|
||||||
(for [((a count) (in-bag/count delta))]
|
(apply-patch! ds ac delta)]
|
||||||
(match (bag-change! (dataspace-assertions ds) a count)
|
|
||||||
['present->absent (remove-assertion! (dataspace-routing-table ds) a)]
|
|
||||||
['absent->present (add-assertion! (dataspace-routing-table ds) a)]
|
|
||||||
;; 'absent->absent absurd
|
|
||||||
['present->present (void)]))] ;; i.e. no visible change
|
|
||||||
[(message body)
|
[(message body)
|
||||||
(send-assertion! (dataspace-routing-table ds) body)]
|
(send-assertion! (dataspace-routing-table ds) body)]
|
||||||
[(spawn name boot-proc initial-assertions)
|
[(spawn name boot-proc initial-assertions)
|
||||||
(add-actor! ds name boot-proc initial-assertions)]))))
|
(add-actor! ds name boot-proc initial-assertions)]
|
||||||
|
[(quit)
|
||||||
|
(apply-patch! ds ac (actor-cleanup-changes ac))]))))
|
||||||
|
|
||||||
|
(define (apply-patch! ds ac delta)
|
||||||
|
(define ds-assertions (dataspace-assertions ds))
|
||||||
|
(define rt (dataspace-routing-table ds))
|
||||||
|
(define new-cleanup-changes
|
||||||
|
(for/fold [(cleanup-changes (actor-cleanup-changes ac))] [((a count) (in-bag/count delta))]
|
||||||
|
(match (bag-change! ds-assertions a count)
|
||||||
|
['present->absent (remove-assertion! rt a)]
|
||||||
|
['absent->present (add-assertion! rt a)]
|
||||||
|
;; 'absent->absent absurd
|
||||||
|
['present->present (void)]) ;; i.e. no visible change
|
||||||
|
(define-values (updated-bag _summary) (bag-change cleanup-changes a (- count)))
|
||||||
|
updated-bag))
|
||||||
|
(set-actor-cleanup-changes! ac new-cleanup-changes))
|
||||||
|
|
||||||
(define (run-scripts! ds)
|
(define (run-scripts! ds)
|
||||||
(run-all-pending-scripts! ds)
|
(run-all-pending-scripts! ds)
|
||||||
|
@ -421,13 +438,17 @@
|
||||||
(let abort-facet! ((f f))
|
(let abort-facet! ((f f))
|
||||||
(set-facet-live?! f #f)
|
(set-facet-live?! f #f)
|
||||||
(for [(child (in-set (facet-children f)))] (abort-facet! child))
|
(for [(child (in-set (facet-children f)))] (abort-facet! child))
|
||||||
(retract-facet-assertions-and-subscriptions! ds f)))))
|
(retract-facet-assertions-and-subscriptions! ds f))))
|
||||||
|
(push-script! ds the-actor (lambda () (enqueue-action! the-actor (quit)))))
|
||||||
|
|
||||||
;; Cleanly terminates a facet and its children, running stop-scripts etc.
|
;; Cleanly terminates a facet and its children, running stop-scripts etc.
|
||||||
(define (terminate-facet! ds f)
|
(define (terminate-facet! ds f)
|
||||||
(when (facet-live? f)
|
(when (facet-live? f)
|
||||||
(define ac (facet-actor f))
|
(define ac (facet-actor f))
|
||||||
(define parent (facet-parent f))
|
(define parent (facet-parent f))
|
||||||
|
(if parent
|
||||||
|
(set-facet-children! parent (set-remove (facet-children parent) f))
|
||||||
|
(set-actor-root-facet! ac #f))
|
||||||
|
|
||||||
(set-facet-live?! f #f)
|
(set-facet-live?! f #f)
|
||||||
|
|
||||||
|
@ -444,13 +465,9 @@
|
||||||
|
|
||||||
(push-script! #:priority *gc-priority* ds ac
|
(push-script! #:priority *gc-priority* ds ac
|
||||||
(lambda ()
|
(lambda ()
|
||||||
(cond
|
(if parent
|
||||||
[parent
|
(when (facet-inert? ds parent) (terminate-facet! ds parent))
|
||||||
(set-facet-children! parent (set-remove (facet-children parent) f))
|
(terminate-actor! ds ac))))))
|
||||||
(when (facet-inert? ds parent) (terminate-facet! ds parent))]
|
|
||||||
[else
|
|
||||||
(set-actor-root-facet! ac #f)
|
|
||||||
(terminate-actor! ds ac)])))))
|
|
||||||
|
|
||||||
(define (stop-facet! ds f stop-script)
|
(define (stop-facet! ds f stop-script)
|
||||||
(define ac (facet-actor f))
|
(define ac (facet-actor f))
|
||||||
|
|
Loading…
Reference in New Issue