actor-group; bug fixes

This commit is contained in:
Tony Garnock-Jones 2021-06-13 07:55:50 +02:00
parent 4dc40da056
commit 4a0e6e0519
7 changed files with 134 additions and 54 deletions

View File

@ -7,7 +7,10 @@
current-turn
actor-system
make-actor-system
make-actor-group
actor-system-wait
actor-system-shutdown!
actor?
actor-id
@ -55,6 +58,10 @@
turn-sync!*
turn-message!)
(module+ internals
(provide make-actor
actor-terminate!))
(require (only-in preserves preserve=?))
(require racket/match)
(require (only-in racket/exn exn->string))
@ -92,7 +99,7 @@
[exit-hooks #:mutable])
#:methods gen:custom-write
[(define (write-proc a port mode)
(fprintf port "#<actor:~a:~a>" (actor-id a) (actor-name a)))])
(fprintf port "#<actor:~a/~a:~a>" (engine-id (actor-engine a)) (actor-id a) (actor-name a)))])
(struct facet (id
actor
@ -158,12 +165,35 @@
;;--------------------------------------------------------------------------
(define (actor-system boot-proc #:name [name 'actor-system])
(define e (make-engine 1))
(make-actor name e #t boot-proc (make-hash))
(adjust-inhabitant-count! e -1)
(define (make-actor-system boot-proc #:name [name 'actor-system])
(define e (make-engine 1 name (lambda (restart)
(actor-system-shutdown! e)
(restart void))))
(queue-task! e (lambda ()
(make-actor name e #t boot-proc (make-hash))
(adjust-inhabitant-count! e -1)))
(actor-system-wait e))
(define (make-actor-group outer-boot-proc inner-boot-proc #:name [name 'actor-group])
(define f (turn-facet! (lambda ()
(facet-prevent-inert-check! (turn-active-facet (current-turn)))
(outer-boot-proc))))
(define e (make-engine 1 name (lambda (restart)
(turn! f (lambda () (turn-stop!)))
(actor-system-shutdown! e)
(restart void))))
(queue-task! e (lambda ()
(make-actor name e #t inner-boot-proc (make-hash))
(adjust-inhabitant-count! e -1)))
e)
(define (actor-system-wait e)
(thread-wait (engine-thread e)))
(define (actor-system-shutdown! e)
(for [(ac (in-list (engine-shutdown! e)))]
(actor-terminate! ac #t)))
(define (make-actor name engine daemon? boot-proc initial-assertions)
(define ac (actor (generate-actor-id)
name
@ -179,6 +209,9 @@
(log-syndicate/actor-info "~a booting" ac)
(define user-root-facet (make-facet ac (actor-root ac) initial-assertions))
(turn! user-root-facet (stop-if-inert-after boot-proc))
(if (engine-running? engine)
(engine-register! engine ac)
(actor-terminate! ac #t))
user-root-facet)
(define (actor-add-exit-hook! ac hook)
@ -207,6 +240,7 @@
(turn! (actor-root ac)
(lambda () (facet-terminate! (actor-root ac) (eq? reason #t)))
#t)
(engine-deregister! (actor-engine ac) ac)
(when (not (actor-daemon? ac))
(adjust-inhabitant-count! (actor-engine ac) -1))))))
@ -334,15 +368,15 @@
(define (turn-ref turn entity [attenuation '()])
(entity-ref (turn-active-facet turn) entity attenuation))
(define (turn-facet! turn boot-proc)
(let ((new-facet (make-facet (facet-actor (turn-active-facet turn))
(turn-active-facet turn))))
(define (turn-facet! boot-proc)
(define turn (current-turn))
(let ((new-facet (make-facet (facet-actor (turn-active-facet turn)) (turn-active-facet turn))))
(with-active-facet new-facet (stop-if-inert-after boot-proc))
new-facet))
(define (turn-stop! turn [f (turn-active-facet turn)] [continuation #f])
(define (turn-stop! [f (turn-active-facet (current-turn))] [continuation #f])
(log-syndicate/actor-debug " ENQ stop-facet ~v" f)
(turn-enqueue! turn
(turn-enqueue! (current-turn)
f
(lambda ()
(log-syndicate/actor-debug " DEQ stop-facet ~v" f)
@ -505,7 +539,7 @@
(lambda ()
(when (or (and (facet-parent f) (not (facet-live? (facet-parent f))))
(facet-inert? f))
(turn-stop! (current-turn)))))))
(turn-stop!))))))
(define (deliver maybe-proc . args)
(when maybe-proc

View File

@ -52,7 +52,7 @@
#:message (lambda (message)
(log-syndicate/dataspace-debug "~v ! ~v" ds-e message)
(send-assertion! this-turn skeleton message))))
ds-e)
(ref ds-e))
(define-syntax actor-system/dataspace
(syntax-rules ()
@ -60,5 +60,5 @@
(actor-system
#:name 'dataspace
(facet-prevent-inert-check! this-facet)
(define ds (ref (dataspace)))
(define ds (dataspace))
expr ...)]))

View File

@ -32,7 +32,7 @@
#:packet-writer (lambda (bs) (send-data conn bs))
#:setup-inputs
(lambda (tr)
(accept-connection conn #:on-data (lambda (bs) (accept-bytes tr bs))))
(accept-connection conn #:on-data (lambda (d _m) (accept-bytes tr d))))
#:initial-ref
(object #:name (list conn 'gatekeeper)
[(Resolve unvalidated-sturdyref observer)

View File

@ -66,8 +66,9 @@
(tcp-connect host port)))
(lambda ()
(define name (call-with-values (lambda () (tcp-addresses i #t)) list))
(on-stop (close-input-port i)
(close-output-port o))
(actor-add-exit-hook! this-actor (lambda ()
(close-input-port i)
(close-output-port o)))
(define issue-credit (start-inbound-relay connection-custodian name (lambda () local-peer) i))
(define relay (outbound-relay name o))
(at local-peer
@ -81,9 +82,9 @@
(define name (call-with-values (lambda () (tcp-addresses i #t)) list))
(spawn
#:name name
(on-stop (close-input-port i)
(close-output-port o))
(actor-add-exit-hook! this-actor (lambda ()
(close-input-port i)
(close-output-port o)))
(define issue-credit #f)
(define active-controller #f)
(define relay (outbound-relay name o))
@ -219,7 +220,13 @@
[#:asserted (Socket-credit amount mode) (on-credit amount mode)]
[#:asserted (Socket-data data mode) (on-data data mode)]
[#:asserted (Socket-eof) (on-eof)]))))
(when initial-credit (send-credit conn initial-credit initial-mode)))
(when initial-credit (send-credit conn initial-credit initial-mode))
(lambda (#:on-data [new-on-data #f]
#:on-eof [new-on-eof #f]
#:on-credit [new-on-credit #f])
(when new-on-data (set! on-data new-on-data))
(when new-on-eof (set! on-eof new-on-eof))
(when new-on-credit (set! on-credit new-on-credit))))
(define (establish-connection ds spec
#:initial-credit [initial-credit (CreditAmount-unbounded)]

View File

@ -3,11 +3,18 @@
;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
(provide engine?
engine-id
engine-name
engine-running?
engine-custodian
engine-thread
engine-inhabitant-count
make-engine
adjust-inhabitant-count!
queue-task!
engine-register!
engine-deregister!
engine-shutdown!
*dead-engine*)
(require racket/match)
@ -16,34 +23,41 @@
(define-logger syndicate/engine)
(struct engine (id thread [inhabitant-count #:mutable])
(struct engine (id name custodian thread [running? #:mutable] actors [inhabitant-count #:mutable])
#:methods gen:custom-write
[(define (write-proc e port mode)
(fprintf port "#<engine:~a>" (engine-id e)))])
(fprintf port "#<engine:~a:~a>" (engine-id e) (engine-name e)))])
(define generate-engine-id (make-counter))
(define (make-engine initial-inhabitant-count)
(define (make-engine initial-inhabitant-count name termination-handler)
(define custodian (make-custodian))
(define e (engine (generate-engine-id)
(thread (lambda ()
(thread-receive) ;; delay boot until we're ready
(log-syndicate/engine-debug "~a starting" e)
(with-handlers ([exn? (handle-unexpected-task-runner-termination e)])
(let loop ()
(log-syndicate/engine-debug
"~a task count: ~a" e (engine-inhabitant-count e))
(if (positive? (engine-inhabitant-count e))
;; We have some non-daemon users so just block
(begin ((thread-receive))
(loop))
;; No non-daemon users, so keep running until there's no more work
(match (thread-try-receive)
[#f ;; No work, no non-daemons, we're done.
(void)]
[thunk
(thunk)
(loop)])))
(log-syndicate/engine-debug "~a stopping" e))))
name
custodian
(parameterize ((current-custodian custodian))
(thread (lambda ()
(thread-receive) ;; delay boot until we're ready
(log-syndicate/engine-debug "~a starting" e)
(with-handlers ([exn? (handle-unexpected-task-runner-termination e)])
(let loop ((termination-handler termination-handler))
(log-syndicate/engine-debug
"~a task count: ~a" e (engine-inhabitant-count e))
(if (positive? (engine-inhabitant-count e))
;; We have some non-daemon users so just block
(begin ((thread-receive))
(loop termination-handler))
;; No non-daemon users, so keep running until there's no more work
(match (thread-try-receive)
[#f ;; No work, no non-daemons, we're done.
(termination-handler loop)]
[thunk
(thunk)
(loop termination-handler)])))
(log-syndicate/engine-debug "~a stopping" e)
(custodian-shutdown-all custodian)))))
#t
(make-hash)
initial-inhabitant-count))
(thread-send (engine-thread e) 'boot)
e)
@ -52,6 +66,23 @@
(queue-task! e (lambda ()
(set-engine-inhabitant-count! e (+ (engine-inhabitant-count e) delta)))))
(define (engine-register! e ac)
(when (not (eq? (current-thread) (engine-thread e)))
(error 'engine-register! "Called from wrong thread"))
(hash-set! (engine-actors e) ac #t))
(define (engine-deregister! e ac)
(when (not (eq? (current-thread) (engine-thread e)))
(error 'engine-deregister! "Called from wrong thread"))
(hash-remove! (engine-actors e) ac))
(define (engine-shutdown! e)
(log-syndicate/engine-debug "~a shutdown" e)
(set-engine-running?! e #f)
(define actors (hash-keys (engine-actors e)))
(hash-clear! (engine-actors e))
actors)
(define ((handle-unexpected-task-runner-termination e) exn)
(log-syndicate/engine-error "~a terminated unexpectedly!\n~a" e (exn->string exn))
(exit 1))
@ -62,4 +93,4 @@
(lambda ()
(log-syndicate/engine-warning "Attempt to enqueue task for dead engine ~v" e))))
(define *dead-engine* (make-engine 0))
(define *dead-engine* (make-engine 0 'dead-engine void))

View File

@ -14,7 +14,7 @@
:pattern)
(require (except-in "actor.rkt" actor-system))
(require "actor.rkt")
(require "entity-ref.rkt")
(require "syntax.rkt")
(require preserves)

View File

@ -9,6 +9,7 @@
entity
actor-system
actor-group
object
ref
@ -46,7 +47,6 @@
(require preserves-schema)
(require "actor.rkt")
(require (prefix-in actor: "actor.rkt"))
(require "entity-ref.rkt")
(require "event-expander.rkt")
@ -78,7 +78,14 @@
(define-syntax (actor-system stx)
(syntax-parse stx
[(_ name:<name> expr ...)
#'(actor:actor-system #:name name.N (lambda () expr ...))]))
#'(make-actor-system #:name name.N (lambda () expr ...))]))
(define-syntax (actor-group stx)
(syntax-parse stx
[(_ name:<name> [outer-facet-expr ...] group-boot-expr ...)
#'(make-actor-group #:name name.N
(lambda () outer-facet-expr ...)
(lambda () group-boot-expr ...))]))
(define-syntax (object stx)
(syntax-parse stx
@ -158,7 +165,7 @@
(entity-ref this-facet entity '()))
(define-syntax-rule (react setup-expr ...)
(turn-facet! this-turn (lambda () setup-expr ...)))
(turn-facet! (lambda () setup-expr ...)))
(define-syntax (let-event stx)
(syntax-parse stx
@ -174,8 +181,8 @@
(define-syntax stop-facet
(syntax-rules ()
[(_ f) (turn-stop! this-turn f)]
[(_ f expr ...) (turn-stop! this-turn f (lambda () expr ...))]))
[(_ f) (turn-stop! f)]
[(_ f expr ...) (turn-stop! f (lambda () expr ...))]))
(define-syntax-rule (stop-current-facet expr ...)
(stop-facet this-facet expr ...))
@ -277,20 +284,20 @@
(define-event-expander event:when
(lambda (stx)
(syntax-case stx (message asserted retracted)
[(_ (message pat) expr ...)
(syntax-parse stx
[(_ ((~datum message) pat) expr ...)
#`(assert (Observe (:pattern pat)
(ref (entity #:message
(lambda (bindings)
(match-define (list #,@(analyse-pattern-bindings #'pat)) bindings)
expr ...)))))]
[(_ (asserted pat) expr ...)
[(_ ((~datum asserted) pat) expr ...)
#`(assert (Observe (:pattern pat)
(ref (entity #:assert
(lambda (bindings _handle)
(match-define (list #,@(analyse-pattern-bindings #'pat)) bindings)
expr ...)))))]
[(_ (retracted pat) expr ...)
[(_ ((~datum retracted) pat) expr ...)
#`(assert (Observe (:pattern pat)
(let ((assertion-map (make-hash)))
(ref (entity #:assert
@ -343,6 +350,7 @@
;;---------------------------------------------------------------------------
;;; Local Variables:
;;; eval: (put 'actor-group 'racket-indent-function 1)
;;; eval: (put 'actor-system/dataspace 'racket-indent-function 1)
;;; eval: (put 'at 'racket-indent-function 1)
;;; eval: (put 'object 'racket-indent-function 0)