449 lines
18 KiB
Racket
449 lines
18 KiB
Racket
|
#lang racket/base
|
||
|
|
||
|
(require syndicate/functional-queue)
|
||
|
(require syndicate/dataflow)
|
||
|
(require racket/match)
|
||
|
(require racket/set)
|
||
|
|
||
|
(module+ test (require rackunit))
|
||
|
|
||
|
(require "skeleton.rkt")
|
||
|
(require "pattern.rkt")
|
||
|
(require "bag.rkt")
|
||
|
|
||
|
;; A `FID` is a Facet ID, uniquely identifying both a facet and its
|
||
|
;; ancestry in a `Dataspace`.
|
||
|
;;
|
||
|
;; FID = (Listof Nat)
|
||
|
;;
|
||
|
;; The first `Nat` in the list is the unique identifier for this
|
||
|
;; facet; the `cdr` of the list is the FID of its parent facet. No two
|
||
|
;; `FID`s in a `Dataspace` have the same first `Nat`.
|
||
|
|
||
|
;; A `Dataspace` is a ... TODO
|
||
|
|
||
|
;; An `Action` is either `(patch (Deltaof Assertion))` or `(message
|
||
|
;; Assertion)` or `(spawn BootProc)`.
|
||
|
(struct patch (changes) #:prefab)
|
||
|
(struct message (body) #:prefab)
|
||
|
(struct spawn (boot-proc) #:prefab)
|
||
|
|
||
|
(struct dataspace ([next-id #:mutable] ;; Nat
|
||
|
routing-table ;; Skeleton
|
||
|
facets ;; (MutableHash FID Facet)
|
||
|
actors ;; (MutableSetof FID)
|
||
|
assertions ;; (Bagof Assertion)
|
||
|
dataflow ;; DataflowGraph
|
||
|
pending-scripts ;; (MutableVectorof (Queueof (-> Any)))
|
||
|
[pending-actions #:mutable] ;; (Queueof Action)
|
||
|
)
|
||
|
#:transparent)
|
||
|
|
||
|
(struct facet (id ;; FID
|
||
|
endpoints ;; (MutableHash EID Endpoint)
|
||
|
[stop-scripts #:mutable] ;; (Listof Script) -- IN REVERSE ORDER
|
||
|
[children #:mutable] ;; (Setof FID)
|
||
|
)
|
||
|
#:transparent)
|
||
|
|
||
|
(struct endpoint (id ;; EID
|
||
|
[assertion #:mutable] ;; Assertion
|
||
|
assertion-fn ;; (-> Assertion)
|
||
|
handler ;; (Option SkInterest)
|
||
|
)
|
||
|
#:transparent)
|
||
|
|
||
|
;; TODO: field ownership: record actor (root facet) ID in field, check
|
||
|
;; it on access.
|
||
|
(struct field-handle (name ;; Symbol
|
||
|
id ;; Nat
|
||
|
owner ;; FID
|
||
|
[value #:mutable] ;; Any
|
||
|
)
|
||
|
#:methods gen:custom-write
|
||
|
[(define (write-proc f port mode)
|
||
|
(fprintf port "#<field-handle:~a:~a>" (field-handle-name f) (field-handle-id f)))]
|
||
|
#:property prop:procedure
|
||
|
(case-lambda
|
||
|
[(f)
|
||
|
(dataflow-record-observation! (dataspace-dataflow (current-dataspace)) f)
|
||
|
(field-handle-value f)]
|
||
|
[(f v)
|
||
|
(dataflow-record-damage! (dataspace-dataflow (current-dataspace)) f)
|
||
|
(set-field-handle-value! f v)]))
|
||
|
|
||
|
;; Parameterof Dataspace
|
||
|
(define current-dataspace (make-parameter #f))
|
||
|
|
||
|
;; Parameterof FID
|
||
|
(define current-actor (make-parameter #f))
|
||
|
|
||
|
;; Parameterof FID
|
||
|
(define current-facet-id (make-parameter #f))
|
||
|
|
||
|
;; Parameterof Facet
|
||
|
(define current-facet (make-parameter #f))
|
||
|
|
||
|
;; Parameterof Boolean
|
||
|
(define in-script? (make-parameter #t))
|
||
|
|
||
|
;;---------------------------------------------------------------------------
|
||
|
|
||
|
;; Script priorities. These are used to ensure that the results of
|
||
|
;; some *side effects* are visible to certain pieces of code.
|
||
|
|
||
|
(module priorities racket/base
|
||
|
(require (for-syntax racket/base))
|
||
|
|
||
|
(define-syntax (define-priority-levels stx)
|
||
|
(let loop ((counter 0) (stx (syntax-case stx ()
|
||
|
[(_ level ...) #'(level ...)])))
|
||
|
(syntax-case stx ()
|
||
|
[()
|
||
|
#'(void)]
|
||
|
[(#:count c)
|
||
|
#`(begin (define c #,counter)
|
||
|
(provide c))]
|
||
|
[(this-level more ...)
|
||
|
#`(begin (define this-level #,counter)
|
||
|
(provide this-level)
|
||
|
#,(loop (+ counter 1) #'(more ...)))])))
|
||
|
|
||
|
(define-priority-levels ;; highest-priority to lowest-priority
|
||
|
*query-priority-high*
|
||
|
*query-priority*
|
||
|
*query-handler-priority*
|
||
|
*normal-priority*
|
||
|
*gc-priority*
|
||
|
*idle-priority*
|
||
|
#:count priority-count))
|
||
|
|
||
|
(require (submod "." priorities))
|
||
|
|
||
|
;;---------------------------------------------------------------------------
|
||
|
|
||
|
(define (make-dataspace)
|
||
|
(dataspace 0
|
||
|
(make-empty-skeleton)
|
||
|
(make-hash)
|
||
|
(mutable-set)
|
||
|
(make-bag)
|
||
|
(make-dataflow-graph)
|
||
|
(make-vector priority-count (make-queue))
|
||
|
(make-queue)))
|
||
|
|
||
|
(define (generate-id! ds)
|
||
|
(let ((id (dataspace-next-id ds)))
|
||
|
(set-dataspace-next-id! ds (+ id 1))
|
||
|
id))
|
||
|
|
||
|
(define (fid-parent fid)
|
||
|
(cdr fid))
|
||
|
|
||
|
(define (generate-fid! ds parent-fid)
|
||
|
(cons (generate-id! ds) parent-fid))
|
||
|
|
||
|
(define (actor-fid? fid)
|
||
|
(null? (fid-parent fid)))
|
||
|
|
||
|
(define (add-actor! ds boot-proc)
|
||
|
(define actor-fid (generate-fid! ds '()))
|
||
|
(set-add! (dataspace-actors ds) actor-fid)
|
||
|
(add-facet! ds actor-fid actor-fid boot-proc))
|
||
|
|
||
|
(define (lookup-facet ds fid)
|
||
|
(hash-ref (dataspace-facets ds) fid #f))
|
||
|
|
||
|
(define-syntax-rule (with-current-facet [ds actor-fid fid f script?] body ...)
|
||
|
(parameterize ((current-dataspace ds)
|
||
|
(current-actor actor-fid)
|
||
|
(current-facet-id fid)
|
||
|
(current-facet f)
|
||
|
(in-script? script?))
|
||
|
body ...))
|
||
|
|
||
|
(define (capture-facet-context proc)
|
||
|
(let ((ds (current-dataspace))
|
||
|
(actor-fid (current-actor))
|
||
|
(fid (current-facet-id))
|
||
|
(f (current-facet)))
|
||
|
(lambda args
|
||
|
(with-current-facet [ds actor-fid fid f #t]
|
||
|
(apply proc args)))))
|
||
|
|
||
|
(define (pop-next-script! ds)
|
||
|
(define priority-levels (dataspace-pending-scripts ds))
|
||
|
(let loop ((level 0))
|
||
|
(and (< level (vector-length priority-levels))
|
||
|
(let ((q (vector-ref priority-levels level)))
|
||
|
(if (queue-empty? q)
|
||
|
(loop (+ level 1))
|
||
|
(let-values (((script q) (dequeue q)))
|
||
|
(vector-set! priority-levels level q)
|
||
|
script))))))
|
||
|
|
||
|
(define (run-all-pending-scripts! ds)
|
||
|
(define script (pop-next-script! ds))
|
||
|
(and script
|
||
|
(begin (script)
|
||
|
(dataflow-repair-damage! (dataspace-dataflow ds)
|
||
|
(lambda (subject-id)
|
||
|
(match-define (list fid eid) subject-id)
|
||
|
(define f (lookup-facet ds fid))
|
||
|
(when f
|
||
|
(define ep (hash-ref (facet-endpoints f) eid))
|
||
|
(define old-assertion (endpoint-assertion ep))
|
||
|
(define new-assertion ((endpoint-assertion-fn ep)))
|
||
|
(when (not (equal? old-assertion new-assertion))
|
||
|
(set-endpoint-assertion! ep new-assertion)
|
||
|
(dataspace-retract! ds old-assertion)
|
||
|
(dataspace-assert! ds new-assertion)
|
||
|
(define h (endpoint-handler ep))
|
||
|
(when h
|
||
|
(dataspace-unsubscribe! ds h)
|
||
|
(dataspace-subscribe! ds h))))))
|
||
|
(run-all-pending-scripts! ds))))
|
||
|
|
||
|
(define (perform-pending-actions! ds)
|
||
|
(define actions (queue->list (dataspace-pending-actions ds)))
|
||
|
(set-dataspace-pending-actions! ds (make-queue))
|
||
|
(for [(action actions)]
|
||
|
(match action
|
||
|
[(patch delta)
|
||
|
(for [((a count) (in-bag delta))]
|
||
|
(match (bag-change! (dataspace-assertions ds) a count)
|
||
|
['present->absent (remove-assertion! (dataspace-routing-table ds) a)]
|
||
|
['absent->present (add-assertion! (dataspace-routing-table ds) a)]
|
||
|
;; 'absent->absent absurd
|
||
|
['present->present (void)]))] ;; i.e. no visible change
|
||
|
[(message body)
|
||
|
(send-assertion! (dataspace-routing-table ds) body)]
|
||
|
[(spawn boot-proc)
|
||
|
(add-actor! ds boot-proc)]))
|
||
|
(not (null? actions)))
|
||
|
|
||
|
(define (run-scripts! ds)
|
||
|
(define ran-a-script (run-all-pending-scripts! ds))
|
||
|
(define performed-an-action (perform-pending-actions! ds))
|
||
|
;; TODO: figure out when a dataspace should quit itself. Given the
|
||
|
;; mutable nature of the implementation, maybe never? It might be
|
||
|
;; being held elsewhere!
|
||
|
(or ran-a-script performed-an-action))
|
||
|
|
||
|
(define (add-facet! ds actor-fid fid boot-proc)
|
||
|
(define parent-fid (fid-parent fid))
|
||
|
(define f (facet fid
|
||
|
(make-hash)
|
||
|
'()
|
||
|
(set)))
|
||
|
(hash-set! (dataspace-facets ds) fid f)
|
||
|
(when (pair? parent-fid)
|
||
|
(define pf (lookup-facet ds parent-fid))
|
||
|
(when pf (set-facet-children! pf (set-add (facet-children pf) fid))))
|
||
|
(with-current-facet [ds actor-fid fid f #f]
|
||
|
(boot-proc))
|
||
|
(schedule-script! ds (lambda ()
|
||
|
(when (and (facet-live? ds fid)
|
||
|
(or (and (pair? parent-fid) (not (facet-live? ds parent-fid)))
|
||
|
(facet-live-but-inert? ds fid)))
|
||
|
(terminate-facet! ds actor-fid fid)))))
|
||
|
|
||
|
(define (facet-live? ds fid)
|
||
|
(hash-has-key? (dataspace-facets ds) fid))
|
||
|
|
||
|
(define (facet-live-but-inert? ds fid)
|
||
|
(define f (lookup-facet ds fid))
|
||
|
(and f
|
||
|
(hash-empty? (facet-endpoints f))
|
||
|
(set-empty? (facet-children f))))
|
||
|
|
||
|
(define (schedule-script! #:priority [priority *normal-priority*] ds thunk)
|
||
|
(define v (dataspace-pending-scripts ds))
|
||
|
(vector-set! v priority (enqueue (vector-ref v priority) thunk)))
|
||
|
|
||
|
(define (terminate-facet! ds actor-fid fid)
|
||
|
(define f (lookup-facet ds fid))
|
||
|
(when f
|
||
|
(define parent-fid (fid-parent fid))
|
||
|
|
||
|
(when (pair? parent-fid)
|
||
|
(define pf (lookup-facet ds parent-fid))
|
||
|
(when pf (set-facet-children! (set-remove (facet-children pf) fid))))
|
||
|
|
||
|
(hash-remove! (dataspace-facets ds) fid)
|
||
|
|
||
|
(for [(child-fid (in-set (facet-children f)))]
|
||
|
(terminate-facet! ds actor-fid child-fid))
|
||
|
|
||
|
;; Run stop-scripts after terminating children. This means that
|
||
|
;; children's stop-scripts run before ours.
|
||
|
(for [(script (reverse (facet-stop-scripts f)))]
|
||
|
(schedule-script! ds script))
|
||
|
|
||
|
(schedule-script!
|
||
|
ds
|
||
|
(lambda ()
|
||
|
(for [((eid ep) (in-hash (facet-endpoints f)))]
|
||
|
(dataflow-forget-subject! (dataspace-dataflow ds) (list fid eid))
|
||
|
(dataspace-retract! ds (endpoint-assertion ep))
|
||
|
(define h (endpoint-handler ep))
|
||
|
(when h (dataspace-unsubscribe! ds h)))))
|
||
|
|
||
|
(schedule-script!
|
||
|
#:priority *gc-priority*
|
||
|
ds
|
||
|
(lambda ()
|
||
|
(when (and (pair? parent-fid) (facet-live-but-inert? ds parent-fid))
|
||
|
(log-info "terminating ~v because it's dead and child ~v terminated" parent-fid fid)
|
||
|
(terminate-facet! ds actor-fid parent-fid))))))
|
||
|
|
||
|
(define (add-endpoint! ds where assertion-fn handler)
|
||
|
(when (in-script?)
|
||
|
(error 'add-endpoint!
|
||
|
"~a: Cannot add endpoint in script; are you missing a (react ...)?"
|
||
|
where))
|
||
|
(define eid (generate-id! ds))
|
||
|
(define assertion
|
||
|
(parameterize ((current-dataflow-subject-id (list (current-facet-id) eid)))
|
||
|
(assertion-fn)))
|
||
|
(define ep (endpoint eid assertion assertion-fn handler))
|
||
|
(dataspace-assert! ds assertion)
|
||
|
(when handler (dataspace-subscribe! ds handler))
|
||
|
(hash-set! (facet-endpoints (current-facet)) eid ep))
|
||
|
|
||
|
(define (ensure-patch-action! ds)
|
||
|
(define old-q (dataspace-pending-actions ds))
|
||
|
(when (or (queue-empty? old-q) (not (patch? (queue-last old-q))))
|
||
|
(set-dataspace-pending-actions! ds (enqueue old-q (patch (make-bag)))))
|
||
|
(patch-changes (queue-last (dataspace-pending-actions ds))))
|
||
|
|
||
|
(define (dataspace-retract! ds assertion)
|
||
|
(when (not (void? assertion))
|
||
|
(bag-change! (ensure-patch-action! ds) assertion -1)))
|
||
|
|
||
|
(define (dataspace-assert! ds assertion)
|
||
|
(when (not (void? assertion))
|
||
|
(bag-change! (ensure-patch-action! ds) assertion +1)))
|
||
|
|
||
|
(define (dataspace-unsubscribe! ds h)
|
||
|
(remove-interest! (dataspace-routing-table ds) h))
|
||
|
|
||
|
(define (dataspace-subscribe! ds h)
|
||
|
(add-interest! (dataspace-routing-table ds) h))
|
||
|
|
||
|
(define (dataspace-send! ds body)
|
||
|
(set-dataspace-pending-actions! ds (enqueue (dataspace-pending-actions ds) (message body))))
|
||
|
|
||
|
(module+ test
|
||
|
;; TODO: move somewhere sensible
|
||
|
;; Thin veneers over `struct` for declaring intent.
|
||
|
(define-syntax-rule (assertion-struct item ...) (struct item ... #:prefab))
|
||
|
(define-syntax-rule (message-struct item ...) (struct item ... #:prefab))
|
||
|
|
||
|
(message-struct set-box (new-value))
|
||
|
(assertion-struct box-state (value))
|
||
|
|
||
|
;; TODO: move somewhere sensible
|
||
|
(assertion-struct observe (specification))
|
||
|
|
||
|
(define ds (make-dataspace))
|
||
|
(add-actor! ds
|
||
|
(lambda ()
|
||
|
(define current-value (field-handle 'current-value
|
||
|
(generate-id! (current-dataspace))
|
||
|
(current-facet-id)
|
||
|
0))
|
||
|
(add-endpoint! (current-dataspace)
|
||
|
'stop-when-ten
|
||
|
(capture-facet-context
|
||
|
(lambda ()
|
||
|
(when (= (current-value) 10)
|
||
|
(schedule-script! (current-dataspace)
|
||
|
(capture-facet-context
|
||
|
(lambda ()
|
||
|
(schedule-script!
|
||
|
(current-dataspace)
|
||
|
(capture-facet-context
|
||
|
(lambda ()
|
||
|
(printf "box: terminating\n"))))
|
||
|
(terminate-facet! (current-dataspace)
|
||
|
(current-actor)
|
||
|
(current-facet-id))))))
|
||
|
(void)))
|
||
|
#f)
|
||
|
(add-endpoint! (current-dataspace)
|
||
|
'assert-box-state
|
||
|
(capture-facet-context
|
||
|
(lambda () (box-state (current-value))))
|
||
|
#f)
|
||
|
(add-endpoint! (current-dataspace)
|
||
|
'on-message-set-box
|
||
|
(capture-facet-context
|
||
|
(lambda () (observe (set-box (capture (discard))))))
|
||
|
(skeleton-interest (list struct:set-box #f)
|
||
|
'()
|
||
|
'()
|
||
|
'((0 0))
|
||
|
(capture-facet-context
|
||
|
(lambda (op new-value)
|
||
|
(when (eq? '! op)
|
||
|
(schedule-script!
|
||
|
(current-dataspace)
|
||
|
(capture-facet-context
|
||
|
(lambda ()
|
||
|
(printf "new-value ~a ~v\n" op new-value)
|
||
|
(current-value new-value)))))))))))
|
||
|
(add-actor! ds
|
||
|
(lambda ()
|
||
|
(add-endpoint! (current-dataspace)
|
||
|
'stop-when-retracted-observe-set-box
|
||
|
(capture-facet-context
|
||
|
(lambda () (observe (observe (set-box (discard))))))
|
||
|
(skeleton-interest (list struct:observe (list struct:set-box #f))
|
||
|
'()
|
||
|
'()
|
||
|
'()
|
||
|
(capture-facet-context
|
||
|
(lambda (op)
|
||
|
(when (eq? '- op)
|
||
|
(schedule-script!
|
||
|
(current-dataspace)
|
||
|
(capture-facet-context
|
||
|
(lambda ()
|
||
|
(schedule-script!
|
||
|
(current-dataspace)
|
||
|
(capture-facet-context
|
||
|
(lambda ()
|
||
|
(printf "client: box has gone\n"))))
|
||
|
(terminate-facet! (current-dataspace)
|
||
|
(current-actor)
|
||
|
(current-facet-id))))))))))
|
||
|
(add-endpoint! (current-dataspace)
|
||
|
'on-asserted-box-state
|
||
|
(capture-facet-context
|
||
|
(lambda () (observe (box-state (capture (discard))))))
|
||
|
(skeleton-interest (list struct:box-state #f)
|
||
|
'()
|
||
|
'()
|
||
|
'((0 0))
|
||
|
(capture-facet-context
|
||
|
(lambda (op v)
|
||
|
(when (eq? '+ op)
|
||
|
(schedule-script!
|
||
|
(current-dataspace)
|
||
|
(capture-facet-context
|
||
|
(lambda ()
|
||
|
(printf "v ~a ~v\n" op v)
|
||
|
(dataspace-send! (current-dataspace)
|
||
|
(set-box (+ v 1)))))))))))))
|
||
|
|
||
|
(require racket/pretty)
|
||
|
(pretty-print ds)
|
||
|
(let loop ((i 0))
|
||
|
(printf "--- i = ~v\n" i)
|
||
|
(when (run-scripts! ds)
|
||
|
(pretty-print ds)
|
||
|
(loop (+ i 1))))
|
||
|
(pretty-print ds)
|
||
|
)
|