diff --git a/syndicate/dataspace.rkt b/syndicate/dataspace.rkt index 5f9e272..4ddbed0 100644 --- a/syndicate/dataspace.rkt +++ b/syndicate/dataspace.rkt @@ -13,6 +13,7 @@ actor? actor-id actor-name + actor-dataspace ;; TODO: should this be provided? facet? facet-actor @@ -24,7 +25,6 @@ field-handle-owner field-handle-value - current-dataspace current-actor current-facet in-script? ;; TODO: shouldn't be provided - inline syntax.rkt?? @@ -93,6 +93,7 @@ ) #:transparent) (struct actor (id ;; ActorID + dataspace ;; Dataspace name ;; Any [root-facet #:mutable] ;; (Option Facet) [runnable? #:mutable] ;; Boolean @@ -152,12 +153,14 @@ #:property prop:procedure (case-lambda [(f) - (when (not (eq? (field-handle-owner f) (current-actor))) (field-scope-error 'field-ref f)) - (dataflow-record-observation! (dataspace-dataflow (current-dataspace)) f) + (define ac (current-actor)) + (when (not (eq? (field-handle-owner f) ac)) (field-scope-error 'field-ref f)) + (dataflow-record-observation! (dataspace-dataflow (actor-dataspace ac)) f) (field-handle-value f)] [(f v) + (define ac (current-actor)) (when (not (eq? (field-handle-owner f) (current-actor))) (field-scope-error 'field-set! f)) - (dataflow-record-damage! (dataspace-dataflow (current-dataspace)) f) + (dataflow-record-damage! (dataspace-dataflow (actor-dataspace ac)) f) (set-field-handle-value! f v)])) (define (field-scope-error who f) @@ -166,9 +169,6 @@ (field-handle-owner f) (current-actor))) -;; Parameterof Dataspace -(define current-dataspace (make-parameter #f)) - ;; Parameterof Actor (define current-actor (make-parameter #f)) @@ -230,6 +230,7 @@ (define the-actor-id (generate-id! ds)) (define filtered-initial-assertions (set-remove initial-assertions (void))) (define the-actor (actor the-actor-id + ds name #f #f @@ -242,8 +243,7 @@ ['absent->present (add-assertion! (dataspace-routing-table ds) a)] ;; 'absent->absent and 'present->absent absurd ['present->present (void)])) ;; i.e. no visible change - (add-facet! ds - #f + (add-facet! #f the-actor #f (lambda () @@ -251,30 +251,27 @@ (for [(a filtered-initial-assertions)] (adhoc-retract! the-actor a))))) -(define-syntax-rule (with-current-facet [ds0 a0 f0 script?] body ...) - (let ((ds ds0) - (a a0) +(define-syntax-rule (with-current-facet [a0 f0 script?] body ...) + (let ((a a0) (f f0)) - (parameterize ((current-dataspace ds) - (current-actor a) + (parameterize ((current-actor a) (current-facet f) (in-script? script?)) (with-handlers ([(lambda (e) (not (exn:break? e))) (lambda (e) (log-error "Actor ~a died with exception:\n~a" a (exn->string e)) (abandon-queued-work! a) - (terminate-actor! ds a))]) ;; TODO: tracing + (terminate-actor! a))]) ;; TODO: tracing (call-with-syndicate-prompt (lambda () body ...)) (void))))) (define (capture-facet-context proc) - (let ((ds (current-dataspace)) - (a (current-actor)) + (let ((a (current-actor)) (f (current-facet))) (lambda args - (with-current-facet [ds a f #t] + (with-current-facet [a f #t] (apply proc args))))) (define (pop-next-script! ac) @@ -302,7 +299,7 @@ (match-define (list f eid) subject-id) (when (facet-live? f) ;; TODO: necessary test, or tautological? (define ac (facet-actor f)) - (with-current-facet [ds ac f #f] + (with-current-facet [ac f #f] (define ep (hash-ref (facet-endpoints f) eid)) (define old-assertion (endpoint-assertion ep)) (define new-assertion ((endpoint-assertion-fn ep))) @@ -369,12 +366,12 @@ ;; being held elsewhere! (not (null? (dataspace-runnable ds)))) -(define (add-facet! ds where actor parent boot-proc) +(define (add-facet! where actor parent boot-proc) (when (and (not (in-script?)) where) (error 'add-facet! "~a: Cannot add facet outside script; are you missing an (on ...)?" where)) - (define f (facet (generate-id! ds) + (define f (facet (generate-id! (actor-dataspace actor)) #t actor parent @@ -390,37 +387,36 @@ ;; root facet should be allowed? (error 'add-facet! "INTERNAL ERROR: Attempt to add second root facet")) (set-actor-root-facet! actor f))) - (with-current-facet [ds actor f #f] + (with-current-facet [actor f #f] (boot-proc)) - (push-script! ds actor (lambda () - (when (or (and parent (not (facet-live? parent))) - (facet-inert? ds f)) - (terminate-facet! ds f))))) + (push-script! actor (lambda () + (when (or (and parent (not (facet-live? parent))) (facet-inert? f)) + (terminate-facet! f))))) -(define (facet-inert? ds f) +(define (facet-inert? f) (and (hash-empty? (facet-endpoints f)) (set-empty? (facet-children f)))) -(define (schedule-script! #:priority [priority *normal-priority*] ds ac thunk) - (push-script! #:priority priority ds ac (capture-facet-context thunk))) +(define (schedule-script! #:priority [priority *normal-priority*] ac thunk) + (push-script! #:priority priority ac (capture-facet-context thunk))) -(define (push-script! #:priority [priority *normal-priority*] ds ac thunk-with-context) +(define (push-script! #:priority [priority *normal-priority*] ac thunk-with-context) (when (not (actor-runnable? ac)) (set-actor-runnable?! ac #t) - (set-dataspace-runnable! ds (cons ac (dataspace-runnable ds)))) + (let ((ds (actor-dataspace ac))) + (set-dataspace-runnable! ds (cons ac (dataspace-runnable ds))))) (define v (actor-pending-scripts ac)) (vector-set! v priority (enqueue (vector-ref v priority) thunk-with-context))) -(define (retract-facet-assertions-and-subscriptions! ds f) +(define (retract-facet-assertions-and-subscriptions! f) (define ac (facet-actor f)) - (push-script! ds - ac - (lambda () - (for [((eid ep) (in-hash (facet-endpoints f)))] - (dataflow-forget-subject! (dataspace-dataflow ds) (list f eid)) - (retract! ac (endpoint-assertion ep)) - (define h (endpoint-handler ep)) - (when h (dataspace-unsubscribe! ds h)))))) + (define ds (actor-dataspace ac)) + (push-script! ac (lambda () + (for [((eid ep) (in-hash (facet-endpoints f)))] + (dataflow-forget-subject! (dataspace-dataflow ds) (list f eid)) + (retract! ac (endpoint-assertion ep)) + (define h (endpoint-handler ep)) + (when h (dataspace-unsubscribe! ds h)))))) (define (abandon-queued-work! ac) (set-actor-pending-actions! ac (make-queue)) @@ -429,20 +425,19 @@ (vector-set! scripts i (make-queue))))) ;; Abruptly terminates an entire actor, without running stop-scripts etc. -(define (terminate-actor! ds the-actor) - (push-script! ds the-actor - (lambda () (for [(a (in-bag (actor-adhoc-assertions the-actor)))] - (retract! the-actor a)))) +(define (terminate-actor! the-actor) + (push-script! the-actor (lambda () (for [(a (in-bag (actor-adhoc-assertions the-actor)))] + (retract! the-actor a)))) (let ((f (actor-root-facet the-actor))) (when f (let abort-facet! ((f f)) (set-facet-live?! f #f) (for [(child (in-set (facet-children f)))] (abort-facet! child)) - (retract-facet-assertions-and-subscriptions! ds f)))) - (push-script! ds the-actor (lambda () (enqueue-action! the-actor (quit))))) + (retract-facet-assertions-and-subscriptions! f)))) + (push-script! the-actor (lambda () (enqueue-action! the-actor (quit))))) ;; Cleanly terminates a facet and its children, running stop-scripts etc. -(define (terminate-facet! ds f) +(define (terminate-facet! f) (when (facet-live? f) (define ac (facet-actor f)) (define parent (facet-parent f)) @@ -452,40 +447,39 @@ (set-facet-live?! f #f) - (for [(child (in-set (facet-children f)))] (terminate-facet! ds child)) + (for [(child (in-set (facet-children f)))] (terminate-facet! child)) ;; Run stop-scripts after terminating children. This means that ;; children's stop-scripts run before ours. - (schedule-script! ds ac (lambda () - (with-current-facet [ds ac f #t] - (for [(script (in-list (reverse (facet-stop-scripts f))))] - (script))))) + (schedule-script! ac (lambda () + (with-current-facet [ac f #t] + (for [(script (in-list (reverse (facet-stop-scripts f))))] + (script))))) - (retract-facet-assertions-and-subscriptions! ds f) + (retract-facet-assertions-and-subscriptions! f) - (push-script! #:priority *gc-priority* ds ac + (push-script! #:priority *gc-priority* ac (lambda () (if parent - (when (facet-inert? ds parent) (terminate-facet! ds parent)) - (terminate-actor! ds ac)))))) + (when (facet-inert? parent) (terminate-facet! parent)) + (terminate-actor! ac)))))) -(define (stop-facet! ds f stop-script) +(define (stop-facet! f stop-script) (define ac (facet-actor f)) - (with-current-facet [ds ac (facet-parent f) #t] ;; run in parent context wrt terminating facet - (schedule-script! ds ac (lambda () - (terminate-facet! ds f) - (schedule-script! ds ac stop-script))))) + (with-current-facet [ac (facet-parent f) #t] ;; run in parent context wrt terminating facet + (schedule-script! ac (lambda () + (terminate-facet! f) + (schedule-script! ac stop-script))))) -(define (add-stop-script! ds script-proc) - (define f (current-facet)) +(define (add-stop-script! f script-proc) (set-facet-stop-scripts! f (cons script-proc (facet-stop-scripts f)))) -(define (add-endpoint! ds where assertion-fn handler) +(define (add-endpoint! f where assertion-fn handler) (when (in-script?) (error 'add-endpoint! "~a: Cannot add endpoint in script; are you missing a (react ...)?" where)) - (define f (current-facet)) + (define ds (actor-dataspace (facet-actor f))) (define eid (generate-id! ds)) (define assertion (parameterize ((current-dataflow-subject-id (list f eid))) @@ -576,8 +570,7 @@ (apply k results))))) (define resume-parent (lambda results - (push-script! (current-dataspace) - (current-actor) + (push-script! (current-actor) (lambda () (apply raw-resume-parent results))))) (proc resume-parent)))) prompt-tag)) @@ -592,7 +585,6 @@ (make-dataspace (lambda () (schedule-script! - (current-dataspace) (current-actor) (lambda () (spawn! @@ -600,24 +592,23 @@ 'box (lambda () (define current-value (field-handle 'current-value - (generate-id! (current-dataspace)) + (generate-id! (actor-dataspace (current-actor))) (current-actor) 0)) - (add-endpoint! (current-dataspace) + (add-endpoint! (current-facet) 'stop-when-ten (lambda () (when (= (current-value) 10) - (stop-facet! (current-dataspace) - (current-facet) + (stop-facet! (current-facet) (lambda () (log-info "box: terminating")))) (void)) #f) - (add-endpoint! (current-dataspace) + (add-endpoint! (current-facet) 'assert-box-state (lambda () (box-state (current-value))) #f) - (add-endpoint! (current-dataspace) + (add-endpoint! (current-facet) 'on-message-set-box (lambda () (observe (set-box (capture (discard))))) (skeleton-interest (list struct:set-box #f) @@ -628,7 +619,6 @@ (lambda (op new-value) (when (eq? '! op) (schedule-script! - (current-dataspace) (current-actor) (lambda () (log-info "box: taking on new-value ~v" new-value) @@ -638,7 +628,7 @@ (current-actor) 'client (lambda () - (add-endpoint! (current-dataspace) + (add-endpoint! (current-facet) 'stop-when-retracted-observe-set-box (lambda () (observe (observe (set-box (discard))))) (skeleton-interest (list struct:observe (list struct:set-box #f)) @@ -649,11 +639,10 @@ (lambda (op) (when (eq? '- op) (stop-facet! - (current-dataspace) (current-facet) (lambda () (log-info "client: box has gone")))))))) - (add-endpoint! (current-dataspace) + (add-endpoint! (current-facet) 'on-asserted-box-state (lambda () (observe (box-state (capture (discard))))) (skeleton-interest (list struct:box-state #f) @@ -664,7 +653,6 @@ (lambda (op v) (when (eq? '+ op) (schedule-script! - (current-dataspace) (current-actor) (lambda () (log-info "client: learned that box's value is now ~v" v) diff --git a/syndicate/syntax.rkt b/syndicate/syntax.rkt index 48c8ef6..5a35026 100644 --- a/syndicate/syntax.rkt +++ b/syndicate/syntax.rkt @@ -132,9 +132,7 @@ [(_ expr0 expr ...) (syntax/loc stx (begin expr0 expr ...))])) (define (react* where boot-proc) - (define ds (current-dataspace)) - (add-facet! ds - where + (add-facet! where (current-actor) (current-facet) boot-proc)) @@ -164,10 +162,8 @@ O ...))])) (define (make-field name init) - (field-handle name - (generate-id! (current-dataspace)) - (current-actor) - init)) + (let ((ac (current-actor))) + (field-handle name (generate-id! (actor-dataspace ac)) ac init))) (define-syntax (define-field stx) (syntax-parse stx @@ -185,7 +181,7 @@ (syntax-parse stx [(_ w:when-pred P) (quasisyntax/loc stx - (add-endpoint! (current-dataspace) + (add-endpoint! (current-facet) #,(source-location->string stx) (lambda () (when w.Pred P)) #f))])) @@ -197,7 +193,7 @@ (let ((f f-expr)) (when (not (equal? (facet-actor f) (current-actor))) (error 'stop-facet "Attempt to stop unrelated facet ~a from actor ~a" f (current-actor))) - (stop-facet! (current-dataspace) f (lambda () (begin/void-default script ...)))))])) + (stop-facet! f (lambda () (begin/void-default script ...)))))])) (define-syntax-rule (stop-current-facet script ...) (stop-facet (current-facet) script ...)) @@ -211,15 +207,14 @@ (syntax-parse stx [(_ script ...) (quasisyntax/loc stx - (schedule-script! (current-dataspace) - (current-actor) + (schedule-script! (current-actor) (lambda () (begin/void-default script ...))))])) (define-syntax (on-stop stx) (syntax-parse stx [(_ script ...) (quasisyntax/loc stx - (add-stop-script! (current-dataspace) + (add-stop-script! (current-facet) (lambda () (begin/void-default script ...))))])) (define-syntax (stop-when stx) @@ -245,13 +240,12 @@ [(_ prio:priority expr ...) (quasisyntax/loc stx (let () - (add-endpoint! (current-dataspace) + (add-endpoint! (current-facet) #,(source-location->string stx) (lambda () (define subject-id (current-dataflow-subject-id)) (schedule-script! #:priority prio.level - (current-dataspace) (current-actor) (lambda () (parameterize ((current-dataflow-subject-id subject-id)) @@ -313,7 +307,7 @@ [(message P) (define desc (analyse-pattern #'P)) (quasisyntax/loc outer-expr-stx - (add-endpoint! (current-dataspace) + (add-endpoint! (current-facet) #,(source-location->string outer-expr-stx) (lambda () (when #,when-pred-stx (observe #,(desc->assertion-stx desc)))) (skeleton-interest #,(desc->skeleton-stx desc) @@ -325,7 +319,6 @@ (when (eq? op '!) (schedule-script! #:priority #,priority-stx - (current-dataspace) (current-actor) #,(quasisyntax/loc script-stx (lambda () @@ -343,7 +336,7 @@ priority-stx) (define desc (analyse-pattern P-stx)) (quasisyntax/loc outer-expr-stx - (add-endpoint! (current-dataspace) + (add-endpoint! (current-facet) #,(source-location->string outer-expr-stx) (lambda () (when #,when-pred-stx (observe #,(desc->assertion-stx desc)))) (skeleton-interest #,(desc->skeleton-stx desc) @@ -355,7 +348,6 @@ (when (eq? op #,(if asserted? #''+ #''-)) (schedule-script! #:priority #,priority-stx - (current-dataspace) (current-actor) #,(quasisyntax/loc script-stx (lambda () @@ -596,7 +588,6 @@ (make-dataspace (lambda () (schedule-script! - (current-dataspace) (current-actor) #;(lambda () @@ -611,7 +602,7 @@ (log-info "finally for x=~a v=~a" (x) v)))) ) - (lambda () + #;(lambda () ;; Goal: no matter the circumstances (e.g. exception in a ;; stop script), we will never retract an assertion more or ;; fewer than the correct number of times. @@ -825,7 +816,7 @@ (send! (stage 2)))) ) - #;(lambda () + (lambda () (spawn (field [current-value 0]) (assert (box-state (current-value))) (stop-when-true (= (current-value) 10)