syndicate-rkt/syndicate/syntax.rkt

666 lines
26 KiB
Racket

#lang racket/base
;; DSL syntax over the API of dataspace.rkt
(provide spawn
spawn*
react
react/suspend
until
field
assert
stop-facet
stop-current-facet
stop-when
stop-when-true
on-start
on-stop
on
add-raw-observer-endpoint!
add-observer-endpoint!
during
during/spawn
begin/dataflow
define/dataflow
asserted
retracted
message
let-event
query-value
query-set
query-hash
;; query-hash-set
query-count
query-value*
query-set*
query-hash*
;; query-hash-set*
query-count*
define/query-value
define/query-set
define/query-hash
;; define/query-hash-set
define/query-count
immediate-query
send!
defer-turn!
flush!
assert!
retract!
current-adhoc-assertions
;;
;; current-action-transformer
)
(require (for-syntax racket/base))
(require (for-syntax syntax/parse))
(require (for-syntax syntax/srcloc))
(require "syntax-classes.rkt")
(require "assertions.rkt")
(require "dataspace.rkt")
(require (submod "dataspace.rkt" priorities))
(require "event-expander.rkt")
(require "skeleton.rkt")
(require "pattern.rkt")
(require "term.rkt")
(require racket/match)
(require racket/set)
(require syndicate/dataflow)
(require syndicate/protocol/instance)
(begin-for-syntax
(define-splicing-syntax-class actor-wrapper
(pattern (~seq #:spawn wrapper))
(pattern (~seq) #:attr wrapper #'spawn))
(define-splicing-syntax-class on-crash-option
(pattern (~seq #:on-crash expr))
(pattern (~seq) #:attr expr #f))
(define-splicing-syntax-class let-option
(pattern (~seq #:let clauses))
(pattern (~seq) #:attr clauses #'()))
(define-splicing-syntax-class when-pred
(pattern (~seq #:when Pred))
(pattern (~seq) #:attr Pred #'#t))
(define-splicing-syntax-class priority
(pattern (~seq #:priority level))
(pattern (~seq) #:attr level #'*normal-priority*)))
(define-syntax (spawn stx)
(syntax-parse stx
[(_ (~or (~optional (~seq #:name name-expr) #:defaults ([name-expr #'#f])
#:name "#:name")
(~optional (~seq #:assertions [assertion-exprs ...])
#:name "#:assertions")
(~optional (~seq #:linkage [linkage-expr ...]) #:defaults ([(linkage-expr 1) '()])
#:name "#:linkage"))
...
O ...)
(quasisyntax/loc stx
(spawn** #:name name-expr
#:assertions #,(cond [(attribute assertion-exprs) #'[assertion-exprs ...]]
[else #'[]])
linkage-expr ... O ...))]))
(define-syntax (spawn* stx)
(syntax-parse stx
[(_ name:name assertions:assertions script ...)
(quasisyntax/loc stx
(spawn** #:name name.N
#:assertions [assertions.exprs ...]
(on-start script ...)))]))
(define-syntax (spawn** stx)
(syntax-parse stx
[(_ name:name assertions:assertions script ...)
(quasisyntax/loc stx
(begin
(ensure-in-script! 'spawn!)
(spawn!
(current-actor)
name.N
(lambda () (begin/void-default script ...))
(set assertions.exprs ...))))]))
(define-syntax (begin/void-default stx)
(syntax-parse stx
[(_) (syntax/loc stx (void))]
[(_ expr0 expr ...) (syntax/loc stx (begin expr0 expr ...))]))
(define (react* where boot-proc)
(add-facet! where
(current-actor)
(current-facet)
boot-proc))
(define-syntax (react stx)
(syntax-parse stx
[(_ O ...)
(quasisyntax/loc stx
(react* #,(source-location->string stx)
(lambda () (begin/void-default O ...))))]))
(define-syntax (react/suspend stx)
(syntax-parse stx
[(_ (resume-parent) O ...)
(quasisyntax/loc stx
(suspend-script* #,(source-location->string stx)
(lambda (resume-parent)
(react* #,(source-location->string stx)
(lambda () (begin/void-default O ...))))))]))
(define-syntax (until stx)
(syntax-parse stx
[(_ E O ...)
(syntax/loc stx
(react/suspend (continue)
(stop-when E (continue (void)))
O ...))]))
(define (make-field name init)
(let ((ac (current-actor)))
(field-handle name (generate-id! (actor-dataspace ac)) ac init)))
(define-syntax (define-field stx)
(syntax-parse stx
[(_ id init)
#'(define id (make-field 'id init))]))
(define-syntax (field stx)
(syntax-parse stx
[(_ [id:id init] ...)
(quasisyntax/loc stx
(begin (define-field id init)
...))]))
(define-syntax (assert stx)
(syntax-parse stx
[(_ w:when-pred snapshot:snapshot P)
(quasisyntax/loc stx
(add-endpoint! (current-facet)
#,(source-location->string stx)
snapshot.dynamic?
(lambda () (values (when w.Pred P) #f))))]))
(define-syntax (stop-facet stx)
(syntax-parse stx
[(_ f-expr script ...)
(quasisyntax/loc stx
(let ((f f-expr))
(when (not (equal? (facet-actor f) (current-actor)))
(error 'stop-facet "Attempt to stop unrelated facet ~a from actor ~a" f (current-actor)))
(stop-facet! f (lambda () (begin/void-default script ...)))))]))
(define-syntax-rule (stop-current-facet script ...)
(stop-facet (current-facet) script ...))
(define-syntax-rule (stop-when-true condition script ...)
(begin/dataflow
(when condition
(stop-facet (current-facet) script ...))))
(define-syntax (on-start stx)
(syntax-parse stx
[(_ script ...)
(quasisyntax/loc stx
(schedule-script! (current-actor)
(lambda () (begin/void-default script ...))))]))
(define-syntax (on-stop stx)
(syntax-parse stx
[(_ script ...)
(quasisyntax/loc stx
(add-stop-script! (current-facet)
(lambda () (begin/void-default script ...))))]))
(define-syntax (stop-when stx)
(syntax-parse stx
[(_ w:when-pred E prio:priority script ...)
(analyse-event stx
#'w.Pred
#'E
(syntax/loc stx (stop-current-facet script ...))
#'prio.level)]))
(define-syntax (on stx)
(syntax-parse stx
[(_ w:when-pred E prio:priority script ...)
(analyse-event stx
#'w.Pred
#'E
(syntax/loc stx (begin/void-default script ...))
#'prio.level)]))
(define (add-raw-observer-endpoint! spec-thunk
#:on-add [on-add void]
#:on-remove [on-remove void]
#:on-message [on-message void]
#:cleanup [cleanup #f])
(add-endpoint! (current-facet)
"add-observer-endpoint!/add-raw-observer-endpoint!"
#t
(lambda ()
(define spec (spec-thunk))
(if (void? spec)
(values (void) #f)
(values (observe spec)
(term->skeleton-interest
spec
(lambda (op . captured-values)
(match op
['+ (on-add captured-values)]
['- (on-remove captured-values)]
['! (on-message captured-values)]))
#:cleanup cleanup))))))
(define (add-observer-endpoint! spec-thunk
#:on-add [on-add void]
#:on-remove [on-remove void]
#:on-message [on-message void]
#:cleanup [cleanup #f])
(define (scriptify f)
(if (eq? f void)
void
(capture-facet-context
(lambda (captured-values)
(schedule-script! (current-actor) (lambda () (f captured-values)))))))
(add-raw-observer-endpoint! spec-thunk
#:on-add (scriptify on-add)
#:on-remove (scriptify on-remove)
#:on-message (scriptify on-message)
#:cleanup cleanup))
(define-syntax (begin/dataflow stx)
(syntax-parse stx
[(_ prio:priority expr ...)
(quasisyntax/loc stx
(let ()
(add-endpoint! (current-facet)
#,(source-location->string stx)
#t
(lambda ()
(define subject-id (current-dataflow-subject-id))
(schedule-script!
#:priority prio.level
(current-actor)
(lambda ()
(parameterize ((current-dataflow-subject-id subject-id))
expr ...)))
(values (void) #f)))))]))
(define-syntax (define/dataflow stx)
(syntax-parse stx
[(_ fieldname expr)
(quasisyntax/loc stx (define/dataflow fieldname expr #:default #f))]
[(_ fieldname expr #:default default-expr)
(quasisyntax/loc stx
(begin
(field [fieldname default-expr])
(begin/dataflow (fieldname expr))))]))
(define-syntax (asserted stx) (raise-syntax-error #f "asserted: Used outside event spec" stx))
(define-syntax (retracted stx) (raise-syntax-error #f "retracted: Used outside event spec" stx))
(define-syntax (message stx) (raise-syntax-error #f "message: Used outside event spec" stx))
(define-syntax (suspend-script stx)
(syntax-parse stx
[(_ proc)
(quasisyntax/loc stx
(suspend-script* #,(source-location->string stx) proc))]))
(define-syntax (let-event stx)
(syntax-parse stx
[(_ [e ...] body ...)
(syntax/loc stx
((react/suspend (k)
(on-start (-let-event [e ...] (stop-current-facet (k (lambda () body ...))))))))]))
(define-syntax (-let-event stx)
(syntax-parse stx
[(_ [] expr) #'expr]
[(_ [e es ...] expr) (quasisyntax/loc #'e (react (stop-when e (-let-event [es ...] expr))))]))
(define-for-syntax orig-insp
(variable-reference->module-declaration-inspector (#%variable-reference)))
(define-for-syntax (analyse-event outer-expr-stx
when-pred-stx
armed-event-stx
script-stx
priority-stx)
(define event-stx (syntax-disarm armed-event-stx orig-insp))
(syntax-parse event-stx
#:literals [message asserted retracted]
[(expander args ...) #:when (event-expander-id? #'expander)
(event-expander-transform event-stx
(lambda (result)
(analyse-event outer-expr-stx
when-pred-stx
(syntax-rearm result event-stx)
script-stx
priority-stx)))]
[(message snapshot:snapshot P)
(define desc (analyse-pattern #'P))
(quasisyntax/loc outer-expr-stx
(add-endpoint! (current-facet)
#,(source-location->string outer-expr-stx)
snapshot.dynamic?
(lambda ()
(if #,when-pred-stx
(values (observe #,(desc->assertion-stx desc))
(skeleton-interest #,(desc->skeleton-stx desc)
'#,(desc->skeleton-proj desc)
(list #,@(desc->key desc))
'#,(desc->capture-proj desc)
(capture-facet-context
(lambda (op #,@(desc->capture-names desc))
(when (eq? op '!)
;; (log-info "~a ~a ~v"
;; (current-facet)
;; op
;; (list #,@(desc->capture-names desc)))
(schedule-script!
#:priority #,priority-stx
(current-actor)
#,(quasisyntax/loc script-stx
(lambda ()
#,script-stx))))))
#f))
(values (void) #f)))))]
[(asserted snapshot:snapshot P)
(analyse-asserted/retracted outer-expr-stx
#'snapshot.dynamic?
when-pred-stx
script-stx
#t
#'P
priority-stx)]
[(retracted snapshot:snapshot P)
(analyse-asserted/retracted outer-expr-stx
#'snapshot.dynamic?
when-pred-stx
script-stx
#f
#'P
priority-stx)]))
(define-for-syntax (analyse-asserted/retracted outer-expr-stx
snapshot-dynamic?-stx
when-pred-stx
script-stx
asserted?
P-stx
priority-stx)
(define desc (analyse-pattern P-stx))
(quasisyntax/loc outer-expr-stx
(add-endpoint! (current-facet)
#,(source-location->string outer-expr-stx)
#,snapshot-dynamic?-stx
(lambda ()
(if #,when-pred-stx
(values (observe #,(desc->assertion-stx desc))
(skeleton-interest #,(desc->skeleton-stx desc)
'#,(desc->skeleton-proj desc)
(list #,@(desc->key desc))
'#,(desc->capture-proj desc)
(capture-facet-context
(lambda (op #,@(desc->capture-names desc))
(when (eq? op #,(if asserted? #''+ #''-))
;; (log-info "~a ~a ~v"
;; (current-facet)
;; op
;; (list #,@(desc->capture-names desc)))
(schedule-script!
#:priority #,priority-stx
(current-actor)
#,(quasisyntax/loc script-stx
(lambda ()
#,script-stx))))))
#f))
(values (void) #f))))))
(define-syntax (during stx)
(syntax-parse stx
[(_ P O ...)
(quasisyntax/loc stx
(on (asserted P)
(react (stop-when (retracted #:snapshot #,(instantiate-pattern->pattern #'P)))
O ...)))]))
(define-syntax (during/spawn stx)
(syntax-parse stx
[(_ P w:actor-wrapper name:name assertions:assertions parent-let:let-option
oncrash:on-crash-option
O ...)
(define Q-stx (instantiate-pattern->pattern #'P))
(quasisyntax/loc stx
(on (asserted P)
(let* ((id (gensym 'during/spawn))
(inst (instance id #,(instantiate-pattern->value #'P)))
;; ^ this is the assertion representing supply
)
(react (stop-when (asserted inst)
;; Supply (inst) appeared before demand (p) retracted.
;; Transition to a state where we monitor demand, but also
;; express interest in supply: this latter acts as a signal
;; to the supply that it should stick around. We react to
;; retraction of supply before retraction of demand by
;; invoking the on-crash expression, if supplied. Once
;; demand is retracted, this facet terminates, retracting
;; its interest in supply, thereby signalling to the supply
;; that it is no longer wanted.
(react (stop-when (retracted inst) ;; NOT OPTIONAL
#,@(if (attribute oncrash.expr)
#'(oncrash.expr)
#'()))
(stop-when (retracted #:snapshot #,Q-stx))))
(stop-when (retracted #:snapshot #,Q-stx)
;; Demand (p) retracted before supply (inst) appeared. We
;; MUST wait for the supply to fully appear so that we can
;; reliably tell it to shut down. We must maintain interest
;; in supply until we see supply, and then terminate, thus
;; signalling to supply that it is no longer wanted.
(react (stop-when (asserted inst)))))
(let parent-let.clauses
(w.wrapper #:linkage [(assert inst)
(stop-when (retracted (observe inst)))]
#:name name.N
#:assertions [inst
(observe (observe inst))
assertions.exprs ...]
O ...)))))]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Queries
(begin-for-syntax
(define-splicing-syntax-class on-add
(pattern (~optional (~seq #:on-add expr) #:defaults ([expr #f]))))
(define-splicing-syntax-class on-remove
(pattern (~optional (~seq #:on-remove expr) #:defaults ([expr #f]))))
(define (schedule-query-handler-stxs maybe-expr-stx)
(if maybe-expr-stx
(quasisyntax/loc maybe-expr-stx
((schedule-script! #:priority *query-handler-priority*
(current-actor)
(lambda () #,maybe-expr-stx))))
#'())))
(define-syntax (query-value stx)
(syntax-parse stx
[(_ field-name absent-expr args ...)
(quasisyntax/loc stx
(let ()
(field [field-name absent-expr])
(query-value* field-name absent-expr args ...)))]))
(define-syntax (query-value* stx)
(syntax-parse stx
[(_ field-name absent-expr P expr on-add:on-add on-remove:on-remove)
(quasisyntax/loc stx
(let ((F field-name))
(on (asserted P) #:priority *query-priority*
#,@(schedule-query-handler-stxs (attribute on-add.expr))
(F expr))
(on (retracted P) #:priority *query-priority-high*
#,@(schedule-query-handler-stxs (attribute on-remove.expr))
(F absent-expr))
F))]))
(define-syntax (query-set stx)
(syntax-parse stx
[(_ field-name args ...)
(quasisyntax/loc stx
(let ()
(field [field-name (set)])
(query-set* field-name args ...)))]))
(define-syntax (query-set* stx)
(syntax-parse stx
[(_ field-name P expr on-add:on-add on-remove:on-remove)
(quasisyntax/loc stx
(let ((F field-name))
(on (asserted P) #:priority *query-priority*
(let ((V expr))
(when (not (set-member? (F) V))
#,@(schedule-query-handler-stxs (attribute on-add.expr))
(F (set-add (F) V)))))
(on (retracted P) #:priority *query-priority-high*
(let ((V expr))
(when (set-member? (F) V)
#,@(schedule-query-handler-stxs (attribute on-remove.expr))
(F (set-remove (F) V)))))
F))]))
(define-syntax (query-hash stx)
(syntax-parse stx
[(_ field-name args ...)
(quasisyntax/loc stx
(let ()
(field [field-name (hash)])
(query-hash* field-name args ...)))]))
(define-syntax (query-hash* stx)
(syntax-parse stx
[(_ field-name P key-expr value-expr on-add:on-add on-remove:on-remove)
(quasisyntax/loc stx
(let ((F field-name))
(on (asserted P) #:priority *query-priority*
(let ((key key-expr))
(when (hash-has-key? (F) key)
(log-warning "query-hash: field ~v with pattern ~v: overwriting existing entry ~v"
'field-name
'P
key))
#,@(schedule-query-handler-stxs (attribute on-add.expr))
(F (hash-set (F) key value-expr))))
(on (retracted P) #:priority *query-priority-high*
#,@(schedule-query-handler-stxs (attribute on-remove.expr))
(F (hash-remove (F) key-expr)))
F))]))
;; (define-syntax (query-hash-set stx)
;; (syntax-parse stx
;; [(_ field-name args ...)
;; (quasisyntax/loc stx
;; (let ()
;; (field [field-name (hash)])
;; (query-hash-set* field-name args ...)))]))
;; (define-syntax (query-hash-set* stx)
;; (syntax-parse stx
;; [(_ field-name P key-expr value-expr on-add:on-add on-remove:on-remove)
;; (quasisyntax/loc stx
;; (let ((F field-name))
;; (on (asserted P) #:priority *query-priority*
;; (let ((K key-expr) (V value-expr))
;; (when (not (hashset-member? (F) K V))
;; #,@(schedule-query-handler-stxs (attribute on-add.expr))
;; (F (hashset-add (F) K V)))))
;; (on (retracted P) #:priority *query-priority-high*
;; (let ((K key-expr) (V value-expr))
;; (when (hashset-member? (F) K V)
;; #,@(schedule-query-handler-stxs (attribute on-remove.expr))
;; (F (hashset-remove (F) K V)))))
;; F))]))
(define-syntax (query-count stx)
(syntax-parse stx
[(_ field-name args ...)
(quasisyntax/loc stx
(let ()
(field [field-name (hash)])
(query-count* field-name args ...)))]))
(define-syntax (query-count* stx)
(syntax-parse stx
[(_ field-name P expr on-add:on-add on-remove:on-remove)
(quasisyntax/loc stx
(let ((F field-name))
(on (asserted P) #:priority *query-priority*
(let ((E expr))
#,@(schedule-query-handler-stxs (attribute on-add.expr))
(F (hash-set (F) E (+ 1 (hash-ref (F) E 0))))))
(on (retracted P) #:priority *query-priority-high*
(let ((E expr))
#,@(schedule-query-handler-stxs (attribute on-remove.expr))
(let ((F0 (F)))
(F (match (hash-ref F0 E 0)
[0 F0] ;; huh
[1 (hash-remove F0 E)]
[n (hash-set F0 E (- n 1))])))))
F))]))
(define-syntax-rule (define/query-value id ae P x ...) (define id (query-value id ae P x ...)))
(define-syntax-rule (define/query-set id P x ...) (define id (query-set id P x ...)))
(define-syntax-rule (define/query-hash id P x ...) (define id (query-hash id P x ...)))
;; (define-syntax-rule (define/query-hash-set id P x ...) (define id (query-hash-set id P x ...)))
(define-syntax-rule (define/query-count id P x ...) (define id (query-count id P x ...)))
(define-syntax (immediate-query stx)
(syntax-case stx ()
[(_ [op args ...] ...)
(with-syntax [((query-result ...) (generate-temporaries #'(op ...)))]
(syntax/loc stx
(react/suspend (k)
(define query-result (op query-result args ...)) ...
(on-start (flush!) (k (query-result) ...)))))]))
(define (send! m)
(ensure-in-script! 'send!)
(enqueue-send! (current-actor) m))
(define (defer-turn! k)
(ensure-in-script! 'defer-turn!)
(enqueue-deferred-turn! (current-actor) k))
(define (flush!)
(ensure-in-script! 'flush!)
(define ack (gensym 'flush!))
(until (message ack)
(on-start (send! ack))))
(define (assert! a [count 1])
(ensure-in-script! 'assert!)
(adhoc-assert! (current-actor) a count))
(define (retract! a [count 1])
(ensure-in-script! 'retract!)
(adhoc-retract! (current-actor) a count))
(define (current-adhoc-assertions)
(actor-adhoc-assertions (current-actor)))