syndicate-rkt/syndicate/actor.rkt

542 lines
21 KiB
Racket

#lang racket/base
;;; SPDX-License-Identifier: LGPL-3.0-or-later
;;; SPDX-FileCopyrightText: Copyright © 2021-2024 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
(provide (struct-out <entity>)
entity
current-turn
make-actor-system
make-actor-group
actor-system-wait
actor-system-shutdown!
escape-pod
actor?
actor-id
actor-name
actor-engine
actor-daemon?
actor-exit-reason
actor-add-exit-hook!
actor-remove-exit-hook!
actor-daemon!
*dead-actor*
facet?
facet-id
facet-actor
facet-parent
facet-live?
facet-inert?
facet-on-end-of-turn!
facet-on-stop!
facet-prevent-inert-check!
*dead-facet*
turn?
turn-id
turn-active-facet
turn-committed?
turn!
turn-freshen
turn-ref
turn-facet!
turn-stop!
turn-spawn!
turn-stop-actor-system!
turn-stop-actor!
turn-crash!
turn-field!
turn-dataflow!
turn-assert/dataflow!
turn-assert!
turn-assert!*
turn-retract!
turn-replace!
turn-retract!*
turn-sync!
turn-sync!*
turn-message!)
(module+ internals
(provide make-actor
actor-terminate!
with-active-facet))
(require (only-in preserves preserve=?))
(require racket/match)
(require (only-in racket/exn exn->string))
(require "rewrite.rkt")
(require "engine.rkt")
(require "entity-ref.rkt")
(require "dataflow.rkt")
(require "field.rkt")
(require "support/counter.rkt")
(struct outbound-assertion (handle peer [established? #:mutable]))
(struct turn (id
[active-facet #:mutable]
[queues #:mutable])
#:methods gen:custom-write
[(define (write-proc t port mode)
(fprintf port "#<turn:~a~a>" (turn-id t) (if (turn-committed? t) ":committed" "")))])
;;---------------------------------------------------------------------------
(define current-turn (make-parameter #f))
(define generate-actor-id (make-counter))
(define generate-turn-id (make-counter))
(define generate-handle (make-counter))
(define-logger syndicate/actor)
(define *dead-actor*
(actor (generate-actor-id)
'*dead-actor*
*dead-engine*
#t
(make-dataflow-graph)
'uninitialized
#t
'()))
(define *dead-facet*
(facet (generate-actor-id)
*dead-actor*
#f
(make-hasheq)
(make-hash)
'()
'()
#f
0))
(set-actor-root! *dead-actor* *dead-facet*)
;;--------------------------------------------------------------------------
(define (make-actor-system boot-proc #:name [name 'actor-system])
(define e (make-engine 1 name (lambda (restart)
(actor-system-shutdown! e)
(restart void))))
(queue-task! e (lambda ()
(make-actor name e #t boot-proc (make-hash))
(adjust-inhabitant-count! e -1)))
(actor-system-wait e))
(define (make-actor-group boot-proc
#:name [name 'actor-group]
#:link? [link? #f])
(define owning-facet (turn-active-facet (current-turn)))
(define e (make-engine 1 name (lambda (restart)
(when link?
(turn! owning-facet
(lambda () (turn-stop! owning-facet))))
(actor-system-shutdown! e)
(restart void))))
(when link? (facet-on-stop! owning-facet (lambda () (actor-system-shutdown! e))))
(queue-task! e (lambda ()
(make-actor name e #t boot-proc (make-hash))
(adjust-inhabitant-count! e -1)))
e)
(define (actor-system-wait e)
(thread-wait (engine-thread e)))
(define (actor-system-shutdown! e)
(define actors (engine-shutdown! e))
(for [(ac (in-list actors))] (actor-terminate! ac #t)))
(define (escape-pod boot-proc #:name [name 'escape-pod])
(define e (actor-engine (facet-actor (turn-active-facet (current-turn)))))
(queue-task! e (lambda () (make-actor name e #t boot-proc (make-hash)))))
(define (make-actor name engine daemon? boot-proc initial-assertions)
(define ac (actor (generate-actor-id)
name
engine
daemon?
(make-dataflow-graph)
'uninitialized
#f
'()))
(when (not daemon?)
(adjust-inhabitant-count! engine +1))
(set-actor-root! ac (make-facet ac #f))
(log-syndicate/actor-info "~a booting" ac)
(define user-root-facet (make-facet ac (actor-root ac) initial-assertions))
(turn! user-root-facet (stop-if-inert-after boot-proc))
(if (engine-running? engine)
(engine-register! engine ac)
(actor-terminate! ac #t))
user-root-facet)
(define (actor-add-exit-hook! ac hook)
(set-actor-exit-hooks! ac (cons hook (actor-exit-hooks ac))))
(define (actor-remove-exit-hook! ac hook)
(set-actor-exit-hooks! ac (filter (lambda (h) (not (eq? h hook)))
(actor-exit-hooks ac))))
(define (actor-daemon! ac daemon?)
(when (not (eq? daemon? (actor-daemon? ac)))
(set-actor-daemon?! ac daemon?)
(adjust-inhabitant-count! (actor-engine ac) (if daemon? -1 +1))))
(define (actor-terminate! ac reason)
(when (not (actor-exit-reason ac))
(set-actor-exit-reason! ac reason)
(if (eq? reason #t)
(log-syndicate/actor-info "~a terminated OK" ac)
(log-syndicate/actor-error "~a terminated with exception:\n~a"
ac
(exn->string reason)))
(for [(h (in-list (reverse (actor-exit-hooks ac))))] (h))
(queue-task! (actor-engine ac)
(lambda ()
(turn! (actor-root ac)
(lambda () (facet-terminate! (actor-root ac) (eq? reason #t)))
#t)
(engine-deregister! (actor-engine ac) ac)
(when (not (actor-daemon? ac))
(adjust-inhabitant-count! (actor-engine ac) -1))))))
;;---------------------------------------------------------------------------
(define (make-facet ac parent [initial-assertions (make-hash)])
(define f (facet (generate-actor-id)
ac
parent
(make-hasheq)
initial-assertions
'()
'()
#t
0))
(when parent
(hash-set! (facet-children parent) f #t))
f)
(define (facet-on-end-of-turn! f action)
(set-facet-end-of-turn-actions! f (cons action (facet-end-of-turn-actions f))))
(define (facet-on-stop! f action)
(set-facet-shutdown-actions! f (cons action (facet-shutdown-actions f))))
(define (facet-inert? f)
(log-syndicate/actor-debug " facet-inert? ~a: ~a kids, ~a outbound, ~a preventers"
f
(hash-count (facet-children f))
(hash-count (facet-outbound f))
(facet-inert-check-preventers f))
(and (hash-empty? (facet-children f))
(hash-empty? (facet-outbound f))
(zero? (facet-inert-check-preventers f))))
(define (facet-prevent-inert-check! f)
(define armed #t)
(set-facet-inert-check-preventers! f (+ (facet-inert-check-preventers f) 1))
(lambda ()
(when armed
(set! armed #f)
(let ((n (- (facet-inert-check-preventers f) 1)))
(set-facet-inert-check-preventers! f n)
(when (zero? n)
(check-for-inertness (current-turn) f))))))
(define (facet-terminate! f orderly?)
(when (facet-live? f)
(log-syndicate/actor-debug " ~a stopping (~a)" f (if orderly? "orderly" "disorderly"))
(set-facet-live?! f #f)
(define parent (facet-parent f))
(when parent (hash-remove! (facet-children parent) f))
(with-active-facet f
(lambda ()
(for [(c (in-hash-keys (facet-children f)))] (facet-terminate! c orderly?))
(when orderly?
(let ((actions (reverse (facet-shutdown-actions f))))
(unless (null? actions)
(with-active-facet (or parent f)
(lambda ()
(for [(h (in-list actions))] (h)))))))
(let ((turn (current-turn)))
(for [(a (in-hash-values (facet-outbound f)))] (turn-retract!* turn a)))
(when orderly?
(queue-task!
(actor-engine (facet-actor f))
(lambda ()
(if parent
(when (facet-inert? parent)
(turn! parent (lambda () (facet-terminate! parent #t))))
(turn! (actor-root (facet-actor f))
(lambda () (actor-terminate! (facet-actor f) #t))
#t)))))))))
;;---------------------------------------------------------------------------
(define (turn-committed? t)
(not (turn-queues t)))
(define (turn! f action [zombie-turn? #f])
(define ac (facet-actor f))
(log-syndicate/actor-debug "start turn ~v~a~a~a"
f
(if zombie-turn? ", zombie" "")
(let ((r (actor-exit-reason ac))) (if r (format ", exit-reason ~v" r) ""))
(if (facet-live? f) "" ", dead facet"))
(when (or zombie-turn? (and (not (actor-exit-reason ac)) (facet-live? f)))
(let ((turn (turn (generate-turn-id) f (make-hasheq))))
(with-handlers ([exn? (lambda (e)
(turn! (actor-root ac) (lambda () (actor-terminate! ac e))))])
(parameterize ((current-turn turn))
(action)
(let loop ()
(dataflow-repair-damage! (actor-dataflow ac) (lambda (action) (action)))
(define pending (reverse (facet-end-of-turn-actions f)))
(when (pair? pending)
(set-facet-end-of-turn-actions! f '())
(for [(action (in-list pending))] (action))
(loop))))
(for [((aa qq) (in-hash (turn-queues turn)))]
(queue-task! (actor-engine aa)
(lambda ()
(define pending (reverse qq))
(let loop ()
(when (pair? pending)
(define ff (caar pending))
(define action (cdar pending))
(set! pending (cdr pending))
(turn! ff (lambda ()
(action)
(let inner ()
(when (and (pair? pending) (eq? ff (caar pending)))
((cdar pending))
(set! pending (cdr pending))
(inner)))))
(loop))))))
(set-turn-queues! turn #f)))
(log-syndicate/actor-debug "end turn ~v\n" f)))
(define (with-active-facet f action)
(let* ((t (current-turn))
(saved-f (turn-active-facet t)))
(set-turn-active-facet! t f)
(action)
(set-turn-active-facet! t saved-f)))
(define (turn-enqueue! turn f action)
(define qs (turn-queues turn))
(when (not qs)
(error 'turn-enqueue! "Attempt to reuse ~a in ~a" turn (turn-active-facet turn)))
(hash-update! qs (facet-actor f) (lambda (actions) (cons (cons f action) actions)) '()))
(define (turn-ref turn entity [attenuation '()])
(entity-ref (turn-active-facet turn) entity attenuation))
(define (turn-facet! boot-proc)
(define turn (current-turn))
(let ((new-facet (make-facet (facet-actor (turn-active-facet turn)) (turn-active-facet turn))))
(with-active-facet new-facet (stop-if-inert-after boot-proc))
new-facet))
(define (turn-stop! f [continuation #f])
(when (not (eq? (facet-actor f) (facet-actor (turn-active-facet (current-turn)))))
(error 'turn-stop! "Attempted to stop facet ~v from different actor ~v"
f
(facet-actor (turn-active-facet (current-turn)))))
(log-syndicate/actor-debug " ENQ stop-facet ~v" f)
(turn-enqueue! (current-turn)
f
(lambda ()
(log-syndicate/actor-debug " DEQ stop-facet ~v" f)
(facet-terminate! f #t)
(when continuation
(log-syndicate/actor-debug " stop-facet ~v continuation" f)
(with-active-facet (facet-parent f) continuation)))))
(define (turn-spawn! turn boot-proc [initial-assertions (make-hash)]
#:name [name '?]
#:daemon? [daemon? #f]
#:link [link-entity #f]
#:link-message [link-message 'alive])
(define f (turn-active-facet turn))
(define o (facet-outbound f))
(log-syndicate/actor-debug " ENQ spawn ~a" name)
(turn-enqueue! turn
f
(lambda ()
(log-syndicate/actor-debug " DEQ spawn ~a" name)
(define new-outbound (make-hash))
(for [(handle (in-hash-keys initial-assertions))]
(hash-set! new-outbound handle (hash-ref o handle))
(hash-remove! o handle))
(define engine (actor-engine (facet-actor f)))
(define new-actor-facet (make-actor name engine daemon? boot-proc new-outbound))
(when link-entity
(define handle (generate-handle))
(define linked-peer (entity-ref new-actor-facet link-entity '()))
(define a (outbound-assertion handle linked-peer #t))
(hash-set! o handle a)
(log-syndicate/actor-debug " ENQ link at ~v assert ~v handle ~v"
linked-peer link-message handle)
(turn! new-actor-facet
(lambda ()
(log-syndicate/actor-debug " DELIVER link to ~v assert ~v handle ~v"
linked-peer link-message handle)
(deliver (entity-assert link-entity) link-message handle))))
(when (hash-empty? o)
(check-for-inertness (current-turn) f)))))
(define (turn-stop-actor-system! turn)
(define ac (facet-actor (turn-active-facet turn)))
(log-syndicate/actor-debug " ENQ stop-actor-system ~v" ac)
(turn-enqueue! turn (actor-root ac) (lambda ()
(log-syndicate/actor-debug " DEQ stop-actor-system ~v" ac)
(actor-system-shutdown! (actor-engine ac)))))
(define (turn-stop-actor! turn)
(define ac (facet-actor (turn-active-facet turn)))
(log-syndicate/actor-debug " ENQ stop-actor ~v" ac)
(turn-enqueue! turn (actor-root ac) (lambda ()
(log-syndicate/actor-debug " DEQ stop-actor ~v" ac)
(actor-terminate! ac #t))))
(define (turn-crash! turn exn)
(define ac (facet-actor (turn-active-facet turn)))
(log-syndicate/actor-debug " ENQ crash ~v" ac)
(turn-enqueue! turn (actor-root ac) (lambda ()
(log-syndicate/actor-debug " DEQ crash ~v" ac)
(actor-terminate! ac exn))))
(define (turn-field! turn name initial-value)
(log-syndicate/actor-debug " field ~v created: ~v" name initial-value)
(field (actor-dataflow (facet-actor (turn-active-facet turn))) name initial-value))
(define (turn-dataflow! turn action)
(define f (turn-active-facet turn))
(define (wrapped) (when (facet-live? f) (with-active-facet f action)))
(parameterize ((current-dataflow-subject-id wrapped)) (wrapped)))
(define (turn-assert/dataflow! turn peer assertion-action)
(define handle #f)
(define assertion (void))
(turn-dataflow! turn
(lambda ()
(define new-assertion (assertion-action))
(when (not (preserve=? assertion new-assertion))
(set! assertion new-assertion)
(set! handle (turn-replace! (current-turn) peer handle new-assertion))))))
(define (turn-assert! turn peer assertion)
(define handle (generate-handle))
(turn-assert!* turn peer assertion handle)
handle)
(define (turn-assert!* turn peer assertion handle)
(match (run-rewrites (entity-ref-attenuation peer) assertion)
[(? void?)
(log-syndicate/actor-debug " blocked assert of ~v at ~v" assertion peer)
(void)]
[rewritten
(define a (outbound-assertion handle peer #f))
(hash-set! (facet-outbound (turn-active-facet turn)) handle a)
(log-syndicate/actor-debug " ENQ at ~v assert ~v handle ~v" peer rewritten handle)
(turn-enqueue! turn
(entity-ref-relay peer)
(lambda ()
(log-syndicate/actor-debug " DEQ at ~v assert ~v handle ~v" peer rewritten handle)
(set-outbound-assertion-established?! a #t)
(deliver (entity-assert (entity-ref-target peer)) rewritten handle)))]))
(define (turn-retract! turn handle)
(when handle
(define a (hash-ref (facet-outbound (turn-active-facet turn)) handle #f))
(when a (turn-retract!* turn a))))
(define (turn-replace! turn peer old-handle assertion)
(define new-handle (if (void? assertion) #f (turn-assert! turn peer assertion)))
(turn-retract! turn old-handle)
new-handle)
(define (turn-retract!* turn a)
(log-syndicate/actor-debug " ENQ at ~v retract handle ~v"
(outbound-assertion-peer a)
(outbound-assertion-handle a))
(turn-enqueue! turn
(entity-ref-relay (outbound-assertion-peer a))
(lambda ()
(log-syndicate/actor-debug " DEQ at ~v retract handle ~v (~a)"
(outbound-assertion-peer a)
(outbound-assertion-handle a)
(if (outbound-assertion-established? a)
"established"
"not established"))
(when (outbound-assertion-established? a)
(set-outbound-assertion-established?! a #f)
(deliver (entity-retract (entity-ref-target (outbound-assertion-peer a)))
(outbound-assertion-handle a)))))
(let* ((f (turn-active-facet turn))
(o (facet-outbound f)))
(hash-remove! o (outbound-assertion-handle a))
(when (hash-empty? o)
(check-for-inertness turn f))))
(define (turn-sync! turn peer k)
(turn-sync!* turn peer (turn-ref turn (entity #:message k))))
(define (turn-sync!* turn peer-to-sync-with peer-k)
(log-syndicate/actor-debug " ENQ sync ~v" peer-to-sync-with)
(turn-enqueue! turn
(entity-ref-relay peer-to-sync-with)
(lambda ()
(log-syndicate/actor-debug " DEQ sync ~v" peer-to-sync-with)
(deliver (or (entity-sync (entity-ref-target peer-to-sync-with))
(lambda (peer-k) (turn-message! (current-turn) peer-k #t)))
peer-k))))
(define (turn-message! turn peer assertion)
(match (run-rewrites (entity-ref-attenuation peer) assertion)
[(? void?)
(log-syndicate/actor-debug " blocked message ~v to ~v" assertion peer)
(void)]
[rewritten
(log-syndicate/actor-debug " ENQ message ~v to ~v" assertion peer)
(turn-enqueue! turn
(entity-ref-relay peer)
(lambda ()
(log-syndicate/actor-debug " DEQ message ~v to ~v" assertion peer)
(deliver (entity-message (entity-ref-target peer)) rewritten)))]))
(define (turn-freshen turn action)
(when (turn-queues turn) (error 'turn-freshen "Attempt to freshen a non-stale turn"))
(turn! (turn-active-facet turn) action))
;;---------------------------------------------------------------------------
(define (stop-if-inert-after action)
(lambda ()
(define f (turn-active-facet (current-turn)))
(action)
(check-for-inertness (current-turn) f)))
(define (check-for-inertness turn f)
(log-syndicate/actor-debug " ENQ checking ~a" f)
(turn-enqueue! turn
f
(lambda ()
(log-syndicate/actor-debug " DEQ checking ~a" f)
(when (or (and (facet-parent f) (not (facet-live? (facet-parent f))))
(facet-inert? f))
(turn-stop! f)))))
(define (deliver maybe-proc . args)
(when maybe-proc
(apply maybe-proc args)))