Create an actor-internal event system oriented around assertions and

messges.

internal form        ~ external form
(know v)             ~ (assert v)
(on (know p) ...)    ~ (on (asserted p) ...)
(on (forget p) ...)  ~ (on (retracted p) ...)
(realize! v)         ~ (send! v)
(on (realize v) ...) ~ (on (message v) ...)
This commit is contained in:
Sam Caldwell 2019-06-12 16:53:11 -04:00
parent a1ca2372a5
commit d8516060c4
4 changed files with 311 additions and 75 deletions

View File

@ -31,6 +31,9 @@
retracted
rising-edge
(rename-out [core:message message])
know
forget
realize
let-event
@ -58,6 +61,7 @@
perform-actions!
flush!
quit-dataspace!
realize!
syndicate-effects-available?
@ -80,6 +84,7 @@
(require racket/set)
(require racket/match)
(require racket/contract)
(require (only-in racket/list flatten))
(require (for-syntax racket/base))
(require (for-syntax syntax/parse))
@ -198,10 +203,15 @@
endpoints ;; (Hash EID Endpoint)
stop-scripts ;; (Listof Script) -- IN REVERSE ORDER
children ;; (Setof FID)
previous-knowledge ;; AssertionSet of internal knowledge
knowledge ;; AssertionSet of internal knowledge
) #:prefab)
(struct endpoint (id patch-fn handler-fn) #:prefab)
(struct internal-knowledge (v) #:prefab)
(define internal-knowledge-parenthesis (open-parenthesis 1 struct:internal-knowledge))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Script priorities. These are used to ensure that the results of
;; some *side effects* are visible to certain pieces of code.
@ -249,6 +259,12 @@
;; Storeof (Constreeof Action)
(define current-pending-actions (make-store))
;; Storeof Patch
(define current-pending-internal-patch (make-store))
;; Storeof (Constreeof Action)
(define current-pending-internal-events (make-store))
;; Storeof (Vector (Queue Script) ...)
;; Mutates the vector!
(define current-pending-scripts (make-store))
@ -407,6 +423,7 @@
(analyze-pattern stx #'P))
(quasisyntax/loc stx
(add-endpoint! #,(source-location->string stx)
#f
(lambda ()
#,(let ((patch-stx #`(core:assert #,pat)))
(if #'w.Pred
@ -414,6 +431,22 @@
patch-stx)))
void))]))
(define-syntax (know stx)
(syntax-parse stx
[(_ w:when-pred P)
(define-values (proj pat bindings _instantiated)
(analyze-pattern stx #'P))
(quasisyntax/loc stx
(add-endpoint!
#,(source-location->string stx)
#t
(lambda ()
#,(let ((patch-stx #`(core:assert (internal-knowledge #,pat))))
(if #'w.Pred
#`(if w.Pred #,patch-stx patch-empty)
patch-stx)))
void))]))
(define (fid-ancestor? fid maybe-ancestor)
(and (pair? fid) ;; empty fid lists obviously no ancestors at all!
(or (equal? fid maybe-ancestor)
@ -474,6 +507,7 @@
(define (on-event* where proc #:priority [priority *normal-priority*])
(add-endpoint! where
#f
(lambda () patch-empty)
(lambda (e _current-interests _synthetic?)
(schedule-script! #:priority priority (lambda () (proc e))))))
@ -547,6 +581,7 @@
(quasisyntax/loc stx
(let ()
(add-endpoint! #,(source-location->string stx)
#f
(lambda ()
(define subject-id (current-dataflow-subject-id))
(schedule-script!
@ -570,6 +605,8 @@
(define-syntax (asserted stx) (raise-syntax-error #f "asserted: Used outside event spec" stx))
(define-syntax (retracted stx) (raise-syntax-error #f "retracted: Used outside event spec" stx))
(define-syntax (rising-edge stx) (raise-syntax-error #f "rising-edge: Used outside event spec" stx))
(define-syntax (forget stx) (raise-syntax-error #f "forget: Used outside event spec" stx))
(define-syntax (realize stx) (raise-syntax-error #f "realize: Used outside event spec" stx))
(define-syntax (suspend-script stx)
(syntax-parse stx
@ -772,62 +809,119 @@
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Syntax-time support
(define (interests-pre-and-post-patch pat synthetic?)
(define (interests-pre-and-post-patch pat synthetic? retrieve-knowledge)
(define (or* x y) (or x y))
(define a (current-actor-state))
(define previous-knowledge (if synthetic? trie-empty (actor-state-previous-knowledge a)))
(define old (trie-lookup previous-knowledge pat #f #:wildcard-union or*))
(define new (trie-lookup (actor-state-knowledge a) pat #f #:wildcard-union or*))
(define-values (prev current) (retrieve-knowledge synthetic?))
(define old (trie-lookup prev pat #f #:wildcard-union or*))
(define new (trie-lookup current pat #f #:wildcard-union or*))
(values old new))
(define (interest-just-appeared-matching? pat synthetic?)
(define-values (old new) (interests-pre-and-post-patch pat synthetic?))
(define (interest-just-appeared-matching? pat synthetic? retrieve-knowledge)
(define-values (old new) (interests-pre-and-post-patch pat synthetic? retrieve-knowledge))
(and (not old) new))
(define (interest-just-disappeared-matching? pat synthetic?)
(define-values (old new) (interests-pre-and-post-patch pat synthetic?))
(define (interest-just-disappeared-matching? pat synthetic? retrieve-knowledge)
(define-values (old new) (interests-pre-and-post-patch pat synthetic? retrieve-knowledge))
(and old (not new)))
(define-for-syntax (analyze-asserted/retracted outer-expr-stx
when-pred-stx
event-stx
script-stx
asserted?
P-stx
priority-stx)
;; Bool -> (Values AssertionSet AssertionSet)
;; retrieve the previous and current knowledge fields from the current actor state
(define (current-actor-state-knowledges synthetic?)
(define a (current-actor-state))
(define previous-knowledge (if synthetic? trie-empty (actor-state-previous-knowledge a)))
(define current-knowledge (actor-state-knowledge a))
(values previous-knowledge current-knowledge))
;; Bool -> (Values AssertionSet AssertionSet)
;; retrieve the previous and current knowledge fields from the current facet
(define (current-facet-knowledges synthetic?)
(define f (lookup-facet (current-facet-id)))
(define previous-knowledge (if synthetic? trie-empty (facet-previous-knowledge f)))
(define current-knowledge (facet-knowledge f))
(values previous-knowledge current-knowledge))
(define-for-syntax (analyze-appear/disappear outer-expr-stx
when-pred-stx
event-stx
script-stx
asserted?
P-stx
priority-stx
internal?)
(define P+
(if internal? #`(internal-knowledge #,P-stx) P-stx))
(define-values (proj-stx pat bindings _instantiated)
(analyze-pattern event-stx P-stx))
(analyze-pattern event-stx P+))
(define event-predicate-stx (if asserted? #'patch/added? #'patch/removed?))
(define patch-accessor-stx (if asserted? #'patch-added #'patch-removed))
(define change-detector-stx
(if asserted? #'interest-just-appeared-matching? #'interest-just-disappeared-matching?))
(define knowledge-retriever
(if internal? #'current-facet-knowledges #'current-actor-state-knowledges))
(quasisyntax/loc outer-expr-stx
(add-endpoint! #,(source-location->string outer-expr-stx)
(lambda () (if #,when-pred-stx
(core:sub #,pat)
patch-empty))
(lambda (e current-interests synthetic?)
(when (not (trie-empty? current-interests))
(core:match-event e
[(? #,event-predicate-stx p)
(define proj #,proj-stx)
(define proj-arity (projection-arity proj))
(define entry-set (trie-project/set #:take proj-arity
(#,patch-accessor-stx p)
proj))
(when (not entry-set)
(error 'asserted
"Wildcard interest discovered while projecting by ~v at ~a"
proj
#,(source-location->string P-stx)))
(for [(entry (in-set entry-set))]
(let ((instantiated (instantiate-projection proj entry)))
(and (#,change-detector-stx instantiated synthetic?)
(schedule-script!
#:priority #,priority-stx
(lambda ()
(match-define (list #,@bindings) entry)
#,script-stx)))))]))))))
(add-endpoint!
#,(source-location->string outer-expr-stx)
#,internal?
(lambda () (if #,when-pred-stx
(core:sub #,pat)
patch-empty))
(lambda (e current-interests synthetic?)
(when (not (trie-empty? current-interests))
(core:match-event e
[(? #,event-predicate-stx p)
(define proj #,proj-stx)
(define proj-arity (projection-arity proj))
(define entry-set (trie-project/set #:take proj-arity
(#,patch-accessor-stx p)
proj))
(when (not entry-set)
(error 'asserted
"Wildcard interest discovered while projecting by ~v at ~a"
proj
#,(source-location->string P-stx)))
(for [(entry (in-set entry-set))]
(let ((instantiated (instantiate-projection proj entry)))
(and (#,change-detector-stx instantiated synthetic? #,knowledge-retriever)
(schedule-script!
#:priority #,priority-stx
(lambda ()
(match-define (list #,@bindings) entry)
#,script-stx)))))]))))))
(define-for-syntax (analyze-message outer-expr-stx
when-pred-stx
event-stx
script-stx
P-stx
priority-stx
internal?)
(define-values (proj pat bindings _instantiated)
(analyze-pattern event-stx P-stx))
(define sub
(if internal? #`(internal-knowledge #,pat) pat))
(define matchp
(if internal? #'(internal-knowledge body) #'body))
(quasisyntax/loc outer-expr-stx
(add-endpoint!
#,(source-location->string outer-expr-stx)
#,internal?
(lambda () (if #,when-pred-stx
(core:sub #,sub)
patch-empty))
(lambda (e current-interests _synthetic?)
(when (not (trie-empty? current-interests))
(core:match-event e
[(core:message #,matchp)
(define capture-vals
(match-value/captures
body
#,proj))
(and capture-vals
(schedule-script!
#:priority #,priority-stx
(lambda ()
(apply (lambda #,bindings #,script-stx)
capture-vals))))]))))))
(define-for-syntax orig-insp
(variable-reference->module-declaration-inspector (#%variable-reference)))
@ -839,7 +933,7 @@
priority-stx)
(define event-stx (syntax-disarm armed-event-stx orig-insp))
(syntax-parse event-stx
#:literals [core:message asserted retracted rising-edge]
#:literals [core:message asserted retracted rising-edge know forget realize]
[(expander args ...)
#:when (event-expander-id? #'expander)
(event-expander-transform
@ -851,33 +945,23 @@
script-stx
priority-stx)))]
[(core:message P)
(define-values (proj pat bindings _instantiated)
(analyze-pattern event-stx #'P))
(quasisyntax/loc outer-expr-stx
(add-endpoint! #,(source-location->string outer-expr-stx)
(lambda () (if #,when-pred-stx
(core:sub #,pat)
patch-empty))
(lambda (e current-interests _synthetic?)
(when (not (trie-empty? current-interests))
(core:match-event e
[(core:message body)
(define capture-vals
(match-value/captures
body
#,proj))
(and capture-vals
(schedule-script!
#:priority #,priority-stx
(lambda ()
(apply (lambda #,bindings #,script-stx)
capture-vals))))])))))]
(analyze-message outer-expr-stx when-pred-stx event-stx script-stx
#'P priority-stx #f)]
[(asserted P)
(analyze-asserted/retracted outer-expr-stx when-pred-stx event-stx script-stx
#t #'P priority-stx)]
(analyze-appear/disappear outer-expr-stx when-pred-stx event-stx script-stx
#t #'P priority-stx #f)]
[(retracted P)
(analyze-asserted/retracted outer-expr-stx when-pred-stx event-stx script-stx
#f #'P priority-stx)]
(analyze-appear/disappear outer-expr-stx when-pred-stx event-stx script-stx
#f #'P priority-stx #f)]
[(realize P)
(analyze-message outer-expr-stx when-pred-stx event-stx script-stx
#'P priority-stx #t)]
[(know P)
(analyze-appear/disappear outer-expr-stx when-pred-stx event-stx script-stx
#t #'P priority-stx #t)]
[(forget P)
(analyze-appear/disappear outer-expr-stx when-pred-stx event-stx script-stx
#f #'P priority-stx #t)]
[(rising-edge Pred)
(define field-name
(datum->syntax event-stx
@ -887,6 +971,7 @@
(let ()
(field [#,field-name #f])
(add-endpoint! #,(source-location->string outer-expr-stx)
#f
(lambda ()
(when #,when-pred-stx
(define old-val (#,field-name))
@ -1003,10 +1088,25 @@
(current-pending-actions (list (current-pending-actions)
((current-action-transformer) p)))))
(define (schedule-internal-event! ac)
(if (patch? ac)
(when (patch-non-empty? ac)
(current-pending-internal-patch (compose-patch ac (current-pending-internal-patch))))
(begin (flush-pending-internal-patch!)
(current-pending-internal-events (list (current-pending-internal-events)
((current-action-transformer) ac))))))
(define (flush-pending-internal-patch!)
(define p (current-pending-internal-patch))
(when (patch-non-empty? p)
(current-pending-internal-patch patch-empty)
(current-pending-internal-events (list (current-pending-internal-events)
((current-action-transformer) p)))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Endpoint Creation
(define (add-endpoint! where patch-fn handler-fn)
(define (add-endpoint! where internal? patch-fn handler-fn)
(when (in-script?)
(error 'add-endpoint!
"~a: Cannot add endpoint in script; are you missing a (react ...)?"
@ -1030,7 +1130,9 @@
(hash-set (facet-endpoints f)
new-eid
(endpoint new-eid patch-fn handler-fn))]))))
(schedule-action! delta-aggregate))
(if internal?
(schedule-internal-event! delta-aggregate)
(schedule-action! delta-aggregate)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Facet Lifecycle
@ -1045,7 +1147,7 @@
(define fid-uid next-fid-uid)
(define fid (cons fid-uid parent-fid))
(set! next-fid-uid (+ next-fid-uid 1))
(update-facet! fid (lambda (_f) (facet fid (hasheqv) '() (set))))
(update-facet! fid (lambda (_f) (facet fid (hasheqv) '() (set) trie-empty trie-empty)))
(update-facet! parent-fid
(lambda (pf)
(and pf (struct-copy facet pf
@ -1094,8 +1196,24 @@
(dataflow-forget-subject! (actor-state-field-dataflow a) (list fid eid))
(define-values (new-mux _eid _delta delta-aggregate)
(mux-remove-stream (actor-state-mux a) eid))
(current-actor-state (struct-copy actor-state a [mux new-mux]))
(schedule-action! delta-aggregate))))
(define internal (patch-step delta-aggregate internal-knowledge-parenthesis))
(define external (patch (trie-subtract (patch-added delta-aggregate) (patch-added internal))
(trie-subtract (patch-removed delta-aggregate) (patch-removed internal))))
(current-actor-state (struct-copy actor-state a
[mux new-mux]))
(define internal-aggregate (patch-prepend internal-knowledge-parenthesis internal))
(schedule-script!
#:priority *gc-priority*
;; need to do this later for the forget change detector
(lambda ()
(define a (current-actor-state))
(define new-knowledge
(update-interests (actor-state-knowledge a) internal))
(current-actor-state (struct-copy actor-state a
[knowledge new-knowledge]))))
(schedule-internal-event! internal-aggregate)
(schedule-action! external))))
(schedule-script!
#:priority *gc-priority*
@ -1124,6 +1242,8 @@
(make-dataflow-graph)))
(current-pending-patch patch-empty)
(current-pending-actions '())
(current-pending-internal-patch patch-empty)
(current-pending-internal-events '())
(current-pending-scripts (make-empty-pending-scripts))
(current-action-transformer values)]
(with-current-facet '() #f
@ -1151,6 +1271,7 @@
(when script
(script)
(refresh-facet-assertions!)
(dispatch-internal-events!)
(run-all-pending-scripts!)))
(define (run-scripts!)
@ -1162,6 +1283,16 @@
(core:quit pending-actions)
(core:transition (current-actor-state) pending-actions)))
;; dispatch the internal events that have accumulated during script execution
(define (dispatch-internal-events!)
(flush-pending-internal-patch!)
(define pending (flatten (current-pending-internal-events)))
(current-pending-internal-events '())
(define a (current-actor-state))
(for* ([e (in-list pending)]
[(fid f) (in-hash (actor-state-facets a))])
(facet-handle-event! fid f e #f)))
(define (refresh-facet-assertions!)
(dataflow-repair-damage! (actor-state-field-dataflow (current-actor-state))
(lambda (subject-id)
@ -1201,6 +1332,8 @@
a))
(current-pending-patch patch-empty)
(current-pending-actions '())
(current-pending-internal-patch patch-empty)
(current-pending-internal-events '())
(current-pending-scripts (make-empty-pending-scripts))
(current-action-transformer values)]
(for [((fid f) (in-hash (actor-state-facets a)))]
@ -1210,6 +1343,16 @@
(define (facet-handle-event! fid f e synthetic?)
(define mux (actor-state-mux (current-actor-state)))
(with-current-facet fid #f
(when (patch? e)
;; quick-and-dirty intersection with (internal-knowledge ?)
(define internal (patch-prepend internal-knowledge-parenthesis
(patch-step e internal-knowledge-parenthesis)))
(update-facet! fid
(lambda (f)
(and f
(struct-copy facet f
[previous-knowledge (facet-knowledge f)]
[knowledge (apply-patch (facet-knowledge f) internal)])))))
(for [(ep (in-hash-values (facet-endpoints f)))]
((endpoint-handler-fn ep) e (mux-interests-of mux (endpoint-id ep)) synthetic?))))
@ -1306,6 +1449,10 @@
(ensure-in-script! 'send!)
(schedule-action! (core:message M)))
(define (realize! M)
(ensure-in-script! 'realize!)
(schedule-internal-event! (core:message (internal-knowledge M))))
(define *adhoc-label* -1)
(define (assert! P)
@ -1352,7 +1499,7 @@
(fprintf p " - Knowledge:\n ~a" (trie->pretty-string knowledge #:indent 3))
(fprintf p " - Facets:\n")
(for ([(fid f) (in-hash facets)])
(match-define (facet _fid endpoints _ children) f)
(match-define (facet _fid endpoints _ children _ _) f)
(fprintf p " ---- facet ~a, children=~a" fid (set->list children))
(when (not (hash-empty? endpoints))
(fprintf p ", endpoints=~a" (hash-keys endpoints)))

View File

@ -0,0 +1,62 @@
#lang syndicate
;; Expected Output:
#|
balance = 0
balance = 5
balance = 0
JEEPERS
know overdraft!
balance = -1
balance = -2
no longer in overdraft
balance = 8
|#
(assertion-struct balance (v))
(message-struct deposit (v))
(spawn
;; Internal Events
(message-struct new-transaction (old new))
(assertion-struct overdraft ())
(field [account 0])
(assert (balance (account)))
(on (message (deposit $v))
(define prev (account))
(account (+ v (account)))
(realize! (new-transaction prev (account))))
(on (realize (new-transaction $old $new))
(when (and (negative? new)
(not (negative? old)))
(react
;; (this print is to make sure only one of these facets is created)
(printf "JEEPERS\n")
(know (overdraft))
(on (realize (new-transaction $old $new))
(when (not (negative? new))
(stop-current-facet))))))
(on (know (overdraft))
(printf "know overdraft!\n"))
(on (forget (overdraft))
(printf "no longer in overdraft\n")))
(spawn
(on (asserted (balance $v))
(printf "balance = ~a\n" v)))
(spawn*
(send! (deposit 5))
(flush!)
(send! (deposit -5))
(flush!)
(send! (deposit -1))
(flush!)
(send! (deposit -1))
(flush!)
(send! (deposit 10)))

View File

@ -0,0 +1,19 @@
#lang syndicate
;; Expected Output:
#|
received message bad
realized good
|#
(message-struct ping (v))
(spawn
(on (realize (ping $v))
(printf "realized ~a\n" v))
(on (message (ping $v))
(printf "received message ~a\n" v)
(realize! (ping 'good))))
(spawn*
(send! (ping 'bad)))

View File

@ -14,6 +14,7 @@
limit-patch
patch-step
patch-step*
patch-prepend
compute-aggregate-patch
apply-patch
update-interests
@ -125,6 +126,13 @@
(define (patch-step* p keys)
(foldl (lambda (key p) (patch-step p key)) p keys))
;; (U Sigma OpenParenthesis) Trie -> Trie
;; Prepend both added and removed sets
(define (patch-prepend key p)
(match-define (patch added removed) p)
(patch (trie-prepend key added)
(trie-prepend key removed)))
;; Entries labelled with `label` may already exist in `base`; the
;; patch `p` MUST already have been limited to add only where no
;; `label`-labelled portions of `base` exist, and to remove only where