syndicate-rkt/OLD-syndicate/dataspace.rkt

639 lines
25 KiB
Racket

#lang racket/base
;;; SPDX-License-Identifier: LGPL-3.0-or-later
;;; SPDX-FileCopyrightText: Copyright © 2010-2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
(provide make-dataspace ;; TODO: how to cleanly provide this?
with-current-facet ;; TODO: shouldn't be provided
with-non-script-context ;; TODO: shouldn't be provided
run-scripts! ;; TODO: how to cleanly provide this?
apply-patch! ;; TODO: DEFINITELY SHOULDN'T BE PROVIDED - needed by relay.rkt
dataspace?
dataspace-assertions ;; TODO: shouldn't be provided - needed by various tests
dataspace-routing-table ;; TODO: shouldn't be provided - needed by relay.rkt
generate-id! ;; TODO: shouldn't be provided - inline syntax.rkt??
actor?
actor-id
actor-name
actor-dataspace ;; TODO: should this be provided?
facet?
facet-actor
facet-live?
field-handle ;; TODO: shouldn't be provided - inline syntax.rkt??
field-handle?
field-handle-name
field-handle-id
field-handle-owner
field-handle-value
current-actor-crash-logger
current-actor
current-facet
in-script? ;; TODO: shouldn't be provided - inline syntax.rkt??
capture-facet-context ;; TODO: shouldn't be provided - inline syntax.rkt??
suspend-script* ;; TODO: shouldn't be provided - inline syntax.rkt??
add-facet!
stop-facet!
add-stop-script! ;; TODO: shouldn't be provided - inline syntax.rkt??
add-endpoint!
remove-endpoint!
terminate-facet! ;; TODO: shouldn't be provided - inline syntax.rkt??
schedule-script! ;; TODO: shouldn't be provided - inline syntax.rkt??
push-script! ;; TODO: shouldn't be provided - inline syntax.rkt??
ensure-in-script! ;; TODO: shouldn't be provided - inline syntax.rkt??
spawn! ;; TODO: should this be provided?
enqueue-send! ;; TODO: should this be provided?
enqueue-deferred-turn! ;; TODO: should this be provided?
adhoc-retract! ;; TODO: should this be provided?
adhoc-assert! ;; TODO: should this be provided?
actor-adhoc-assertions ;; TODO: should this be provided?
)
(require syndicate/dataflow)
(require racket/match)
(require racket/set)
(require (only-in racket/exn exn->string))
(require "functional-queue.rkt")
(require "skeleton.rkt")
(require "pattern.rkt")
(require "bag.rkt")
(require "reflection.rkt")
;; An `ActorID` uniquely identifies an actor in a `Dataspace`.
;; A `FID` is a Facet ID, uniquely identifying a facet in a `Dataspace`.
;; A `Dataspace` is a ... TODO
;; An `Action` is one of
;; - `(patch (MutableDeltaof Assertion))`
;; - `(message Assertion)`
;; - `(spawn Any BootProc (Set Assertion))`
;; - `(quit)`
;; - `(deferred-turn (-> Any))`
(struct patch (changes) #:prefab)
(struct message (body) #:prefab)
(struct spawn (name boot-proc initial-assertions) #:prefab)
(struct quit () #:prefab)
(struct deferred-turn (continuation) #:prefab)
(struct dataspace ([next-id #:mutable] ;; Nat
routing-table ;; Skeleton
;; v TODO: Caches have to be bags, not sets; once
;; this change is made, can I avoid keeping a bag
;; of assertions in the dataspace as a whole?
assertions ;; (MutableBagof Assertion)
dataflow ;; DataflowGraph
[runnable #:mutable] ;; (Listof Actor)
[pending-actions #:mutable] ;; (Queueof ActionGroup)
) #:transparent)
(struct actor (id ;; ActorID
dataspace ;; Dataspace
name ;; Any
[root-facet #:mutable] ;; (Option Facet)
[runnable? #:mutable] ;; Boolean
pending-scripts ;; (MutableVectorof (Queueof (-> Any)))
[pending-actions #:mutable] ;; (Queueof Action)
[adhoc-assertions #:mutable] ;; (Bagof Assertion)
[cleanup-changes #:mutable] ;; (Deltaof Assertion)
)
#:methods gen:custom-write
[(define (write-proc a p mode)
(fprintf p "#<actor ~a ~v>" (actor-id a) (actor-name a)))])
(struct action-group (actor ;; (U Actor 'meta)
actions ;; (Listof Action)
)
#:transparent)
(struct facet (id ;; FID
[live? #:mutable] ;; Boolean
actor ;; Actor
parent ;; (Option Facet)
endpoints ;; (MutableHash EID Endpoint)
[stop-scripts #:mutable] ;; (Listof Script) -- IN REVERSE ORDER
[children #:mutable] ;; (Seteqof Facet)
)
#:methods gen:custom-write
[(define (write-proc f p mode)
(local-require (only-in racket/string string-join))
(define (facet-id-chain f)
(if f
(cons (number->string (facet-id f)) (facet-id-chain (facet-parent f)))
'()))
(fprintf p "#<facet ~a ~v ~a>"
(actor-id (facet-actor f))
(actor-name (facet-actor f))
(string-join (facet-id-chain f) ":")))])
(struct endpoint (id ;; EID
[assertion #:mutable] ;; Assertion
[handler #:mutable] ;; (Option SkInterest)
update-fn ;; (-> (Values Assertion (Option SkInterest)))
)
#:methods gen:custom-write
[(define (write-proc e p mode)
(fprintf p "#<endpoint ~a>" (endpoint-id e)))])
;; TODO: the field ownership checks during field-ref/field-set! might
;; be quite expensive. Are they worth it?
(struct field-handle (name ;; Symbol
id ;; Nat
owner ;; Actor
[value #:mutable] ;; Any
)
#:methods gen:custom-write
[(define (write-proc f port mode)
(fprintf port "#<field-handle:~a:~a>" (field-handle-name f) (field-handle-id f)))]
#:property prop:procedure
(case-lambda
[(f)
(define ac (current-actor))
(when (not (eq? (field-handle-owner f) ac)) (field-scope-error 'field-ref f))
(dataflow-record-observation! (dataspace-dataflow (actor-dataspace ac)) f)
(field-handle-value f)]
[(f v)
(define ac (current-actor))
(when (not (eq? (field-handle-owner f) (current-actor))) (field-scope-error 'field-set! f))
(when (not (equal? (field-handle-value f) v))
(dataflow-record-damage! (dataspace-dataflow (actor-dataspace ac)) f)
(set-field-handle-value! f v))]))
(define (field-scope-error who f)
(error who "Field ~a used out-of-scope; owner = ~a, current = ~a"
f
(field-handle-owner f)
(current-actor)))
;; Parameterof (Actor Exn -> Void)
(define current-actor-crash-logger
(make-parameter
(lambda (a e)
(log-error "Actor ~a died with exception:\n~a" a (exn->string e)))))
(define (current-actor) (facet-actor (current-facet)))
;; Parameterof Facet
(define current-facet (make-parameter #f))
;; Parameterof Boolean
(define in-script? (make-parameter #t))
;;---------------------------------------------------------------------------
;; Script priorities. These are used to ensure that the results of
;; some *side effects* are visible to certain pieces of code.
(module priorities racket/base
(require (for-syntax racket/base))
(define-syntax (define-priority-levels stx)
(let loop ((counter 0) (stx (syntax-case stx ()
[(_ level ...) #'(level ...)])))
(syntax-case stx ()
[()
#'(void)]
[(#:count c)
#`(begin (define c #,counter)
(provide c))]
[(this-level more ...)
#`(begin (define this-level #,counter)
(provide this-level)
#,(loop (+ counter 1) #'(more ...)))])))
(define-priority-levels ;; highest-priority to lowest-priority
*query-priority-high*
*query-priority*
*query-handler-priority*
*normal-priority*
*gc-priority*
*idle-priority*
#:count priority-count))
(require (submod "." priorities))
;;---------------------------------------------------------------------------
(define (make-dataspace boot-proc)
(dataspace 0
(make-empty-skeleton)
(make-bag)
(make-dataflow-graph)
'()
(enqueue (make-queue) (action-group 'meta (list (spawn #f boot-proc (set)))))))
(define (generate-id! ds)
(let ((id (dataspace-next-id ds)))
(set-dataspace-next-id! ds (+ id 1))
id))
(define (add-actor! ds name boot-proc initial-assertions)
(define the-actor-id (generate-id! ds))
(define filtered-initial-assertions (set-remove initial-assertions (void)))
(define initial-delta (set->bag filtered-initial-assertions +1))
(define the-actor (actor the-actor-id
ds
name
#f
#f
(make-vector priority-count (make-queue))
(make-queue)
initial-delta
(bag)))
(apply-patch! ds the-actor initial-delta)
;; Root facet is a dummy "system" facet that exists to hold one-or-more "user" "root" facets.
(add-facet! #f
the-actor
#f
(lambda ()
;; The "true root", user-visible facet.
(add-facet! #f
the-actor
(current-facet)
(lambda ()
(boot-proc)))
(for [(a filtered-initial-assertions)]
(adhoc-retract! the-actor a)))))
(define-syntax-rule (with-current-facet [f0] body ...)
(let ((f f0))
;; (when (not f)
;; (error 'with-current-facet "Cannot use with-current-facet this way"))
(parameterize ((current-facet f))
(with-handlers ([(lambda (e) (not (exn:break? e)))
(lambda (e)
(define a (current-actor))
((current-actor-crash-logger) a e)
(abandon-queued-work! a)
;; v Supply #f for `emit-patches?` here
;; because we are in an uncertain limbo after
;; discarding previously-queued actions.
;; Instead of emitting patches to orderly
;; tear down assertions from endpoints, we
;; rely on the recorded `cleanup-changes`.
(terminate-actor! a #f e))]) ;; TODO: tracing
(call-with-syndicate-prompt
(lambda ()
body ...))
(void)))))
(define-syntax-rule (with-non-script-context body ...)
(parameterize ((in-script? #f))
body ...))
(define (capture-facet-context proc)
(let ((f (current-facet)))
;; (when (not f)
;; (error 'capture-facet-context "Cannot capture non-facet"))
(lambda args
(with-current-facet [f]
(apply proc args)))))
(define (pop-next-script! ac)
(define priority-levels (actor-pending-scripts ac))
(let loop ((level 0))
(and (< level (vector-length priority-levels))
(let ((q (vector-ref priority-levels level)))
(if (queue-empty? q)
(loop (+ level 1))
(let-values (((script q) (dequeue q)))
(vector-set! priority-levels level q)
script))))))
(define (run-actor-pending-scripts! ds ac)
(let loop ()
(let ((script (pop-next-script! ac)))
(and script
(begin (script)
(refresh-facet-assertions! ds)
(loop))))))
(define (refresh-facet-assertions! ds)
(with-non-script-context
(dataflow-repair-damage! (dataspace-dataflow ds)
(lambda (subject-id)
(match-define (list f eid) subject-id)
(when (facet-live? f) ;; TODO: necessary test, or tautological?
(define ac (facet-actor f))
(with-current-facet [f]
(define ep (hash-ref (facet-endpoints f) eid))
(match-define (endpoint _ old-assertion old-handler update-fn) ep)
(define-values (new-assertion new-handler) (update-fn))
(when (not (equal? old-assertion new-assertion))
(retract! ac old-assertion)
(when old-handler (dataspace-unsubscribe! ds old-handler))
(set-endpoint-assertion! ep new-assertion)
(set-endpoint-handler! ep new-handler)
(assert! ac new-assertion)
(when new-handler (dataspace-subscribe! ds new-handler)))))))))
(define (commit-actions! ds ac)
(define pending (queue->list (actor-pending-actions ac)))
;; (log-info "commit-actions!: ~a actions ~a" ac pending)
(when (pair? pending)
(set-actor-pending-actions! ac (make-queue))
(set-dataspace-pending-actions! ds (enqueue (dataspace-pending-actions ds)
(action-group ac pending)))))
(define (run-all-pending-scripts! ds)
(define runnable (dataspace-runnable ds))
(set-dataspace-runnable! ds '())
(for [(ac (in-list runnable))]
(run-actor-pending-scripts! ds ac)
(set-actor-runnable?! ac #f)
(commit-actions! ds ac)))
(define (perform-pending-actions! ds)
(define groups (queue->list (dataspace-pending-actions ds)))
(set-dataspace-pending-actions! ds (make-queue))
(for [(group (in-list groups))]
(match-define (action-group ac actions) group)
(for [(action (in-list actions))]
;; (log-info "~a in ~a performing ~a" ac (eq-hash-code ds) action)
(match action
[(patch delta)
(apply-patch! ds ac delta)]
[(message body)
(send-assertion! (dataspace-routing-table ds) body)]
[(spawn name boot-proc initial-assertions)
(add-actor! ds name boot-proc initial-assertions)]
[(quit)
(apply-patch! ds ac (actor-cleanup-changes ac))]
[(deferred-turn k)
(push-script! ac k)])
(run-all-pending-scripts! ds))))
(define (apply-patch! ds ac delta)
(when (not (bag-empty? delta))
(define ds-assertions (dataspace-assertions ds))
;; (log-info "apply-patch! ~a ~v" ac delta)
;; (for [((a c) (in-bag/count ds-assertions))] (log-info " . ~v = ~v" a c))
;; (for [((a c) (in-bag/count delta))] (log-info " → ~v = ~v" a c))
(define rt (dataspace-routing-table ds))
(define pending-removals '())
(define new-cleanup-changes
(for/fold [(cleanup-changes (actor-cleanup-changes ac))] [((a count) (in-bag/count delta))]
(match (bag-change! ds-assertions a count)
['present->absent (set! pending-removals (cons a pending-removals))]
['absent->present (add-assertion! rt a)]
;; 'absent->absent absurd
['present->present (void)]) ;; i.e. no visible change
(define-values (updated-bag _summary) (bag-change cleanup-changes a (- count)))
updated-bag))
(for [(a (in-list pending-removals))]
(remove-assertion! rt a))
(set-actor-cleanup-changes! ac new-cleanup-changes)))
(define (run-scripts! ds)
(run-all-pending-scripts! ds)
(perform-pending-actions! ds)
;; TODO: figure out when a dataspace should quit itself. Given the
;; mutable nature of the implementation, maybe never? It might be
;; being held elsewhere!
(not (and (null? (dataspace-runnable ds))
(queue-empty? (dataspace-pending-actions ds)))))
(define (add-facet! where actor parent boot-proc)
(when (and (not (in-script?)) where)
(error 'add-facet!
"~a: Cannot add facet outside script; are you missing an (on ...)?"
where))
(define f (facet (generate-id! (actor-dataspace actor))
#t
actor
parent
(make-hash)
'()
(seteq)))
(if parent
(set-facet-children! parent (set-add (facet-children parent) f))
(begin
(when (actor-root-facet actor)
;; This should never happen. We deliberately create an
;; otherwise-dummy root facet for each actor specifically to
;; hold user facets, and there should be no way for the user
;; to stop that root facet explicitly, which means user code
;; can't start any replacements for it at all, let alone
;; more than one!
(error 'add-facet! "INTERNAL ERROR: Attempt to add second root facet"))
(set-actor-root-facet! actor f)))
(with-current-facet [f]
(with-non-script-context
(boot-proc)))
(push-script! actor (lambda ()
(when (or (and parent (not (facet-live? parent))) (facet-inert? f))
(terminate-facet! f)))))
(define (facet-inert? f)
(and (hash-empty? (facet-endpoints f))
(set-empty? (facet-children f))))
(define (schedule-script! #:priority [priority *normal-priority*] ac thunk)
(push-script! #:priority priority ac (capture-facet-context thunk)))
(define (push-script! #:priority [priority *normal-priority*] ac thunk-with-context)
(when (not (actor-runnable? ac))
(set-actor-runnable?! ac #t)
(let ((ds (actor-dataspace ac)))
(set-dataspace-runnable! ds (cons ac (dataspace-runnable ds)))))
(define v (actor-pending-scripts ac))
(vector-set! v priority (enqueue (vector-ref v priority) thunk-with-context)))
(define (retract-facet-assertions-and-subscriptions! f emit-patches?)
(define ac (facet-actor f))
(define ds (actor-dataspace ac))
(push-script! ac (lambda ()
(for [((eid ep) (in-hash (facet-endpoints f)))]
(destroy-endpoint! ds ac f ep emit-patches?))
(hash-clear! (facet-endpoints f)))))
(define (abandon-queued-work! ac)
(set-actor-pending-actions! ac (make-queue))
(let ((scripts (actor-pending-scripts ac)))
(for [(i (in-range (vector-length scripts)))]
(vector-set! scripts i (make-queue)))))
;; Abruptly terminates an entire actor, without running stop-scripts etc.
(define (terminate-actor! the-actor emit-patches? maybe-exn)
(when emit-patches?
(push-script! the-actor (lambda () (for [(a (in-bag (actor-adhoc-assertions the-actor)))]
(retract! the-actor a)))))
(let ((f (actor-root-facet the-actor)))
(when f
(let abort-facet! ((f f))
(set-facet-live?! f #f)
(for [(child (in-set (facet-children f)))] (abort-facet! child))
(retract-facet-assertions-and-subscriptions! f emit-patches?))))
(push-script! the-actor (lambda ()
(let ((name (actor-name the-actor)))
(when name
(enqueue-send! the-actor (terminated name maybe-exn))))
(enqueue-action! the-actor (quit)))))
;; Cleanly terminates a facet and its children, running stop-scripts etc.
(define (terminate-facet! f)
(when (facet-live? f)
(define ac (facet-actor f))
(define parent (facet-parent f))
(if parent
(set-facet-children! parent (set-remove (facet-children parent) f))
(set-actor-root-facet! ac #f))
(set-facet-live?! f #f)
(for [(child (in-set (facet-children f)))] (terminate-facet! child))
;; Run stop-scripts after terminating children. This means that
;; children's stop-scripts run before ours.
(push-script! ac (lambda ()
(with-current-facet [f]
(for [(script (in-list (reverse (facet-stop-scripts f))))]
(script)))))
(retract-facet-assertions-and-subscriptions! f #t)
(push-script! #:priority *gc-priority* ac
(lambda ()
(if parent
(when (facet-inert? parent) (terminate-facet! parent))
(terminate-actor! ac #t #f))))))
(define (stop-facet! f stop-script)
(define ac (facet-actor f))
(with-current-facet [(facet-parent f)] ;; run in parent context wrt terminating facet
(schedule-script! ac (lambda ()
(terminate-facet! f)
(schedule-script! ac stop-script)))))
(define (add-stop-script! f script-proc)
(set-facet-stop-scripts! f (cons script-proc (facet-stop-scripts f))))
(define (add-endpoint! f where dynamic? update-fn)
(when (in-script?)
(error 'add-endpoint!
"~a: Cannot add endpoint in script; are you missing a (react ...)?"
where))
(define ds (actor-dataspace (facet-actor f)))
(define eid (generate-id! ds))
(define-values (assertion handler)
(parameterize ((current-dataflow-subject-id (if dynamic? (list f eid) #f)))
(call-with-syndicate-prompt update-fn)))
(define ep (endpoint eid assertion handler update-fn))
(assert! (facet-actor f) assertion)
(when handler (dataspace-subscribe! ds handler))
(hash-set! (facet-endpoints f) eid ep)
eid)
(define (remove-endpoint! f eid)
(define eps (facet-endpoints f))
(define ep (hash-ref eps eid #f))
(when ep
(define ac (facet-actor f))
(define ds (actor-dataspace ac))
(destroy-endpoint! ds ac f ep #t)
(hash-remove! eps eid)))
(define (destroy-endpoint! ds ac f ep emit-patches?)
(match-define (endpoint eid assertion handler _update-fn) ep)
(dataflow-forget-subject! (dataspace-dataflow ds) (list f eid))
(when emit-patches? (retract! ac assertion))
(when handler (dataspace-unsubscribe! ds handler)))
(define (enqueue-action! ac action)
(set-actor-pending-actions! ac (enqueue (actor-pending-actions ac) action)))
(define (ensure-patch-action! ac)
(let ((q (actor-pending-actions ac)))
(when (or (queue-empty? q) (not (patch? (queue-last q))))
(enqueue-action! ac (patch (make-bag)))))
(patch-changes (queue-last (actor-pending-actions ac))))
(define (retract! ac assertion)
(when (not (void? assertion))
(bag-change! (ensure-patch-action! ac) assertion -1)))
(define (assert! ac assertion)
(when (not (void? assertion))
(bag-change! (ensure-patch-action! ac) assertion +1)))
(define (adhoc-retract! ac assertion [count 1])
(when (not (void? assertion))
(define-values (new-assertions summary)
(bag-change (actor-adhoc-assertions ac) assertion (- count) #:clamp? #t))
(set-actor-adhoc-assertions! ac new-assertions)
(match summary
;; 'absent->present absurd (if the call to `adhoc-retract!`
;; matches a previous `adhoc-assert!`)
['present->absent (retract! ac assertion)]
['present->present (void)]
['absent->absent (void)]))) ;; can happen if we're exploiting the clamping
(define (adhoc-assert! ac assertion [count 1])
(when (not (void? assertion))
(define-values (new-assertions summary)
(bag-change (actor-adhoc-assertions ac) assertion count))
(set-actor-adhoc-assertions! ac new-assertions)
(match summary
;; 'absent->absent and 'present->absent absurd (assuming there
;; haven't been too many calls to `adhoc-retract!` in the past)
['absent->present (assert! ac assertion)]
['present->present (void)])))
(define (dataspace-unsubscribe! ds h)
(remove-interest! (dataspace-routing-table ds) h))
(define (dataspace-subscribe! ds h)
(add-interest! (dataspace-routing-table ds) h))
(define (ensure-in-script! who)
(when (not (in-script?))
(error who "Attempt to perform action outside script; are you missing an (on ...)?")))
(define (enqueue-send! ac body)
(enqueue-action! ac (message body)))
(define (enqueue-deferred-turn! ac k)
(enqueue-action! ac (deferred-turn (capture-facet-context k))))
(define (spawn! ac name boot-proc initial-assertions)
(enqueue-action! ac (spawn name boot-proc initial-assertions)))
;;---------------------------------------------------------------------------
;; Script suspend-and-resume.
(define prompt-tag (make-continuation-prompt-tag 'syndicate))
(define (call-with-syndicate-prompt thunk)
(call-with-continuation-prompt thunk prompt-tag))
(define (suspend-script* where proc)
(when (not (in-script?))
(error 'suspend-script
"~a: Cannot suspend script outside script; are you missing an (on ...)?"
where))
(call-with-composable-continuation
(lambda (k)
(abort-current-continuation
prompt-tag
(lambda ()
(define in? (in-script?))
(define raw-resume-parent
(capture-facet-context
(lambda results
(parameterize ((in-script? in?))
(apply k results)))))
(define resume-parent
(lambda results
(push-script! (current-actor)
(lambda () (apply raw-resume-parent results)))))
(proc resume-parent))))
prompt-tag))