syndicate-rkt/syndicate/dataspace.rkt

497 lines
19 KiB
Racket

#lang racket/base
(provide )
(require syndicate/functional-queue)
(require syndicate/dataflow)
(require racket/match)
(require racket/set)
(module+ test (require rackunit))
(require "skeleton.rkt")
(require "pattern.rkt")
(require "bag.rkt")
;; A `FID` is a Facet ID, uniquely identifying both a facet and its
;; ancestry in a `Dataspace`.
;;
;; FID = (Listof Nat)
;;
;; The first `Nat` in the list is the unique identifier for this
;; facet; the `cdr` of the list is the FID of its parent facet. No two
;; `FID`s in a `Dataspace` have the same first `Nat`.
;; A `Dataspace` is a ... TODO
;; An `Action` is either `(patch (Deltaof Assertion))` or `(message
;; Assertion)` or `(spawn Any BootProc (Set Assertion))`.
(struct patch (changes) #:prefab)
(struct message (body) #:prefab)
(struct spawn (name boot-proc initial-assertions) #:prefab)
(struct dataspace ([next-id #:mutable] ;; Nat
routing-table ;; Skeleton
facets ;; (MutableHash FID Facet)
actors ;; (MutableHash FID Any) ;; maps FID to actor name
assertions ;; (Bagof Assertion)
dataflow ;; DataflowGraph
pending-scripts ;; (MutableVectorof (Queueof (-> Any)))
[pending-actions #:mutable] ;; (Queueof Action)
)
#:transparent)
(struct facet (id ;; FID
endpoints ;; (MutableHash EID Endpoint)
[stop-scripts #:mutable] ;; (Listof Script) -- IN REVERSE ORDER
[children #:mutable] ;; (Setof FID)
)
#:transparent)
(struct endpoint (id ;; EID
[assertion #:mutable] ;; Assertion
assertion-fn ;; (-> Assertion)
handler ;; (Option SkInterest)
)
#:transparent)
;; TODO: field ownership: record actor (root facet) ID in field, check
;; it on access.
(struct field-handle (name ;; Symbol
id ;; Nat
owner ;; FID
[value #:mutable] ;; Any
)
#:methods gen:custom-write
[(define (write-proc f port mode)
(fprintf port "#<field-handle:~a:~a>" (field-handle-name f) (field-handle-id f)))]
#:property prop:procedure
(case-lambda
[(f)
(dataflow-record-observation! (dataspace-dataflow (current-dataspace)) f)
(field-handle-value f)]
[(f v)
(dataflow-record-damage! (dataspace-dataflow (current-dataspace)) f)
(set-field-handle-value! f v)]))
;; Parameterof Dataspace
(define current-dataspace (make-parameter #f))
;; Parameterof FID
(define current-facet-id (make-parameter #f))
;; Parameterof Boolean
(define in-script? (make-parameter #t))
;;---------------------------------------------------------------------------
;; Script priorities. These are used to ensure that the results of
;; some *side effects* are visible to certain pieces of code.
(module priorities racket/base
(require (for-syntax racket/base))
(define-syntax (define-priority-levels stx)
(let loop ((counter 0) (stx (syntax-case stx ()
[(_ level ...) #'(level ...)])))
(syntax-case stx ()
[()
#'(void)]
[(#:count c)
#`(begin (define c #,counter)
(provide c))]
[(this-level more ...)
#`(begin (define this-level #,counter)
(provide this-level)
#,(loop (+ counter 1) #'(more ...)))])))
(define-priority-levels ;; highest-priority to lowest-priority
*query-priority-high*
*query-priority*
*query-handler-priority*
*normal-priority*
*gc-priority*
*idle-priority*
#:count priority-count))
(require (submod "." priorities))
;;---------------------------------------------------------------------------
(define (make-dataspace name boot-proc)
(define ds (dataspace 0
(make-empty-skeleton)
(make-hash)
(make-hash)
(make-bag)
(make-dataflow-graph)
(make-vector priority-count (make-queue))
(make-queue)))
(dataspace-spawn! ds name boot-proc (set))
ds)
(define (generate-id! ds)
(let ((id (dataspace-next-id ds)))
(set-dataspace-next-id! ds (+ id 1))
id))
(define (fid-parent fid)
(cdr fid))
(define (generate-fid! ds parent-fid)
(cons (generate-id! ds) parent-fid))
(define (actor-fid? fid)
(null? (fid-parent fid)))
(define (fid->actor-fid fid)
(if (actor-fid? fid)
fid
(fid->actor-fid (fid-parent fid))))
(define (fid-ancestor? fid maybe-ancestor)
(and (pair? fid) ;; empty fid lists obviously no ancestors at all!
(or (equal? fid maybe-ancestor)
(fid-ancestor? (cdr fid) maybe-ancestor))))
(define (add-actor! ds name boot-proc initial-assertions)
(define actor-fid (generate-fid! ds '()))
(hash-set! (dataspace-actors ds) actor-fid name)
(for [(a initial-assertions)]
(match (bag-change! (dataspace-assertions ds) a 1)
['absent->present (add-assertion! (dataspace-routing-table ds) a)]
;; 'absent->absent and 'present->absent absurd
['present->present (void)])) ;; i.e. no visible change
(add-facet! ds #f actor-fid (lambda ()
(boot-proc)
(for [(a initial-assertions)] (dataspace-retract! ds a)))))
(define (lookup-facet ds fid)
(hash-ref (dataspace-facets ds) fid #f))
(define-syntax-rule (with-current-facet [ds0 fid0 script?] body ...)
(let ((ds ds0)
(fid fid0))
(parameterize ((current-dataspace ds)
(current-facet-id fid)
(in-script? script?))
(with-handlers ([(lambda (e) (not (exn:break? e)))
(lambda (e) (terminate-actor! ds (fid->actor-fid fid)))]) ;; TODO: tracing
body ...
(void)))))
(define (capture-facet-context proc)
(let ((ds (current-dataspace))
(fid (current-facet-id)))
(lambda args
(with-current-facet [ds fid #t]
(apply proc args)))))
(define (pop-next-script! ds)
(define priority-levels (dataspace-pending-scripts ds))
(let loop ((level 0))
(and (< level (vector-length priority-levels))
(let ((q (vector-ref priority-levels level)))
(if (queue-empty? q)
(loop (+ level 1))
(let-values (((script q) (dequeue q)))
(vector-set! priority-levels level q)
script))))))
(define (run-all-pending-scripts! ds)
(define script (pop-next-script! ds))
(and script
(begin (script)
(dataflow-repair-damage! (dataspace-dataflow ds)
(lambda (subject-id)
(match-define (list fid eid) subject-id)
(define f (lookup-facet ds fid))
(when f
(with-current-facet [ds fid #f]
(define ep (hash-ref (facet-endpoints f) eid))
(define old-assertion (endpoint-assertion ep))
(define new-assertion ((endpoint-assertion-fn ep)))
(when (not (equal? old-assertion new-assertion))
(set-endpoint-assertion! ep new-assertion)
(dataspace-retract! ds old-assertion)
(dataspace-assert! ds new-assertion)
(define h (endpoint-handler ep))
(when h
(dataspace-unsubscribe! ds h)
(dataspace-subscribe! ds h)))))))
(run-all-pending-scripts! ds))))
(define (perform-pending-actions! ds)
(define actions (queue->list (dataspace-pending-actions ds)))
(set-dataspace-pending-actions! ds (make-queue))
(for [(action actions)]
(match action
[(patch delta)
(for [((a count) (in-bag delta))]
(match (bag-change! (dataspace-assertions ds) a count)
['present->absent (remove-assertion! (dataspace-routing-table ds) a)]
['absent->present (add-assertion! (dataspace-routing-table ds) a)]
;; 'absent->absent absurd
['present->present (void)]))] ;; i.e. no visible change
[(message body)
(send-assertion! (dataspace-routing-table ds) body)]
[(spawn name boot-proc initial-assertions)
(add-actor! ds name boot-proc initial-assertions)]))
(not (null? actions)))
(define (run-scripts! ds)
(define ran-a-script (run-all-pending-scripts! ds))
(define performed-an-action (perform-pending-actions! ds))
;; TODO: figure out when a dataspace should quit itself. Given the
;; mutable nature of the implementation, maybe never? It might be
;; being held elsewhere!
(or ran-a-script performed-an-action))
(define (add-facet! ds where fid boot-proc)
(when (and (not (in-script?)) where)
(error 'add-facet!
"~a: Cannot add facet outside script; are you missing an (on ...)?"
where))
(define parent-fid (fid-parent fid))
(define f (facet fid
(make-hash)
'()
(set)))
(hash-set! (dataspace-facets ds) fid f)
(when (pair? parent-fid)
(define pf (lookup-facet ds parent-fid))
(when pf (set-facet-children! pf (set-add (facet-children pf) fid))))
(with-current-facet [ds fid #f]
(boot-proc))
(schedule-script!* ds (lambda ()
(when (and (facet-live? ds fid)
(or (and (pair? parent-fid) (not (facet-live? ds parent-fid)))
(facet-live-but-inert? ds fid)))
(terminate-facet! ds fid)))))
(define (facet-live? ds fid)
(hash-has-key? (dataspace-facets ds) fid))
(define (facet-live-but-inert? ds fid)
(define f (lookup-facet ds fid))
(and f
(hash-empty? (facet-endpoints f))
(set-empty? (facet-children f))))
(define (schedule-script! #:priority [priority *normal-priority*] ds thunk)
(schedule-script!* #:priority priority ds (capture-facet-context thunk)))
(define (schedule-script!* #:priority [priority *normal-priority*] ds thunk)
(define v (dataspace-pending-scripts ds))
(vector-set! v priority (enqueue (vector-ref v priority) thunk)))
;; Precondition: `f` is the `facet` struct that is/was associated with `fid` in `ds`
(define (retract-facet-assertions-and-subscriptions! ds fid f)
(schedule-script!* ds (lambda ()
(for [((eid ep) (in-hash (facet-endpoints f)))]
(dataflow-forget-subject! (dataspace-dataflow ds) (list fid eid))
(dataspace-retract! ds (endpoint-assertion ep))
(define h (endpoint-handler ep))
(when h (dataspace-unsubscribe! ds h))))))
;; Abruptly terminates an entire actor, without running stop-scripts etc.
(define (terminate-actor! ds actor-fid)
(when (not (actor-fid? actor-fid))
(error 'terminate-actor! "Attempt to terminate non-actor FID ~a" actor-fid))
(hash-remove! (dataspace-actors ds) actor-fid)
(let abort-facet! ((fid actor-fid))
(define f (lookup-facet ds fid))
(when f
(hash-remove! (dataspace-facets ds) fid)
(for [(child-fid (in-set (facet-children f)))] (abort-facet! child-fid))
(retract-facet-assertions-and-subscriptions! ds fid f))))
;; Cleanly terminates a facet and its children, running stop-scripts etc.
(define (terminate-facet! ds fid)
(define f (lookup-facet ds fid))
(when f
(define parent-fid (fid-parent fid))
(when (pair? parent-fid)
(define pf (lookup-facet ds parent-fid))
(when pf (set-facet-children! pf (set-remove (facet-children pf) fid))))
(hash-remove! (dataspace-facets ds) fid)
(for [(child-fid (in-set (facet-children f)))]
(terminate-facet! ds child-fid))
;; Run stop-scripts after terminating children. This means that
;; children's stop-scripts run before ours.
(for [(script (reverse (facet-stop-scripts f)))]
(schedule-script! ds
(lambda ()
(with-current-facet [ds fid #t]
(script)))))
(retract-facet-assertions-and-subscriptions! ds fid f)
(schedule-script!*
#:priority *gc-priority*
ds
(lambda ()
(if (pair? parent-fid)
(when (facet-live-but-inert? ds parent-fid) (terminate-facet! ds parent-fid))
(terminate-actor! ds fid))))))
(define (stop-facet! ds fid stop-script)
(with-current-facet [ds (fid-parent fid) #t] ;; run in parent context wrt terminating facet
(schedule-script! ds (lambda ()
(terminate-facet! ds fid)
(schedule-script! ds stop-script)))))
(define (add-stop-script! ds script-proc)
(define f (lookup-facet ds (current-facet-id)))
(when f (set-facet-stop-scripts! f (cons script-proc (facet-stop-scripts f)))))
(define (add-endpoint! ds where assertion-fn handler)
(when (in-script?)
(error 'add-endpoint!
"~a: Cannot add endpoint in script; are you missing a (react ...)?"
where))
(define fid (current-facet-id))
(define eid (generate-id! ds))
(define assertion
(parameterize ((current-dataflow-subject-id (list fid eid)))
(assertion-fn)))
(define ep (endpoint eid assertion assertion-fn handler))
(dataspace-assert! ds assertion)
(when handler (dataspace-subscribe! ds handler))
(hash-set! (facet-endpoints (lookup-facet ds fid)) eid ep))
(define (enqueue-action! ds action)
(set-dataspace-pending-actions! ds (enqueue (dataspace-pending-actions ds) action)))
(define (ensure-patch-action! ds)
(let ((q (dataspace-pending-actions ds)))
(when (or (queue-empty? q) (not (patch? (queue-last q))))
(enqueue-action! ds (patch (make-bag)))))
(patch-changes (queue-last (dataspace-pending-actions ds))))
(define (dataspace-retract! ds assertion)
(when (not (void? assertion))
(bag-change! (ensure-patch-action! ds) assertion -1)))
(define (dataspace-assert! ds assertion)
(when (not (void? assertion))
(bag-change! (ensure-patch-action! ds) assertion +1)))
(define (dataspace-unsubscribe! ds h)
(remove-interest! (dataspace-routing-table ds) h))
(define (dataspace-subscribe! ds h)
(add-interest! (dataspace-routing-table ds) h))
(define (dataspace-send! ds body)
(enqueue-action! ds (message body)))
(define (dataspace-spawn! ds name boot-proc initial-assertions)
(enqueue-action! ds (spawn name boot-proc initial-assertions)))
(module+ test
;; TODO: move somewhere sensible
;; Thin veneers over `struct` for declaring intent.
(define-syntax-rule (assertion-struct item ...) (struct item ... #:prefab))
(define-syntax-rule (message-struct item ...) (struct item ... #:prefab))
(message-struct set-box (new-value))
(assertion-struct box-state (value))
;; TODO: move somewhere sensible
(assertion-struct observe (specification))
(define ds
(make-dataspace
'ground
(lambda ()
(dataspace-spawn!
ds
'box
(lambda ()
(define current-value (field-handle 'current-value
(generate-id! (current-dataspace))
(fid->actor-fid (current-facet-id))
0))
(add-endpoint! (current-dataspace)
'stop-when-ten
(lambda ()
(when (= (current-value) 10)
(stop-facet! (current-dataspace)
(current-facet-id)
(lambda ()
(log-info "box: terminating"))))
(void))
#f)
(add-endpoint! (current-dataspace)
'assert-box-state
(lambda () (box-state (current-value)))
#f)
(add-endpoint! (current-dataspace)
'on-message-set-box
(lambda () (observe (set-box (capture (discard)))))
(skeleton-interest (list struct:set-box #f)
'()
'()
'((0 0))
(capture-facet-context
(lambda (op new-value)
(when (eq? '! op)
(schedule-script!
(current-dataspace)
(lambda ()
(log-info "box: taking on new-value ~v" new-value)
(current-value new-value)))))))))
(set))
(dataspace-spawn!
ds
'client
(lambda ()
(add-endpoint! (current-dataspace)
'stop-when-retracted-observe-set-box
(lambda () (observe (observe (set-box (discard)))))
(skeleton-interest (list struct:observe (list struct:set-box #f))
'()
'()
'()
(capture-facet-context
(lambda (op)
(when (eq? '- op)
(stop-facet!
(current-dataspace)
(current-facet-id)
(lambda ()
(log-info "client: box has gone"))))))))
(add-endpoint! (current-dataspace)
'on-asserted-box-state
(lambda () (observe (box-state (capture (discard)))))
(skeleton-interest (list struct:box-state #f)
'()
'()
'((0 0))
(capture-facet-context
(lambda (op v)
(when (eq? '+ op)
(schedule-script!
(current-dataspace)
(lambda ()
(log-info "client: learned that box's value is now ~v" v)
(dataspace-send! (current-dataspace)
(set-box (+ v 1)))))))))))
(set)))))
(require racket/pretty)
;; (pretty-print ds)
(#;time values
(let loop ((i 0))
;; (printf "--- i = ~v\n" i)
(when (run-scripts! ds)
;; (pretty-print ds)
(loop (+ i 1)))))
;; (pretty-print ds)
)