diff --git a/syndicate/actor.rkt b/syndicate/actor.rkt index 804b789..d9020ab 100644 --- a/syndicate/actor.rkt +++ b/syndicate/actor.rkt @@ -1,14 +1,54 @@ #lang racket/base -(provide parse-Ref - Ref->preserves) +(provide (except-out (struct-out entity) entity) + (rename-out [make-entity entity]) + + (struct-out ref) + parse-Ref + Ref->preserves + + actor-system + + actor? + actor-id + actor-exit-reason + actor-add-exit-hook! + + facet? + facet-id + facet-actor + facet-parent + facet-live? + facet-inert? + facet-on-stop! + facet-prevent-inert-check! + + turn? + turn-id + turn-active-facet + turn! + turn-freshen + turn-ref + turn-facet! + turn-stop! + turn-spawn! + turn-stop-actor! + turn-crash! + turn-assert! + turn-assert!* + turn-retract! + turn-replace! + turn-retract!* + turn-sync! + turn-sync!* + turn-message!) (require racket/match) (require (only-in racket/exn exn->string)) (require struct-defaults) (require "rewrite.rkt") -(require "task.rkt") +(require "engine.rkt") (require "support/counter.rkt") (struct entity (assert retract message sync)) @@ -25,10 +65,13 @@ (struct outbound-assertion (handle peer [established? #:mutable])) (struct actor (id + engine [root #:mutable] [exit-reason #:mutable] ;; #f -> running, #t -> terminated OK, exn -> error [exit-hooks #:mutable]) - #:transparent) + #:methods gen:custom-write + [(define (write-proc a port mode) + (fprintf port "#" (actor-id a)))]) (struct facet (id actor @@ -38,7 +81,16 @@ [shutdown-actions #:mutable] [live? #:mutable] [inert-check-preventers #:mutable]) - #:transparent) + #:methods gen:custom-write + [(define (write-proc f port mode) + (local-require (only-in racket/string string-join)) + (fprintf port "#" + (actor-id (facet-actor f)) + (string-join (let loop ((f f)) + (if (facet-parent f) + (cons (number->string (facet-id f)) (loop (facet-parent f))) + '())) + "/")))]) (struct turn (id active-facet @@ -54,15 +106,23 @@ ;;-------------------------------------------------------------------------- -(define (make-actor boot-proc [initial-assertions (make-hash)]) +(define (actor-system boot-proc) + (define e (make-engine 1)) + (make-actor e boot-proc (make-hash)) + (adjust-inhabitant-count! e -1) + (thread-wait (engine-thread e))) + +(define (make-actor engine boot-proc initial-assertions) (define ac (actor (generate-actor-id) + engine 'uninitialized #f '())) + (adjust-inhabitant-count! engine +1) (set-actor-root! ac (make-facet ac #f initial-assertions)) (turn! (make-facet ac (actor-root ac)) (stop-if-inert-after boot-proc)) - (log-syndicate/actor-debug "Created actor ~a" (actor-id ac)) + (log-syndicate/actor-info "~a created" ac) ac) (define (actor-add-exit-hook! ac hook) @@ -72,14 +132,16 @@ (when (not (actor-exit-reason ac)) (set-actor-exit-reason! ac reason) (if (eq? reason #t) - (log-syndicate/actor-debug "Actor ~a terminated OK" (actor-id ac)) - (log-syndicate/actor-error "Actor ~a terminated with exception:\n~a" - (actor-id ac) + (log-syndicate/actor-info "~a terminated OK" ac) + (log-syndicate/actor-error "~a terminated with exception:\n~a" + ac (exn->string reason))) (for [(h (in-list (reverse (actor-exit-hooks ac))))] (h turn)) - (queue-task! (lambda () (turn! (actor-root ac) + (queue-task! (actor-engine ac) + (lambda () (turn! (actor-root ac) (lambda (turn) (facet-terminate! turn (actor-root ac) #f)) - #t))))) + #t))) + (adjust-inhabitant-count! (actor-engine ac) -1))) ;;--------------------------------------------------------------------------- @@ -106,14 +168,15 @@ (define (facet-prevent-inert-check! f) (define armed #t) - (set-facet-inert-check-preventers! (+ (facet-inert-check-preventers f) 1)) + (set-facet-inert-check-preventers! f (+ (facet-inert-check-preventers f) 1)) (lambda () (when armed (set! armed #f) - (set-facet-inert-check-preventers! (- (facet-inert-check-preventers f) 1))))) + (set-facet-inert-check-preventers! f (- (facet-inert-check-preventers f) 1))))) (define (facet-terminate! turn f orderly?) (when (facet-live? f) + (log-syndicate/actor-info "~a stopping (~a)" f (if orderly? "orderly" "disorderly")) (set-facet-live?! f #f) (define parent (facet-parent f)) @@ -129,6 +192,7 @@ (turn-retract!* turn a)) (when orderly? (queue-task! + (actor-engine (facet-actor f)) (lambda () (if parent (when (facet-inert? parent) @@ -137,7 +201,8 @@ (facet-terminate! turn parent #t)))) (turn! (actor-root (facet-actor f)) (lambda (turn) - (actor-terminate! turn (facet-actor f) #t))))))))))) + (actor-terminate! turn (facet-actor f) #t)) + #t))))))))) ;;--------------------------------------------------------------------------- @@ -150,7 +215,8 @@ (actor-terminate! turn (facet-actor f) e))))]) (action turn) (for [((ff qq) (in-hash (turn-queues turn)))] - (queue-task! (lambda () (for [(a (in-list (reverse qq)))] (turn! ff a))))) + (queue-task! (actor-engine (facet-actor ff)) + (lambda () (for [(a (in-list (reverse qq)))] (turn! ff a))))) (set-turn-queues! turn #f))))) (define (turn-call-with-facet outer-turn f action) @@ -188,7 +254,8 @@ (for [(handle (in-hash-keys initial-assertions))] (hash-set! new-outbound handle (hash-ref o handle)) (hash-remove! o handle)) - (queue-task! (lambda () (make-actor boot-proc new-outbound)))))) + (define engine (actor-engine (facet-actor f))) + (queue-task! engine (lambda () (make-actor engine boot-proc new-outbound)))))) (define (turn-stop-actor! turn) (define ac (facet-actor (turn-active-facet turn))) diff --git a/syndicate/engine.rkt b/syndicate/engine.rkt new file mode 100644 index 0000000..87de36f --- /dev/null +++ b/syndicate/engine.rkt @@ -0,0 +1,48 @@ +#lang racket/base + +(provide engine? + engine-thread + engine-inhabitant-count + make-engine + adjust-inhabitant-count! + queue-task!) + +(require (only-in racket/exn exn->string)) +(require "support/counter.rkt") + +(define-logger syndicate/task) + +(struct engine (id thread [inhabitant-count #:mutable]) + #:methods gen:custom-write + [(define (write-proc e port mode) + (fprintf port "#" (engine-id e)))]) + +(define generate-engine-id (make-counter)) + +(define (make-engine initial-inhabitant-count) + (define e (engine (generate-engine-id) + (thread (lambda () + (thread-receive) ;; delay boot until we're ready + (log-syndicate/task-info "~a starting" e) + (with-handlers ([exn? (handle-unexpected-task-runner-termination e)]) + (let loop () + (log-syndicate/task-debug + "~a task count: ~a" e (engine-inhabitant-count e)) + (when (positive? (engine-inhabitant-count e)) + ((thread-receive)) + (loop))) + (log-syndicate/task-info "~a stopping" e)))) + initial-inhabitant-count)) + (thread-send (engine-thread e) 'boot) + e) + +(define (adjust-inhabitant-count! e delta) + (queue-task! e (lambda () + (set-engine-inhabitant-count! e (+ (engine-inhabitant-count e) delta))))) + +(define ((handle-unexpected-task-runner-termination e) exn) + (log-syndicate/task-error "~a terminated unexpectedly!\n~a" e (exn->string exn)) + (exit 1)) + +(define (queue-task! e thunk) + (thread-send (engine-thread e) thunk)) diff --git a/syndicate/go.rkt b/syndicate/go.rkt new file mode 100644 index 0000000..9213438 --- /dev/null +++ b/syndicate/go.rkt @@ -0,0 +1,8 @@ +#lang racket/base + +(require "main.rkt") + +(actor-system + (lambda (turn) + (turn-spawn! turn (lambda (turn) + (printf "Hi!\n"))))) diff --git a/syndicate/main.rkt b/syndicate/main.rkt index b50146d..3f05dab 100644 --- a/syndicate/main.rkt +++ b/syndicate/main.rkt @@ -1,5 +1,9 @@ #lang racket/base +(provide (all-from-out "actor.rkt")) + +(require "actor.rkt") + ;; (provide (all-from-out "dataspace.rkt") ;; (all-from-out "assertions.rkt") ;; (all-from-out "syntax.rkt") diff --git a/syndicate/task.rkt b/syndicate/task.rkt deleted file mode 100644 index a6e0d08..0000000 --- a/syndicate/task.rkt +++ /dev/null @@ -1,21 +0,0 @@ -#lang racket/base - -(provide queue-task!) - -(require (only-in racket/exn exn->string)) - -(define-logger syndicate/task) - -(define task-runner - (thread (lambda () - (with-handlers ([exn? handle-unexpected-task-runner-termination]) - (let loop () - ((thread-receive)) - (loop)))))) - -(define (handle-unexpected-task-runner-termination e) - (log-syndicate/task-error "Task runner terminated unexpectedly!\n~a" (exn->string e)) - (exit 1)) - -(define (queue-task! thunk) - (thread-send task-runner thunk))