532 lines
21 KiB
Racket
532 lines
21 KiB
Racket
#lang racket/base
|
|
;;; SPDX-License-Identifier: LGPL-3.0-or-later
|
|
;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
|
|
|
|
(provide (struct-out <entity>)
|
|
entity
|
|
|
|
current-turn
|
|
|
|
make-actor-system
|
|
make-actor-group
|
|
actor-system-wait
|
|
actor-system-shutdown!
|
|
escape-pod
|
|
|
|
actor?
|
|
actor-id
|
|
actor-name
|
|
actor-engine
|
|
actor-daemon?
|
|
actor-exit-reason
|
|
actor-add-exit-hook!
|
|
actor-remove-exit-hook!
|
|
actor-daemon!
|
|
*dead-actor*
|
|
|
|
facet?
|
|
facet-id
|
|
facet-actor
|
|
facet-parent
|
|
facet-live?
|
|
facet-inert?
|
|
facet-on-end-of-turn!
|
|
facet-on-stop!
|
|
facet-prevent-inert-check!
|
|
*dead-facet*
|
|
|
|
turn?
|
|
turn-id
|
|
turn-active-facet
|
|
turn-committed?
|
|
turn!
|
|
turn-freshen
|
|
turn-ref
|
|
turn-facet!
|
|
turn-stop!
|
|
turn-spawn!
|
|
turn-stop-actor-system!
|
|
turn-stop-actor!
|
|
turn-crash!
|
|
turn-field!
|
|
turn-dataflow!
|
|
turn-assert/dataflow!
|
|
turn-assert!
|
|
turn-assert!*
|
|
turn-retract!
|
|
turn-replace!
|
|
turn-retract!*
|
|
turn-sync!
|
|
turn-sync!*
|
|
turn-message!)
|
|
|
|
(module+ internals
|
|
(provide make-actor
|
|
actor-terminate!))
|
|
|
|
(require (only-in preserves preserve=?))
|
|
(require racket/match)
|
|
(require (only-in racket/exn exn->string))
|
|
|
|
(require "rewrite.rkt")
|
|
(require "engine.rkt")
|
|
(require "entity-ref.rkt")
|
|
(require "dataflow.rkt")
|
|
(require "field.rkt")
|
|
(require "support/counter.rkt")
|
|
|
|
(struct outbound-assertion (handle peer [established? #:mutable]))
|
|
|
|
(struct turn (id
|
|
active-facet
|
|
[queues #:mutable])
|
|
#:methods gen:custom-write
|
|
[(define (write-proc t port mode)
|
|
(fprintf port "#<turn:~a~a>" (turn-id t) (if (turn-committed? t) ":committed" "")))])
|
|
|
|
;;---------------------------------------------------------------------------
|
|
|
|
(define current-turn (make-parameter #f))
|
|
|
|
(define generate-actor-id (make-counter))
|
|
(define generate-turn-id (make-counter))
|
|
(define generate-handle (make-counter))
|
|
|
|
(define-logger syndicate/actor)
|
|
|
|
(define *dead-actor*
|
|
(actor (generate-actor-id)
|
|
'*dead-actor*
|
|
*dead-engine*
|
|
#t
|
|
(make-dataflow-graph)
|
|
'uninitialized
|
|
#t
|
|
'()))
|
|
|
|
(define *dead-facet*
|
|
(facet (generate-actor-id)
|
|
*dead-actor*
|
|
#f
|
|
(make-hasheq)
|
|
(make-hash)
|
|
'()
|
|
'()
|
|
#f
|
|
0))
|
|
|
|
(set-actor-root! *dead-actor* *dead-facet*)
|
|
|
|
;;--------------------------------------------------------------------------
|
|
|
|
(define (make-actor-system boot-proc #:name [name 'actor-system])
|
|
(define e (make-engine 1 name (lambda (restart)
|
|
(actor-system-shutdown! e)
|
|
(restart void))))
|
|
(queue-task! e (lambda ()
|
|
(make-actor name e #t boot-proc (make-hash))
|
|
(adjust-inhabitant-count! e -1)))
|
|
(actor-system-wait e))
|
|
|
|
(define (make-actor-group boot-proc
|
|
#:name [name 'actor-group]
|
|
#:link? [link? #f])
|
|
(define owning-facet (turn-active-facet (current-turn)))
|
|
(define e (make-engine 1 name (lambda (restart)
|
|
(when link? (turn! owning-facet (lambda () (turn-stop!))))
|
|
(actor-system-shutdown! e)
|
|
(restart void))))
|
|
(when link? (facet-on-stop! owning-facet (lambda () (actor-system-shutdown! e))))
|
|
(queue-task! e (lambda ()
|
|
(make-actor name e #t boot-proc (make-hash))
|
|
(adjust-inhabitant-count! e -1)))
|
|
e)
|
|
|
|
(define (actor-system-wait e)
|
|
(thread-wait (engine-thread e)))
|
|
|
|
(define (actor-system-shutdown! e)
|
|
(define actors (engine-shutdown! e))
|
|
(for [(ac (in-list actors))] (actor-terminate! ac #t)))
|
|
|
|
(define (escape-pod boot-proc #:name [name 'escape-pod])
|
|
(define e (actor-engine (facet-actor (turn-active-facet (current-turn)))))
|
|
(queue-task! e (lambda () (make-actor name e #t boot-proc (make-hash)))))
|
|
|
|
(define (make-actor name engine daemon? boot-proc initial-assertions)
|
|
(define ac (actor (generate-actor-id)
|
|
name
|
|
engine
|
|
daemon?
|
|
(make-dataflow-graph)
|
|
'uninitialized
|
|
#f
|
|
'()))
|
|
(when (not daemon?)
|
|
(adjust-inhabitant-count! engine +1))
|
|
(set-actor-root! ac (make-facet ac #f))
|
|
(log-syndicate/actor-info "~a booting" ac)
|
|
(define user-root-facet (make-facet ac (actor-root ac) initial-assertions))
|
|
(turn! user-root-facet (stop-if-inert-after boot-proc))
|
|
(if (engine-running? engine)
|
|
(engine-register! engine ac)
|
|
(actor-terminate! ac #t))
|
|
user-root-facet)
|
|
|
|
(define (actor-add-exit-hook! ac hook)
|
|
(set-actor-exit-hooks! ac (cons hook (actor-exit-hooks ac))))
|
|
|
|
(define (actor-remove-exit-hook! ac hook)
|
|
(set-actor-exit-hooks! ac (filter (lambda (h) (not (eq? h hook)))
|
|
(actor-exit-hooks ac))))
|
|
|
|
(define (actor-daemon! ac daemon?)
|
|
(when (not (eq? daemon? (actor-daemon? ac)))
|
|
(set-actor-daemon?! ac daemon?)
|
|
(adjust-inhabitant-count! (actor-engine ac) (if daemon? -1 +1))))
|
|
|
|
(define (actor-terminate! ac reason)
|
|
(when (not (actor-exit-reason ac))
|
|
(set-actor-exit-reason! ac reason)
|
|
(if (eq? reason #t)
|
|
(log-syndicate/actor-info "~a terminated OK" ac)
|
|
(log-syndicate/actor-error "~a terminated with exception:\n~a"
|
|
ac
|
|
(exn->string reason)))
|
|
(for [(h (in-list (reverse (actor-exit-hooks ac))))] (h))
|
|
(queue-task! (actor-engine ac)
|
|
(lambda ()
|
|
(turn! (actor-root ac)
|
|
(lambda () (facet-terminate! (actor-root ac) (eq? reason #t)))
|
|
#t)
|
|
(engine-deregister! (actor-engine ac) ac)
|
|
(when (not (actor-daemon? ac))
|
|
(adjust-inhabitant-count! (actor-engine ac) -1))))))
|
|
|
|
;;---------------------------------------------------------------------------
|
|
|
|
(define (make-facet ac parent [initial-assertions (make-hash)])
|
|
(define f (facet (generate-actor-id)
|
|
ac
|
|
parent
|
|
(make-hasheq)
|
|
initial-assertions
|
|
'()
|
|
'()
|
|
#t
|
|
0))
|
|
(when parent
|
|
(hash-set! (facet-children parent) f #t))
|
|
f)
|
|
|
|
(define (facet-on-end-of-turn! f action)
|
|
(set-facet-end-of-turn-actions! f (cons action (facet-end-of-turn-actions f))))
|
|
|
|
(define (facet-on-stop! f action)
|
|
(set-facet-shutdown-actions! f (cons action (facet-shutdown-actions f))))
|
|
|
|
(define (facet-inert? f)
|
|
(log-syndicate/actor-debug " facet-inert? ~a: ~a kids, ~a outbound, ~a preventers"
|
|
f
|
|
(hash-count (facet-children f))
|
|
(hash-count (facet-outbound f))
|
|
(facet-inert-check-preventers f))
|
|
(and (hash-empty? (facet-children f))
|
|
(hash-empty? (facet-outbound f))
|
|
(zero? (facet-inert-check-preventers f))))
|
|
|
|
(define (facet-prevent-inert-check! f)
|
|
(define armed #t)
|
|
(set-facet-inert-check-preventers! f (+ (facet-inert-check-preventers f) 1))
|
|
(lambda ()
|
|
(when armed
|
|
(set! armed #f)
|
|
(let ((n (- (facet-inert-check-preventers f) 1)))
|
|
(set-facet-inert-check-preventers! f n)
|
|
(when (zero? n)
|
|
(check-for-inertness (current-turn) f))))))
|
|
|
|
(define (facet-terminate! f orderly?)
|
|
(when (facet-live? f)
|
|
(log-syndicate/actor-debug " ~a stopping (~a)" f (if orderly? "orderly" "disorderly"))
|
|
(set-facet-live?! f #f)
|
|
|
|
(define parent (facet-parent f))
|
|
(when parent (hash-remove! (facet-children parent) f))
|
|
|
|
(with-active-facet f
|
|
(lambda ()
|
|
(for [(c (in-hash-keys (facet-children f)))] (facet-terminate! c orderly?))
|
|
(when orderly? (for [(h (in-list (reverse (facet-shutdown-actions f))))] (h)))
|
|
(let ((turn (current-turn)))
|
|
(for [(a (in-hash-values (facet-outbound f)))] (turn-retract!* turn a)))
|
|
(when orderly?
|
|
(queue-task!
|
|
(actor-engine (facet-actor f))
|
|
(lambda ()
|
|
(if parent
|
|
(when (facet-inert? parent)
|
|
(turn! parent (lambda () (facet-terminate! parent #t))))
|
|
(turn! (actor-root (facet-actor f))
|
|
(lambda () (actor-terminate! (facet-actor f) #t))
|
|
#t)))))))))
|
|
|
|
;;---------------------------------------------------------------------------
|
|
|
|
(define (turn-committed? t)
|
|
(not (turn-queues t)))
|
|
|
|
(define (turn! f action [zombie-turn? #f])
|
|
(define ac (facet-actor f))
|
|
(log-syndicate/actor-debug "start turn ~v~a~a~a"
|
|
f
|
|
(if zombie-turn? ", zombie" "")
|
|
(let ((r (actor-exit-reason ac))) (if r (format ", exit-reason ~v" r) ""))
|
|
(if (facet-live? f) "" ", dead facet"))
|
|
(when (or zombie-turn? (and (not (actor-exit-reason ac)) (facet-live? f)))
|
|
(let ((turn (turn (generate-turn-id) f (make-hasheq))))
|
|
(with-handlers ([exn? (lambda (e)
|
|
(turn! (actor-root ac) (lambda () (actor-terminate! ac e))))])
|
|
(parameterize ((current-turn turn))
|
|
(action)
|
|
(let loop ()
|
|
(dataflow-repair-damage! (actor-dataflow ac) (lambda (action) (action)))
|
|
(define pending (reverse (facet-end-of-turn-actions f)))
|
|
(when (pair? pending)
|
|
(set-facet-end-of-turn-actions! f '())
|
|
(for [(action (in-list pending))] (action))
|
|
(loop))))
|
|
(for [((aa qq) (in-hash (turn-queues turn)))]
|
|
(queue-task! (actor-engine aa)
|
|
(lambda ()
|
|
(define pending (reverse qq))
|
|
(let loop ()
|
|
(when (pair? pending)
|
|
(define ff (caar pending))
|
|
(define action (cdar pending))
|
|
(set! pending (cdr pending))
|
|
(turn! ff (lambda ()
|
|
(action)
|
|
(let inner ()
|
|
(when (and (pair? pending) (eq? ff (caar pending)))
|
|
((cdar pending))
|
|
(set! pending (cdr pending))
|
|
(inner)))))
|
|
(loop))))))
|
|
(set-turn-queues! turn #f)))
|
|
(log-syndicate/actor-debug "end turn ~v\n" f)))
|
|
|
|
(define (with-active-facet f action)
|
|
(let ((inner-turn (turn (generate-turn-id) f (turn-queues (current-turn)))))
|
|
(parameterize ((current-turn inner-turn)) (action))
|
|
(set-turn-queues! inner-turn #f)))
|
|
|
|
(define (turn-enqueue! turn f action)
|
|
(define qs (turn-queues turn))
|
|
(when (not qs)
|
|
(error 'turn-enqueue! "Attempt to reuse ~a in ~a" turn (turn-active-facet turn)))
|
|
(hash-update! qs (facet-actor f) (lambda (actions) (cons (cons f action) actions)) '()))
|
|
|
|
(define (turn-ref turn entity [attenuation '()])
|
|
(entity-ref (turn-active-facet turn) entity attenuation))
|
|
|
|
(define (turn-facet! boot-proc)
|
|
(define turn (current-turn))
|
|
(let ((new-facet (make-facet (facet-actor (turn-active-facet turn)) (turn-active-facet turn))))
|
|
(with-active-facet new-facet (stop-if-inert-after boot-proc))
|
|
new-facet))
|
|
|
|
(define (turn-stop! [f (turn-active-facet (current-turn))] [continuation #f])
|
|
(when (not (eq? (facet-actor f) (facet-actor (turn-active-facet (current-turn)))))
|
|
(error 'turn-stop! "Attempted to stop facet ~v from different actor ~v"
|
|
f
|
|
(facet-actor (turn-active-facet (current-turn)))))
|
|
(log-syndicate/actor-debug " ENQ stop-facet ~v" f)
|
|
(turn-enqueue! (current-turn)
|
|
f
|
|
(lambda ()
|
|
(log-syndicate/actor-debug " DEQ stop-facet ~v" f)
|
|
(facet-terminate! f #t)
|
|
(when continuation
|
|
(log-syndicate/actor-debug " stop-facet ~v continuation" f)
|
|
(with-active-facet (facet-parent f) continuation)))))
|
|
|
|
(define (turn-spawn! turn boot-proc [initial-assertions (make-hash)]
|
|
#:name [name '?]
|
|
#:daemon? [daemon? #f]
|
|
#:link [link-entity #f]
|
|
#:link-message [link-message 'alive])
|
|
(define f (turn-active-facet turn))
|
|
(define o (facet-outbound f))
|
|
(log-syndicate/actor-debug " ENQ spawn ~a" name)
|
|
(turn-enqueue! turn
|
|
f
|
|
(lambda ()
|
|
(log-syndicate/actor-debug " DEQ spawn ~a" name)
|
|
(define new-outbound (make-hash))
|
|
(for [(handle (in-hash-keys initial-assertions))]
|
|
(hash-set! new-outbound handle (hash-ref o handle))
|
|
(hash-remove! o handle))
|
|
(define engine (actor-engine (facet-actor f)))
|
|
(define new-actor-facet (make-actor name engine daemon? boot-proc new-outbound))
|
|
(when link-entity
|
|
(define handle (generate-handle))
|
|
(define linked-peer (entity-ref new-actor-facet link-entity '()))
|
|
(define a (outbound-assertion handle linked-peer #t))
|
|
(hash-set! o handle a)
|
|
(log-syndicate/actor-debug " ENQ link at ~v assert ~v handle ~v"
|
|
linked-peer link-message handle)
|
|
(turn! new-actor-facet
|
|
(lambda ()
|
|
(log-syndicate/actor-debug " DELIVER link to ~v assert ~v handle ~v"
|
|
linked-peer link-message handle)
|
|
(deliver (entity-assert link-entity) link-message handle))))
|
|
(when (hash-empty? o)
|
|
(check-for-inertness (current-turn) f)))))
|
|
|
|
(define (turn-stop-actor-system! turn)
|
|
(define ac (facet-actor (turn-active-facet turn)))
|
|
(log-syndicate/actor-debug " ENQ stop-actor-system ~v" ac)
|
|
(turn-enqueue! turn (actor-root ac) (lambda ()
|
|
(log-syndicate/actor-debug " DEQ stop-actor-system ~v" ac)
|
|
(actor-system-shutdown! (actor-engine ac)))))
|
|
|
|
(define (turn-stop-actor! turn)
|
|
(define ac (facet-actor (turn-active-facet turn)))
|
|
(log-syndicate/actor-debug " ENQ stop-actor ~v" ac)
|
|
(turn-enqueue! turn (actor-root ac) (lambda ()
|
|
(log-syndicate/actor-debug " DEQ stop-actor ~v" ac)
|
|
(actor-terminate! ac #t))))
|
|
|
|
(define (turn-crash! turn exn)
|
|
(define ac (facet-actor (turn-active-facet turn)))
|
|
(log-syndicate/actor-debug " ENQ crash ~v" ac)
|
|
(turn-enqueue! turn (actor-root ac) (lambda ()
|
|
(log-syndicate/actor-debug " DEQ crash ~v" ac)
|
|
(actor-terminate! ac exn))))
|
|
|
|
(define (turn-field! turn name initial-value)
|
|
(log-syndicate/actor-debug " field ~v created: ~v" name initial-value)
|
|
(field (actor-dataflow (facet-actor (turn-active-facet turn))) name initial-value))
|
|
|
|
(define (turn-dataflow! turn action)
|
|
(define f (turn-active-facet turn))
|
|
(define (wrapped) (when (facet-live? f) (with-active-facet f action)))
|
|
(parameterize ((current-dataflow-subject-id wrapped)) (wrapped)))
|
|
|
|
(define (turn-assert/dataflow! turn peer assertion-action)
|
|
(define handle #f)
|
|
(define assertion (void))
|
|
(turn-dataflow! turn
|
|
(lambda ()
|
|
(define new-assertion (assertion-action))
|
|
(when (not (preserve=? assertion new-assertion))
|
|
(set! assertion new-assertion)
|
|
(set! handle (turn-replace! (current-turn) peer handle new-assertion))))))
|
|
|
|
(define (turn-assert! turn peer assertion)
|
|
(define handle (generate-handle))
|
|
(turn-assert!* turn peer assertion handle)
|
|
handle)
|
|
|
|
(define (turn-assert!* turn peer assertion handle)
|
|
(match (run-rewrites (entity-ref-attenuation peer) assertion)
|
|
[(? void?)
|
|
(log-syndicate/actor-debug " blocked assert of ~v at ~v" assertion peer)
|
|
(void)]
|
|
[rewritten
|
|
(define a (outbound-assertion handle peer #f))
|
|
(hash-set! (facet-outbound (turn-active-facet turn)) handle a)
|
|
(log-syndicate/actor-debug " ENQ at ~v assert ~v handle ~v" peer rewritten handle)
|
|
(turn-enqueue! turn
|
|
(entity-ref-relay peer)
|
|
(lambda ()
|
|
(log-syndicate/actor-debug " DEQ at ~v assert ~v handle ~v" peer rewritten handle)
|
|
(set-outbound-assertion-established?! a #t)
|
|
(deliver (entity-assert (entity-ref-target peer)) rewritten handle)))]))
|
|
|
|
(define (turn-retract! turn handle)
|
|
(when handle
|
|
(define a (hash-ref (facet-outbound (turn-active-facet turn)) handle #f))
|
|
(when a (turn-retract!* turn a))))
|
|
|
|
(define (turn-replace! turn peer old-handle assertion)
|
|
(define new-handle (if (void? assertion) #f (turn-assert! turn peer assertion)))
|
|
(turn-retract! turn old-handle)
|
|
new-handle)
|
|
|
|
(define (turn-retract!* turn a)
|
|
(log-syndicate/actor-debug " ENQ at ~v retract handle ~v"
|
|
(outbound-assertion-peer a)
|
|
(outbound-assertion-handle a))
|
|
(turn-enqueue! turn
|
|
(entity-ref-relay (outbound-assertion-peer a))
|
|
(lambda ()
|
|
(log-syndicate/actor-debug " DEQ at ~v retract handle ~v (~a)"
|
|
(outbound-assertion-peer a)
|
|
(outbound-assertion-handle a)
|
|
(if (outbound-assertion-established? a)
|
|
"established"
|
|
"not established"))
|
|
(when (outbound-assertion-established? a)
|
|
(set-outbound-assertion-established?! a #f)
|
|
(deliver (entity-retract (entity-ref-target (outbound-assertion-peer a)))
|
|
(outbound-assertion-handle a)))))
|
|
(let* ((f (turn-active-facet turn))
|
|
(o (facet-outbound f)))
|
|
(hash-remove! o (outbound-assertion-handle a))
|
|
(when (hash-empty? o)
|
|
(check-for-inertness turn f))))
|
|
|
|
(define (turn-sync! turn peer k)
|
|
(turn-sync!* turn peer (turn-ref turn (entity #:message k))))
|
|
|
|
(define (turn-sync!* turn peer-to-sync-with peer-k)
|
|
(log-syndicate/actor-debug " ENQ sync ~v" peer-to-sync-with)
|
|
(turn-enqueue! turn
|
|
(entity-ref-relay peer-to-sync-with)
|
|
(lambda ()
|
|
(log-syndicate/actor-debug " DEQ sync ~v" peer-to-sync-with)
|
|
(deliver (or (entity-sync (entity-ref-target peer-to-sync-with))
|
|
(lambda (peer-k) (turn-message! (current-turn) peer-k #t)))
|
|
peer-k))))
|
|
|
|
(define (turn-message! turn peer assertion)
|
|
(match (run-rewrites (entity-ref-attenuation peer) assertion)
|
|
[(? void?)
|
|
(log-syndicate/actor-debug " blocked message ~v to ~v" assertion peer)
|
|
(void)]
|
|
[rewritten
|
|
(log-syndicate/actor-debug " ENQ message ~v to ~v" assertion peer)
|
|
(turn-enqueue! turn
|
|
(entity-ref-relay peer)
|
|
(lambda ()
|
|
(log-syndicate/actor-debug " DEQ message ~v to ~v" assertion peer)
|
|
(deliver (entity-message (entity-ref-target peer)) rewritten)))]))
|
|
|
|
(define (turn-freshen turn action)
|
|
(when (turn-queues turn) (error 'turn-freshen "Attempt to freshen a non-stale turn"))
|
|
(turn! (turn-active-facet turn) action))
|
|
|
|
;;---------------------------------------------------------------------------
|
|
|
|
(define (stop-if-inert-after action)
|
|
(lambda ()
|
|
(define f (turn-active-facet (current-turn)))
|
|
(action)
|
|
(check-for-inertness (current-turn) f)))
|
|
|
|
(define (check-for-inertness turn f)
|
|
(log-syndicate/actor-debug " ENQ checking ~a" f)
|
|
(turn-enqueue! turn
|
|
f
|
|
(lambda ()
|
|
(log-syndicate/actor-debug " DEQ checking ~a" f)
|
|
(when (or (and (facet-parent f) (not (facet-live? (facet-parent f))))
|
|
(facet-inert? f))
|
|
(turn-stop!)))))
|
|
|
|
(define (deliver maybe-proc . args)
|
|
(when maybe-proc
|
|
(apply maybe-proc args)))
|