Use newly-refactored mux in endpoint implementation
This commit is contained in:
parent
4db7c17dc8
commit
a9600b0de8
|
@ -25,9 +25,7 @@
|
||||||
;; An EID is a Nat.
|
;; An EID is a Nat.
|
||||||
|
|
||||||
;; Endpoint-group private states
|
;; Endpoint-group private states
|
||||||
(struct endpoint-group (next-eid ;; EID
|
(struct endpoint-group (mux ;; Mux
|
||||||
routing-table ;; (Matcherof (Setof EID))
|
|
||||||
interests ;; (HashTable EID Matcher)
|
|
||||||
endpoints ;; (HashTable EID Endpoint)
|
endpoints ;; (HashTable EID Endpoint)
|
||||||
state ;; Any
|
state ;; Any
|
||||||
)
|
)
|
||||||
|
@ -47,9 +45,7 @@
|
||||||
(struct as-endpoint (eid action) #:prefab)
|
(struct as-endpoint (eid action) #:prefab)
|
||||||
|
|
||||||
(define (make-endpoint-group initial-state)
|
(define (make-endpoint-group initial-state)
|
||||||
(endpoint-group 0
|
(endpoint-group (mux)
|
||||||
(matcher-empty)
|
|
||||||
(hash)
|
|
||||||
(hash)
|
(hash)
|
||||||
initial-state))
|
initial-state))
|
||||||
|
|
||||||
|
@ -76,16 +72,15 @@
|
||||||
(define (inert-endpoint e state) #f)
|
(define (inert-endpoint e state) #f)
|
||||||
|
|
||||||
(define (endpoint-group-handle-event e g)
|
(define (endpoint-group-handle-event e g)
|
||||||
(match-define (endpoint-group _ routing-table interests endpoints state) g)
|
(match-define (endpoint-group m endpoints state) g)
|
||||||
(define affected-eids
|
(define affected-eids
|
||||||
(match e
|
(match e
|
||||||
[#f (hash-keys endpoints)]
|
[#f (hash-keys endpoints)]
|
||||||
[(? patch?) (compute-affected-pids routing-table e)]
|
[(? patch?) (compute-affected-pids (mux-routing-table m) e)]
|
||||||
[(message body)
|
[(message body) (mux-route-message m body)]))
|
||||||
(tset->list (matcher-match-value routing-table (observe body) (datum-tset)))]))
|
|
||||||
(sequence-handlers g (for/list [(eid (sort affected-eids <))]
|
(sequence-handlers g (for/list [(eid (sort affected-eids <))]
|
||||||
(list (if (patch? e)
|
(list (if (patch? e)
|
||||||
(view-patch e (hash-ref interests eid matcher-empty))
|
(view-patch e (mux-interests-of m eid))
|
||||||
e)
|
e)
|
||||||
eid
|
eid
|
||||||
(hash-ref endpoints eid (lambda () inert-endpoint))))))
|
(hash-ref endpoints eid (lambda () inert-endpoint))))))
|
||||||
|
@ -120,21 +115,11 @@
|
||||||
(cons actions cumulative-patch)))
|
(cons actions cumulative-patch)))
|
||||||
|
|
||||||
(define (interpret-endpoint-patch cumulative-patch actions g eid p0)
|
(define (interpret-endpoint-patch cumulative-patch actions g eid p0)
|
||||||
(define old-interests (hash-ref (endpoint-group-interests g) eid matcher-empty))
|
(define-values (new-mux _eid p p-aggregate)
|
||||||
(define old-routing-table (endpoint-group-routing-table g))
|
(mux-update-stream (endpoint-group-mux g) eid p0))
|
||||||
(define p (limit-patch (label-patch p0 (datum-tset eid)) old-interests))
|
|
||||||
(define p-aggregate (compute-aggregate-patch p eid old-routing-table))
|
|
||||||
(define new-interests (apply-patch old-interests p))
|
|
||||||
(define new-routing-table (apply-patch old-routing-table p))
|
|
||||||
(values (patch-seq cumulative-patch p-aggregate)
|
(values (patch-seq cumulative-patch p-aggregate)
|
||||||
actions
|
actions
|
||||||
(struct-copy endpoint-group g
|
(struct-copy endpoint-group g [mux new-mux])))
|
||||||
[routing-table new-routing-table]
|
|
||||||
[interests (if (matcher-empty? new-interests)
|
|
||||||
(hash-remove (endpoint-group-interests g) eid)
|
|
||||||
(hash-set (endpoint-group-interests g)
|
|
||||||
eid
|
|
||||||
new-interests))])))
|
|
||||||
|
|
||||||
(define (interpret-endpoint-action cumulative-patch actions g eid endpoint-action)
|
(define (interpret-endpoint-action cumulative-patch actions g eid endpoint-action)
|
||||||
(match endpoint-action
|
(match endpoint-action
|
||||||
|
@ -146,12 +131,13 @@
|
||||||
[(? patch? p0)
|
[(? patch? p0)
|
||||||
(interpret-endpoint-patch cumulative-patch actions g eid p0)]
|
(interpret-endpoint-patch cumulative-patch actions g eid p0)]
|
||||||
[(add-endpoint function)
|
[(add-endpoint function)
|
||||||
(define new-eid (endpoint-group-next-eid g))
|
(define-values (new-mux new-eid _p _p-aggregate)
|
||||||
|
(mux-add-stream (endpoint-group-mux g) empty-patch))
|
||||||
(define-values (new-ep initial-transition) (function new-eid (endpoint-group-state g)))
|
(define-values (new-ep initial-transition) (function new-eid (endpoint-group-state g)))
|
||||||
(interpret-endpoint-actions cumulative-patch
|
(interpret-endpoint-actions cumulative-patch
|
||||||
actions
|
actions
|
||||||
(struct-copy endpoint-group g
|
(struct-copy endpoint-group g
|
||||||
[next-eid (+ new-eid 1)]
|
[mux new-mux]
|
||||||
[endpoints
|
[endpoints
|
||||||
(hash-set (endpoint-group-endpoints g)
|
(hash-set (endpoint-group-endpoints g)
|
||||||
new-eid
|
new-eid
|
||||||
|
@ -183,11 +169,12 @@
|
||||||
endpoint-action)))
|
endpoint-action)))
|
||||||
|
|
||||||
(define (pretty-print-endpoint-group g [p (current-output-port)])
|
(define (pretty-print-endpoint-group g [p (current-output-port)])
|
||||||
(match-define (endpoint-group _ routing-table interests endpoints state) g)
|
(match-define (endpoint-group m endpoints state) g)
|
||||||
(fprintf p "ENDPOINT GROUP:\n")
|
(fprintf p "ENDPOINT GROUP:\n")
|
||||||
(fprintf p " ---- STATE:\n")
|
(fprintf p " ---- STATE:\n")
|
||||||
(display (indented-port-output 6 (lambda (p) (prospect-pretty-print state p))) p)
|
(display (indented-port-output 6 (lambda (p) (prospect-pretty-print state p))) p)
|
||||||
(newline p)
|
(newline p)
|
||||||
(fprintf p " - ~a endpoints\n" (hash-count endpoints))
|
(fprintf p " - ~a endpoints\n" (hash-count endpoints))
|
||||||
|
(fprintf p " - next eid: ~a\n" (mux-next-pid mux))
|
||||||
(fprintf p " - routing table:\n")
|
(fprintf p " - routing table:\n")
|
||||||
(pretty-print-matcher routing-table p))
|
(pretty-print-matcher (mux-routing-table mux) p))
|
||||||
|
|
Loading…
Reference in New Issue