315 lines
10 KiB
Racket
315 lines
10 KiB
Racket
#lang racket/base
|
|
|
|
(require racket/match)
|
|
(require racket/list)
|
|
(require "pattern.rkt")
|
|
(require "functional-queue.rkt")
|
|
(require (only-in web-server/private/util exn->string))
|
|
|
|
(provide (struct-out route)
|
|
(struct-out routing-update)
|
|
(struct-out message)
|
|
(struct-out quit)
|
|
(struct-out process)
|
|
(struct-out transition)
|
|
? ;; imported from pattern.rkt
|
|
wildcard?
|
|
sub
|
|
pub
|
|
gestalt-ref
|
|
gestalt-empty
|
|
gestalt-empty?
|
|
gestalt-union
|
|
gestalt-filter
|
|
pretty-print-matcher
|
|
pretty-print-gestalt
|
|
spawn
|
|
send
|
|
feedback
|
|
co-route
|
|
route-accepts?
|
|
intersect-routes
|
|
spawn-world
|
|
deliver-event
|
|
transition-bind
|
|
sequence-transitions
|
|
routing-implementation
|
|
|
|
log-events-and-actions?)
|
|
|
|
(define pid-stack (make-parameter '()))
|
|
(define log-events-and-actions? (make-parameter #f))
|
|
|
|
(struct route (subscription? pattern meta-level level) #:prefab)
|
|
|
|
;; Events
|
|
(struct routing-update (routes) #:prefab)
|
|
(struct message (body meta-level feedback?) #:prefab)
|
|
|
|
;; Actions (in addition to Events)
|
|
;; (spawn is just process)
|
|
(struct quit () #:prefab)
|
|
|
|
;; Actors and Configurations
|
|
(struct process (routes behavior state) #:transparent)
|
|
(struct world (next-pid event-queue process-table downward-routes process-actions) #:transparent)
|
|
|
|
;; Behavior : maybe event * state -> transition
|
|
(struct transition (state actions) #:transparent)
|
|
|
|
;; Process table maps to these; idea is to avoid redundant signalling
|
|
;; of routing-updates where possible
|
|
(struct trigger-guard (process downward-routes) #:transparent)
|
|
|
|
(define (drop-route r)
|
|
(match-define (route s? p ml l) r)
|
|
(and (positive? ml) (route s? p (- ml 1) l)))
|
|
|
|
(define (lift-route r)
|
|
(match-define (route s? p ml l) r)
|
|
(route s? p (+ ml 1) l))
|
|
|
|
(define (sub p #:meta-level [ml 0] #:level [l 0]) (route #t p ml l))
|
|
(define (pub p #:meta-level [ml 0] #:level [l 0]) (route #f p ml l))
|
|
|
|
(define (gestalt-ref g metalevel level get-advertisements?)
|
|
(filter-map (lambda (r)
|
|
(match-define (route is-sub? p ml l) r)
|
|
(and (= ml metalevel)
|
|
(= l level)
|
|
(eq? get-advertisements? (not is-sub?))
|
|
p))
|
|
(flatten g)))
|
|
|
|
(define (gestalt-union . gs) (flatten gs))
|
|
(define (gestalt-filter g1 g2) (intersect-routes (flatten g1) (flatten g2)))
|
|
(define (gestalt-empty) '())
|
|
(define (gestalt-empty? g) (null? g))
|
|
|
|
(require racket/pretty)
|
|
(define (pretty-print-matcher x #:indent [ignored-indent 0]) (pretty-print x))
|
|
(define (pretty-print-gestalt x) (pretty-print x))
|
|
|
|
(define (spawn behavior state [initial-routes '()]) (process initial-routes behavior state))
|
|
(define (send body #:meta-level [ml 0]) (message body ml #f))
|
|
(define (feedback body #:meta-level [ml 0]) (message body ml #t))
|
|
|
|
(define (drop-routes rs) (filter-map drop-route rs))
|
|
(define (lift-routes rs) (map lift-route rs))
|
|
|
|
(define (co-route r #:level [level-override #f])
|
|
(match-define (route sub? pat ml l) r)
|
|
(route (not sub?) pat ml (or level-override l)))
|
|
|
|
(define (route-accepts? r m)
|
|
(and (= (message-meta-level m) (route-meta-level r))
|
|
(equal? (message-feedback? m) (not (route-subscription? r)))
|
|
(intersect? (message-body m) (route-pattern r))))
|
|
|
|
(define (intersect-routes rs1 rs2)
|
|
(let loop1 ((rs1 rs1)
|
|
(acc '()))
|
|
(match rs1
|
|
['() (reverse acc)]
|
|
[(cons r1 rs1)
|
|
(let loop2 ((rs2 rs2)
|
|
(acc acc))
|
|
(match rs2
|
|
['() (loop1 rs1 acc)]
|
|
[(cons r2 rs2)
|
|
(if (and (equal? (route-subscription? r1) (not (route-subscription? r2)))
|
|
(= (route-meta-level r1) (route-meta-level r2))
|
|
(< (route-level r1) (route-level r2)))
|
|
(intersect (route-pattern r1) (route-pattern r2)
|
|
(lambda (rr) (loop2 rs2 (cons (struct-copy route r1 [pattern rr]) acc)))
|
|
(lambda () (loop2 rs2 acc)))
|
|
(loop2 rs2 acc))]))])))
|
|
|
|
(define (filter-event e rs)
|
|
(match e
|
|
[(routing-update e-rs)
|
|
(routing-update (intersect-routes e-rs rs))]
|
|
[(? message? m)
|
|
(if (ormap (lambda (r) (route-accepts? r m)) rs) e #f)]))
|
|
|
|
(define (spawn-world . boot-actions)
|
|
(spawn world-handle-event
|
|
(enqueue-actions (world 0 (make-queue) (hash) '() (make-queue))
|
|
-1
|
|
boot-actions)))
|
|
|
|
(define (event? x) (or (routing-update? x) (message? x)))
|
|
(define (action? x) (or (event? x) (process? x) (quit? x)))
|
|
|
|
(define (enqueue-actions w pid actions)
|
|
(struct-copy world w
|
|
[process-actions (queue-append-list (world-process-actions w)
|
|
(filter-map (lambda (a) (and (action? a) (cons pid a)))
|
|
(flatten actions)))]))
|
|
|
|
(define (quiescent? w)
|
|
(and (queue-empty? (world-event-queue w))
|
|
(queue-empty? (world-process-actions w))))
|
|
|
|
(define (deliver-event e pid p)
|
|
(parameterize ((pid-stack (cons pid (pid-stack))))
|
|
(when (and (log-events-and-actions?) e)
|
|
(log-info "~a: ~v --> ~v ~v"
|
|
(reverse (pid-stack))
|
|
e
|
|
(process-behavior p)
|
|
(if (world? (process-state p))
|
|
"#<world>"
|
|
(process-state p))))
|
|
(with-handlers ([(lambda (exn) #t)
|
|
(lambda (exn)
|
|
(log-error "Process ~a died with exception:\n~a" pid (exn->string exn))
|
|
(transition (process-state p) (list (quit))))])
|
|
(match (with-continuation-mark 'minimart-process
|
|
pid ;; TODO: debug-name, other user annotation
|
|
((process-behavior p) e (process-state p)))
|
|
[#f #f]
|
|
[(? transition? t) t]
|
|
[x
|
|
(log-error "Process ~a returned non-#f, non-transition: ~v" pid x)
|
|
(transition (process-state p) (list (quit)))]))))
|
|
|
|
(define (apply-transition pid t w)
|
|
(match t
|
|
[#f w]
|
|
[(transition new-state new-actions)
|
|
(let* ((w (transform-process pid w
|
|
(lambda (p)
|
|
(when (and (log-events-and-actions?)
|
|
(not (null? (flatten new-actions))))
|
|
(log-info "~a: ~v <-- ~v ~v"
|
|
(reverse (cons pid (pid-stack)))
|
|
new-actions
|
|
(process-behavior p)
|
|
(if (world? new-state)
|
|
"#<world>"
|
|
new-state)))
|
|
(struct-copy process p [state new-state]))
|
|
values)))
|
|
(enqueue-actions w pid new-actions))]))
|
|
|
|
(define (step-children w)
|
|
(let-values (((w step-taken?)
|
|
(for/fold ([w w] [step-taken? #f]) (((pid g) (in-hash (world-process-table w))))
|
|
(match-define (trigger-guard p _) g)
|
|
(define t (deliver-event #f pid p))
|
|
(values (apply-transition pid t w)
|
|
(or step-taken? (transition? t))))))
|
|
(and step-taken? (transition w '()))))
|
|
|
|
(define (transition-bind k t0)
|
|
(match-define (transition state0 actions0) t0)
|
|
(match-define (transition state1 actions1) (k state0))
|
|
(transition state1 (cons actions0 actions1)))
|
|
|
|
(define (sequence-transitions t0 . steps)
|
|
(foldl transition-bind t0 steps))
|
|
|
|
(define (perform-actions w)
|
|
(for/fold ([t (transition (struct-copy world w [process-actions (make-queue)]) '())])
|
|
((entry (in-list (queue->list (world-process-actions w)))))
|
|
(match-define (cons pid a) entry)
|
|
(transition-bind (perform-action pid a) t)))
|
|
|
|
(define (dispatch-events w)
|
|
(transition (for/fold ([w (struct-copy world w [event-queue (make-queue)])])
|
|
((e (in-list (queue->list (world-event-queue w)))))
|
|
(dispatch-event e w))
|
|
'()))
|
|
|
|
(define (transform-process pid w fp frs)
|
|
(match (hash-ref (world-process-table w) pid)
|
|
[#f w]
|
|
[(trigger-guard p downward-rs)
|
|
(struct-copy world w
|
|
[process-table (hash-set (world-process-table w)
|
|
pid
|
|
(trigger-guard (fp p) (frs downward-rs)))])]))
|
|
|
|
(define (enqueue-event e w)
|
|
(struct-copy world w [event-queue (enqueue (world-event-queue w) e)]))
|
|
|
|
(define (upward-routes-change-ignorable? pid w rs)
|
|
(match (hash-ref (world-process-table w) pid)
|
|
[#f #t]
|
|
[(trigger-guard p _) (equal? (process-routes p) rs)]))
|
|
|
|
(define ((perform-action pid a) w)
|
|
(match a
|
|
[(? process? new-p)
|
|
(let* ((new-pid (world-next-pid w))
|
|
(new-p (struct-copy process new-p [routes (flatten (process-routes new-p))]))
|
|
(w (struct-copy world w [next-pid (+ new-pid 1)]))
|
|
(w (struct-copy world w [process-table
|
|
(hash-set (world-process-table w)
|
|
new-pid
|
|
(trigger-guard new-p '()))])))
|
|
(log-info "Spawned process ~a ~v ~v" new-pid (process-behavior new-p) (process-state new-p))
|
|
(issue-routing-update w))]
|
|
[(quit)
|
|
(when (hash-has-key? (world-process-table w) pid) (log-info "Process ~a terminating" pid))
|
|
(let* ((w (struct-copy world w [process-table (hash-remove (world-process-table w) pid)])))
|
|
(issue-routing-update w))]
|
|
[(routing-update routes0)
|
|
(define routes (flatten routes0))
|
|
(if (upward-routes-change-ignorable? pid w routes)
|
|
(transition w '())
|
|
(let* ((w (transform-process pid w
|
|
(lambda (p) (struct-copy process p [routes routes]))
|
|
values)))
|
|
(issue-routing-update w)))]
|
|
[(message body meta-level feedback?)
|
|
(if (zero? meta-level)
|
|
(transition (enqueue-event a w) '())
|
|
(transition w (message body (- meta-level 1) feedback?)))]))
|
|
|
|
(define (aggregate-routes base w)
|
|
(apply append
|
|
base
|
|
(for/list ((g (in-hash-values (world-process-table w))))
|
|
(process-routes (trigger-guard-process g)))))
|
|
|
|
(define (issue-local-routing-update w)
|
|
(enqueue-event (routing-update (aggregate-routes (world-downward-routes w) w)) w))
|
|
|
|
(define (issue-routing-update w)
|
|
(transition (issue-local-routing-update w)
|
|
(routing-update (drop-routes (aggregate-routes '() w)))))
|
|
|
|
(define (dispatch-event e w)
|
|
(for/fold ([w w]) (((pid g) (in-hash (world-process-table w))))
|
|
(match-define (trigger-guard p old-downward-rs) g)
|
|
(define e1 (filter-event e (process-routes p)))
|
|
(match e1
|
|
[#f w]
|
|
[(routing-update new-downward-rs)
|
|
(if (equal? old-downward-rs new-downward-rs)
|
|
w
|
|
(transform-process pid (apply-transition pid (deliver-event e1 pid p) w)
|
|
values
|
|
(lambda (old-rs) new-downward-rs)))]
|
|
[_ (apply-transition pid (deliver-event e1 pid p) w)])))
|
|
|
|
(define (world-handle-event e w)
|
|
(if (or e (not (quiescent? w)))
|
|
(sequence-transitions (transition (inject-event e w) '())
|
|
dispatch-events
|
|
perform-actions
|
|
(lambda (w) (or (step-children w) (transition w '()))))
|
|
(step-children w)))
|
|
|
|
(define (inject-event e w)
|
|
(match e
|
|
[#f w]
|
|
[(routing-update routes)
|
|
(issue-local-routing-update (struct-copy world w [downward-routes (lift-routes routes)]))]
|
|
[(message body meta-level feedback?)
|
|
(enqueue-event (message body (+ meta-level 1) feedback?) w)]))
|
|
|
|
(define routing-implementation 'naive)
|