Add timer/timestate support; this prompted a change to endpoint registration.

The `add-endpoint!` call is changed in two ways:

 - the old `assertion-fn` has become `update-fn`, yielding both
   an assertion *and* an optional handler, because if the handler
   depends on a field which changes, previously the handler wasn't
   being updated

 - a new parameter, `dynamic?`, can be set to #f (it's usually #t)
   to ensure that the assertion and skeleton-interest are calculated
   only once ever, and are not connected to the dataflow machinery.

The first change makes it possible for the `(later-than (deadline))`
pattern, where `deadline` is a field, to work; the second change makes
`during` and `during/spawn` work correctly in the face of field
updates.
This commit is contained in:
Tony Garnock-Jones 2018-04-29 22:27:55 +01:00
parent 91596b066f
commit a6811f2ba5
9 changed files with 280 additions and 123 deletions

View File

@ -147,8 +147,8 @@
(struct endpoint (id ;; EID
[assertion #:mutable] ;; Assertion
assertion-fn ;; (-> Assertion)
handler ;; (Option SkInterest)
[handler #:mutable] ;; (Option SkInterest)
update-fn ;; (-> (Values Assertion (Option SkInterest)))
)
#:methods gen:custom-write
[(define (write-proc e p mode)
@ -322,16 +322,15 @@
(define ac (facet-actor f))
(with-current-facet [ac f #f]
(define ep (hash-ref (facet-endpoints f) eid))
(define old-assertion (endpoint-assertion ep))
(define new-assertion ((endpoint-assertion-fn ep)))
(match-define (endpoint _ old-assertion old-handler update-fn) ep)
(define-values (new-assertion new-handler) (update-fn))
(when (not (equal? old-assertion new-assertion))
(set-endpoint-assertion! ep new-assertion)
(retract! ac old-assertion)
(when old-handler (dataspace-unsubscribe! ds old-handler))
(set-endpoint-assertion! ep new-assertion)
(set-endpoint-handler! ep new-handler)
(assert! ac new-assertion)
(define h (endpoint-handler ep))
(when h
(dataspace-unsubscribe! ds h)
(dataspace-subscribe! ds h))))))))
(when new-handler (dataspace-subscribe! ds new-handler))))))))
(define (commit-actions! ds ac)
(define pending (actor-pending-actions ac))
@ -494,17 +493,17 @@
(define (add-stop-script! f script-proc)
(set-facet-stop-scripts! f (cons script-proc (facet-stop-scripts f))))
(define (add-endpoint! f where assertion-fn handler)
(define (add-endpoint! f where dynamic? update-fn)
(when (in-script?)
(error 'add-endpoint!
"~a: Cannot add endpoint in script; are you missing a (react ...)?"
where))
(define ds (actor-dataspace (facet-actor f)))
(define eid (generate-id! ds))
(define assertion
(parameterize ((current-dataflow-subject-id (list f eid)))
(call-with-syndicate-prompt assertion-fn)))
(define ep (endpoint eid assertion assertion-fn handler))
(define-values (assertion handler)
(parameterize ((current-dataflow-subject-id (if dynamic? (list f eid) #f)))
(call-with-syndicate-prompt update-fn)))
(define ep (endpoint eid assertion handler update-fn))
(assert! (facet-actor f) assertion)
(when handler (dataspace-subscribe! ds handler))
(hash-set! (facet-endpoints f) eid ep)
@ -520,7 +519,7 @@
(destroy-endpoint! ds ac f ep)))
(define (destroy-endpoint! ds ac f ep)
(match-define (endpoint eid assertion _assertion-fn handler) ep)
(match-define (endpoint eid assertion handler _update-fn) ep)
(dataflow-forget-subject! (dataspace-dataflow ds) (list f eid))
(retract! ac assertion)
(when handler (dataspace-unsubscribe! ds handler)))

View File

@ -0,0 +1,99 @@
#lang imperative-syndicate
;; Timer driver.
;; Uses mutable state internally, but because the scope of the
;; mutation is limited to each timer process alone, it's easy to show
;; correct linear use of the various pointers.
(provide (struct-out set-timer)
(struct-out timer-expired)
(struct-out later-than)
on-timeout
stop-when-timeout
sleep)
(define-logger syndicate/drivers/timer)
(require racket/set)
(require data/heap)
(message-struct set-timer (label msecs kind))
(message-struct timer-expired (label msecs))
(assertion-struct later-than (msecs))
(spawn #:name 'drivers/timer
(define control-ch (make-channel))
(thread (lambda ()
(struct pending-timer (deadline label) #:transparent)
(define heap
(make-heap (lambda (t1 t2) (<= (pending-timer-deadline t1)
(pending-timer-deadline t2)))))
(define (next-timer)
(and (positive? (heap-count heap))
(heap-min heap)))
(define (fire-timers! now)
(define count-fired 0)
(let loop ()
(when (positive? (heap-count heap))
(let ((m (heap-min heap)))
(when (<= (pending-timer-deadline m) now)
(begin (heap-remove-min! heap)
(log-syndicate/drivers/timer-debug "expired timer ~a"
(pending-timer-label m))
(ground-send! (timer-expired (pending-timer-label m) now))
(set! count-fired (+ count-fired 1))
(loop))))))
(signal-background-activity! (- count-fired)))
(define (install-timer! label deadline)
(heap-add! heap (pending-timer deadline label))
(signal-background-activity! +1))
(let loop ()
(sync (match (next-timer)
[#f never-evt]
[t (handle-evt (alarm-evt (pending-timer-deadline t))
(lambda (_dummy)
(define now (current-inexact-milliseconds))
(fire-timers! now)
(loop)))])
(handle-evt control-ch
(match-lambda
[(set-timer label msecs 'relative)
(define deadline (+ (current-inexact-milliseconds) msecs))
(install-timer! label deadline)
(loop)]
[(set-timer label deadline 'absolute)
(install-timer! label deadline)
(loop)]))))))
(on (message ($ instruction (set-timer _ _ _)))
(log-syndicate/drivers/timer-debug "received instruction ~a" instruction)
(channel-put control-ch instruction))
(during (observe (later-than $msecs))
(log-syndicate/drivers/timer-debug "observing (later-than ~a) at ~a"
msecs
(current-inexact-milliseconds))
(define timer-id (gensym 'timestate))
(on-start (send! (set-timer timer-id msecs 'absolute)))
(on (message (timer-expired timer-id _))
(react (assert (later-than msecs))))))
(define-syntax-rule (on-timeout relative-msecs body ...)
(let ((timer-id (gensym 'timeout)))
(on-start (send! (set-timer timer-id relative-msecs 'relative)))
(on (message (timer-expired timer-id _)) body ...)))
(define-syntax-rule (stop-when-timeout relative-msecs body ...)
(on-timeout relative-msecs (stop-current-facet body ...)))
(define (sleep sec)
(define timer-id (gensym 'sleep))
(until (message (timer-expired timer-id _))
(on-start (send! (set-timer timer-id (* sec 1000.0) 'relative)))))

View File

@ -0,0 +1,21 @@
#lang imperative-syndicate
(require/activate imperative-syndicate/drivers/timer)
(spawn #:name 'plain-timer-demo
(field [count 0])
(on-start (send! (set-timer 'main-timer 0 'relative)))
(on (message (timer-expired 'main-timer $now))
(log-info "main-timer expired at ~a" now)
(count (+ (count) 1))
(when (< (count) 5)
(send! (set-timer 'main-timer 500 'relative)))))
(spawn #:name 'later-than-demo
(field [deadline (current-inexact-milliseconds)]
[count 0])
(on (asserted (later-than (deadline)))
(log-info "later-than ticked for deadline ~a" (deadline))
(count (+ (count) 1))
(when (< (count) 5)
(deadline (+ (deadline) 500)))))

View File

@ -62,9 +62,9 @@
(let loop ()
(define work-remaining? (run-scripts! ds))
(define events-expected? (positive? background-activity-count))
(log-info "GROUND: ~a; ~a background activities"
(if work-remaining? "busy" "idle")
background-activity-count)
(log-syndicate/ground-debug "GROUND: ~a; ~a background activities"
(if work-remaining? "busy" "idle")
background-activity-count)
(cond
[events-expected?
(sync ground-event-relay-evt (if work-remaining? (system-idle-evt) never-evt))

View File

@ -90,8 +90,8 @@
x
(add-endpoint! outer-facet
"dataspace-relay (observe (inbound ...))"
(lambda () (observe x))
i))))
#t
(lambda () (values (observe x) i))))))
(on (retracted (observe (inbound $x)))
;; (log-info "~a (retracted (observe (inbound ~v)))" inner-actor x)
@ -106,8 +106,8 @@
x
(add-endpoint! outer-facet
"dataspace-relay (outbound ...)"
(lambda () x)
#f))))
#t
(lambda () (values x #f))))))
(on (retracted (outbound $x))
;; (log-info "~a (retracted (outbound ~v))" inner-actor x)

View File

@ -2,7 +2,8 @@
;; Common syntax classes.
(provide (for-syntax assertions
name))
name
snapshot))
(require racket/set)
@ -17,4 +18,8 @@
(define-splicing-syntax-class name
(pattern (~seq #:name N))
(pattern (~seq) #:attr N #'#f)))
(pattern (~seq) #:attr N #'#f))
(define-splicing-syntax-class snapshot
(pattern (~seq #:snapshot) #:attr dynamic? #'#f)
(pattern (~seq) #:attr dynamic? #'#t)))

View File

@ -182,12 +182,12 @@
(define-syntax (assert stx)
(syntax-parse stx
[(_ w:when-pred P)
[(_ w:when-pred snapshot:snapshot P)
(quasisyntax/loc stx
(add-endpoint! (current-facet)
#,(source-location->string stx)
(lambda () (when w.Pred P))
#f))]))
snapshot.dynamic?
(lambda () (values (when w.Pred P) #f))))]))
(define-syntax (stop-facet stx)
(syntax-parse stx
@ -245,6 +245,7 @@
(let ()
(add-endpoint! (current-facet)
#,(source-location->string stx)
#t
(lambda ()
(define subject-id (current-dataflow-subject-id))
(schedule-script!
@ -253,8 +254,7 @@
(lambda ()
(parameterize ((current-dataflow-subject-id subject-id))
expr ...)))
(void))
#f)))]))
(values (void) #f)))))]))
(define-syntax (define/dataflow stx)
(syntax-parse stx
@ -307,32 +307,49 @@
(syntax-rearm result event-stx)
script-stx
priority-stx)))]
[(message P)
[(message snapshot:snapshot P)
(define desc (analyse-pattern #'P))
(quasisyntax/loc outer-expr-stx
(add-endpoint! (current-facet)
#,(source-location->string outer-expr-stx)
(lambda () (when #,when-pred-stx (observe #,(desc->assertion-stx desc))))
(skeleton-interest #,(desc->skeleton-stx desc)
'#,(desc->skeleton-proj desc)
(list #,@(desc->key desc))
'#,(desc->capture-proj desc)
(capture-facet-context
(lambda (op #,@(desc->capture-names desc))
(when (eq? op '!)
(schedule-script!
#:priority #,priority-stx
(current-actor)
#,(quasisyntax/loc script-stx
(lambda ()
#,script-stx))))))
#f)))]
[(asserted P)
(analyse-asserted/retracted outer-expr-stx when-pred-stx script-stx #t #'P priority-stx)]
[(retracted P)
(analyse-asserted/retracted outer-expr-stx when-pred-stx script-stx #f #'P priority-stx)]))
snapshot.dynamic?
(lambda ()
(if #,when-pred-stx
(values (observe #,(desc->assertion-stx desc))
(skeleton-interest #,(desc->skeleton-stx desc)
'#,(desc->skeleton-proj desc)
(list #,@(desc->key desc))
'#,(desc->capture-proj desc)
(capture-facet-context
(lambda (op #,@(desc->capture-names desc))
(when (eq? op '!)
(schedule-script!
#:priority #,priority-stx
(current-actor)
#,(quasisyntax/loc script-stx
(lambda ()
#,script-stx))))))
#f))
(values (void) #f)))))]
[(asserted snapshot:snapshot P)
(analyse-asserted/retracted outer-expr-stx
#'snapshot.dynamic?
when-pred-stx
script-stx
#t
#'P
priority-stx)]
[(retracted snapshot:snapshot P)
(analyse-asserted/retracted outer-expr-stx
#'snapshot.dynamic?
when-pred-stx
script-stx
#f
#'P
priority-stx)]))
(define-for-syntax (analyse-asserted/retracted outer-expr-stx
snapshot-dynamic?-stx
when-pred-stx
script-stx
asserted?
@ -342,28 +359,32 @@
(quasisyntax/loc outer-expr-stx
(add-endpoint! (current-facet)
#,(source-location->string outer-expr-stx)
(lambda () (when #,when-pred-stx (observe #,(desc->assertion-stx desc))))
(skeleton-interest #,(desc->skeleton-stx desc)
'#,(desc->skeleton-proj desc)
(list #,@(desc->key desc))
'#,(desc->capture-proj desc)
(capture-facet-context
(lambda (op #,@(desc->capture-names desc))
(when (eq? op #,(if asserted? #''+ #''-))
(schedule-script!
#:priority #,priority-stx
(current-actor)
#,(quasisyntax/loc script-stx
(lambda ()
#,script-stx))))))
#f))))
#,snapshot-dynamic?-stx
(lambda ()
(if #,when-pred-stx
(values (observe #,(desc->assertion-stx desc))
(skeleton-interest #,(desc->skeleton-stx desc)
'#,(desc->skeleton-proj desc)
(list #,@(desc->key desc))
'#,(desc->capture-proj desc)
(capture-facet-context
(lambda (op #,@(desc->capture-names desc))
(when (eq? op #,(if asserted? #''+ #''-))
(schedule-script!
#:priority #,priority-stx
(current-actor)
#,(quasisyntax/loc script-stx
(lambda ()
#,script-stx))))))
#f))
(values (void) #f))))))
(define-syntax (during stx)
(syntax-parse stx
[(_ P O ...)
(quasisyntax/loc stx
(on (asserted P)
(react (stop-when (retracted #,(instantiate-pattern->pattern #'P)))
(react (stop-when (retracted #:snapshot #,(instantiate-pattern->pattern #'P)))
O ...)))]))
(define-syntax (during/spawn stx)
@ -392,8 +413,8 @@
#,@(if (attribute oncrash.expr)
#'(oncrash.expr)
#'()))
(stop-when (retracted #,Q-stx))))
(stop-when (retracted #,Q-stx)
(stop-when (retracted #:snapshot #,Q-stx))))
(stop-when (retracted #:snapshot #,Q-stx)
;; Demand (p) retracted before supply (inst) appeared. We
;; MUST wait for the supply to fully appear so that we can
;; reliably tell it to shut down. We must maintain interest

View File

@ -6,10 +6,12 @@
(spawn (field [x 123])
(assert (foo (x) 999))
(during (foo (x) $v)
(define x0 (x))
(printf "x=~a v=~a\n" (x) v)
(when (= (x) 123) (x 124))
(on-stop (printf "finally for x=~a v=~a\n" (x) v))))]
(on-stop
(printf "finally for x0=~a x=~a v=~a\n" x0 (x) v))))]
no-crashes
(expected-output "x=123 v=999"
"x=124 v=999"
"finally for x=124 v=999"))
"finally for x0=123 x=124 v=999"))

View File

@ -20,73 +20,83 @@
0))
(add-endpoint! (current-facet)
'stop-when-ten
#t
(lambda ()
(when (= (current-value) 3)
(stop-facet! (current-facet)
(lambda ()
(printf "box: terminating\n"))))
(void))
#f)
(values (void) #f)))
(add-endpoint! (current-facet)
'assert-box-state
(lambda () (box-state (current-value)))
#f)
(add-endpoint! (current-facet)
'on-message-set-box
(lambda () (observe (set-box (capture (discard)))))
(skeleton-interest (list struct:set-box #f)
'()
'()
'((0 0))
(capture-facet-context
(lambda (op new-value)
(when (eq? '! op)
(schedule-script!
(current-actor)
(lambda ()
(printf "box: taking on new-value ~v\n" new-value)
(current-value new-value))))))
#f)))
#t
(lambda () (values (box-state (current-value)) #f)))
(add-endpoint!
(current-facet)
'on-message-set-box
#t
(lambda ()
(values (observe (set-box (capture (discard))))
(skeleton-interest (list struct:set-box #f)
'()
'()
'((0 0))
(capture-facet-context
(lambda (op new-value)
(when (eq? '! op)
(schedule-script!
(current-actor)
(lambda ()
(printf "box: taking on new-value ~v\n"
new-value)
(current-value new-value))))))
#f)))))
(set))
(spawn!
(current-actor)
'client
(lambda ()
(add-endpoint! (current-facet)
'stop-when-retracted-observe-set-box
(lambda () (observe (observe (set-box (discard)))))
(skeleton-interest (list struct:observe (list struct:set-box #f))
'()
'()
'()
(capture-facet-context
(lambda (op)
(when (eq? '- op)
(stop-facet!
(current-facet)
(lambda ()
(printf "client: box has gone\n"))))))
#f))
(add-endpoint! (current-facet)
'on-asserted-box-state
(lambda () (observe (box-state (capture (discard)))))
(skeleton-interest (list struct:box-state #f)
'()
'()
'((0 0))
(capture-facet-context
(lambda (op v)
(when (eq? '+ op)
(schedule-script!
(current-actor)
(lambda ()
(printf
"client: learned that box's value is now ~v\n"
v)
(enqueue-send! (current-actor)
(set-box (+ v 1))))))))
#f)))
(add-endpoint!
(current-facet)
'stop-when-retracted-observe-set-box
#t
(lambda ()
(values (observe (observe (set-box (discard))))
(skeleton-interest (list struct:observe (list struct:set-box #f))
'()
'()
'()
(capture-facet-context
(lambda (op)
(when (eq? '- op)
(stop-facet!
(current-facet)
(lambda ()
(printf "client: box has gone\n"))))))
#f))))
(add-endpoint!
(current-facet)
'on-asserted-box-state
#t
(lambda ()
(values (observe (box-state (capture (discard))))
(skeleton-interest (list struct:box-state #f)
'()
'()
'((0 0))
(capture-facet-context
(lambda (op v)
(when (eq? '+ op)
(schedule-script!
(current-actor)
(lambda ()
(printf
"client: learned that box's value is now ~v\n"
v)
(enqueue-send! (current-actor)
(set-box (+ v 1))))))))
#f)))))
(set))]
no-crashes
(expected-output "client: learned that box's value is now 0"