diff --git a/syndicate/dataspace.rkt b/syndicate/dataspace.rkt index 61aae59..294465e 100644 --- a/syndicate/dataspace.rkt +++ b/syndicate/dataspace.rkt @@ -39,8 +39,8 @@ push-script! ;; TODO: shouldn't be provided - inline syntax.rkt?? ensure-in-script! ;; TODO: shouldn't be provided - inline syntax.rkt?? - dataspace-spawn! ;; TODO: should this be provided? - dataspace-send! ;; TODO: should this be provided? + spawn! ;; TODO: should this be provided? + enqueue-send! ;; TODO: should this be provided? ) (require syndicate/functional-queue) @@ -78,13 +78,16 @@ actors ;; (MutableHash ActorID Actor) assertions ;; (Bagof Assertion) dataflow ;; DataflowGraph - pending-scripts ;; (MutableVectorof (Queueof (-> Any))) + [runnable #:mutable] ;; (Listof Actor) [pending-actions #:mutable] ;; (Queueof Action) - )) + ) #:transparent) (struct actor (id ;; ActorID name ;; Any [root-facet #:mutable] ;; (Option Facet) + [runnable? #:mutable] ;; Boolean + pending-scripts ;; (MutableVectorof (Queueof (-> Any))) + [pending-actions #:mutable] ;; (Queueof Action) ) #:methods gen:custom-write [(define (write-proc a p mode) @@ -189,15 +192,13 @@ ;;--------------------------------------------------------------------------- (define (make-dataspace name boot-proc) - (define ds (dataspace 0 - (make-empty-skeleton) - (make-hash) - (make-bag) - (make-dataflow-graph) - (make-vector priority-count (make-queue)) - (make-queue))) - (dataspace-spawn! ds name boot-proc (set)) - ds) + (dataspace 0 + (make-empty-skeleton) + (make-hash) + (make-bag) + (make-dataflow-graph) + '() + (enqueue (make-queue) (spawn name boot-proc (set))))) (define (generate-id! ds) (let ((id (dataspace-next-id ds))) @@ -206,7 +207,12 @@ (define (add-actor! ds name boot-proc initial-assertions) (define the-actor-id (generate-id! ds)) - (define the-actor (actor the-actor-id name #f)) + (define the-actor (actor the-actor-id + name + #f + #f + (make-vector priority-count (make-queue)) + (make-queue))) (hash-set! (dataspace-actors ds) the-actor-id the-actor) (for [(a initial-assertions)] (match (bag-change! (dataspace-assertions ds) a 1) @@ -219,7 +225,7 @@ #f (lambda () (boot-proc) - (for [(a initial-assertions)] (dataspace-retract! ds a))))) + (for [(a initial-assertions)] (retract! the-actor a))))) (define-syntax-rule (with-current-facet [ds0 a0 f0 script?] body ...) (let ((ds ds0) @@ -231,9 +237,8 @@ (in-script? script?)) (with-handlers ([(lambda (e) (not (exn:break? e))) (lambda (e) - (log-error "Actor ~a died with exception:\n~a" - (current-actor) - (exn->string e)) + (log-error "Actor ~a died with exception:\n~a" a (exn->string e)) + (abandon-queued-work! a) (terminate-actor! ds a))]) ;; TODO: tracing (call-with-syndicate-prompt (lambda () @@ -248,8 +253,8 @@ (with-current-facet [ds a f #t] (apply proc args))))) -(define (pop-next-script! ds) - (define priority-levels (dataspace-pending-scripts ds)) +(define (pop-next-script! ac) + (define priority-levels (actor-pending-scripts ac)) (let loop ((level 0)) (and (< level (vector-length priority-levels)) (let ((q (vector-ref priority-levels level))) @@ -259,32 +264,52 @@ (vector-set! priority-levels level q) script)))))) +(define (run-actor-pending-scripts! ds ac) + (let loop () + (let ((script (pop-next-script! ac))) + (and script + (begin (script) + (refresh-facet-assertions! ds) + (loop)))))) + +(define (refresh-facet-assertions! ds) + (dataflow-repair-damage! (dataspace-dataflow ds) + (lambda (subject-id) + (match-define (list f eid) subject-id) + (when (facet-live? f) ;; TODO: necessary test, or tautological? + (define ac (facet-actor f)) + (with-current-facet [ds ac f #f] + (define ep (hash-ref (facet-endpoints f) eid)) + (define old-assertion (endpoint-assertion ep)) + (define new-assertion ((endpoint-assertion-fn ep))) + (when (not (equal? old-assertion new-assertion)) + (set-endpoint-assertion! ep new-assertion) + (retract! ac old-assertion) + (assert! ac new-assertion) + (define h (endpoint-handler ep)) + (when h + (dataspace-unsubscribe! ds h) + (dataspace-subscribe! ds h)))))))) + +(define (commit-actions! ds ac) + (define pending (actor-pending-actions ac)) + ;; (log-info "commit-actions!: ~a actions ~a" ac (queue->list pending)) + (set-dataspace-pending-actions! ds (queue-append (dataspace-pending-actions ds) pending)) + (set-actor-pending-actions! ac (make-queue))) + (define (run-all-pending-scripts! ds) - (define script (pop-next-script! ds)) - (and script - (begin (script) - (dataflow-repair-damage! (dataspace-dataflow ds) - (lambda (subject-id) - (match-define (list f eid) subject-id) - (when (facet-live? f) ;; TODO: necessary test, or tautological? - (with-current-facet [ds (facet-actor f) f #f] - (define ep (hash-ref (facet-endpoints f) eid)) - (define old-assertion (endpoint-assertion ep)) - (define new-assertion ((endpoint-assertion-fn ep))) - (when (not (equal? old-assertion new-assertion)) - (set-endpoint-assertion! ep new-assertion) - (dataspace-retract! ds old-assertion) - (dataspace-assert! ds new-assertion) - (define h (endpoint-handler ep)) - (when h - (dataspace-unsubscribe! ds h) - (dataspace-subscribe! ds h))))))) - (run-all-pending-scripts! ds)))) + (define runnable (dataspace-runnable ds)) + (set-dataspace-runnable! ds '()) + (for [(ac (in-list runnable))] + (run-actor-pending-scripts! ds ac) + (set-actor-runnable?! ac #f) + (commit-actions! ds ac))) (define (perform-pending-actions! ds) (define actions (queue->list (dataspace-pending-actions ds))) (set-dataspace-pending-actions! ds (make-queue)) (for [(action actions)] + ;; (log-info "performing ~a" action) (match action [(patch delta) (for [((a count) (in-bag delta))] @@ -296,16 +321,15 @@ [(message body) (send-assertion! (dataspace-routing-table ds) body)] [(spawn name boot-proc initial-assertions) - (add-actor! ds name boot-proc initial-assertions)])) - (not (null? actions))) + (add-actor! ds name boot-proc initial-assertions)]))) (define (run-scripts! ds) - (define ran-a-script (run-all-pending-scripts! ds)) - (define performed-an-action (perform-pending-actions! ds)) + (run-all-pending-scripts! ds) + (perform-pending-actions! ds) ;; TODO: figure out when a dataspace should quit itself. Given the ;; mutable nature of the implementation, maybe never? It might be ;; being held elsewhere! - (or ran-a-script performed-an-action)) + (not (null? (dataspace-runnable ds)))) (define (add-facet! ds where actor parent boot-proc) (when (and (not (in-script?)) where) @@ -324,30 +348,42 @@ (set-actor-root-facet! actor f)) (with-current-facet [ds actor f #f] (boot-proc)) - (push-script! ds (lambda () - (when (and (facet-live? f) - (or (and parent (not (facet-live? parent))) - (facet-inert? ds f))) - (terminate-facet! ds f))))) + (push-script! ds actor (lambda () + (when (and (facet-live? f) + (or (and parent (not (facet-live? parent))) + (facet-inert? ds f))) + (terminate-facet! ds f))))) (define (facet-inert? ds f) (and (hash-empty? (facet-endpoints f)) (set-empty? (facet-children f)))) -(define (schedule-script! #:priority [priority *normal-priority*] ds thunk) - (push-script! #:priority priority ds (capture-facet-context thunk))) +(define (schedule-script! #:priority [priority *normal-priority*] ds ac thunk) + (push-script! #:priority priority ds ac (capture-facet-context thunk))) -(define (push-script! #:priority [priority *normal-priority*] ds thunk-with-context) - (define v (dataspace-pending-scripts ds)) +(define (push-script! #:priority [priority *normal-priority*] ds ac thunk-with-context) + (when (not (actor-runnable? ac)) + (set-actor-runnable?! ac #t) + (set-dataspace-runnable! ds (cons ac (dataspace-runnable ds)))) + (define v (actor-pending-scripts ac)) (vector-set! v priority (enqueue (vector-ref v priority) thunk-with-context))) (define (retract-facet-assertions-and-subscriptions! ds f) - (push-script! ds (lambda () - (for [((eid ep) (in-hash (facet-endpoints f)))] - (dataflow-forget-subject! (dataspace-dataflow ds) (list f eid)) - (dataspace-retract! ds (endpoint-assertion ep)) - (define h (endpoint-handler ep)) - (when h (dataspace-unsubscribe! ds h)))))) + (define ac (facet-actor f)) + (push-script! ds + ac + (lambda () + (for [((eid ep) (in-hash (facet-endpoints f)))] + (dataflow-forget-subject! (dataspace-dataflow ds) (list f eid)) + (retract! ac (endpoint-assertion ep)) + (define h (endpoint-handler ep)) + (when h (dataspace-unsubscribe! ds h)))))) + +(define (abandon-queued-work! ac) + (set-actor-pending-actions! ac (make-queue)) + (let ((scripts (actor-pending-scripts ac))) + (for [(i (in-range (vector-length scripts)))] + (vector-set! scripts i (make-queue))))) ;; Abruptly terminates an entire actor, without running stop-scripts etc. (define (terminate-actor! ds the-actor) @@ -362,10 +398,11 @@ ;; Cleanly terminates a facet and its children, running stop-scripts etc. (define (terminate-facet! ds f) (when (facet-live? f) + (define ac (facet-actor f)) (define parent (facet-parent f)) (if parent (set-facet-children! parent (set-remove (facet-children parent) f)) - (set-actor-root-facet! (facet-actor f) #f)) + (set-actor-root-facet! ac #f)) (set-facet-live?! f #f) @@ -375,25 +412,25 @@ ;; children's stop-scripts run before ours. (for [(script (reverse (facet-stop-scripts f)))] (schedule-script! ds + ac (lambda () - (with-current-facet [ds (facet-actor f) f #t] + (with-current-facet [ds ac f #t] (script))))) (retract-facet-assertions-and-subscriptions! ds f) - (push-script! - #:priority *gc-priority* - ds - (lambda () - (if parent - (when (facet-inert? ds parent) (terminate-facet! ds parent)) - (terminate-actor! ds (facet-actor f))))))) + (push-script! #:priority *gc-priority* ds ac + (lambda () + (if parent + (when (facet-inert? ds parent) (terminate-facet! ds parent)) + (terminate-actor! ds ac)))))) (define (stop-facet! ds f stop-script) - (with-current-facet [ds (facet-actor f) f #t] ;; run in parent context wrt terminating facet - (schedule-script! ds (lambda () - (terminate-facet! ds f) - (schedule-script! ds stop-script))))) + (define ac (facet-actor f)) + (with-current-facet [ds ac f #t] ;; run in parent context wrt terminating facet + (schedule-script! ds ac (lambda () + (terminate-facet! ds f) + (schedule-script! ds ac stop-script))))) (define (add-stop-script! ds script-proc) (define f (current-facet)) @@ -410,26 +447,26 @@ (parameterize ((current-dataflow-subject-id (list f eid))) (call-with-syndicate-prompt assertion-fn))) (define ep (endpoint eid assertion assertion-fn handler)) - (dataspace-assert! ds assertion) + (assert! (facet-actor f) assertion) (when handler (dataspace-subscribe! ds handler)) (hash-set! (facet-endpoints f) eid ep)) -(define (enqueue-action! ds action) - (set-dataspace-pending-actions! ds (enqueue (dataspace-pending-actions ds) action))) +(define (enqueue-action! ac action) + (set-actor-pending-actions! ac (enqueue (actor-pending-actions ac) action))) -(define (ensure-patch-action! ds) - (let ((q (dataspace-pending-actions ds))) +(define (ensure-patch-action! ac) + (let ((q (actor-pending-actions ac))) (when (or (queue-empty? q) (not (patch? (queue-last q)))) - (enqueue-action! ds (patch (make-bag))))) - (patch-changes (queue-last (dataspace-pending-actions ds)))) + (enqueue-action! ac (patch (make-bag))))) + (patch-changes (queue-last (actor-pending-actions ac)))) -(define (dataspace-retract! ds assertion) +(define (retract! ac assertion) (when (not (void? assertion)) - (bag-change! (ensure-patch-action! ds) assertion -1))) + (bag-change! (ensure-patch-action! ac) assertion -1))) -(define (dataspace-assert! ds assertion) +(define (assert! ac assertion) (when (not (void? assertion)) - (bag-change! (ensure-patch-action! ds) assertion +1))) + (bag-change! (ensure-patch-action! ac) assertion +1))) (define (dataspace-unsubscribe! ds h) (remove-interest! (dataspace-routing-table ds) h)) @@ -441,13 +478,13 @@ (when (not (in-script?)) (error who "Attempt to perform action outside script; are you missing an (on ...)?"))) -(define (dataspace-send! ds body) - (ensure-in-script! 'dataspace-send!) - (enqueue-action! ds (message body))) +(define (enqueue-send! ac body) + (ensure-in-script! 'enqueue-send!) + (enqueue-action! ac (message body))) -(define (dataspace-spawn! ds name boot-proc initial-assertions) - (ensure-in-script! 'dataspace-spawn!) - (enqueue-action! ds (spawn name boot-proc initial-assertions))) +(define (spawn! ac name boot-proc initial-assertions) + (ensure-in-script! 'spawn!) + (enqueue-action! ac (spawn name boot-proc initial-assertions))) ;;--------------------------------------------------------------------------- ;; Script suspend-and-resume. @@ -476,8 +513,8 @@ (define resume-parent (lambda results (push-script! (current-dataspace) - (lambda () - (apply raw-resume-parent results))))) + (current-actor) + (lambda () (apply raw-resume-parent results))))) (proc resume-parent)))) prompt-tag)) @@ -493,9 +530,10 @@ (lambda () (schedule-script! (current-dataspace) + (current-actor) (lambda () - (dataspace-spawn! - ds + (spawn! + (current-actor) 'box (lambda () (define current-value (field-handle 'current-value @@ -528,12 +566,13 @@ (when (eq? '! op) (schedule-script! (current-dataspace) + (current-actor) (lambda () (log-info "box: taking on new-value ~v" new-value) (current-value new-value))))))))) (set)) - (dataspace-spawn! - ds + (spawn! + (current-actor) 'client (lambda () (add-endpoint! (current-dataspace) @@ -563,10 +602,11 @@ (when (eq? '+ op) (schedule-script! (current-dataspace) + (current-actor) (lambda () (log-info "client: learned that box's value is now ~v" v) - (dataspace-send! (current-dataspace) - (set-box (+ v 1))))))))))) + (enqueue-send! (current-actor) + (set-box (+ v 1))))))))))) (set))))))) (require racket/pretty) diff --git a/syndicate/syntax.rkt b/syndicate/syntax.rkt index 7c3f869..d4a57e6 100644 --- a/syndicate/syntax.rkt +++ b/syndicate/syntax.rkt @@ -116,8 +116,8 @@ (syntax-parse stx [(_ name:name assertions:assertions script ...) (quasisyntax/loc stx - (dataspace-spawn! - (current-dataspace) + (spawn! + (current-actor) name.N (lambda () (begin/void-default script ...)) (set assertions.exprs ...)))])) @@ -208,6 +208,7 @@ [(_ script ...) (quasisyntax/loc stx (schedule-script! (current-dataspace) + (current-actor) (lambda () (begin/void-default script ...))))])) (define-syntax (on-stop stx) @@ -247,6 +248,7 @@ (schedule-script! #:priority prio.level (current-dataspace) + (current-actor) (lambda () (parameterize ((current-dataflow-subject-id subject-id)) expr ...))) @@ -320,6 +322,7 @@ (schedule-script! #:priority #,priority-stx (current-dataspace) + (current-actor) #,(quasisyntax/loc script-stx (lambda () #,script-stx)))))))))] @@ -349,6 +352,7 @@ (schedule-script! #:priority #,priority-stx (current-dataspace) + (current-actor) #,(quasisyntax/loc script-stx (lambda () #,script-stx)))))))))) @@ -564,7 +568,7 @@ ;; (on-start (flush!) (k (query-result) ...)))))])) (define (send! m) - (dataspace-send! (current-dataspace) m)) + (enqueue-send! (current-actor) m)) (define (flush!) (ensure-in-script! 'flush!) @@ -584,6 +588,7 @@ (lambda () (schedule-script! (current-dataspace) + (current-actor) #;(lambda () (spawn (on (message $v) (if (= v 10000000) @@ -591,7 +596,31 @@ (send! (+ v 1)))) (on-start (send! 0))) ) + (lambda () + + (message-struct stage (n)) + + (spawn #:name 'actor0 + (on (message (stage 0)) + (send! (stage 1))) + + (on (message (stage 2)) + (send! (stage 3)) + (/ 1 0) + (send! (stage 3)))) + + (spawn #:name 'main + (on (message (stage $v)) + (printf "Got message ~v\n" v)) + (on-start + (until (asserted (observe (stage 0)))) + (send! (stage 0)) + (until (message (stage 1))) + (send! (stage 2)))) + ) + + #;(lambda () (spawn (field [current-value 0]) (assert (box-state (current-value))) (stop-when-true (= (current-value) 10)