Daemon actors and go.rkt shutdown

This commit is contained in:
Tony Garnock-Jones 2021-05-28 10:33:02 +02:00
parent 3528391f12
commit caec6fc820
3 changed files with 67 additions and 18 deletions

View File

@ -11,8 +11,10 @@
actor?
actor-id
actor-daemon?
actor-exit-reason
actor-add-exit-hook!
actor-daemon!
facet?
facet-id
@ -66,6 +68,7 @@
(struct actor (id
engine
[daemon? #:mutable]
[root #:mutable]
[exit-reason #:mutable] ;; #f -> running, #t -> terminated OK, exn -> error
[exit-hooks #:mutable])
@ -108,17 +111,19 @@
(define (actor-system boot-proc)
(define e (make-engine 1))
(make-actor e boot-proc (make-hash))
(make-actor e #t boot-proc (make-hash))
(adjust-inhabitant-count! e -1)
(thread-wait (engine-thread e)))
(define (make-actor engine boot-proc initial-assertions)
(define (make-actor engine daemon? boot-proc initial-assertions)
(define ac (actor (generate-actor-id)
engine
daemon?
'uninitialized
#f
'()))
(adjust-inhabitant-count! engine +1)
(when (not daemon?)
(adjust-inhabitant-count! engine +1))
(set-actor-root! ac (make-facet ac #f initial-assertions))
(turn! (make-facet ac (actor-root ac))
(stop-if-inert-after boot-proc))
@ -128,6 +133,11 @@
(define (actor-add-exit-hook! ac hook)
(set-actor-exit-hooks! ac (cons hook (actor-exit-hooks ac))))
(define (actor-daemon! ac daemon?)
(when (not (eq? daemon? (actor-daemon? ac)))
(set-actor-daemon?! ac daemon?)
(adjust-inhabitant-count! (actor-engine ac) (if daemon? -1 +1))))
(define (actor-terminate! turn ac reason)
(when (not (actor-exit-reason ac))
(set-actor-exit-reason! ac reason)
@ -139,9 +149,11 @@
(for [(h (in-list (reverse (actor-exit-hooks ac))))] (h turn))
(queue-task! (actor-engine ac)
(lambda () (turn! (actor-root ac)
(lambda (turn) (facet-terminate! turn (actor-root ac) #f))
(lambda (turn)
(facet-terminate! turn (actor-root ac) (eq? reason #t)))
#t)))
(adjust-inhabitant-count! (actor-engine ac) -1)))
(when (not (actor-daemon? ac))
(adjust-inhabitant-count! (actor-engine ac) -1))))
;;---------------------------------------------------------------------------
@ -176,7 +188,7 @@
(define (facet-terminate! turn f orderly?)
(when (facet-live? f)
(log-syndicate/actor-info "~a stopping (~a)" f (if orderly? "orderly" "disorderly"))
(log-syndicate/actor-debug "~a stopping (~a)" f (if orderly? "orderly" "disorderly"))
(set-facet-live?! f #f)
(define parent (facet-parent f))
@ -244,7 +256,7 @@
(facet-terminate! turn f #t)
(continuation turn))))
(define (turn-spawn! turn boot-proc [initial-assertions (make-hash)])
(define (turn-spawn! turn boot-proc [initial-assertions (make-hash)] #:daemon? [daemon? #f])
(define f (turn-active-facet turn))
(define o (facet-outbound f))
(turn-enqueue! turn
@ -255,7 +267,7 @@
(hash-set! new-outbound handle (hash-ref o handle))
(hash-remove! o handle))
(define engine (actor-engine (facet-actor f)))
(queue-task! engine (lambda () (make-actor engine boot-proc new-outbound))))))
(queue-task! engine (lambda () (make-actor engine daemon? boot-proc new-outbound))))))
(define (turn-stop-actor! turn)
(define ac (facet-actor (turn-active-facet turn)))

View File

@ -7,6 +7,7 @@
adjust-inhabitant-count!
queue-task!)
(require racket/match)
(require (only-in racket/exn exn->string))
(require "support/counter.rkt")
@ -28,9 +29,17 @@
(let loop ()
(log-syndicate/task-debug
"~a task count: ~a" e (engine-inhabitant-count e))
(when (positive? (engine-inhabitant-count e))
((thread-receive))
(loop)))
(if (positive? (engine-inhabitant-count e))
;; We have some non-daemon users so just block
(begin ((thread-receive))
(loop))
;; No non-daemon users, so keep running until there's no more work
(match (thread-try-receive)
[#f ;; No work, no non-daemons, we're done.
(void)]
[thunk
(thunk)
(loop)])))
(log-syndicate/task-info "~a stopping" e))))
initial-inhabitant-count))
(thread-send (engine-thread e) 'boot)

View File

@ -8,19 +8,32 @@
(require "schemas/gen/box-protocol.rkt")
(require "schemas/gen/dataspace.rkt")
(define ((box ds) turn)
(define ((box ds LIMIT REPORT_EVERY) turn)
(define value-handle #f)
(define (set-value turn value)
(set! value-handle (turn-replace! turn ds value-handle (BoxState->preserves (BoxState value)))))
(set-value turn 0)
(define start-time (current-inexact-milliseconds))
(define prev-value 0)
(turn-assert! turn ds
(Observe->preserves
(Observe 'SetBox
(turn-ref turn
(entity #:message
(lambda (turn new-value)
(log-info "Box got ~a" new-value)
(set-value turn (SetBox-value new-value)))))))))
(lambda (turn new-value-rec)
(define new-value (SetBox-value new-value-rec))
(when (zero? (remainder new-value REPORT_EVERY))
(define end-time (current-inexact-milliseconds))
(define delta (/ (- end-time start-time) 1000.0))
(define count (- new-value prev-value))
(set! prev-value new-value)
(set! start-time end-time)
(log-info "Box got ~a (~a Hz)"
new-value
(/ count delta)))
(when (= new-value LIMIT)
(turn-stop-actor! turn))
(set-value turn new-value))))))))
(define ((client ds) turn)
(turn-assert! turn ds
@ -29,12 +42,26 @@
(turn-ref turn
(entity #:assert
(lambda (turn current-value _handle)
(log-info "Client got ~a" current-value)
;; (log-info "Client got ~a" current-value)
(turn-message! turn ds
(SetBox->preserves
(SetBox
(+ (BoxState-value current-value)
1)))))))))))
1))))))))))
(let ((count 0))
(turn-assert! turn ds
(Observe->preserves
(Observe 'BoxState
(turn-ref turn
(entity #:assert
(lambda (turn current-value _handle)
(set! count (+ count 1)))
#:retract
(lambda (turn _handle)
(set! count (- count 1))
(when (zero? count)
(log-info "Client detected box termination")
(turn-stop-actor! turn))))))))))
(define (dataspace)
(define handles (make-hash))
@ -77,7 +104,8 @@
(actor-system
(lambda (turn)
(actor-daemon! (facet-actor (turn-active-facet turn)) #t)
(define disarm (facet-prevent-inert-check! (turn-active-facet turn)))
(define ds (turn-ref turn (dataspace)))
(turn-spawn! turn (box ds))
(turn-spawn! turn (box ds 500000 100000))
(turn-spawn! turn (client ds))))