diff --git a/syndicate/actor.rkt b/syndicate/actor.rkt index 0279a93..80103b4 100644 --- a/syndicate/actor.rkt +++ b/syndicate/actor.rkt @@ -7,7 +7,10 @@ current-turn - actor-system + make-actor-system + make-actor-group + actor-system-wait + actor-system-shutdown! actor? actor-id @@ -55,6 +58,10 @@ turn-sync!* turn-message!) +(module+ internals + (provide make-actor + actor-terminate!)) + (require (only-in preserves preserve=?)) (require racket/match) (require (only-in racket/exn exn->string)) @@ -92,7 +99,7 @@ [exit-hooks #:mutable]) #:methods gen:custom-write [(define (write-proc a port mode) - (fprintf port "#" (actor-id a) (actor-name a)))]) + (fprintf port "#" (engine-id (actor-engine a)) (actor-id a) (actor-name a)))]) (struct facet (id actor @@ -158,12 +165,35 @@ ;;-------------------------------------------------------------------------- -(define (actor-system boot-proc #:name [name 'actor-system]) - (define e (make-engine 1)) - (make-actor name e #t boot-proc (make-hash)) - (adjust-inhabitant-count! e -1) +(define (make-actor-system boot-proc #:name [name 'actor-system]) + (define e (make-engine 1 name (lambda (restart) + (actor-system-shutdown! e) + (restart void)))) + (queue-task! e (lambda () + (make-actor name e #t boot-proc (make-hash)) + (adjust-inhabitant-count! e -1))) + (actor-system-wait e)) + +(define (make-actor-group outer-boot-proc inner-boot-proc #:name [name 'actor-group]) + (define f (turn-facet! (lambda () + (facet-prevent-inert-check! (turn-active-facet (current-turn))) + (outer-boot-proc)))) + (define e (make-engine 1 name (lambda (restart) + (turn! f (lambda () (turn-stop!))) + (actor-system-shutdown! e) + (restart void)))) + (queue-task! e (lambda () + (make-actor name e #t inner-boot-proc (make-hash)) + (adjust-inhabitant-count! e -1))) + e) + +(define (actor-system-wait e) (thread-wait (engine-thread e))) +(define (actor-system-shutdown! e) + (for [(ac (in-list (engine-shutdown! e)))] + (actor-terminate! ac #t))) + (define (make-actor name engine daemon? boot-proc initial-assertions) (define ac (actor (generate-actor-id) name @@ -179,6 +209,9 @@ (log-syndicate/actor-info "~a booting" ac) (define user-root-facet (make-facet ac (actor-root ac) initial-assertions)) (turn! user-root-facet (stop-if-inert-after boot-proc)) + (if (engine-running? engine) + (engine-register! engine ac) + (actor-terminate! ac #t)) user-root-facet) (define (actor-add-exit-hook! ac hook) @@ -207,6 +240,7 @@ (turn! (actor-root ac) (lambda () (facet-terminate! (actor-root ac) (eq? reason #t))) #t) + (engine-deregister! (actor-engine ac) ac) (when (not (actor-daemon? ac)) (adjust-inhabitant-count! (actor-engine ac) -1)))))) @@ -334,15 +368,15 @@ (define (turn-ref turn entity [attenuation '()]) (entity-ref (turn-active-facet turn) entity attenuation)) -(define (turn-facet! turn boot-proc) - (let ((new-facet (make-facet (facet-actor (turn-active-facet turn)) - (turn-active-facet turn)))) +(define (turn-facet! boot-proc) + (define turn (current-turn)) + (let ((new-facet (make-facet (facet-actor (turn-active-facet turn)) (turn-active-facet turn)))) (with-active-facet new-facet (stop-if-inert-after boot-proc)) new-facet)) -(define (turn-stop! turn [f (turn-active-facet turn)] [continuation #f]) +(define (turn-stop! [f (turn-active-facet (current-turn))] [continuation #f]) (log-syndicate/actor-debug " ENQ stop-facet ~v" f) - (turn-enqueue! turn + (turn-enqueue! (current-turn) f (lambda () (log-syndicate/actor-debug " DEQ stop-facet ~v" f) @@ -505,7 +539,7 @@ (lambda () (when (or (and (facet-parent f) (not (facet-live? (facet-parent f)))) (facet-inert? f)) - (turn-stop! (current-turn))))))) + (turn-stop!)))))) (define (deliver maybe-proc . args) (when maybe-proc diff --git a/syndicate/dataspace.rkt b/syndicate/dataspace.rkt index ecf56e5..d1db868 100644 --- a/syndicate/dataspace.rkt +++ b/syndicate/dataspace.rkt @@ -52,7 +52,7 @@ #:message (lambda (message) (log-syndicate/dataspace-debug "~v ! ~v" ds-e message) (send-assertion! this-turn skeleton message)))) - ds-e) + (ref ds-e)) (define-syntax actor-system/dataspace (syntax-rules () @@ -60,5 +60,5 @@ (actor-system #:name 'dataspace (facet-prevent-inert-check! this-facet) - (define ds (ref (dataspace))) + (define ds (dataspace)) expr ...)])) diff --git a/syndicate/distributed/tcp-server.rkt b/syndicate/distributed/tcp-server.rkt index 2df4e64..fa0592f 100644 --- a/syndicate/distributed/tcp-server.rkt +++ b/syndicate/distributed/tcp-server.rkt @@ -32,7 +32,7 @@ #:packet-writer (lambda (bs) (send-data conn bs)) #:setup-inputs (lambda (tr) - (accept-connection conn #:on-data (lambda (bs) (accept-bytes tr bs)))) + (accept-connection conn #:on-data (lambda (d _m) (accept-bytes tr d)))) #:initial-ref (object #:name (list conn 'gatekeeper) [(Resolve unvalidated-sturdyref observer) diff --git a/syndicate/drivers/tcp.rkt b/syndicate/drivers/tcp.rkt index 6bb9a79..57cd962 100644 --- a/syndicate/drivers/tcp.rkt +++ b/syndicate/drivers/tcp.rkt @@ -66,8 +66,9 @@ (tcp-connect host port))) (lambda () (define name (call-with-values (lambda () (tcp-addresses i #t)) list)) - (on-stop (close-input-port i) - (close-output-port o)) + (actor-add-exit-hook! this-actor (lambda () + (close-input-port i) + (close-output-port o))) (define issue-credit (start-inbound-relay connection-custodian name (lambda () local-peer) i)) (define relay (outbound-relay name o)) (at local-peer @@ -81,9 +82,9 @@ (define name (call-with-values (lambda () (tcp-addresses i #t)) list)) (spawn #:name name - (on-stop (close-input-port i) - (close-output-port o)) - + (actor-add-exit-hook! this-actor (lambda () + (close-input-port i) + (close-output-port o))) (define issue-credit #f) (define active-controller #f) (define relay (outbound-relay name o)) @@ -219,7 +220,13 @@ [#:asserted (Socket-credit amount mode) (on-credit amount mode)] [#:asserted (Socket-data data mode) (on-data data mode)] [#:asserted (Socket-eof) (on-eof)])))) - (when initial-credit (send-credit conn initial-credit initial-mode))) + (when initial-credit (send-credit conn initial-credit initial-mode)) + (lambda (#:on-data [new-on-data #f] + #:on-eof [new-on-eof #f] + #:on-credit [new-on-credit #f]) + (when new-on-data (set! on-data new-on-data)) + (when new-on-eof (set! on-eof new-on-eof)) + (when new-on-credit (set! on-credit new-on-credit)))) (define (establish-connection ds spec #:initial-credit [initial-credit (CreditAmount-unbounded)] diff --git a/syndicate/engine.rkt b/syndicate/engine.rkt index f7b526b..1b96570 100644 --- a/syndicate/engine.rkt +++ b/syndicate/engine.rkt @@ -3,11 +3,18 @@ ;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones (provide engine? + engine-id + engine-name + engine-running? + engine-custodian engine-thread engine-inhabitant-count make-engine adjust-inhabitant-count! queue-task! + engine-register! + engine-deregister! + engine-shutdown! *dead-engine*) (require racket/match) @@ -16,34 +23,41 @@ (define-logger syndicate/engine) -(struct engine (id thread [inhabitant-count #:mutable]) +(struct engine (id name custodian thread [running? #:mutable] actors [inhabitant-count #:mutable]) #:methods gen:custom-write [(define (write-proc e port mode) - (fprintf port "#" (engine-id e)))]) + (fprintf port "#" (engine-id e) (engine-name e)))]) (define generate-engine-id (make-counter)) -(define (make-engine initial-inhabitant-count) +(define (make-engine initial-inhabitant-count name termination-handler) + (define custodian (make-custodian)) (define e (engine (generate-engine-id) - (thread (lambda () - (thread-receive) ;; delay boot until we're ready - (log-syndicate/engine-debug "~a starting" e) - (with-handlers ([exn? (handle-unexpected-task-runner-termination e)]) - (let loop () - (log-syndicate/engine-debug - "~a task count: ~a" e (engine-inhabitant-count e)) - (if (positive? (engine-inhabitant-count e)) - ;; We have some non-daemon users so just block - (begin ((thread-receive)) - (loop)) - ;; No non-daemon users, so keep running until there's no more work - (match (thread-try-receive) - [#f ;; No work, no non-daemons, we're done. - (void)] - [thunk - (thunk) - (loop)]))) - (log-syndicate/engine-debug "~a stopping" e)))) + name + custodian + (parameterize ((current-custodian custodian)) + (thread (lambda () + (thread-receive) ;; delay boot until we're ready + (log-syndicate/engine-debug "~a starting" e) + (with-handlers ([exn? (handle-unexpected-task-runner-termination e)]) + (let loop ((termination-handler termination-handler)) + (log-syndicate/engine-debug + "~a task count: ~a" e (engine-inhabitant-count e)) + (if (positive? (engine-inhabitant-count e)) + ;; We have some non-daemon users so just block + (begin ((thread-receive)) + (loop termination-handler)) + ;; No non-daemon users, so keep running until there's no more work + (match (thread-try-receive) + [#f ;; No work, no non-daemons, we're done. + (termination-handler loop)] + [thunk + (thunk) + (loop termination-handler)]))) + (log-syndicate/engine-debug "~a stopping" e) + (custodian-shutdown-all custodian))))) + #t + (make-hash) initial-inhabitant-count)) (thread-send (engine-thread e) 'boot) e) @@ -52,6 +66,23 @@ (queue-task! e (lambda () (set-engine-inhabitant-count! e (+ (engine-inhabitant-count e) delta))))) +(define (engine-register! e ac) + (when (not (eq? (current-thread) (engine-thread e))) + (error 'engine-register! "Called from wrong thread")) + (hash-set! (engine-actors e) ac #t)) + +(define (engine-deregister! e ac) + (when (not (eq? (current-thread) (engine-thread e))) + (error 'engine-deregister! "Called from wrong thread")) + (hash-remove! (engine-actors e) ac)) + +(define (engine-shutdown! e) + (log-syndicate/engine-debug "~a shutdown" e) + (set-engine-running?! e #f) + (define actors (hash-keys (engine-actors e))) + (hash-clear! (engine-actors e)) + actors) + (define ((handle-unexpected-task-runner-termination e) exn) (log-syndicate/engine-error "~a terminated unexpectedly!\n~a" e (exn->string exn)) (exit 1)) @@ -62,4 +93,4 @@ (lambda () (log-syndicate/engine-warning "Attempt to enqueue task for dead engine ~v" e)))) -(define *dead-engine* (make-engine 0)) +(define *dead-engine* (make-engine 0 'dead-engine void)) diff --git a/syndicate/main.rkt b/syndicate/main.rkt index 7e114fd..840caf6 100644 --- a/syndicate/main.rkt +++ b/syndicate/main.rkt @@ -14,7 +14,7 @@ :pattern) -(require (except-in "actor.rkt" actor-system)) +(require "actor.rkt") (require "entity-ref.rkt") (require "syntax.rkt") (require preserves) diff --git a/syndicate/syntax.rkt b/syndicate/syntax.rkt index 7a6d818..5e55c65 100644 --- a/syndicate/syntax.rkt +++ b/syndicate/syntax.rkt @@ -9,6 +9,7 @@ entity actor-system + actor-group object ref @@ -46,7 +47,6 @@ (require preserves-schema) (require "actor.rkt") -(require (prefix-in actor: "actor.rkt")) (require "entity-ref.rkt") (require "event-expander.rkt") @@ -78,7 +78,14 @@ (define-syntax (actor-system stx) (syntax-parse stx [(_ name: expr ...) - #'(actor:actor-system #:name name.N (lambda () expr ...))])) + #'(make-actor-system #:name name.N (lambda () expr ...))])) + +(define-syntax (actor-group stx) + (syntax-parse stx + [(_ name: [outer-facet-expr ...] group-boot-expr ...) + #'(make-actor-group #:name name.N + (lambda () outer-facet-expr ...) + (lambda () group-boot-expr ...))])) (define-syntax (object stx) (syntax-parse stx @@ -158,7 +165,7 @@ (entity-ref this-facet entity '())) (define-syntax-rule (react setup-expr ...) - (turn-facet! this-turn (lambda () setup-expr ...))) + (turn-facet! (lambda () setup-expr ...))) (define-syntax (let-event stx) (syntax-parse stx @@ -174,8 +181,8 @@ (define-syntax stop-facet (syntax-rules () - [(_ f) (turn-stop! this-turn f)] - [(_ f expr ...) (turn-stop! this-turn f (lambda () expr ...))])) + [(_ f) (turn-stop! f)] + [(_ f expr ...) (turn-stop! f (lambda () expr ...))])) (define-syntax-rule (stop-current-facet expr ...) (stop-facet this-facet expr ...)) @@ -277,20 +284,20 @@ (define-event-expander event:when (lambda (stx) - (syntax-case stx (message asserted retracted) - [(_ (message pat) expr ...) + (syntax-parse stx + [(_ ((~datum message) pat) expr ...) #`(assert (Observe (:pattern pat) (ref (entity #:message (lambda (bindings) (match-define (list #,@(analyse-pattern-bindings #'pat)) bindings) expr ...)))))] - [(_ (asserted pat) expr ...) + [(_ ((~datum asserted) pat) expr ...) #`(assert (Observe (:pattern pat) (ref (entity #:assert (lambda (bindings _handle) (match-define (list #,@(analyse-pattern-bindings #'pat)) bindings) expr ...)))))] - [(_ (retracted pat) expr ...) + [(_ ((~datum retracted) pat) expr ...) #`(assert (Observe (:pattern pat) (let ((assertion-map (make-hash))) (ref (entity #:assert @@ -343,6 +350,7 @@ ;;--------------------------------------------------------------------------- ;;; Local Variables: +;;; eval: (put 'actor-group 'racket-indent-function 1) ;;; eval: (put 'actor-system/dataspace 'racket-indent-function 1) ;;; eval: (put 'at 'racket-indent-function 1) ;;; eval: (put 'object 'racket-indent-function 0)