WIP
This commit is contained in:
parent
de6cd3884a
commit
d88eb390b8
|
@ -6,6 +6,8 @@
|
|||
(require "functional-queue.rkt")
|
||||
(require (only-in web-server/private/util exn->string))
|
||||
|
||||
(require rackunit) ;; TODO: split out
|
||||
|
||||
(provide (struct-out route)
|
||||
(struct-out routing-update)
|
||||
(struct-out message)
|
||||
|
@ -19,9 +21,6 @@
|
|||
spawn
|
||||
send
|
||||
feedback
|
||||
co-route
|
||||
route-accepts?
|
||||
intersect-routes
|
||||
spawn-world
|
||||
deliver-event
|
||||
transition-bind
|
||||
|
@ -32,10 +31,25 @@
|
|||
(define pid-stack (make-parameter '()))
|
||||
(define log-events-and-actions? (make-parameter #f))
|
||||
|
||||
(struct route (subscription? pattern meta-level level) #:prefab)
|
||||
;; A Gestalt is a (gestalt (Listof (Vectorof (Pairof Matcher
|
||||
;; Matcher)))), representing the total interests of a process or group
|
||||
;; of processes. The outer list has a present entry for each active
|
||||
;; metalevel, starting with metalevel 0 in the car. The vectors each
|
||||
;; have an entry for each active observer level at their metalevel.
|
||||
;; The innermost pairs have cars holding matchers representing active
|
||||
;; subscriptions, and cdrs representing active advertisements.
|
||||
;;
|
||||
;; "... a few standardised subsystems, identical from citizen to
|
||||
;; citizen. Two of these were channels for incoming data — one for
|
||||
;; gestalt, and one for linear, the two primary modalities of all
|
||||
;; Konishi citizens, distant descendants of vision and hearing."
|
||||
;; -- Greg Egan, "Diaspora"
|
||||
;; http://gregegan.customer.netspace.net.au/DIASPORA/01/Orphanogenesis.html
|
||||
;;
|
||||
(struct gestalt (metalevels) #:prefab)
|
||||
|
||||
;; Events
|
||||
(struct routing-update (routes) #:prefab)
|
||||
(struct routing-update (gestalt) #:prefab)
|
||||
(struct message (body meta-level feedback?) #:prefab)
|
||||
|
||||
;; Actions (in addition to Events)
|
||||
|
@ -43,72 +57,75 @@
|
|||
(struct quit () #:prefab)
|
||||
|
||||
;; Actors and Configurations
|
||||
(struct process (routes behavior state) #:transparent)
|
||||
(struct world (next-pid event-queue process-table downward-routes process-actions) #:transparent)
|
||||
(struct process (gestalt behavior state) #:transparent)
|
||||
(struct world (next-pid event-queue process-table downward-gestalt process-actions) #:transparent)
|
||||
|
||||
;; Behavior : maybe event * state -> transition
|
||||
(struct transition (state actions) #:transparent)
|
||||
|
||||
;; Process table maps to these; idea is to avoid redundant signalling
|
||||
;; of routing-updates where possible
|
||||
(struct trigger-guard (process downward-routes) #:transparent)
|
||||
;; TODO: Reintroduce "trigger-guard" from the naive implementation
|
||||
;; perhaps. "Process table maps to these; idea is to avoid redundant
|
||||
;; signalling of routing-updates where possible"
|
||||
|
||||
(define (drop-route r)
|
||||
(match-define (route s? p ml l) r)
|
||||
(and (positive? ml) (route s? p (- ml 1) l)))
|
||||
(define (drop-gestalt g)
|
||||
(match-define (gestalt metalevels) g)
|
||||
(gestalt (if (null? metalevels) '() (cdr metalevels))))
|
||||
|
||||
(define (lift-route r)
|
||||
(match-define (route s? p ml l) r)
|
||||
(route s? p (+ ml 1) l))
|
||||
(define (lift-gestalt g)
|
||||
(gestalt (cons '#() (gestalt-metalevels g))))
|
||||
|
||||
(define (sub p #:meta-level [ml 0] #:level [l 0]) (route #t p ml l))
|
||||
(define (pub p #:meta-level [ml 0] #:level [l 0]) (route #f p ml l))
|
||||
(define (simple-gestalt subs advs level metalevel)
|
||||
(define leaf (cons subs advs))
|
||||
(define vec (make-vector (+ level 1) (cons #f #f)))
|
||||
(vector-set! vec level leaf)
|
||||
(let loop ((n metalevel) (acc (list vec)))
|
||||
(if (zero? n)
|
||||
(gestalt acc)
|
||||
(loop (- n 1) (cons '#() acc)))))
|
||||
|
||||
(define (spawn behavior state [initial-routes '()]) (process initial-routes behavior state))
|
||||
(define (gestalt-empty) (gestalt '()))
|
||||
|
||||
(define (gestalt-union g1 g2)
|
||||
(define (zu sa1 sa2)
|
||||
(cons (matcher-union (car sa1) (car sa2))
|
||||
(matcher-union (cdr sa1) (cdr sa2))))
|
||||
(define (yu ls1 ls2)
|
||||
(define vl1 (vector-length ls1))
|
||||
(define vl2 (vector-length ls2))
|
||||
(define one-bigger? (> vl1 vl2))
|
||||
(define maxlen (max vl1 vl2))
|
||||
(define minlen (min vl1 vl2))
|
||||
(define result (make-vector maxlen #f))
|
||||
(for ((i (in-range 0 minlen)))
|
||||
(vector-set! result i (zu (vector-ref ls1 i) (vector-ref ls2 i))))
|
||||
(for ((i (in-range minlen maxlen)))
|
||||
(vector-set! result i (vector-ref (if one-bigger? vl1 vl2) i)))
|
||||
result)
|
||||
(define (xu mls1 mls2)
|
||||
(match* (mls1 mls2)
|
||||
[('() mls) mls]
|
||||
[(mls '()) mls]
|
||||
[((cons m1 mls1) (cons m2 mls2)) (cons (yu m1 m2) (xu mls1 mls2))]))
|
||||
(gestalt (xu (gestalt-metalevels g1)
|
||||
(gestalt-metalevels g2))))
|
||||
|
||||
(check-equal? (simple-gestalt 'a 'b 0 0)
|
||||
(gestalt (list (vector (cons 'a 'b)))))
|
||||
(check-equal? (simple-gestalt 'a 'b 2 2)
|
||||
(gestalt (list '#() '#() (vector (cons #f #f)
|
||||
(cons #f #f)
|
||||
(cons 'a 'b)))))
|
||||
|
||||
(define (sub p #:meta-level [ml 0] #:level [l 0]) (simple-gestalt (pattern->matcher #t p) #f l ml))
|
||||
(define (pub p #:meta-level [ml 0] #:level [l 0]) (simple-gestalt #f (pattern->matcher #t p) l ml))
|
||||
|
||||
(define (spawn behavior state [gestalt (gestalt-empty)]) (process gestalt behavior state))
|
||||
(define (send body #:meta-level [ml 0]) (message body ml #f))
|
||||
(define (feedback body #:meta-level [ml 0]) (message body ml #t))
|
||||
|
||||
(define (drop-routes rs) (filter-map drop-route rs))
|
||||
(define (lift-routes rs) (map lift-route rs))
|
||||
|
||||
(define (co-route r #:level [level-override #f])
|
||||
(match-define (route sub? pat ml l) r)
|
||||
(route (not sub?) pat ml (or level-override l)))
|
||||
|
||||
(define (route-accepts? r m)
|
||||
(and (= (message-meta-level m) (route-meta-level r))
|
||||
(equal? (message-feedback? m) (not (route-subscription? r)))
|
||||
(intersect? (message-body m) (route-pattern r))))
|
||||
|
||||
(define (intersect-routes rs1 rs2)
|
||||
(let loop1 ((rs1 rs1)
|
||||
(acc '()))
|
||||
(match rs1
|
||||
['() (reverse acc)]
|
||||
[(cons r1 rs1)
|
||||
(let loop2 ((rs2 rs2)
|
||||
(acc acc))
|
||||
(match rs2
|
||||
['() (loop1 rs1 acc)]
|
||||
[(cons r2 rs2)
|
||||
(if (and (equal? (route-subscription? r1) (not (route-subscription? r2)))
|
||||
(= (route-meta-level r1) (route-meta-level r2))
|
||||
(< (route-level r1) (route-level r2)))
|
||||
(intersect (route-pattern r1) (route-pattern r2)
|
||||
(lambda (rr) (loop2 rs2 (cons (struct-copy route r1 [pattern rr]) acc)))
|
||||
(lambda () (loop2 rs2 acc)))
|
||||
(loop2 rs2 acc))]))])))
|
||||
|
||||
(define (filter-event e rs)
|
||||
(match e
|
||||
[(routing-update e-rs)
|
||||
(routing-update (intersect-routes e-rs rs))]
|
||||
[(? message? m)
|
||||
(if (ormap (lambda (r) (route-accepts? r m)) rs) e #f)]))
|
||||
|
||||
(define (spawn-world . boot-actions)
|
||||
(spawn world-handle-event
|
||||
(enqueue-actions (world 0 (make-queue) (hash) '() (make-queue))
|
||||
(enqueue-actions (world 0 (make-queue) (hash) (gestalt-empty) (make-queue))
|
||||
-1
|
||||
boot-actions)))
|
||||
|
||||
|
@ -163,19 +180,9 @@
|
|||
(if (world? new-state)
|
||||
"#<world>"
|
||||
new-state)))
|
||||
(struct-copy process p [state new-state]))
|
||||
values)))
|
||||
(struct-copy process p [state new-state])))))
|
||||
(enqueue-actions w pid new-actions))]))
|
||||
|
||||
(define (step-children w)
|
||||
(let-values (((w step-taken?)
|
||||
(for/fold ([w w] [step-taken? #f]) (((pid g) (in-hash (world-process-table w))))
|
||||
(match-define (trigger-guard p _) g)
|
||||
(define t (deliver-event #f pid p))
|
||||
(values (apply-transition pid t w)
|
||||
(or step-taken? (transition? t))))))
|
||||
(and step-taken? (transition w '()))))
|
||||
|
||||
(define (transition-bind k t0)
|
||||
(match-define (transition state0 actions0) t0)
|
||||
(match-define (transition state1 actions1) (k state0))
|
||||
|
@ -196,56 +203,37 @@
|
|||
(dispatch-event e w))
|
||||
'()))
|
||||
|
||||
(define (transform-process pid w fp frs)
|
||||
(match (hash-ref (world-process-table w) pid)
|
||||
(define (transform-process pid w fp)
|
||||
(define pt (world-process-actions w))
|
||||
(match (hash-ref pt pid)
|
||||
[#f w]
|
||||
[(trigger-guard p downward-rs)
|
||||
(struct-copy world w
|
||||
[process-table (hash-set (world-process-table w)
|
||||
pid
|
||||
(trigger-guard (fp p) (frs downward-rs)))])]))
|
||||
[p (struct-copy world w [process-table (hash-set pt pid (fp p))])]))
|
||||
|
||||
(define (enqueue-event e w)
|
||||
(struct-copy world w [event-queue (enqueue (world-event-queue w) e)]))
|
||||
|
||||
(define (upward-routes-change-ignorable? pid w rs)
|
||||
(match (hash-ref (world-process-table w) pid)
|
||||
[#f #t]
|
||||
[(trigger-guard p _) (equal? (process-routes p) rs)]))
|
||||
|
||||
(define ((perform-action pid a) w)
|
||||
(match a
|
||||
[(? process? new-p)
|
||||
(let* ((new-pid (world-next-pid w))
|
||||
(w (struct-copy world w [next-pid (+ new-pid 1)]))
|
||||
(w (struct-copy world w [process-table
|
||||
(hash-set (world-process-table w)
|
||||
new-pid
|
||||
(trigger-guard new-p '()))])))
|
||||
(hash-set (world-process-table w) new-pid new-p)])))
|
||||
(log-info "Spawned process ~a ~v ~v" new-pid (process-behavior new-p) (process-state new-p))
|
||||
(issue-routing-update w))]
|
||||
[(quit)
|
||||
(when (hash-has-key? (world-process-table w) pid) (log-info "Process ~a terminating" pid))
|
||||
(let* ((w (struct-copy world w [process-table (hash-remove (world-process-table w) pid)])))
|
||||
(issue-routing-update w))]
|
||||
[(routing-update routes)
|
||||
(if (upward-routes-change-ignorable? pid w routes)
|
||||
(transition w '())
|
||||
(let* ((w (transform-process pid w
|
||||
(lambda (p) (struct-copy process p [routes routes]))
|
||||
values)))
|
||||
(issue-routing-update w)))]
|
||||
[(routing-update gestalt)
|
||||
(let* ((w (transform-process pid w
|
||||
(lambda (p) (struct-copy process p [gestalt gestalt])))))
|
||||
(issue-routing-update w))]
|
||||
[(message body meta-level feedback?)
|
||||
(if (zero? meta-level)
|
||||
(transition (enqueue-event a w) '())
|
||||
(transition w (message body (- meta-level 1) feedback?)))]))
|
||||
|
||||
(define (aggregate-routes base w)
|
||||
(apply append
|
||||
base
|
||||
(for/list ((g (in-hash-values (world-process-table w))))
|
||||
(process-routes (trigger-guard-process g)))))
|
||||
|
||||
(define (issue-local-routing-update w)
|
||||
(enqueue-event (routing-update (aggregate-routes (world-downward-routes w) w)) w))
|
||||
|
||||
|
@ -254,18 +242,23 @@
|
|||
(routing-update (drop-routes (aggregate-routes '() w)))))
|
||||
|
||||
(define (dispatch-event e w)
|
||||
(for/fold ([w w]) (((pid g) (in-hash (world-process-table w))))
|
||||
(match-define (trigger-guard p old-downward-rs) g)
|
||||
(define e1 (filter-event e (process-routes p)))
|
||||
(match e1
|
||||
[#f w]
|
||||
[(routing-update new-downward-rs)
|
||||
(if (equal? old-downward-rs new-downward-rs)
|
||||
w
|
||||
(transform-process pid (apply-transition pid (deliver-event e1 pid p) w)
|
||||
values
|
||||
(lambda (old-rs) new-downward-rs)))]
|
||||
[_ (apply-transition pid (deliver-event e1 pid p) w)])))
|
||||
...)
|
||||
|
||||
;; TODO: need explicit indication from a transitioning child as to
|
||||
;; whether it is inert or not. If not, it should be explicitly
|
||||
;; scheduled for the next round. The current system of just asking
|
||||
;; everyone doesn't scale.
|
||||
;;
|
||||
;; This is the "schedule" rule of the calculus.
|
||||
;;
|
||||
(define (step-children w)
|
||||
(let-values (((w step-taken?)
|
||||
(for/fold ([w w] [step-taken? #f]) (((pid g) (in-hash (world-process-table w))))
|
||||
(match-define (trigger-guard p _) g)
|
||||
(define t (deliver-event #f pid p))
|
||||
(values (apply-transition pid t w)
|
||||
(or step-taken? (transition? t))))))
|
||||
(and step-taken? (transition w '()))))
|
||||
|
||||
(define (world-handle-event e w)
|
||||
(if (or e (not (quiescent? w)))
|
||||
|
|
Loading…
Reference in New Issue