From caec6fc820895957595cebbd63ec9f0addc4c941 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Fri, 28 May 2021 10:33:02 +0200 Subject: [PATCH] Daemon actors and go.rkt shutdown --- syndicate/actor.rkt | 28 ++++++++++++++++++++-------- syndicate/engine.rkt | 15 ++++++++++++--- syndicate/go.rkt | 42 +++++++++++++++++++++++++++++++++++------- 3 files changed, 67 insertions(+), 18 deletions(-) diff --git a/syndicate/actor.rkt b/syndicate/actor.rkt index 10fdb12..f1636b4 100644 --- a/syndicate/actor.rkt +++ b/syndicate/actor.rkt @@ -11,8 +11,10 @@ actor? actor-id + actor-daemon? actor-exit-reason actor-add-exit-hook! + actor-daemon! facet? facet-id @@ -66,6 +68,7 @@ (struct actor (id engine + [daemon? #:mutable] [root #:mutable] [exit-reason #:mutable] ;; #f -> running, #t -> terminated OK, exn -> error [exit-hooks #:mutable]) @@ -108,17 +111,19 @@ (define (actor-system boot-proc) (define e (make-engine 1)) - (make-actor e boot-proc (make-hash)) + (make-actor e #t boot-proc (make-hash)) (adjust-inhabitant-count! e -1) (thread-wait (engine-thread e))) -(define (make-actor engine boot-proc initial-assertions) +(define (make-actor engine daemon? boot-proc initial-assertions) (define ac (actor (generate-actor-id) engine + daemon? 'uninitialized #f '())) - (adjust-inhabitant-count! engine +1) + (when (not daemon?) + (adjust-inhabitant-count! engine +1)) (set-actor-root! ac (make-facet ac #f initial-assertions)) (turn! (make-facet ac (actor-root ac)) (stop-if-inert-after boot-proc)) @@ -128,6 +133,11 @@ (define (actor-add-exit-hook! ac hook) (set-actor-exit-hooks! ac (cons hook (actor-exit-hooks ac)))) +(define (actor-daemon! ac daemon?) + (when (not (eq? daemon? (actor-daemon? ac))) + (set-actor-daemon?! ac daemon?) + (adjust-inhabitant-count! (actor-engine ac) (if daemon? -1 +1)))) + (define (actor-terminate! turn ac reason) (when (not (actor-exit-reason ac)) (set-actor-exit-reason! ac reason) @@ -139,9 +149,11 @@ (for [(h (in-list (reverse (actor-exit-hooks ac))))] (h turn)) (queue-task! (actor-engine ac) (lambda () (turn! (actor-root ac) - (lambda (turn) (facet-terminate! turn (actor-root ac) #f)) + (lambda (turn) + (facet-terminate! turn (actor-root ac) (eq? reason #t))) #t))) - (adjust-inhabitant-count! (actor-engine ac) -1))) + (when (not (actor-daemon? ac)) + (adjust-inhabitant-count! (actor-engine ac) -1)))) ;;--------------------------------------------------------------------------- @@ -176,7 +188,7 @@ (define (facet-terminate! turn f orderly?) (when (facet-live? f) - (log-syndicate/actor-info "~a stopping (~a)" f (if orderly? "orderly" "disorderly")) + (log-syndicate/actor-debug "~a stopping (~a)" f (if orderly? "orderly" "disorderly")) (set-facet-live?! f #f) (define parent (facet-parent f)) @@ -244,7 +256,7 @@ (facet-terminate! turn f #t) (continuation turn)))) -(define (turn-spawn! turn boot-proc [initial-assertions (make-hash)]) +(define (turn-spawn! turn boot-proc [initial-assertions (make-hash)] #:daemon? [daemon? #f]) (define f (turn-active-facet turn)) (define o (facet-outbound f)) (turn-enqueue! turn @@ -255,7 +267,7 @@ (hash-set! new-outbound handle (hash-ref o handle)) (hash-remove! o handle)) (define engine (actor-engine (facet-actor f))) - (queue-task! engine (lambda () (make-actor engine boot-proc new-outbound)))))) + (queue-task! engine (lambda () (make-actor engine daemon? boot-proc new-outbound)))))) (define (turn-stop-actor! turn) (define ac (facet-actor (turn-active-facet turn))) diff --git a/syndicate/engine.rkt b/syndicate/engine.rkt index 87de36f..515bb38 100644 --- a/syndicate/engine.rkt +++ b/syndicate/engine.rkt @@ -7,6 +7,7 @@ adjust-inhabitant-count! queue-task!) +(require racket/match) (require (only-in racket/exn exn->string)) (require "support/counter.rkt") @@ -28,9 +29,17 @@ (let loop () (log-syndicate/task-debug "~a task count: ~a" e (engine-inhabitant-count e)) - (when (positive? (engine-inhabitant-count e)) - ((thread-receive)) - (loop))) + (if (positive? (engine-inhabitant-count e)) + ;; We have some non-daemon users so just block + (begin ((thread-receive)) + (loop)) + ;; No non-daemon users, so keep running until there's no more work + (match (thread-try-receive) + [#f ;; No work, no non-daemons, we're done. + (void)] + [thunk + (thunk) + (loop)]))) (log-syndicate/task-info "~a stopping" e)))) initial-inhabitant-count)) (thread-send (engine-thread e) 'boot) diff --git a/syndicate/go.rkt b/syndicate/go.rkt index 2705b55..e2ade15 100644 --- a/syndicate/go.rkt +++ b/syndicate/go.rkt @@ -8,19 +8,32 @@ (require "schemas/gen/box-protocol.rkt") (require "schemas/gen/dataspace.rkt") -(define ((box ds) turn) +(define ((box ds LIMIT REPORT_EVERY) turn) (define value-handle #f) (define (set-value turn value) (set! value-handle (turn-replace! turn ds value-handle (BoxState->preserves (BoxState value))))) (set-value turn 0) + (define start-time (current-inexact-milliseconds)) + (define prev-value 0) (turn-assert! turn ds (Observe->preserves (Observe 'SetBox (turn-ref turn (entity #:message - (lambda (turn new-value) - (log-info "Box got ~a" new-value) - (set-value turn (SetBox-value new-value))))))))) + (lambda (turn new-value-rec) + (define new-value (SetBox-value new-value-rec)) + (when (zero? (remainder new-value REPORT_EVERY)) + (define end-time (current-inexact-milliseconds)) + (define delta (/ (- end-time start-time) 1000.0)) + (define count (- new-value prev-value)) + (set! prev-value new-value) + (set! start-time end-time) + (log-info "Box got ~a (~a Hz)" + new-value + (/ count delta))) + (when (= new-value LIMIT) + (turn-stop-actor! turn)) + (set-value turn new-value)))))))) (define ((client ds) turn) (turn-assert! turn ds @@ -29,12 +42,26 @@ (turn-ref turn (entity #:assert (lambda (turn current-value _handle) - (log-info "Client got ~a" current-value) + ;; (log-info "Client got ~a" current-value) (turn-message! turn ds (SetBox->preserves (SetBox (+ (BoxState-value current-value) - 1))))))))))) + 1)))))))))) + (let ((count 0)) + (turn-assert! turn ds + (Observe->preserves + (Observe 'BoxState + (turn-ref turn + (entity #:assert + (lambda (turn current-value _handle) + (set! count (+ count 1))) + #:retract + (lambda (turn _handle) + (set! count (- count 1)) + (when (zero? count) + (log-info "Client detected box termination") + (turn-stop-actor! turn)))))))))) (define (dataspace) (define handles (make-hash)) @@ -77,7 +104,8 @@ (actor-system (lambda (turn) + (actor-daemon! (facet-actor (turn-active-facet turn)) #t) (define disarm (facet-prevent-inert-check! (turn-active-facet turn))) (define ds (turn-ref turn (dataspace))) - (turn-spawn! turn (box ds)) + (turn-spawn! turn (box ds 500000 100000)) (turn-spawn! turn (client ds))))