syndicate-rkt/syndicate/actor.rkt

347 lines
12 KiB
Racket

#lang racket/base
(provide (except-out (struct-out entity) entity)
(rename-out [make-entity entity])
(struct-out ref)
parse-Ref
Ref->preserves
actor-system
actor?
actor-id
actor-exit-reason
actor-add-exit-hook!
facet?
facet-id
facet-actor
facet-parent
facet-live?
facet-inert?
facet-on-stop!
facet-prevent-inert-check!
turn?
turn-id
turn-active-facet
turn!
turn-freshen
turn-ref
turn-facet!
turn-stop!
turn-spawn!
turn-stop-actor!
turn-crash!
turn-assert!
turn-assert!*
turn-retract!
turn-replace!
turn-retract!*
turn-sync!
turn-sync!*
turn-message!)
(require racket/match)
(require (only-in racket/exn exn->string))
(require struct-defaults)
(require "rewrite.rkt")
(require "engine.rkt")
(require "support/counter.rkt")
(struct entity (assert retract message sync))
(define-struct-defaults make-entity entity
(#:assert [entity-assert #f]
#:retract [entity-retract #f]
#:message [entity-message #f]
#:sync [entity-sync #f]))
(struct ref (relay target attenuation) #:transparent)
(define (parse-Ref r) (if (ref? r) r eof))
(define (Ref->preserves r) r)
(struct outbound-assertion (handle peer [established? #:mutable]))
(struct actor (id
engine
[root #:mutable]
[exit-reason #:mutable] ;; #f -> running, #t -> terminated OK, exn -> error
[exit-hooks #:mutable])
#:methods gen:custom-write
[(define (write-proc a port mode)
(fprintf port "#<actor:~a>" (actor-id a)))])
(struct facet (id
actor
parent
children
outbound
[shutdown-actions #:mutable]
[live? #:mutable]
[inert-check-preventers #:mutable])
#:methods gen:custom-write
[(define (write-proc f port mode)
(local-require (only-in racket/string string-join))
(fprintf port "#<facet:~a:~a>"
(actor-id (facet-actor f))
(string-join (let loop ((f f))
(if (facet-parent f)
(cons (number->string (facet-id f)) (loop (facet-parent f)))
'()))
"/")))])
(struct turn (id
active-facet
[queues #:mutable]))
;;---------------------------------------------------------------------------
(define generate-actor-id (make-counter))
(define generate-turn-id (make-counter))
(define generate-handle (make-counter))
(define-logger syndicate/actor)
;;--------------------------------------------------------------------------
(define (actor-system boot-proc)
(define e (make-engine 1))
(make-actor e boot-proc (make-hash))
(adjust-inhabitant-count! e -1)
(thread-wait (engine-thread e)))
(define (make-actor engine boot-proc initial-assertions)
(define ac (actor (generate-actor-id)
engine
'uninitialized
#f
'()))
(adjust-inhabitant-count! engine +1)
(set-actor-root! ac (make-facet ac #f initial-assertions))
(turn! (make-facet ac (actor-root ac))
(stop-if-inert-after boot-proc))
(log-syndicate/actor-info "~a created" ac)
ac)
(define (actor-add-exit-hook! ac hook)
(set-actor-exit-hooks! ac (cons hook (actor-exit-hooks ac))))
(define (actor-terminate! turn ac reason)
(when (not (actor-exit-reason ac))
(set-actor-exit-reason! ac reason)
(if (eq? reason #t)
(log-syndicate/actor-info "~a terminated OK" ac)
(log-syndicate/actor-error "~a terminated with exception:\n~a"
ac
(exn->string reason)))
(for [(h (in-list (reverse (actor-exit-hooks ac))))] (h turn))
(queue-task! (actor-engine ac)
(lambda () (turn! (actor-root ac)
(lambda (turn) (facet-terminate! turn (actor-root ac) #f))
#t)))
(adjust-inhabitant-count! (actor-engine ac) -1)))
;;---------------------------------------------------------------------------
(define (make-facet ac parent [initial-assertions (make-hash)])
(define f (facet (generate-actor-id)
ac
parent
(make-hasheq)
initial-assertions
'()
#t
0))
(when parent
(hash-set! (facet-children parent) f #t))
f)
(define (facet-on-stop! f action)
(set-facet-shutdown-actions! f (cons action (facet-shutdown-actions f))))
(define (facet-inert? f)
(and (hash-empty? (facet-children f))
(hash-empty? (facet-outbound f))
(zero? (facet-inert-check-preventers f))))
(define (facet-prevent-inert-check! f)
(define armed #t)
(set-facet-inert-check-preventers! f (+ (facet-inert-check-preventers f) 1))
(lambda ()
(when armed
(set! armed #f)
(set-facet-inert-check-preventers! f (- (facet-inert-check-preventers f) 1)))))
(define (facet-terminate! turn f orderly?)
(when (facet-live? f)
(log-syndicate/actor-info "~a stopping (~a)" f (if orderly? "orderly" "disorderly"))
(set-facet-live?! f #f)
(define parent (facet-parent f))
(when parent (hash-remove! (facet-children parent) f))
(turn-call-with-facet turn f
(lambda (turn)
(for [(c (in-hash-keys (facet-children f)))]
(facet-terminate! turn c orderly?))
(when orderly?
(for [(h (in-list (reverse (facet-shutdown-actions f))))] (h turn)))
(for [(a (in-hash-values (facet-outbound f)))]
(turn-retract!* turn a))
(when orderly?
(queue-task!
(actor-engine (facet-actor f))
(lambda ()
(if parent
(when (facet-inert? parent)
(turn! parent
(lambda (turn)
(facet-terminate! turn parent #t))))
(turn! (actor-root (facet-actor f))
(lambda (turn)
(actor-terminate! turn (facet-actor f) #t))
#t)))))))))
;;---------------------------------------------------------------------------
(define (turn! f action [zombie-turn? #f])
(when (or zombie-turn? (and (not (actor-exit-reason (facet-actor f))) (facet-live? f)))
(let ((turn (turn (generate-turn-id) f (make-hasheq))))
(with-handlers ([exn? (lambda (e)
(turn! (actor-root (facet-actor f))
(lambda (turn)
(actor-terminate! turn (facet-actor f) e))))])
(action turn)
(for [((ff qq) (in-hash (turn-queues turn)))]
(queue-task! (actor-engine (facet-actor ff))
(lambda () (for [(a (in-list (reverse qq)))] (turn! ff a)))))
(set-turn-queues! turn #f)))))
(define (turn-call-with-facet outer-turn f action)
(let ((inner-turn (turn (generate-turn-id) f (turn-queues outer-turn))))
(action inner-turn)
(set-turn-queues! inner-turn #f)))
(define (turn-enqueue! turn f action)
(define qs (turn-queues turn))
(when (not qs) (error 'turn-enqueue! "Attempt to reuse a committed turn"))
(hash-update! qs f (lambda (actions) (cons action actions)) '()))
(define (turn-ref turn entity)
(ref (turn-active-facet turn) entity))
(define (turn-facet! turn boot-proc)
(let ((new-facet (make-facet (facet-actor (turn-active-facet turn)) (turn-active-facet turn))))
(turn-call-with-facet turn new-facet (stop-if-inert-after boot-proc))
new-facet))
(define (turn-stop! turn [f (turn-active-facet turn)] [continuation void])
(turn-enqueue! turn
(facet-parent f)
(lambda (turn)
(facet-terminate! turn f #t)
(continuation turn))))
(define (turn-spawn! turn boot-proc [initial-assertions (make-hash)])
(define f (turn-active-facet turn))
(define o (facet-outbound f))
(turn-enqueue! turn
f
(lambda (turn)
(define new-outbound (make-hash))
(for [(handle (in-hash-keys initial-assertions))]
(hash-set! new-outbound handle (hash-ref o handle))
(hash-remove! o handle))
(define engine (actor-engine (facet-actor f)))
(queue-task! engine (lambda () (make-actor engine boot-proc new-outbound))))))
(define (turn-stop-actor! turn)
(define ac (facet-actor (turn-active-facet turn)))
(turn-enqueue! turn (actor-root ac) (lambda (turn) (actor-terminate! turn ac #t))))
(define (turn-crash! turn exn)
(define ac (facet-actor (turn-active-facet turn)))
(turn-enqueue! turn (actor-root ac) (lambda (turn) (actor-terminate! turn ac exn))))
(define (turn-assert! turn peer assertion)
(define handle (generate-handle))
(turn-assert!* turn peer assertion handle)
handle)
(define (turn-assert!* turn peer assertion handle)
(match (run-rewrites (ref-attenuation peer) assertion)
[(? void?) (void)]
[rewritten
(define a (outbound-assertion handle peer #f))
(hash-set! (facet-outbound (turn-active-facet turn)) handle a)
(turn-enqueue! turn
(ref-relay peer)
(lambda (turn)
(set-outbound-assertion-established?! a #t)
(deliver (entity-assert (ref-target peer)) turn rewritten handle)))]))
(define (turn-retract! turn handle)
(when handle
(define a (hash-ref (facet-outbound (turn-active-facet turn)) handle #f))
(when a (turn-retract!* turn a))))
(define (turn-replace! turn peer old-handle assertion)
(define new-handle (if (void? assertion) #f (turn-assert! turn peer assertion)))
(turn-retract! turn old-handle)
new-handle)
(define (turn-retract!* turn a)
(hash-remove! (facet-outbound (turn-active-facet turn)) (outbound-assertion-handle a))
(turn-enqueue! turn
(ref-relay (outbound-assertion-peer a))
(lambda (turn)
(when (outbound-assertion-established? a)
(set-outbound-assertion-established?! a #f)
(deliver (entity-retract (ref-target (outbound-assertion-peer a)))
turn
(outbound-assertion-handle a))))))
(define (turn-sync! turn peer k)
(turn-sync!* turn peer (turn-ref turn (make-entity #:message k))))
(define (turn-sync!* turn peer-to-sync-with peer-k)
(turn-enqueue! turn
(ref-relay peer-to-sync-with)
(lambda (turn)
(deliver (or (entity-sync (ref-target peer-to-sync-with))
(lambda (turn peer-k) (turn-message! turn peer-k #t)))
turn
peer-k))))
(define (turn-message! turn peer assertion)
(match (run-rewrites (ref-attenuation peer) assertion)
[(? void?) (void)]
[rewritten
(turn-enqueue! turn
(ref-relay peer)
(lambda (turn)
(deliver (entity-message (ref-target peer)) turn rewritten)))]))
(define (turn-freshen turn action)
(when (turn-queues turn) (error 'turn-freshen "Attempt to freshen a non-stale turn"))
(turn! (turn-active-facet turn) action))
;;---------------------------------------------------------------------------
(define (stop-if-inert-after action)
(lambda (turn)
(action turn)
(turn-enqueue! turn
(turn-active-facet turn)
(lambda (turn)
(define f (turn-active-facet turn))
(when (or (and (facet-parent f) (not (facet-live? (facet-parent f))))
(facet-inert? f))
(turn-stop! turn))))))
(define (deliver maybe-proc . args)
(when maybe-proc
(apply maybe-proc args)))