Better Isolated Turn Principle

This commit is contained in:
Tony Garnock-Jones 2018-04-11 12:28:09 +01:00
parent 533d6f9a24
commit 5377a486c9
2 changed files with 170 additions and 101 deletions

View File

@ -39,8 +39,8 @@
push-script! ;; TODO: shouldn't be provided - inline syntax.rkt??
ensure-in-script! ;; TODO: shouldn't be provided - inline syntax.rkt??
dataspace-spawn! ;; TODO: should this be provided?
dataspace-send! ;; TODO: should this be provided?
spawn! ;; TODO: should this be provided?
enqueue-send! ;; TODO: should this be provided?
)
(require syndicate/functional-queue)
@ -78,13 +78,16 @@
actors ;; (MutableHash ActorID Actor)
assertions ;; (Bagof Assertion)
dataflow ;; DataflowGraph
pending-scripts ;; (MutableVectorof (Queueof (-> Any)))
[runnable #:mutable] ;; (Listof Actor)
[pending-actions #:mutable] ;; (Queueof Action)
))
) #:transparent)
(struct actor (id ;; ActorID
name ;; Any
[root-facet #:mutable] ;; (Option Facet)
[runnable? #:mutable] ;; Boolean
pending-scripts ;; (MutableVectorof (Queueof (-> Any)))
[pending-actions #:mutable] ;; (Queueof Action)
)
#:methods gen:custom-write
[(define (write-proc a p mode)
@ -189,15 +192,13 @@
;;---------------------------------------------------------------------------
(define (make-dataspace name boot-proc)
(define ds (dataspace 0
(make-empty-skeleton)
(make-hash)
(make-bag)
(make-dataflow-graph)
(make-vector priority-count (make-queue))
(make-queue)))
(dataspace-spawn! ds name boot-proc (set))
ds)
(dataspace 0
(make-empty-skeleton)
(make-hash)
(make-bag)
(make-dataflow-graph)
'()
(enqueue (make-queue) (spawn name boot-proc (set)))))
(define (generate-id! ds)
(let ((id (dataspace-next-id ds)))
@ -206,7 +207,12 @@
(define (add-actor! ds name boot-proc initial-assertions)
(define the-actor-id (generate-id! ds))
(define the-actor (actor the-actor-id name #f))
(define the-actor (actor the-actor-id
name
#f
#f
(make-vector priority-count (make-queue))
(make-queue)))
(hash-set! (dataspace-actors ds) the-actor-id the-actor)
(for [(a initial-assertions)]
(match (bag-change! (dataspace-assertions ds) a 1)
@ -219,7 +225,7 @@
#f
(lambda ()
(boot-proc)
(for [(a initial-assertions)] (dataspace-retract! ds a)))))
(for [(a initial-assertions)] (retract! the-actor a)))))
(define-syntax-rule (with-current-facet [ds0 a0 f0 script?] body ...)
(let ((ds ds0)
@ -231,9 +237,8 @@
(in-script? script?))
(with-handlers ([(lambda (e) (not (exn:break? e)))
(lambda (e)
(log-error "Actor ~a died with exception:\n~a"
(current-actor)
(exn->string e))
(log-error "Actor ~a died with exception:\n~a" a (exn->string e))
(abandon-queued-work! a)
(terminate-actor! ds a))]) ;; TODO: tracing
(call-with-syndicate-prompt
(lambda ()
@ -248,8 +253,8 @@
(with-current-facet [ds a f #t]
(apply proc args)))))
(define (pop-next-script! ds)
(define priority-levels (dataspace-pending-scripts ds))
(define (pop-next-script! ac)
(define priority-levels (actor-pending-scripts ac))
(let loop ((level 0))
(and (< level (vector-length priority-levels))
(let ((q (vector-ref priority-levels level)))
@ -259,32 +264,52 @@
(vector-set! priority-levels level q)
script))))))
(define (run-actor-pending-scripts! ds ac)
(let loop ()
(let ((script (pop-next-script! ac)))
(and script
(begin (script)
(refresh-facet-assertions! ds)
(loop))))))
(define (refresh-facet-assertions! ds)
(dataflow-repair-damage! (dataspace-dataflow ds)
(lambda (subject-id)
(match-define (list f eid) subject-id)
(when (facet-live? f) ;; TODO: necessary test, or tautological?
(define ac (facet-actor f))
(with-current-facet [ds ac f #f]
(define ep (hash-ref (facet-endpoints f) eid))
(define old-assertion (endpoint-assertion ep))
(define new-assertion ((endpoint-assertion-fn ep)))
(when (not (equal? old-assertion new-assertion))
(set-endpoint-assertion! ep new-assertion)
(retract! ac old-assertion)
(assert! ac new-assertion)
(define h (endpoint-handler ep))
(when h
(dataspace-unsubscribe! ds h)
(dataspace-subscribe! ds h))))))))
(define (commit-actions! ds ac)
(define pending (actor-pending-actions ac))
;; (log-info "commit-actions!: ~a actions ~a" ac (queue->list pending))
(set-dataspace-pending-actions! ds (queue-append (dataspace-pending-actions ds) pending))
(set-actor-pending-actions! ac (make-queue)))
(define (run-all-pending-scripts! ds)
(define script (pop-next-script! ds))
(and script
(begin (script)
(dataflow-repair-damage! (dataspace-dataflow ds)
(lambda (subject-id)
(match-define (list f eid) subject-id)
(when (facet-live? f) ;; TODO: necessary test, or tautological?
(with-current-facet [ds (facet-actor f) f #f]
(define ep (hash-ref (facet-endpoints f) eid))
(define old-assertion (endpoint-assertion ep))
(define new-assertion ((endpoint-assertion-fn ep)))
(when (not (equal? old-assertion new-assertion))
(set-endpoint-assertion! ep new-assertion)
(dataspace-retract! ds old-assertion)
(dataspace-assert! ds new-assertion)
(define h (endpoint-handler ep))
(when h
(dataspace-unsubscribe! ds h)
(dataspace-subscribe! ds h)))))))
(run-all-pending-scripts! ds))))
(define runnable (dataspace-runnable ds))
(set-dataspace-runnable! ds '())
(for [(ac (in-list runnable))]
(run-actor-pending-scripts! ds ac)
(set-actor-runnable?! ac #f)
(commit-actions! ds ac)))
(define (perform-pending-actions! ds)
(define actions (queue->list (dataspace-pending-actions ds)))
(set-dataspace-pending-actions! ds (make-queue))
(for [(action actions)]
;; (log-info "performing ~a" action)
(match action
[(patch delta)
(for [((a count) (in-bag delta))]
@ -296,16 +321,15 @@
[(message body)
(send-assertion! (dataspace-routing-table ds) body)]
[(spawn name boot-proc initial-assertions)
(add-actor! ds name boot-proc initial-assertions)]))
(not (null? actions)))
(add-actor! ds name boot-proc initial-assertions)])))
(define (run-scripts! ds)
(define ran-a-script (run-all-pending-scripts! ds))
(define performed-an-action (perform-pending-actions! ds))
(run-all-pending-scripts! ds)
(perform-pending-actions! ds)
;; TODO: figure out when a dataspace should quit itself. Given the
;; mutable nature of the implementation, maybe never? It might be
;; being held elsewhere!
(or ran-a-script performed-an-action))
(not (null? (dataspace-runnable ds))))
(define (add-facet! ds where actor parent boot-proc)
(when (and (not (in-script?)) where)
@ -324,30 +348,42 @@
(set-actor-root-facet! actor f))
(with-current-facet [ds actor f #f]
(boot-proc))
(push-script! ds (lambda ()
(when (and (facet-live? f)
(or (and parent (not (facet-live? parent)))
(facet-inert? ds f)))
(terminate-facet! ds f)))))
(push-script! ds actor (lambda ()
(when (and (facet-live? f)
(or (and parent (not (facet-live? parent)))
(facet-inert? ds f)))
(terminate-facet! ds f)))))
(define (facet-inert? ds f)
(and (hash-empty? (facet-endpoints f))
(set-empty? (facet-children f))))
(define (schedule-script! #:priority [priority *normal-priority*] ds thunk)
(push-script! #:priority priority ds (capture-facet-context thunk)))
(define (schedule-script! #:priority [priority *normal-priority*] ds ac thunk)
(push-script! #:priority priority ds ac (capture-facet-context thunk)))
(define (push-script! #:priority [priority *normal-priority*] ds thunk-with-context)
(define v (dataspace-pending-scripts ds))
(define (push-script! #:priority [priority *normal-priority*] ds ac thunk-with-context)
(when (not (actor-runnable? ac))
(set-actor-runnable?! ac #t)
(set-dataspace-runnable! ds (cons ac (dataspace-runnable ds))))
(define v (actor-pending-scripts ac))
(vector-set! v priority (enqueue (vector-ref v priority) thunk-with-context)))
(define (retract-facet-assertions-and-subscriptions! ds f)
(push-script! ds (lambda ()
(for [((eid ep) (in-hash (facet-endpoints f)))]
(dataflow-forget-subject! (dataspace-dataflow ds) (list f eid))
(dataspace-retract! ds (endpoint-assertion ep))
(define h (endpoint-handler ep))
(when h (dataspace-unsubscribe! ds h))))))
(define ac (facet-actor f))
(push-script! ds
ac
(lambda ()
(for [((eid ep) (in-hash (facet-endpoints f)))]
(dataflow-forget-subject! (dataspace-dataflow ds) (list f eid))
(retract! ac (endpoint-assertion ep))
(define h (endpoint-handler ep))
(when h (dataspace-unsubscribe! ds h))))))
(define (abandon-queued-work! ac)
(set-actor-pending-actions! ac (make-queue))
(let ((scripts (actor-pending-scripts ac)))
(for [(i (in-range (vector-length scripts)))]
(vector-set! scripts i (make-queue)))))
;; Abruptly terminates an entire actor, without running stop-scripts etc.
(define (terminate-actor! ds the-actor)
@ -362,10 +398,11 @@
;; Cleanly terminates a facet and its children, running stop-scripts etc.
(define (terminate-facet! ds f)
(when (facet-live? f)
(define ac (facet-actor f))
(define parent (facet-parent f))
(if parent
(set-facet-children! parent (set-remove (facet-children parent) f))
(set-actor-root-facet! (facet-actor f) #f))
(set-actor-root-facet! ac #f))
(set-facet-live?! f #f)
@ -375,25 +412,25 @@
;; children's stop-scripts run before ours.
(for [(script (reverse (facet-stop-scripts f)))]
(schedule-script! ds
ac
(lambda ()
(with-current-facet [ds (facet-actor f) f #t]
(with-current-facet [ds ac f #t]
(script)))))
(retract-facet-assertions-and-subscriptions! ds f)
(push-script!
#:priority *gc-priority*
ds
(lambda ()
(if parent
(when (facet-inert? ds parent) (terminate-facet! ds parent))
(terminate-actor! ds (facet-actor f)))))))
(push-script! #:priority *gc-priority* ds ac
(lambda ()
(if parent
(when (facet-inert? ds parent) (terminate-facet! ds parent))
(terminate-actor! ds ac))))))
(define (stop-facet! ds f stop-script)
(with-current-facet [ds (facet-actor f) f #t] ;; run in parent context wrt terminating facet
(schedule-script! ds (lambda ()
(terminate-facet! ds f)
(schedule-script! ds stop-script)))))
(define ac (facet-actor f))
(with-current-facet [ds ac f #t] ;; run in parent context wrt terminating facet
(schedule-script! ds ac (lambda ()
(terminate-facet! ds f)
(schedule-script! ds ac stop-script)))))
(define (add-stop-script! ds script-proc)
(define f (current-facet))
@ -410,26 +447,26 @@
(parameterize ((current-dataflow-subject-id (list f eid)))
(call-with-syndicate-prompt assertion-fn)))
(define ep (endpoint eid assertion assertion-fn handler))
(dataspace-assert! ds assertion)
(assert! (facet-actor f) assertion)
(when handler (dataspace-subscribe! ds handler))
(hash-set! (facet-endpoints f) eid ep))
(define (enqueue-action! ds action)
(set-dataspace-pending-actions! ds (enqueue (dataspace-pending-actions ds) action)))
(define (enqueue-action! ac action)
(set-actor-pending-actions! ac (enqueue (actor-pending-actions ac) action)))
(define (ensure-patch-action! ds)
(let ((q (dataspace-pending-actions ds)))
(define (ensure-patch-action! ac)
(let ((q (actor-pending-actions ac)))
(when (or (queue-empty? q) (not (patch? (queue-last q))))
(enqueue-action! ds (patch (make-bag)))))
(patch-changes (queue-last (dataspace-pending-actions ds))))
(enqueue-action! ac (patch (make-bag)))))
(patch-changes (queue-last (actor-pending-actions ac))))
(define (dataspace-retract! ds assertion)
(define (retract! ac assertion)
(when (not (void? assertion))
(bag-change! (ensure-patch-action! ds) assertion -1)))
(bag-change! (ensure-patch-action! ac) assertion -1)))
(define (dataspace-assert! ds assertion)
(define (assert! ac assertion)
(when (not (void? assertion))
(bag-change! (ensure-patch-action! ds) assertion +1)))
(bag-change! (ensure-patch-action! ac) assertion +1)))
(define (dataspace-unsubscribe! ds h)
(remove-interest! (dataspace-routing-table ds) h))
@ -441,13 +478,13 @@
(when (not (in-script?))
(error who "Attempt to perform action outside script; are you missing an (on ...)?")))
(define (dataspace-send! ds body)
(ensure-in-script! 'dataspace-send!)
(enqueue-action! ds (message body)))
(define (enqueue-send! ac body)
(ensure-in-script! 'enqueue-send!)
(enqueue-action! ac (message body)))
(define (dataspace-spawn! ds name boot-proc initial-assertions)
(ensure-in-script! 'dataspace-spawn!)
(enqueue-action! ds (spawn name boot-proc initial-assertions)))
(define (spawn! ac name boot-proc initial-assertions)
(ensure-in-script! 'spawn!)
(enqueue-action! ac (spawn name boot-proc initial-assertions)))
;;---------------------------------------------------------------------------
;; Script suspend-and-resume.
@ -476,8 +513,8 @@
(define resume-parent
(lambda results
(push-script! (current-dataspace)
(lambda ()
(apply raw-resume-parent results)))))
(current-actor)
(lambda () (apply raw-resume-parent results)))))
(proc resume-parent))))
prompt-tag))
@ -493,9 +530,10 @@
(lambda ()
(schedule-script!
(current-dataspace)
(current-actor)
(lambda ()
(dataspace-spawn!
ds
(spawn!
(current-actor)
'box
(lambda ()
(define current-value (field-handle 'current-value
@ -528,12 +566,13 @@
(when (eq? '! op)
(schedule-script!
(current-dataspace)
(current-actor)
(lambda ()
(log-info "box: taking on new-value ~v" new-value)
(current-value new-value)))))))))
(set))
(dataspace-spawn!
ds
(spawn!
(current-actor)
'client
(lambda ()
(add-endpoint! (current-dataspace)
@ -563,10 +602,11 @@
(when (eq? '+ op)
(schedule-script!
(current-dataspace)
(current-actor)
(lambda ()
(log-info "client: learned that box's value is now ~v" v)
(dataspace-send! (current-dataspace)
(set-box (+ v 1)))))))))))
(enqueue-send! (current-actor)
(set-box (+ v 1)))))))))))
(set)))))))
(require racket/pretty)

View File

@ -116,8 +116,8 @@
(syntax-parse stx
[(_ name:name assertions:assertions script ...)
(quasisyntax/loc stx
(dataspace-spawn!
(current-dataspace)
(spawn!
(current-actor)
name.N
(lambda () (begin/void-default script ...))
(set assertions.exprs ...)))]))
@ -208,6 +208,7 @@
[(_ script ...)
(quasisyntax/loc stx
(schedule-script! (current-dataspace)
(current-actor)
(lambda () (begin/void-default script ...))))]))
(define-syntax (on-stop stx)
@ -247,6 +248,7 @@
(schedule-script!
#:priority prio.level
(current-dataspace)
(current-actor)
(lambda ()
(parameterize ((current-dataflow-subject-id subject-id))
expr ...)))
@ -320,6 +322,7 @@
(schedule-script!
#:priority #,priority-stx
(current-dataspace)
(current-actor)
#,(quasisyntax/loc script-stx
(lambda ()
#,script-stx)))))))))]
@ -349,6 +352,7 @@
(schedule-script!
#:priority #,priority-stx
(current-dataspace)
(current-actor)
#,(quasisyntax/loc script-stx
(lambda ()
#,script-stx))))))))))
@ -564,7 +568,7 @@
;; (on-start (flush!) (k (query-result) ...)))))]))
(define (send! m)
(dataspace-send! (current-dataspace) m))
(enqueue-send! (current-actor) m))
(define (flush!)
(ensure-in-script! 'flush!)
@ -584,6 +588,7 @@
(lambda ()
(schedule-script!
(current-dataspace)
(current-actor)
#;(lambda ()
(spawn (on (message $v)
(if (= v 10000000)
@ -591,7 +596,31 @@
(send! (+ v 1))))
(on-start (send! 0)))
)
(lambda ()
(message-struct stage (n))
(spawn #:name 'actor0
(on (message (stage 0))
(send! (stage 1)))
(on (message (stage 2))
(send! (stage 3))
(/ 1 0)
(send! (stage 3))))
(spawn #:name 'main
(on (message (stage $v))
(printf "Got message ~v\n" v))
(on-start
(until (asserted (observe (stage 0))))
(send! (stage 0))
(until (message (stage 1)))
(send! (stage 2))))
)
#;(lambda ()
(spawn (field [current-value 0])
(assert (box-state (current-value)))
(stop-when-true (= (current-value) 10)