Supervision and reloading

This commit is contained in:
Tony Garnock-Jones 2019-01-28 01:14:01 +00:00
parent 636ca9acf7
commit 23580010bb
6 changed files with 247 additions and 5 deletions

View File

@ -62,6 +62,7 @@
(require "skeleton.rkt")
(require "pattern.rkt")
(require "bag.rkt")
(require "reflection.rkt")
;; An `ActorID` uniquely identifies an actor in a `Dataspace`.
;; A `FID` is a Facet ID, uniquely identifying a facet in a `Dataspace`.
@ -275,7 +276,7 @@
;; Instead of emitting patches to orderly
;; tear down assertions from endpoints, we
;; rely on the recorded `cleanup-changes`.
(terminate-actor! a #f))]) ;; TODO: tracing
(terminate-actor! a #f e))]) ;; TODO: tracing
(call-with-syndicate-prompt
(lambda ()
body ...))
@ -458,7 +459,7 @@
(vector-set! scripts i (make-queue)))))
;; Abruptly terminates an entire actor, without running stop-scripts etc.
(define (terminate-actor! the-actor emit-patches?)
(define (terminate-actor! the-actor emit-patches? maybe-exn)
(when emit-patches?
(push-script! the-actor (lambda () (for [(a (in-bag (actor-adhoc-assertions the-actor)))]
(retract! the-actor a)))))
@ -468,7 +469,11 @@
(set-facet-live?! f #f)
(for [(child (in-set (facet-children f)))] (abort-facet! child))
(retract-facet-assertions-and-subscriptions! f emit-patches?))))
(push-script! the-actor (lambda () (enqueue-action! the-actor (quit)))))
(push-script! the-actor (lambda ()
(let ((name (actor-name the-actor)))
(when name
(enqueue-send! the-actor (terminated name maybe-exn))))
(enqueue-action! the-actor (quit)))))
;; Cleanly terminates a facet and its children, running stop-scripts etc.
(define (terminate-facet! f)
@ -496,7 +501,7 @@
(lambda ()
(if parent
(when (facet-inert? parent) (terminate-facet! parent))
(terminate-actor! ac #t))))))
(terminate-actor! ac #t #f))))))
(define (stop-facet! f stop-script)
(define ac (facet-actor f))

9
syndicate/reflection.rkt Normal file
View File

@ -0,0 +1,9 @@
#lang racket/base
;; Reflective protocols
(provide (struct-out terminated))
;; (terminated Any (Option Any))
;; The `actor-name` is the name of the terminated actor.
;; The `reason` is either `#f` or a termination reason, usually an `exn?`.
(struct terminated (actor-name reason) #:transparent)

101
syndicate/reload.rkt Normal file
View File

@ -0,0 +1,101 @@
#lang imperative-syndicate
;; Crude steps toward reloadable Syndicate modules
(provide (except-out (struct-out reload-pending) reload-pending)
(rename-out [reload-pending <reload-pending>])
(rename-out [make-reload-pending reload-pending])
stop-when-reloaded
spawn-reloader
spawn-reloader*
reloader-mixin
reloader-mixin*)
(define-logger syndicate/reload)
(require file/sha1)
(require (for-syntax racket/base))
(require racket/rerequire)
(require/activate imperative-syndicate/supervise)
(require/activate imperative-syndicate/drivers/filesystem)
(assertion-struct reloader (pathstr))
(assertion-struct reload-pending (filename))
(define-syntax (make-reload-pending stx)
(syntax-case stx ()
[(SELF)
(quasisyntax/loc stx
(reload-pending '#,(path->string (syntax-source #'SELF))))]))
(define-syntax (stop-when-reloaded stx)
(syntax-case stx ()
[(_ body ...)
(quasisyntax/loc stx
(stop-when (asserted (reload-pending '#,(path->string (syntax-source stx))))
body ...))]))
(define-syntax-rule (spawn-reloader module-path)
(spawn-reloader* 'module-path))
(define (spawn-reloader* module-path)
(match (module-path->path-string module-path)
[#f #f]
[pathstr
(supervise #:name (reloader pathstr)
(reloader-mixin** module-path pathstr))
#t]))
(define-syntax-rule (reloader-mixin module-path)
(reloader-mixin* 'module-path))
(define (reloader-mixin* module-path)
(define pathstr (module-path->path-string module-path))
(when (not pathstr)
(error 'reloader-mixin "Cannot deduce source path from module-path ~v" module-path))
(reloader-mixin** module-path pathstr))
(define (module-path->path-string module-path)
(define mpi (module-path-index-join module-path #f))
(define rpath (module-path-index-resolve mpi))
(define path (let ((p (resolved-module-path-name rpath)))
(if (pair? p) (car p) p)))
(if (path? path)
(path->string path)
(begin (log-syndicate/reload-error "Could not process module-path ~v" module-path)
#f)))
(define (file->sha1 p)
(call-with-input-file p sha1))
(define (reloader-mixin** module-path pathstr)
(field [reloading? #f])
(define (reload!)
(when (not (reloading?))
(reloading? #t)
(react (field [obstacles-cleared? #f])
(define/query-value obstacles-exist? #f (observe (reload-pending pathstr)) #t
#:on-add (log-syndicate/reload-info "waiting to reload ~v" pathstr)
#:on-remove (obstacles-cleared? #t))
(assert #:when (obstacles-exist?) (reload-pending pathstr))
(on-start (flush!)
(obstacles-cleared? (not (obstacles-exist?))))
(stop-when-true (obstacles-cleared?)
(flush!) ;; Wait one turn for effects of newly-cleared obstacles
(log-syndicate/reload-info "(re)loading ~v" pathstr)
(dynamic-rerequire module-path)
(spawn* #:name module-path
((dynamic-require `(submod ,module-path syndicate-main)
'activate!)))
(reloading? #f)))))
(field [previous-version 'unknown])
(define/query-value latest-version 'unknown (file-content pathstr file->sha1 $p) p)
(begin/dataflow
(when (and (not (eq? (latest-version) 'unknown))
(not (equal? (latest-version) (previous-version))))
(if (latest-version)
(reload!)
(log-syndicate/reload-warning "Module ~v does not exist" pathstr))
(previous-version (latest-version)))))

74
syndicate/supervise.rkt Normal file
View File

@ -0,0 +1,74 @@
#lang imperative-syndicate
;; Extremely simple single-actor supervision
;; Vastly simplified compared to the available options in OTP
(provide (struct-out supervisor)
supervise)
(require racket/exn)
(require (for-syntax syntax/parse))
(require "syntax-classes.rkt")
(require "reflection.rkt")
(require/activate imperative-syndicate/drivers/timer)
(define-logger syndicate/supervise)
(assertion-struct supervisor (id name))
(define-syntax (supervise stx)
(syntax-parse stx
[(_ (~or (~optional (~seq #:name name-expr) #:defaults ([name-expr #'#f])
#:name "#:name")
(~optional (~seq #:linkage [linkage-expr ...]) #:defaults ([(linkage-expr 1) '()])
#:name "#:linkage"))
...
O ...)
(syntax/loc stx
(supervise* name-expr
(lambda () linkage-expr ... (void))
(lambda () O ...)))]))
(define (supervise* name0 linkage-thunk root-facet-thunk)
(define id (gensym 'supervisor))
(define name (or name0 (gensym 'supervisee)))
(spawn #:name (supervisor id name)
#:linkage [(linkage-thunk)] ;; may contain e.g. linkage instructions from during/spawn
(assert (supervisor id name))
(define root-supervisor-facet (current-facet))
(define intensity 1)
(define period 5000) ;; milliseconds
(define sleep-time 10) ;; seconds
(field [restarts '()])
(define (add-restart!)
(define now (current-inexact-milliseconds))
(define oldest-to-keep (- now period))
(restarts (filter (lambda (r) (>= r oldest-to-keep)) (cons now (restarts))))
(when (> (length (restarts)) intensity)
(log-syndicate/supervise-error
"Supervised process ~s reached max restart intensity. Sleeping for ~a seconds"
name
sleep-time)
(sleep sleep-time)))
(define (start-supervisee!)
(spawn #:name name
(stop-when (retracted (supervisor id name)))
(root-facet-thunk)))
(on (message (terminated name $reason))
(when reason
(log-syndicate/supervise-error "Supervised process ~s died" name)
;; (log-syndicate/supervise-error
;; "Supervised process ~s died with exception:\n~a"
;; name
;; (if (exn? reason) (exn->string reason) (format "~v" reason)))
(add-restart!)
(start-supervisee!)))
(on-start (start-supervisee!))))

View File

@ -2,6 +2,7 @@
;; Test drivers for Syndicate implementation.
(provide collected-events
collected-exns
collected-output
collected-lines
final-dataspace
@ -23,7 +24,7 @@
run-syndicate-test!
log-test-result!
(all-from-out racket)
(except-out (all-from-out racket) sleep)
(all-from-out "main.rkt"))
(module reader syntax/module-reader imperative-syndicate/test-implementation)

View File

@ -0,0 +1,52 @@
#lang imperative-syndicate/test-implementation
;; Simple tests of supervision functionality.
(require imperative-syndicate/supervise)
(test-case
[(supervise #:name 'ward
(on-start (printf "Starting ward\n"))
(on-stop (printf "Stopping ward\n"))
(on (message 'crash)
(printf "Crashing\n")
(error 'ward "Eep!"))
(stop-when (message 'quit)
(printf "Bye!\n")))
(define (monitor-interest-in thing)
(spawn #:name (list 'monitor-interest-in thing)
(during (observe thing)
(on-start (printf "Interest in ~v appeared\n" thing))
(on-stop (printf "Interest in ~v disappeared\n" thing)))))
(monitor-interest-in 'crash)
(monitor-interest-in 'quit)
(spawn* #:name 'main
(until (asserted (observe 'crash)))
(send! 'crash)
(flush!)
(flush!)
(flush!)
;; ^ give it time to actually terminate
;; v then wait for the next instance to appear
(until (asserted (observe 'quit)))
(send! 'quit))]
(it "should cause ward to produce an exception"
(actor-died? 'ward "Eep!"))
(it "should cause exactly one crash in total"
(= (length (collected-exns)) 1))
(expected-output (list "Starting ward")
(set "Interest in 'crash appeared"
"Interest in 'quit appeared")
(list "Crashing")
(set "Interest in 'quit disappeared"
"Interest in 'crash disappeared")
(list "Starting ward")
(set "Interest in 'crash appeared"
"Interest in 'quit appeared")
(list "Stopping ward"
"Bye!")
(set "Interest in 'quit disappeared"
"Interest in 'crash disappeared")))