#lang racket/base ;;; SPDX-License-Identifier: LGPL-3.0-or-later ;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones (provide (except-out (struct-out entity) entity) (rename-out [make-entity entity]) (struct-out entity-ref) attenuate-entity-ref parse-Ref Ref->preserves actor-system actor? actor-id actor-name actor-engine actor-daemon? actor-exit-reason actor-add-exit-hook! actor-remove-exit-hook! actor-daemon! facet? facet-id facet-actor facet-parent facet-live? facet-inert? facet-on-stop! facet-prevent-inert-check! turn? turn-id turn-active-facet turn! turn-freshen turn-ref turn-facet! turn-stop! turn-spawn! turn-stop-actor! turn-crash! turn-field! turn-dataflow! turn-assert/dataflow! turn-assert! turn-assert!* turn-retract! turn-replace! turn-retract!* turn-sync! turn-sync!* turn-message!) (require (only-in preserves preserve=?)) (require racket/match) (require (only-in racket/exn exn->string)) (require struct-defaults) (require "rewrite.rkt") (require "engine.rkt") (require "dataflow.rkt") (require "field.rkt") (require "support/counter.rkt") (struct entity (id name assert retract message sync) #:methods gen:custom-write [(define (write-proc e port mode) (fprintf port "#" (entity-id e) (entity-name e)))]) (define-struct-defaults make-entity entity (#:_id [entity-id (generate-entity-id)] #:name [entity-name '?] #:assert [entity-assert #f] #:retract [entity-retract #f] #:message [entity-message #f] #:sync [entity-sync #f])) (struct entity-ref (relay target attenuation) #:transparent) (define (parse-Ref r) (if (entity-ref? r) r eof)) (define (Ref->preserves r) r) (struct outbound-assertion (handle peer [established? #:mutable])) (struct actor (id name engine [daemon? #:mutable] dataflow [root #:mutable] [exit-reason #:mutable] ;; #f -> running, #t -> terminated OK, exn -> error [exit-hooks #:mutable]) #:methods gen:custom-write [(define (write-proc a port mode) (fprintf port "#" (actor-id a) (actor-name a)))]) (struct facet (id actor parent children outbound [shutdown-actions #:mutable] [live? #:mutable] [inert-check-preventers #:mutable]) #:methods gen:custom-write [(define (write-proc f port mode) (local-require (only-in racket/string string-join)) (fprintf port "#" (actor-id (facet-actor f)) (actor-name (facet-actor f)) (string-join (let loop ((f f)) (if (facet-parent f) (cons (number->string (facet-id f)) (loop (facet-parent f))) '())) "/")))]) (struct turn (id active-facet [queues #:mutable])) ;;--------------------------------------------------------------------------- (define generate-entity-id (make-counter)) (define generate-actor-id (make-counter)) (define generate-turn-id (make-counter)) (define generate-handle (make-counter)) (define-logger syndicate/actor) ;;-------------------------------------------------------------------------- (define (actor-system boot-proc #:name [name 'actor-system]) (define e (make-engine 1)) (make-actor name e #t boot-proc (make-hash)) (adjust-inhabitant-count! e -1) (thread-wait (engine-thread e))) (define (make-actor name engine daemon? boot-proc initial-assertions) (define ac (actor (generate-actor-id) name engine daemon? (make-dataflow-graph) 'uninitialized #f '())) (when (not daemon?) (adjust-inhabitant-count! engine +1)) (set-actor-root! ac (make-facet ac #f initial-assertions)) (log-syndicate/actor-info "~a booting" ac) (turn! (make-facet ac (actor-root ac)) (stop-if-inert-after boot-proc)) ac) (define (actor-add-exit-hook! ac hook) (set-actor-exit-hooks! ac (cons hook (actor-exit-hooks ac)))) (define (actor-remove-exit-hook! ac hook) (set-actor-exit-hooks! ac (filter (lambda (h) (not (eq? h hook))) (actor-exit-hooks ac)))) (define (actor-daemon! ac daemon?) (when (not (eq? daemon? (actor-daemon? ac))) (set-actor-daemon?! ac daemon?) (adjust-inhabitant-count! (actor-engine ac) (if daemon? -1 +1)))) (define (actor-terminate! turn ac reason) (when (not (actor-exit-reason ac)) (set-actor-exit-reason! ac reason) (if (eq? reason #t) (log-syndicate/actor-info "~a terminated OK" ac) (log-syndicate/actor-error "~a terminated with exception:\n~a" ac (exn->string reason))) (for [(h (in-list (reverse (actor-exit-hooks ac))))] (h turn)) (queue-task! (actor-engine ac) (lambda () (turn! (actor-root ac) (lambda (turn) (facet-terminate! turn (actor-root ac) (eq? reason #t))) #t))) (when (not (actor-daemon? ac)) (adjust-inhabitant-count! (actor-engine ac) -1)))) ;;--------------------------------------------------------------------------- (define (make-facet ac parent [initial-assertions (make-hash)]) (define f (facet (generate-actor-id) ac parent (make-hasheq) initial-assertions '() #t 0)) (when parent (hash-set! (facet-children parent) f #t)) f) (define (facet-on-stop! f action) (set-facet-shutdown-actions! f (cons action (facet-shutdown-actions f)))) (define (facet-inert? f) (and (hash-empty? (facet-children f)) (hash-empty? (facet-outbound f)) (zero? (facet-inert-check-preventers f)))) (define (facet-prevent-inert-check! f) (define armed #t) (set-facet-inert-check-preventers! f (+ (facet-inert-check-preventers f) 1)) (lambda () (when armed (set! armed #f) (set-facet-inert-check-preventers! f (- (facet-inert-check-preventers f) 1))))) (define (facet-terminate! turn f orderly?) (when (facet-live? f) (log-syndicate/actor-debug " ~a stopping (~a)" f (if orderly? "orderly" "disorderly")) (set-facet-live?! f #f) (define parent (facet-parent f)) (when parent (hash-remove! (facet-children parent) f)) (with-active-facet turn f (lambda (turn) (for [(c (in-hash-keys (facet-children f)))] (facet-terminate! turn c orderly?)) (when orderly? (for [(h (in-list (reverse (facet-shutdown-actions f))))] (h turn))) (for [(a (in-hash-values (facet-outbound f)))] (turn-retract!* turn a)) (when orderly? (queue-task! (actor-engine (facet-actor f)) (lambda () (if parent (when (facet-inert? parent) (turn! parent (lambda (turn) (facet-terminate! turn parent #t)))) (turn! (actor-root (facet-actor f)) (lambda (turn) (actor-terminate! turn (facet-actor f) #t)) #t))))))))) ;;--------------------------------------------------------------------------- (define (turn! f action [zombie-turn? #f]) (define ac (facet-actor f)) (log-syndicate/actor-debug "start turn ~v~a~a~a" f (if zombie-turn? ", zombie" "") (let ((r (actor-exit-reason ac))) (if r (format ", exit-reason ~v" r) "")) (if (facet-live? f) "" ", dead facet")) (when (or zombie-turn? (and (not (actor-exit-reason ac)) (facet-live? f))) (let ((turn (turn (generate-turn-id) f (make-hasheq)))) (with-handlers ([exn? (lambda (e) (turn! (actor-root ac) (lambda (turn) (actor-terminate! turn ac e))))]) (action turn) (dataflow-repair-damage! (actor-dataflow ac) (lambda (action) (action turn))) (for [((ff qq) (in-hash (turn-queues turn)))] (queue-task! (actor-engine (facet-actor ff)) (lambda () (turn! ff (lambda (turn) (for [(a (in-list (reverse qq)))] (a turn))))))) (set-turn-queues! turn #f))) (log-syndicate/actor-debug "end turn ~v\n" f))) (define (with-active-facet outer-turn f action) (let ((inner-turn (turn (generate-turn-id) f (turn-queues outer-turn)))) (action inner-turn) (set-turn-queues! inner-turn #f))) (define (turn-enqueue! turn f action) (define qs (turn-queues turn)) (when (not qs) (error 'turn-enqueue! "Attempt to reuse a committed turn")) (hash-update! qs f (lambda (actions) (cons action actions)) '())) (define (turn-ref turn entity [attenuation '()]) (entity-ref (turn-active-facet turn) entity attenuation)) (define (turn-facet! turn boot-proc) (let ((new-facet (make-facet (facet-actor (turn-active-facet turn)) (turn-active-facet turn)))) (with-active-facet turn new-facet (stop-if-inert-after boot-proc)) new-facet)) (define (turn-stop! turn [f (turn-active-facet turn)] [continuation #f]) (log-syndicate/actor-debug " ENQ stop-facet ~v" f) (turn-enqueue! turn f (lambda (turn) (log-syndicate/actor-debug " DEQ stop-facet ~v" f) (facet-terminate! turn f #t) (when continuation (log-syndicate/actor-debug " stop-facet ~v continuation" f) (with-active-facet turn (facet-parent f) continuation))))) (define (turn-spawn! turn boot-proc [initial-assertions (make-hash)] #:name [name '?] #:daemon? [daemon? #f]) (define f (turn-active-facet turn)) (define o (facet-outbound f)) (log-syndicate/actor-debug " ENQ spawn ~a" name) (turn-enqueue! turn f (lambda (turn) (log-syndicate/actor-debug " DEQ spawn ~a" name) (define new-outbound (make-hash)) (for [(handle (in-hash-keys initial-assertions))] (hash-set! new-outbound handle (hash-ref o handle)) (hash-remove! o handle)) (define engine (actor-engine (facet-actor f))) (queue-task! engine (lambda () (make-actor name engine daemon? boot-proc new-outbound)))))) (define (turn-stop-actor! turn) (define ac (facet-actor (turn-active-facet turn))) (log-syndicate/actor-debug " ENQ stop-actor ~v" ac) (turn-enqueue! turn (actor-root ac) (lambda (turn) (log-syndicate/actor-debug " DEQ stop-actor ~v" ac) (actor-terminate! turn ac #t)))) (define (turn-crash! turn exn) (define ac (facet-actor (turn-active-facet turn))) (log-syndicate/actor-debug " ENQ crash ~v" ac) (turn-enqueue! turn (actor-root ac) (lambda (turn) (log-syndicate/actor-debug " DEQ crash ~v" ac) (actor-terminate! turn ac exn)))) (define (turn-field! turn name initial-value) (log-syndicate/actor-debug " field ~v created: ~v" name initial-value) (field (actor-dataflow (facet-actor (turn-active-facet turn))) name initial-value)) (define (turn-dataflow! turn action) (define f (turn-active-facet turn)) (define (wrapped turn) (when (facet-live? f) (action turn))) (parameterize ((current-dataflow-subject-id wrapped)) (wrapped turn))) (define (turn-assert/dataflow! turn peer assertion-action) (define handle #f) (define assertion (void)) (turn-dataflow! turn (lambda (turn) (define new-assertion (assertion-action turn)) (when (not (preserve=? assertion new-assertion)) (set! handle (turn-replace! turn peer handle new-assertion)))))) (define (turn-assert! turn peer assertion) (define handle (generate-handle)) (turn-assert!* turn peer assertion handle) handle) (define (turn-assert!* turn peer assertion handle) (match (run-rewrites (entity-ref-attenuation peer) assertion) [(? void?) (log-syndicate/actor-debug " blocked assert of ~v at ~v" assertion peer) (void)] [rewritten (define a (outbound-assertion handle peer #f)) (hash-set! (facet-outbound (turn-active-facet turn)) handle a) (log-syndicate/actor-debug " ENQ at ~v assert ~v handle ~v" peer rewritten handle) (turn-enqueue! turn (entity-ref-relay peer) (lambda (turn) (log-syndicate/actor-debug " DEQ at ~v assert ~v handle ~v" peer rewritten handle) (set-outbound-assertion-established?! a #t) (deliver (entity-assert (entity-ref-target peer)) turn rewritten handle)))])) (define (turn-retract! turn handle) (when handle (define a (hash-ref (facet-outbound (turn-active-facet turn)) handle #f)) (when a (turn-retract!* turn a)))) (define (turn-replace! turn peer old-handle assertion) (define new-handle (if (void? assertion) #f (turn-assert! turn peer assertion))) (turn-retract! turn old-handle) new-handle) (define (turn-retract!* turn a) (hash-remove! (facet-outbound (turn-active-facet turn)) (outbound-assertion-handle a)) (log-syndicate/actor-debug " ENQ at ~v retract handle ~v" (outbound-assertion-peer a) (outbound-assertion-handle a)) (turn-enqueue! turn (entity-ref-relay (outbound-assertion-peer a)) (lambda (turn) (log-syndicate/actor-debug " DEQ at ~v retract handle ~v (~a)" (outbound-assertion-peer a) (outbound-assertion-handle a) (if (outbound-assertion-established? a) "established" "not established")) (when (outbound-assertion-established? a) (set-outbound-assertion-established?! a #f) (deliver (entity-retract (entity-ref-target (outbound-assertion-peer a))) turn (outbound-assertion-handle a)))))) (define (turn-sync! turn peer k) (turn-sync!* turn peer (turn-ref turn (make-entity #:message k)))) (define (turn-sync!* turn peer-to-sync-with peer-k) (log-syndicate/actor-debug " ENQ sync ~v" peer-to-sync-with) (turn-enqueue! turn (entity-ref-relay peer-to-sync-with) (lambda (turn) (log-syndicate/actor-debug " DEQ sync ~v" peer-to-sync-with) (deliver (or (entity-sync (entity-ref-target peer-to-sync-with)) (lambda (turn peer-k) (turn-message! turn peer-k #t))) turn peer-k)))) (define (turn-message! turn peer assertion) (match (run-rewrites (entity-ref-attenuation peer) assertion) [(? void?) (log-syndicate/actor-debug " blocked message ~v to ~v" assertion peer) (void)] [rewritten (log-syndicate/actor-debug " ENQ message ~v to ~v" assertion peer) (turn-enqueue! turn (entity-ref-relay peer) (lambda (turn) (log-syndicate/actor-debug " DEQ message ~v to ~v" assertion peer) (deliver (entity-message (entity-ref-target peer)) turn rewritten)))])) (define (turn-freshen turn action) (when (turn-queues turn) (error 'turn-freshen "Attempt to freshen a non-stale turn")) (turn! (turn-active-facet turn) action)) ;;--------------------------------------------------------------------------- (define (stop-if-inert-after action) (lambda (turn) (action turn) (turn-enqueue! turn (turn-active-facet turn) (lambda (turn) (define f (turn-active-facet turn)) (when (or (and (facet-parent f) (not (facet-live? (facet-parent f)))) (facet-inert? f)) (turn-stop! turn)))))) (define (deliver maybe-proc . args) (when maybe-proc (apply maybe-proc args))) ;; TODO: prove to myself I've gotten the order correct. (Right-to-left, wasn't it?!?!) (define (attenuate-entity-ref r . caveats) (if (null? caveats) r (match r [(entity-ref relay target previous-attenuation) (entity-ref relay target (append previous-attenuation caveats))])))