#lang racket/base ;;; SPDX-License-Identifier: LGPL-3.0-or-later ;;; SPDX-FileCopyrightText: Copyright © 2021-2022 Tony Garnock-Jones (provide engine? engine-id engine-name engine-running? engine-custodian engine-thread engine-inhabitant-count make-engine adjust-inhabitant-count! queue-task! engine-register! engine-deregister! engine-shutdown! *dead-engine*) (require racket/match) (require (only-in racket/exn exn->string)) (require "support/counter.rkt") (define-logger syndicate/engine) (struct engine (id name custodian thread [running? #:mutable] actors [inhabitant-count #:mutable]) #:methods gen:custom-write [(define (write-proc e port mode) (fprintf port "#" (engine-id e) (engine-name e)))]) (define generate-engine-id (make-counter)) (define (make-engine initial-inhabitant-count name termination-handler) (define custodian (make-custodian)) (define e (engine (generate-engine-id) name custodian (parameterize ((current-custodian custodian)) (thread (lambda () (thread-receive) ;; delay boot until we're ready (log-syndicate/engine-debug "~a starting" e) (with-handlers ([exn? (handle-unexpected-task-runner-termination e)]) (let loop ((termination-handler termination-handler)) (log-syndicate/engine-debug "~a task count: ~a" e (engine-inhabitant-count e)) (if (positive? (engine-inhabitant-count e)) ;; We have some non-daemon users so just block (begin ((thread-receive)) (loop termination-handler)) ;; No non-daemon users, so keep running until there's no more work (match (thread-try-receive) [#f ;; No work, no non-daemons, we're done. (log-syndicate/engine-debug "Invoking termination handler ~v" termination-handler) (termination-handler loop)] [thunk (log-syndicate/engine-debug "Zero inhabitant count but pending work") (thunk) (loop termination-handler)]))) (log-syndicate/engine-debug "~a stopping" e) (custodian-shutdown-all custodian))))) #t (make-hash) initial-inhabitant-count)) (thread-send (engine-thread e) 'boot) e) (define (adjust-inhabitant-count! e delta) (queue-task! e (lambda () (set-engine-inhabitant-count! e (+ (engine-inhabitant-count e) delta))))) (define (engine-register! e ac) (when (not (eq? (current-thread) (engine-thread e))) (error 'engine-register! "Called from wrong thread")) (hash-set! (engine-actors e) ac #t)) (define (engine-deregister! e ac) (when (not (eq? (current-thread) (engine-thread e))) (error 'engine-deregister! "Called from wrong thread")) (hash-remove! (engine-actors e) ac)) (define (engine-shutdown! e) (log-syndicate/engine-debug "~a shutdown" e) (set-engine-running?! e #f) (define actors (hash-keys (engine-actors e))) (hash-clear! (engine-actors e)) actors) (define ((handle-unexpected-task-runner-termination e) exn) (log-syndicate/engine-error "~a terminated unexpectedly!\n~a" e (exn->string exn)) (exit 1)) (define (queue-task! e thunk) (thread-send (engine-thread e) thunk (lambda () (log-syndicate/engine-debug "Attempt to enqueue task for dead engine ~v" e)))) (define *dead-engine* (make-engine 0 'dead-engine void))