Initial stab at implementation (largely cribbed from js-marketplace)

This commit is contained in:
Tony Garnock-Jones 2013-10-26 18:48:59 +01:00
parent fbdad15ab6
commit 5cc26681f5
1 changed files with 211 additions and 0 deletions

211
core.rkt Normal file
View File

@ -0,0 +1,211 @@
#lang racket/base
(require racket/match)
(require racket/list)
(require "pattern.rkt")
(require "functional-queue.rkt")
(require (only-in web-server/private/util exn->string))
(provide (struct-out route)
(struct-out routing-update)
(struct-out message)
(struct-out quit)
(struct-out process)
(struct-out transition)
sub
pub
spawn
send
feedback
spawn-world
transition-bind
sequence-transitions)
(struct route (subscription? pattern meta-level level) #:prefab)
;; Events
(struct routing-update (routes) #:prefab)
(struct message (body meta-level feedback?) #:prefab)
;; Actions (in addition to Events)
;; (spawn is just process)
(struct quit () #:prefab)
;; Actors and Configurations
(struct process (routes behavior state) #:transparent)
(struct world (next-pid event-queue process-table downward-routes process-actions) #:transparent)
;; Behavior : maybe event * state -> transition
(struct transition (state actions) #:transparent)
(define (drop-route r)
(match-define (route s? p ml l) r)
(and (positive? ml) (route s? p (- ml 1) l)))
(define (lift-route r)
(match-define (route s? p ml l) r)
(route s? p (+ ml 1) l))
(define (sub p #:meta-level [ml 0] #:level [l 0]) (route #t p ml l))
(define (pub p #:meta-level [ml 0] #:level [l 0]) (route #t p ml l))
(define (spawn behavior state [initial-routes '()]) (process initial-routes behavior state))
(define (send body #:meta-level [ml 0]) (message body ml #f))
(define (feedback body #:meta-level [ml 0]) (message body ml #t))
(define (drop-routes rs) (filter-map drop-route rs))
(define (lift-routes rs) (map lift-routes rs))
(define (intersect-routes rs1 rs2)
(let loop1 ((rs1 rs1)
(acc '()))
(match rs1
['() (reverse acc)]
[(cons r1 rs1)
(let loop2 ((rs2 rs2)
(acc acc))
(match rs2
['() (loop1 rs1 acc)]
[(cons r2 rs2)
(if (and (equal? (route-subscription? r1) (not (route-subscription? r2)))
(= (route-meta-level r1) (route-meta-level r2))
(< (route-level r1) (route-level r2)))
(intersect (route-pattern r1) (route-pattern r2)
(lambda (rr) (loop2 rs2 (cons rr acc)))
(lambda () (loop2 rs2 acc)))
(loop2 rs2 acc))]))])))
(define (filter-event e rs)
(match e
[(routing-update e-rs)
(routing-update (intersect-routes e-rs rs))]
[(message body meta-level feedback?)
(let loop ((rs rs))
(match rs
['() #f]
[(cons r rs)
(and (= meta-level (route-meta-level r))
(equal? feedback? (not (route-subscription? r)))
(intersect body (route-pattern r)
(lambda (dummy) e)
(lambda () (loop rs))))]))]))
(define (spawn-world boot-actions)
(spawn world-handle-event
(enqueue-actions (world 0
(make-queue)
(hash)
'()
(make-queue))
-1
boot-actions)))
(define (enqueue-actions w pid actions)
(struct-copy world w
[process-actions (queue-append-list (world-process-actions w)
(map (lambda (a) (cons pid a)) (flatten actions)))]))
(define (quiescent? w)
(and (queue-empty? (world-event-queue w))
(queue-empty? (world-process-actions w))))
(define (deliver e pid p)
(with-handlers ([(lambda (exn) #t)
(lambda (exn)
(log-error "Process ~a died with exception:\n~a" pid (exn->string exn))
(transition (process-state p) (list (quit))))])
((process-behavior p) e (process-state p))))
(define (apply-transition pid t w)
(match-define (transition new-state new-actions) t)
(let* ((w (transform-process pid w (lambda (p) (struct-copy process p [state new-state])))))
(enqueue-actions w pid new-actions)))
(define (step-children w)
(let-values (((w step-taken?)
(for/fold ([w w] [step-taken? #f]) (((pid p) (in-hash (world-process-table w))))
(define t (deliver #f pid p))
(if t
(values (apply-transition pid t w) #t)
(values w step-taken?)))))
(and step-taken? (transition w '()))))
(define (transition-bind k t0)
(match-define (transition state0 actions0) t0)
(match-define (transition state1 actions1) (k state0))
(transition state1 (cons actions0 actions1)))
(define (sequence-transitions t0 . steps)
(foldl transition-bind t0 steps))
(define (perform-actions w)
(for/fold ([t (transition (struct-copy world w [process-actions (make-queue)]) '())])
((entry (in-list (queue->list (world-process-actions w)))))
(match-define (cons pid a) entry)
(transition-bind (perform-action pid a) t)))
(define (dispatch-events w)
(transition (for/fold ([w (struct-copy world w [event-queue (make-queue)])])
((e (in-list (queue->list (world-event-queue w)))))
(dispatch-event e w))
'()))
(define (transform-process pid w f)
(define p (hash-ref (world-process-table w) pid))
(if p
(struct-copy world w [process-table (hash-set (world-process-table w) pid (f p))])
w))
(define (enqueue-event e w)
(struct-copy world w [event-queue (enqueue (world-event-queue w) e)]))
(define ((perform-action pid a) w)
(match a
[(? process? new-p)
(let* ((new-pid (world-next-pid w))
(w (struct-copy world w [next-pid (+ new-pid 1)]))
(w (struct-copy world w [process-table
(hash-set (world-process-table w) new-pid new-p)])))
(issue-routing-update w))]
[(quit)
(let* ((w (struct-copy world w [process-table (hash-remove (world-process-table w) pid)])))
(issue-routing-update w))]
[(routing-update routes)
(let* ((w (transform-process pid w (lambda (p) (struct-copy process p [routes routes])))))
(issue-routing-update w))]
[(message body meta-level feedback?)
(if (zero? meta-level)
(transition (enqueue-event a w) '())
(transition w (message body (- meta-level 1) feedback?)))]))
(define (aggregate-routes base w)
(apply append base (for/list ((p (in-hash-values (world-process-table w)))) (process-routes p))))
(define (issue-local-routing-update w)
(enqueue-event (routing-update (aggregate-routes (world-downward-routes w) w)) w))
(define (issue-routing-update w)
(transition (issue-local-routing-update w)
(routing-update (drop-routes (aggregate-routes '() w)))))
(define (dispatch-event e w)
(for/fold ([w w]) (((pid p) (in-hash (world-process-table w))))
(define e1 (filter-event e (process-routes p)))
(if e1
(apply-transition pid (deliver e1 pid p) w)
w)))
(define (world-handle-event e w)
(if (or e (not (quiescent? w)))
(sequence-transitions (transition (inject-event e w) '())
dispatch-events
perform-actions
(lambda (w) (or (step-children w) (transition w '()))))
(step-children w)))
(define (inject-event e w)
(match e
[(routing-update routes)
(issue-local-routing-update (struct-copy world w [downward-routes (lift-routes routes)]))]
[(message body meta-level feedback?)
(enqueue-event (message body (+ meta-level 1) feedback?) w)]))