diff --git a/syndicate/dataspace.rkt b/syndicate/dataspace.rkt index cd27500..9084c59 100644 --- a/syndicate/dataspace.rkt +++ b/syndicate/dataspace.rkt @@ -147,8 +147,8 @@ (struct endpoint (id ;; EID [assertion #:mutable] ;; Assertion - assertion-fn ;; (-> Assertion) - handler ;; (Option SkInterest) + [handler #:mutable] ;; (Option SkInterest) + update-fn ;; (-> (Values Assertion (Option SkInterest))) ) #:methods gen:custom-write [(define (write-proc e p mode) @@ -322,16 +322,15 @@ (define ac (facet-actor f)) (with-current-facet [ac f #f] (define ep (hash-ref (facet-endpoints f) eid)) - (define old-assertion (endpoint-assertion ep)) - (define new-assertion ((endpoint-assertion-fn ep))) + (match-define (endpoint _ old-assertion old-handler update-fn) ep) + (define-values (new-assertion new-handler) (update-fn)) (when (not (equal? old-assertion new-assertion)) - (set-endpoint-assertion! ep new-assertion) (retract! ac old-assertion) + (when old-handler (dataspace-unsubscribe! ds old-handler)) + (set-endpoint-assertion! ep new-assertion) + (set-endpoint-handler! ep new-handler) (assert! ac new-assertion) - (define h (endpoint-handler ep)) - (when h - (dataspace-unsubscribe! ds h) - (dataspace-subscribe! ds h)))))))) + (when new-handler (dataspace-subscribe! ds new-handler)))))))) (define (commit-actions! ds ac) (define pending (actor-pending-actions ac)) @@ -494,17 +493,17 @@ (define (add-stop-script! f script-proc) (set-facet-stop-scripts! f (cons script-proc (facet-stop-scripts f)))) -(define (add-endpoint! f where assertion-fn handler) +(define (add-endpoint! f where dynamic? update-fn) (when (in-script?) (error 'add-endpoint! "~a: Cannot add endpoint in script; are you missing a (react ...)?" where)) (define ds (actor-dataspace (facet-actor f))) (define eid (generate-id! ds)) - (define assertion - (parameterize ((current-dataflow-subject-id (list f eid))) - (call-with-syndicate-prompt assertion-fn))) - (define ep (endpoint eid assertion assertion-fn handler)) + (define-values (assertion handler) + (parameterize ((current-dataflow-subject-id (if dynamic? (list f eid) #f))) + (call-with-syndicate-prompt update-fn))) + (define ep (endpoint eid assertion handler update-fn)) (assert! (facet-actor f) assertion) (when handler (dataspace-subscribe! ds handler)) (hash-set! (facet-endpoints f) eid ep) @@ -520,7 +519,7 @@ (destroy-endpoint! ds ac f ep))) (define (destroy-endpoint! ds ac f ep) - (match-define (endpoint eid assertion _assertion-fn handler) ep) + (match-define (endpoint eid assertion handler _update-fn) ep) (dataflow-forget-subject! (dataspace-dataflow ds) (list f eid)) (retract! ac assertion) (when handler (dataspace-unsubscribe! ds handler))) diff --git a/syndicate/drivers/timer.rkt b/syndicate/drivers/timer.rkt new file mode 100644 index 0000000..97152e9 --- /dev/null +++ b/syndicate/drivers/timer.rkt @@ -0,0 +1,99 @@ +#lang imperative-syndicate +;; Timer driver. + +;; Uses mutable state internally, but because the scope of the +;; mutation is limited to each timer process alone, it's easy to show +;; correct linear use of the various pointers. + +(provide (struct-out set-timer) + (struct-out timer-expired) + (struct-out later-than) + on-timeout + stop-when-timeout + sleep) + +(define-logger syndicate/drivers/timer) + +(require racket/set) +(require data/heap) + +(message-struct set-timer (label msecs kind)) +(message-struct timer-expired (label msecs)) + +(assertion-struct later-than (msecs)) + +(spawn #:name 'drivers/timer + (define control-ch (make-channel)) + + (thread (lambda () + (struct pending-timer (deadline label) #:transparent) + + (define heap + (make-heap (lambda (t1 t2) (<= (pending-timer-deadline t1) + (pending-timer-deadline t2))))) + + (define (next-timer) + (and (positive? (heap-count heap)) + (heap-min heap))) + + (define (fire-timers! now) + (define count-fired 0) + (let loop () + (when (positive? (heap-count heap)) + (let ((m (heap-min heap))) + (when (<= (pending-timer-deadline m) now) + (begin (heap-remove-min! heap) + (log-syndicate/drivers/timer-debug "expired timer ~a" + (pending-timer-label m)) + (ground-send! (timer-expired (pending-timer-label m) now)) + (set! count-fired (+ count-fired 1)) + (loop)))))) + (signal-background-activity! (- count-fired))) + + (define (install-timer! label deadline) + (heap-add! heap (pending-timer deadline label)) + (signal-background-activity! +1)) + + (let loop () + (sync (match (next-timer) + [#f never-evt] + [t (handle-evt (alarm-evt (pending-timer-deadline t)) + (lambda (_dummy) + (define now (current-inexact-milliseconds)) + (fire-timers! now) + (loop)))]) + (handle-evt control-ch + (match-lambda + [(set-timer label msecs 'relative) + (define deadline (+ (current-inexact-milliseconds) msecs)) + (install-timer! label deadline) + (loop)] + [(set-timer label deadline 'absolute) + (install-timer! label deadline) + (loop)])))))) + + (on (message ($ instruction (set-timer _ _ _))) + (log-syndicate/drivers/timer-debug "received instruction ~a" instruction) + (channel-put control-ch instruction)) + + (during (observe (later-than $msecs)) + (log-syndicate/drivers/timer-debug "observing (later-than ~a) at ~a" + msecs + (current-inexact-milliseconds)) + (define timer-id (gensym 'timestate)) + (on-start (send! (set-timer timer-id msecs 'absolute))) + (on (message (timer-expired timer-id _)) + (react (assert (later-than msecs)))))) + +(define-syntax-rule (on-timeout relative-msecs body ...) + (let ((timer-id (gensym 'timeout))) + (on-start (send! (set-timer timer-id relative-msecs 'relative))) + (on (message (timer-expired timer-id _)) body ...))) + +(define-syntax-rule (stop-when-timeout relative-msecs body ...) + (on-timeout relative-msecs (stop-current-facet body ...))) + +(define (sleep sec) + (define timer-id (gensym 'sleep)) + (until (message (timer-expired timer-id _)) + (on-start (send! (set-timer timer-id (* sec 1000.0) 'relative))))) diff --git a/syndicate/examples/time.rkt b/syndicate/examples/time.rkt new file mode 100644 index 0000000..3bf8934 --- /dev/null +++ b/syndicate/examples/time.rkt @@ -0,0 +1,21 @@ +#lang imperative-syndicate + +(require/activate imperative-syndicate/drivers/timer) + +(spawn #:name 'plain-timer-demo + (field [count 0]) + (on-start (send! (set-timer 'main-timer 0 'relative))) + (on (message (timer-expired 'main-timer $now)) + (log-info "main-timer expired at ~a" now) + (count (+ (count) 1)) + (when (< (count) 5) + (send! (set-timer 'main-timer 500 'relative))))) + +(spawn #:name 'later-than-demo + (field [deadline (current-inexact-milliseconds)] + [count 0]) + (on (asserted (later-than (deadline))) + (log-info "later-than ticked for deadline ~a" (deadline)) + (count (+ (count) 1)) + (when (< (count) 5) + (deadline (+ (deadline) 500))))) diff --git a/syndicate/ground.rkt b/syndicate/ground.rkt index 812b193..98b1d87 100644 --- a/syndicate/ground.rkt +++ b/syndicate/ground.rkt @@ -62,9 +62,9 @@ (let loop () (define work-remaining? (run-scripts! ds)) (define events-expected? (positive? background-activity-count)) - (log-info "GROUND: ~a; ~a background activities" - (if work-remaining? "busy" "idle") - background-activity-count) + (log-syndicate/ground-debug "GROUND: ~a; ~a background activities" + (if work-remaining? "busy" "idle") + background-activity-count) (cond [events-expected? (sync ground-event-relay-evt (if work-remaining? (system-idle-evt) never-evt)) diff --git a/syndicate/relay.rkt b/syndicate/relay.rkt index 8ffc27c..f837cca 100644 --- a/syndicate/relay.rkt +++ b/syndicate/relay.rkt @@ -90,8 +90,8 @@ x (add-endpoint! outer-facet "dataspace-relay (observe (inbound ...))" - (lambda () (observe x)) - i)))) + #t + (lambda () (values (observe x) i)))))) (on (retracted (observe (inbound $x))) ;; (log-info "~a (retracted (observe (inbound ~v)))" inner-actor x) @@ -106,8 +106,8 @@ x (add-endpoint! outer-facet "dataspace-relay (outbound ...)" - (lambda () x) - #f)))) + #t + (lambda () (values x #f)))))) (on (retracted (outbound $x)) ;; (log-info "~a (retracted (outbound ~v))" inner-actor x) diff --git a/syndicate/syntax-classes.rkt b/syndicate/syntax-classes.rkt index acddb82..31c0255 100644 --- a/syndicate/syntax-classes.rkt +++ b/syndicate/syntax-classes.rkt @@ -2,7 +2,8 @@ ;; Common syntax classes. (provide (for-syntax assertions - name)) + name + snapshot)) (require racket/set) @@ -17,4 +18,8 @@ (define-splicing-syntax-class name (pattern (~seq #:name N)) - (pattern (~seq) #:attr N #'#f))) + (pattern (~seq) #:attr N #'#f)) + + (define-splicing-syntax-class snapshot + (pattern (~seq #:snapshot) #:attr dynamic? #'#f) + (pattern (~seq) #:attr dynamic? #'#t))) diff --git a/syndicate/syntax.rkt b/syndicate/syntax.rkt index 5f54ffa..e5ef75c 100644 --- a/syndicate/syntax.rkt +++ b/syndicate/syntax.rkt @@ -182,12 +182,12 @@ (define-syntax (assert stx) (syntax-parse stx - [(_ w:when-pred P) + [(_ w:when-pred snapshot:snapshot P) (quasisyntax/loc stx (add-endpoint! (current-facet) #,(source-location->string stx) - (lambda () (when w.Pred P)) - #f))])) + snapshot.dynamic? + (lambda () (values (when w.Pred P) #f))))])) (define-syntax (stop-facet stx) (syntax-parse stx @@ -245,6 +245,7 @@ (let () (add-endpoint! (current-facet) #,(source-location->string stx) + #t (lambda () (define subject-id (current-dataflow-subject-id)) (schedule-script! @@ -253,8 +254,7 @@ (lambda () (parameterize ((current-dataflow-subject-id subject-id)) expr ...))) - (void)) - #f)))])) + (values (void) #f)))))])) (define-syntax (define/dataflow stx) (syntax-parse stx @@ -307,32 +307,49 @@ (syntax-rearm result event-stx) script-stx priority-stx)))] - [(message P) + [(message snapshot:snapshot P) (define desc (analyse-pattern #'P)) (quasisyntax/loc outer-expr-stx (add-endpoint! (current-facet) #,(source-location->string outer-expr-stx) - (lambda () (when #,when-pred-stx (observe #,(desc->assertion-stx desc)))) - (skeleton-interest #,(desc->skeleton-stx desc) - '#,(desc->skeleton-proj desc) - (list #,@(desc->key desc)) - '#,(desc->capture-proj desc) - (capture-facet-context - (lambda (op #,@(desc->capture-names desc)) - (when (eq? op '!) - (schedule-script! - #:priority #,priority-stx - (current-actor) - #,(quasisyntax/loc script-stx - (lambda () - #,script-stx)))))) - #f)))] - [(asserted P) - (analyse-asserted/retracted outer-expr-stx when-pred-stx script-stx #t #'P priority-stx)] - [(retracted P) - (analyse-asserted/retracted outer-expr-stx when-pred-stx script-stx #f #'P priority-stx)])) + snapshot.dynamic? + (lambda () + (if #,when-pred-stx + (values (observe #,(desc->assertion-stx desc)) + (skeleton-interest #,(desc->skeleton-stx desc) + '#,(desc->skeleton-proj desc) + (list #,@(desc->key desc)) + '#,(desc->capture-proj desc) + (capture-facet-context + (lambda (op #,@(desc->capture-names desc)) + (when (eq? op '!) + (schedule-script! + #:priority #,priority-stx + (current-actor) + #,(quasisyntax/loc script-stx + (lambda () + #,script-stx)))))) + #f)) + (values (void) #f)))))] + [(asserted snapshot:snapshot P) + (analyse-asserted/retracted outer-expr-stx + #'snapshot.dynamic? + when-pred-stx + script-stx + #t + #'P + priority-stx)] + [(retracted snapshot:snapshot P) + (analyse-asserted/retracted outer-expr-stx + #'snapshot.dynamic? + when-pred-stx + script-stx + #f + #'P + priority-stx)])) (define-for-syntax (analyse-asserted/retracted outer-expr-stx + snapshot-dynamic?-stx when-pred-stx script-stx asserted? @@ -342,28 +359,32 @@ (quasisyntax/loc outer-expr-stx (add-endpoint! (current-facet) #,(source-location->string outer-expr-stx) - (lambda () (when #,when-pred-stx (observe #,(desc->assertion-stx desc)))) - (skeleton-interest #,(desc->skeleton-stx desc) - '#,(desc->skeleton-proj desc) - (list #,@(desc->key desc)) - '#,(desc->capture-proj desc) - (capture-facet-context - (lambda (op #,@(desc->capture-names desc)) - (when (eq? op #,(if asserted? #''+ #''-)) - (schedule-script! - #:priority #,priority-stx - (current-actor) - #,(quasisyntax/loc script-stx - (lambda () - #,script-stx)))))) - #f)))) + #,snapshot-dynamic?-stx + (lambda () + (if #,when-pred-stx + (values (observe #,(desc->assertion-stx desc)) + (skeleton-interest #,(desc->skeleton-stx desc) + '#,(desc->skeleton-proj desc) + (list #,@(desc->key desc)) + '#,(desc->capture-proj desc) + (capture-facet-context + (lambda (op #,@(desc->capture-names desc)) + (when (eq? op #,(if asserted? #''+ #''-)) + (schedule-script! + #:priority #,priority-stx + (current-actor) + #,(quasisyntax/loc script-stx + (lambda () + #,script-stx)))))) + #f)) + (values (void) #f)))))) (define-syntax (during stx) (syntax-parse stx [(_ P O ...) (quasisyntax/loc stx (on (asserted P) - (react (stop-when (retracted #,(instantiate-pattern->pattern #'P))) + (react (stop-when (retracted #:snapshot #,(instantiate-pattern->pattern #'P))) O ...)))])) (define-syntax (during/spawn stx) @@ -392,8 +413,8 @@ #,@(if (attribute oncrash.expr) #'(oncrash.expr) #'())) - (stop-when (retracted #,Q-stx)))) - (stop-when (retracted #,Q-stx) + (stop-when (retracted #:snapshot #,Q-stx)))) + (stop-when (retracted #:snapshot #,Q-stx) ;; Demand (p) retracted before supply (inst) appeared. We ;; MUST wait for the supply to fully appear so that we can ;; reliably tell it to shut down. We must maintain interest diff --git a/syndicate/test/core/during-criterion-snapshotting.rkt b/syndicate/test/core/during-criterion-snapshotting.rkt index ad83397..12a1af2 100644 --- a/syndicate/test/core/during-criterion-snapshotting.rkt +++ b/syndicate/test/core/during-criterion-snapshotting.rkt @@ -6,10 +6,12 @@ (spawn (field [x 123]) (assert (foo (x) 999)) (during (foo (x) $v) + (define x0 (x)) (printf "x=~a v=~a\n" (x) v) (when (= (x) 123) (x 124)) - (on-stop (printf "finally for x=~a v=~a\n" (x) v))))] + (on-stop + (printf "finally for x0=~a x=~a v=~a\n" x0 (x) v))))] no-crashes (expected-output "x=123 v=999" "x=124 v=999" - "finally for x=124 v=999")) + "finally for x0=123 x=124 v=999")) diff --git a/syndicate/test/raw-dataspace.rkt b/syndicate/test/raw-dataspace.rkt index b62610d..9975e08 100644 --- a/syndicate/test/raw-dataspace.rkt +++ b/syndicate/test/raw-dataspace.rkt @@ -20,73 +20,83 @@ 0)) (add-endpoint! (current-facet) 'stop-when-ten + #t (lambda () (when (= (current-value) 3) (stop-facet! (current-facet) (lambda () (printf "box: terminating\n")))) - (void)) - #f) + (values (void) #f))) (add-endpoint! (current-facet) 'assert-box-state - (lambda () (box-state (current-value))) - #f) - (add-endpoint! (current-facet) - 'on-message-set-box - (lambda () (observe (set-box (capture (discard))))) - (skeleton-interest (list struct:set-box #f) - '() - '() - '((0 0)) - (capture-facet-context - (lambda (op new-value) - (when (eq? '! op) - (schedule-script! - (current-actor) - (lambda () - (printf "box: taking on new-value ~v\n" new-value) - (current-value new-value)))))) - #f))) + #t + (lambda () (values (box-state (current-value)) #f))) + (add-endpoint! + (current-facet) + 'on-message-set-box + #t + (lambda () + (values (observe (set-box (capture (discard)))) + (skeleton-interest (list struct:set-box #f) + '() + '() + '((0 0)) + (capture-facet-context + (lambda (op new-value) + (when (eq? '! op) + (schedule-script! + (current-actor) + (lambda () + (printf "box: taking on new-value ~v\n" + new-value) + (current-value new-value)))))) + #f))))) (set)) (spawn! (current-actor) 'client (lambda () - (add-endpoint! (current-facet) - 'stop-when-retracted-observe-set-box - (lambda () (observe (observe (set-box (discard))))) - (skeleton-interest (list struct:observe (list struct:set-box #f)) - '() - '() - '() - (capture-facet-context - (lambda (op) - (when (eq? '- op) - (stop-facet! - (current-facet) - (lambda () - (printf "client: box has gone\n")))))) - #f)) - (add-endpoint! (current-facet) - 'on-asserted-box-state - (lambda () (observe (box-state (capture (discard))))) - (skeleton-interest (list struct:box-state #f) - '() - '() - '((0 0)) - (capture-facet-context - (lambda (op v) - (when (eq? '+ op) - (schedule-script! - (current-actor) - (lambda () - (printf - "client: learned that box's value is now ~v\n" - v) - (enqueue-send! (current-actor) - (set-box (+ v 1)))))))) - #f))) + (add-endpoint! + (current-facet) + 'stop-when-retracted-observe-set-box + #t + (lambda () + (values (observe (observe (set-box (discard)))) + (skeleton-interest (list struct:observe (list struct:set-box #f)) + '() + '() + '() + (capture-facet-context + (lambda (op) + (when (eq? '- op) + (stop-facet! + (current-facet) + (lambda () + (printf "client: box has gone\n")))))) + #f)))) + (add-endpoint! + (current-facet) + 'on-asserted-box-state + #t + (lambda () + (values (observe (box-state (capture (discard)))) + (skeleton-interest (list struct:box-state #f) + '() + '() + '((0 0)) + (capture-facet-context + (lambda (op v) + (when (eq? '+ op) + (schedule-script! + (current-actor) + (lambda () + (printf + "client: learned that box's value is now ~v\n" + v) + (enqueue-send! (current-actor) + (set-box (+ v 1)))))))) + #f))))) (set))] no-crashes (expected-output "client: learned that box's value is now 0"