syndicate-rkt/syndicate/syntax.rkt

621 lines
23 KiB
Racket
Raw Normal View History

#lang racket/base
;; DSL syntax over the API of dataspace.rkt
(provide spawn
spawn*
react
react/suspend
until
field
assert
stop-facet
stop-current-facet
stop-when
stop-when-true
on-start
on-stop
on
2018-04-08 10:44:32 +00:00
;; during
;; during/spawn
begin/dataflow
define/dataflow
asserted
retracted
message
let-event
2018-04-08 10:44:32 +00:00
;; query-value
;; query-set
;; query-hash
;; query-hash-set
;; query-count
;; query-value*
;; query-set*
;; query-hash*
;; query-hash-set*
;; query-count*
;; define/query-value
;; define/query-set
;; define/query-hash
;; define/query-hash-set
;; define/query-count
;; immediate-query
send!
flush!
;;
2018-04-08 10:44:32 +00:00
;; current-action-transformer
)
(require (for-syntax racket/base))
(require (for-syntax syntax/parse))
(require (for-syntax syntax/srcloc))
(require "syntax-classes.rkt")
(require "dataspace.rkt")
(require (submod "dataspace.rkt" priorities))
2018-04-08 10:44:32 +00:00
(require "event-expander.rkt")
(require "skeleton.rkt")
(require "pattern.rkt")
(require racket/set)
(require syndicate/dataflow)
(begin-for-syntax
(define-splicing-syntax-class actor-wrapper
(pattern (~seq #:spawn wrapper))
(pattern (~seq) #:attr wrapper #'spawn))
(define-splicing-syntax-class on-crash-option
(pattern (~seq #:on-crash expr))
(pattern (~seq) #:attr expr #f))
(define-splicing-syntax-class let-option
(pattern (~seq #:let clauses))
(pattern (~seq) #:attr clauses #'()))
(define-splicing-syntax-class when-pred
(pattern (~seq #:when Pred))
(pattern (~seq) #:attr Pred #'#t))
(define-splicing-syntax-class priority
(pattern (~seq #:priority level))
(pattern (~seq) #:attr level #'*normal-priority*)))
(define-syntax (spawn stx)
(syntax-parse stx
[(_ (~or (~optional (~seq #:name name-expr) #:defaults ([name-expr #'#f])
#:name "#:name")
(~optional (~seq #:assertions [assertion-exprs ...])
#:name "#:assertions")
(~optional (~seq #:linkage [linkage-expr ...]) #:defaults ([(linkage-expr 1) '()])
#:name "#:linkage"))
...
O ...)
(quasisyntax/loc stx
(spawn** #:name name-expr
#:assertions #,(cond [(attribute assertion-exprs) #'[assertion-exprs ...]]
[else #'[]])
linkage-expr ... O ...))]))
(define-syntax (spawn* stx)
(syntax-parse stx
[(_ name:name assertions:assertions script ...)
(quasisyntax/loc stx
(spawn** #:name name.N
#:assertions [assertions.exprs ...]
(on-start script ...)))]))
(define-syntax (spawn** stx)
(syntax-parse stx
[(_ name:name assertions:assertions script ...)
(quasisyntax/loc stx
(dataspace-spawn!
(current-dataspace)
name.N
(lambda () (begin/void-default script ...))
(set assertions.exprs ...)))]))
2018-04-08 10:44:32 +00:00
(define-syntax (begin/void-default stx)
(syntax-parse stx
[(_) (syntax/loc stx (void))]
[(_ expr0 expr ...) (syntax/loc stx (begin expr0 expr ...))]))
(define (react* where boot-proc)
(define ds (current-dataspace))
(add-facet! ds
where
(current-actor)
(current-facet)
boot-proc))
(define-syntax (react stx)
(syntax-parse stx
[(_ O ...)
(quasisyntax/loc stx
(react* #,(source-location->string stx)
(lambda () (begin/void-default O ...))))]))
(define-syntax (react/suspend stx)
(syntax-parse stx
[(_ (resume-parent) O ...)
(quasisyntax/loc stx
(suspend-script* #,(source-location->string stx)
(lambda (resume-parent)
(react* #,(source-location->string stx)
(lambda () (begin/void-default O ...))))))]))
(define-syntax (until stx)
(syntax-parse stx
[(_ E O ...)
(syntax/loc stx
(react/suspend (continue)
(stop-when E (continue (void)))
O ...))]))
(define (make-field name init)
(field-handle name
(generate-id! (current-dataspace))
2018-04-08 10:44:32 +00:00
(current-actor)
init))
(define-syntax (define-field stx)
(syntax-parse stx
[(_ id init)
#'(define id (make-field 'id init))]))
(define-syntax (field stx)
(syntax-parse stx
[(_ [id:id init] ...)
(quasisyntax/loc stx
(begin (define-field id init)
...))]))
(define-syntax (assert stx)
(syntax-parse stx
[(_ w:when-pred P)
(quasisyntax/loc stx
(add-endpoint! (current-dataspace)
#,(source-location->string stx)
2018-04-08 10:44:32 +00:00
(lambda () (when w.Pred P))
#f))]))
(define-syntax (stop-facet stx)
(syntax-parse stx
2018-04-08 10:44:32 +00:00
[(_ f-expr script ...)
(quasisyntax/loc stx
2018-04-08 10:44:32 +00:00
(let ((f f-expr))
(when (not (equal? (facet-actor f) (current-actor)))
(error 'stop-facet "Attempt to stop unrelated facet ~a from actor ~a" f (current-actor)))
(stop-facet! (current-dataspace) f (lambda () (begin/void-default script ...)))))]))
(define-syntax-rule (stop-current-facet script ...)
2018-04-08 10:44:32 +00:00
(stop-facet (current-facet) script ...))
(define-syntax-rule (stop-when-true condition script ...)
(begin/dataflow
(when condition
2018-04-08 10:44:32 +00:00
(stop-facet (current-facet) script ...))))
(define-syntax (on-start stx)
(syntax-parse stx
[(_ script ...)
(quasisyntax/loc stx
(schedule-script! (current-dataspace)
(lambda () (begin/void-default script ...))))]))
(define-syntax (on-stop stx)
(syntax-parse stx
[(_ script ...)
(quasisyntax/loc stx
(add-stop-script! (current-dataspace)
(lambda () (begin/void-default script ...))))]))
(define-syntax (stop-when stx)
(syntax-parse stx
[(_ w:when-pred E prio:priority script ...)
2018-04-08 10:44:32 +00:00
(analyse-event stx
#'w.Pred
#'E
(syntax/loc stx (stop-current-facet script ...))
#'prio.level)]))
(define-syntax (on stx)
(syntax-parse stx
[(_ w:when-pred E prio:priority script ...)
2018-04-08 10:44:32 +00:00
(analyse-event stx
#'w.Pred
#'E
(syntax/loc stx (begin/void-default script ...))
#'prio.level)]))
(define-syntax (begin/dataflow stx)
(syntax-parse stx
[(_ prio:priority expr ...)
(quasisyntax/loc stx
(let ()
(add-endpoint! (current-dataspace)
#,(source-location->string stx)
(lambda ()
(define subject-id (current-dataflow-subject-id))
(schedule-script!
#:priority prio.level
2018-04-08 10:44:32 +00:00
(current-dataspace)
(lambda ()
(parameterize ((current-dataflow-subject-id subject-id))
expr ...)))
(void))
#f)))]))
(define-syntax (define/dataflow stx)
(syntax-parse stx
[(_ fieldname expr)
(quasisyntax/loc stx (define/dataflow fieldname expr #:default #f))]
[(_ fieldname expr #:default default-expr)
(quasisyntax/loc stx
(begin
(field [fieldname default-expr])
(begin/dataflow (fieldname expr))))]))
(define-syntax (asserted stx) (raise-syntax-error #f "asserted: Used outside event spec" stx))
(define-syntax (retracted stx) (raise-syntax-error #f "retracted: Used outside event spec" stx))
2018-04-08 10:44:32 +00:00
(define-syntax (message stx) (raise-syntax-error #f "message: Used outside event spec" stx))
(define-syntax (suspend-script stx)
(syntax-parse stx
[(_ proc)
(quasisyntax/loc stx
(suspend-script* #,(source-location->string stx) proc))]))
(define-syntax (let-event stx)
(syntax-parse stx
[(_ [e ...] body ...)
(syntax/loc stx
((react/suspend (k)
(on-start (-let-event [e ...] (stop-current-facet (k (lambda () body ...))))))))]))
(define-syntax (-let-event stx)
(syntax-parse stx
[(_ [] expr) #'expr]
[(_ [e es ...] expr) (quasisyntax/loc #'e (react (stop-when e (-let-event [es ...] expr))))]))
(define-for-syntax orig-insp
(variable-reference->module-declaration-inspector (#%variable-reference)))
2018-04-08 10:44:32 +00:00
(define-for-syntax (analyse-event outer-expr-stx
when-pred-stx
armed-event-stx
script-stx
priority-stx)
(define event-stx (syntax-disarm armed-event-stx orig-insp))
(syntax-parse event-stx
2018-04-08 10:44:32 +00:00
#:literals [message asserted retracted]
[(expander args ...) #:when (event-expander-id? #'expander)
(event-expander-transform event-stx
(lambda (result)
(analyse-event outer-expr-stx
when-pred-stx
(syntax-rearm result event-stx)
script-stx
priority-stx)))]
[(message P)
(define desc (analyse-pattern #'P))
(quasisyntax/loc outer-expr-stx
2018-04-08 10:44:32 +00:00
(add-endpoint! (current-dataspace)
#,(source-location->string outer-expr-stx)
(lambda () (when #,when-pred-stx (observe #,(desc->assertion-stx desc))))
(skeleton-interest #,(desc->skeleton-stx desc)
'#,(desc->skeleton-proj desc)
(list #,@(desc->key desc))
'#,(desc->capture-proj desc)
(capture-facet-context
(lambda (op #,@(desc->capture-names desc))
(when (eq? op '!)
(schedule-script!
#:priority #,priority-stx
(current-dataspace)
2018-04-09 09:29:14 +00:00
#,(quasisyntax/loc script-stx
(lambda ()
#,script-stx)))))))))]
[(asserted P)
2018-04-08 10:44:32 +00:00
(analyze-asserted/retracted outer-expr-stx when-pred-stx script-stx #t #'P priority-stx)]
[(retracted P)
2018-04-08 10:44:32 +00:00
(analyze-asserted/retracted outer-expr-stx when-pred-stx script-stx #f #'P priority-stx)]))
2018-04-08 10:44:32 +00:00
(define-for-syntax (analyze-asserted/retracted outer-expr-stx
when-pred-stx
script-stx
asserted?
P-stx
priority-stx)
(define desc (analyse-pattern P-stx))
(quasisyntax/loc outer-expr-stx
(add-endpoint! (current-dataspace)
#,(source-location->string outer-expr-stx)
(lambda () (when #,when-pred-stx (observe #,(desc->assertion-stx desc))))
(skeleton-interest #,(desc->skeleton-stx desc)
'#,(desc->skeleton-proj desc)
(list #,@(desc->key desc))
'#,(desc->capture-proj desc)
(capture-facet-context
(lambda (op #,@(desc->capture-names desc))
(when (eq? op #,(if asserted? #''+ #''-))
(schedule-script!
#:priority #,priority-stx
(current-dataspace)
2018-04-09 09:29:14 +00:00
#,(quasisyntax/loc script-stx
(lambda ()
#,script-stx))))))))))
2018-04-08 10:44:32 +00:00
;; (define-syntax (during stx)
;; (syntax-parse stx
;; [(_ P O ...)
;; (define E-stx (syntax/loc #'P (asserted P)))
;; (define-values (_proj _pat _bindings instantiated)
;; (analyze-pattern E-stx #'P))
;; (quasisyntax/loc stx
;; (on #,E-stx
;; (let ((p #,instantiated))
;; (react (stop-when (retracted p))
;; O ...))))]))
;; (define-syntax (during/spawn stx)
;; (syntax-parse stx
;; [(_ P w:actor-wrapper name:name assertions:assertions parent-let:let-option
;; oncrash:on-crash-option
;; O ...)
;; (define E-stx (syntax/loc #'P (asserted P)))
;; (define-values (_proj _pat _bindings instantiated)
;; (analyze-pattern E-stx #'P))
;; (quasisyntax/loc stx
;; (on #,E-stx
;; (let* ((id (gensym 'during/spawn))
;; (p #,instantiated) ;; this is the concrete assertion corresponding to demand
;; (inst (instance id p))) ;; this is the assertion representing supply
;; (react (stop-when (asserted inst)
;; ;; Supply (inst) appeared before demand (p) retracted.
;; ;; Transition to a state where we monitor demand, but also
;; ;; express interest in supply: this latter acts as a signal
;; ;; to the supply that it should stick around. We react to
;; ;; retraction of supply before retraction of demand by
;; ;; invoking the on-crash expression, if supplied. Once
;; ;; demand is retracted, this facet terminates, retracting
;; ;; its interest in supply, thereby signalling to the supply
;; ;; that it is no longer wanted.
;; (react (stop-when (retracted inst) ;; NOT OPTIONAL
;; #,@(if (attribute oncrash.expr)
;; #'(oncrash.expr)
;; #'()))
;; (stop-when (retracted p))))
;; (stop-when (retracted p)
;; ;; Demand (p) retracted before supply (inst) appeared. We
;; ;; MUST wait for the supply to fully appear so that we can
;; ;; reliably tell it to shut down. We must maintain interest
;; ;; in supply until we see supply, and then terminate, thus
;; ;; signalling to supply that it is no longer wanted.
;; (react (stop-when (asserted inst)))))
;; (let parent-let.clauses
;; (w.wrapper #:linkage [(assert inst)
;; (stop-when (retracted (observe inst)))]
;; #:name name.N
;; #:assertions* assertions.P
;; O ...)))))]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
2018-04-08 10:44:32 +00:00
;; Queries
2018-04-08 10:44:32 +00:00
;; (begin-for-syntax
;; (define-splicing-syntax-class on-add
;; (pattern (~optional (~seq #:on-add expr) #:defaults ([expr #f]))))
;; (define-splicing-syntax-class on-remove
;; (pattern (~optional (~seq #:on-remove expr) #:defaults ([expr #f]))))
;; (define (schedule-query-handler-stxs maybe-expr-stx)
;; (if maybe-expr-stx
;; (quasisyntax/loc maybe-expr-stx
;; ((schedule-script! #:priority *query-handler-priority*
;; (lambda () #,maybe-expr-stx))))
;; #'())))
;; (define-syntax (query-value stx)
;; (syntax-parse stx
;; [(_ field-name absent-expr args ...)
;; (quasisyntax/loc stx
;; (let ()
;; (field [field-name absent-expr])
;; (query-value* field-name absent-expr args ...)))]))
;; (define-syntax (query-value* stx)
;; (syntax-parse stx
;; [(_ field-name absent-expr P expr on-add:on-add on-remove:on-remove)
;; (quasisyntax/loc stx
;; (let ((F field-name))
;; (on (asserted P) #:priority *query-priority*
;; #,@(schedule-query-handler-stxs (attribute on-add.expr))
;; (F expr))
;; (on (retracted P) #:priority *query-priority-high*
;; #,@(schedule-query-handler-stxs (attribute on-remove.expr))
;; (F absent-expr))
;; F))]))
;; (define-syntax (query-set stx)
;; (syntax-parse stx
;; [(_ field-name args ...)
;; (quasisyntax/loc stx
;; (let ()
;; (field [field-name (set)])
;; (query-set* field-name args ...)))]))
;; (define-syntax (query-set* stx)
;; (syntax-parse stx
;; [(_ field-name P expr on-add:on-add on-remove:on-remove)
;; (quasisyntax/loc stx
;; (let ((F field-name))
;; (on (asserted P) #:priority *query-priority*
;; (let ((V expr))
;; (when (not (set-member? (F) V))
;; #,@(schedule-query-handler-stxs (attribute on-add.expr))
;; (F (set-add (F) V)))))
;; (on (retracted P) #:priority *query-priority-high*
;; (let ((V expr))
;; (when (set-member? (F) V)
;; #,@(schedule-query-handler-stxs (attribute on-remove.expr))
;; (F (set-remove (F) V)))))
;; F))]))
;; (define-syntax (query-hash stx)
;; (syntax-parse stx
;; [(_ field-name args ...)
;; (quasisyntax/loc stx
;; (let ()
;; (field [field-name (hash)])
;; (query-hash* field-name args ...)))]))
;; (define-syntax (query-hash* stx)
;; (syntax-parse stx
;; [(_ field-name P key-expr value-expr on-add:on-add on-remove:on-remove)
;; (quasisyntax/loc stx
;; (let ((F field-name))
;; (on (asserted P) #:priority *query-priority*
;; (let ((key key-expr))
;; (when (hash-has-key? (F) key)
;; (log-warning "query-hash: field ~v with pattern ~v: overwriting existing entry ~v"
;; 'field-name
;; 'P
;; key))
;; #,@(schedule-query-handler-stxs (attribute on-add.expr))
;; (F (hash-set (F) key value-expr))))
;; (on (retracted P) #:priority *query-priority-high*
;; #,@(schedule-query-handler-stxs (attribute on-remove.expr))
;; (F (hash-remove (F) key-expr)))
;; F))]))
;; (define-syntax (query-hash-set stx)
;; (syntax-parse stx
;; [(_ field-name args ...)
;; (quasisyntax/loc stx
;; (let ()
;; (field [field-name (hash)])
;; (query-hash-set* field-name args ...)))]))
;; (define-syntax (query-hash-set* stx)
;; (syntax-parse stx
;; [(_ field-name P key-expr value-expr on-add:on-add on-remove:on-remove)
;; (quasisyntax/loc stx
;; (let ((F field-name))
;; (on (asserted P) #:priority *query-priority*
;; (let ((K key-expr) (V value-expr))
;; (when (not (hashset-member? (F) K V))
;; #,@(schedule-query-handler-stxs (attribute on-add.expr))
;; (F (hashset-add (F) K V)))))
;; (on (retracted P) #:priority *query-priority-high*
;; (let ((K key-expr) (V value-expr))
;; (when (hashset-member? (F) K V)
;; #,@(schedule-query-handler-stxs (attribute on-remove.expr))
;; (F (hashset-remove (F) K V)))))
;; F))]))
;; (define-syntax (query-count stx)
;; (syntax-parse stx
;; [(_ field-name args ...)
;; (quasisyntax/loc stx
;; (let ()
;; (field [field-name (hash)])
;; (query-count* field-name args ...)))]))
;; (define-syntax (query-count* stx)
;; (syntax-parse stx
;; [(_ field-name P expr on-add:on-add on-remove:on-remove)
;; (quasisyntax/loc stx
;; (let ((F field-name))
;; (on (asserted P) #:priority *query-priority*
;; (let ((E expr))
;; #,@(schedule-query-handler-stxs (attribute on-add.expr))
;; (F (hash-set (F) E (+ 1 (hash-ref (F) E 0))))))
;; (on (retracted P) #:priority *query-priority-high*
;; (let ((E expr))
;; #,@(schedule-query-handler-stxs (attribute on-remove.expr))
;; (let ((F0 (F)))
;; (F (match (hash-ref F0 E 0)
;; [0 F0] ;; huh
;; [1 (hash-remove F0 E)]
;; [n (hash-set F0 E (- n 1))])))))
;; F))]))
;; (define-syntax-rule (define/query-value id ae P x ...) (define id (query-value id ae P x ...)))
;; (define-syntax-rule (define/query-set id P x ...) (define id (query-set id P x ...)))
;; (define-syntax-rule (define/query-hash id P x ...) (define id (query-hash id P x ...)))
;; (define-syntax-rule (define/query-hash-set id P x ...) (define id (query-hash-set id P x ...)))
;; (define-syntax-rule (define/query-count id P x ...) (define id (query-count id P x ...)))
;; (define-syntax (immediate-query stx)
;; (syntax-case stx ()
;; [(_ [op args ...] ...)
;; (with-syntax [((query-result ...) (generate-temporaries #'(op ...)))]
;; (syntax/loc stx
;; (react/suspend (k)
;; (define query-result (op query-result args ...)) ...
;; (on-start (flush!) (k (query-result) ...)))))]))
2018-04-08 10:44:32 +00:00
(define (send! m)
(dataspace-send! (current-dataspace) m))
(define (flush!)
(ensure-in-script! 'flush!)
(define ack (gensym 'flush!))
2018-04-08 10:44:32 +00:00
(until (message ack)
(on-start (send! ack))))
2018-04-08 10:44:32 +00:00
;;---------------------------------------------------------------------------
2018-04-08 10:44:32 +00:00
(module+ test
(message-struct set-box (new-value))
(assertion-struct box-state (value))
2018-04-08 10:44:32 +00:00
(define ds
(make-dataspace
'ground
(lambda ()
(schedule-script!
(current-dataspace)
#;(lambda ()
(spawn (on (message $v)
(if (= v 10000000)
(stop-current-facet)
(send! (+ v 1))))
(on-start (send! 0)))
)
2018-04-08 10:44:32 +00:00
(lambda ()
(spawn (field [current-value 0])
(assert (box-state (current-value)))
(stop-when-true (= (current-value) 10)
(log-info "box: terminating"))
(on (message (set-box $new-value))
(log-info "box: taking on new-value ~v" new-value)
(current-value new-value)))
(spawn (stop-when (retracted (observe (set-box _)))
(log-info "client: box has gone"))
(on (asserted (box-state $v))
(log-info "client: learned that box's value is now ~v" v)
(send! (set-box (+ v 1)))))
)
))))
2018-04-08 10:44:32 +00:00
(require racket/pretty)
;; (pretty-print ds)
(#;time values
(let loop ((i 0))
;; (printf "--- i = ~v\n" i)
(when (run-scripts! ds)
;; (pretty-print ds)
(loop (+ i 1)))))
;; (pretty-print ds)
)