2015-03-16 14:38:32 +00:00
|
|
|
#lang racket/base
|
|
|
|
;; General multiplexer.
|
|
|
|
|
|
|
|
(provide meta-label?
|
|
|
|
(except-out (struct-out mux) mux)
|
|
|
|
(rename-out [mux <mux>] [make-mux mux])
|
|
|
|
mux-add-stream
|
|
|
|
mux-remove-stream
|
|
|
|
mux-update-stream
|
|
|
|
mux-route-message
|
2015-05-11 22:25:38 +00:00
|
|
|
mux-interests-of
|
2016-05-08 23:46:44 +00:00
|
|
|
;; mux-focus-event
|
2015-12-04 16:21:13 +00:00
|
|
|
compute-patches
|
2015-12-11 02:21:24 +00:00
|
|
|
compute-affected-pids
|
|
|
|
pretty-print-mux)
|
2015-03-16 14:38:32 +00:00
|
|
|
|
|
|
|
(require racket/set)
|
|
|
|
(require racket/match)
|
2016-03-12 16:54:31 +00:00
|
|
|
(require "trie.rkt")
|
2015-03-16 14:38:32 +00:00
|
|
|
(require "patch.rkt")
|
2015-06-20 00:29:16 +00:00
|
|
|
(require "tset.rkt")
|
2015-12-11 02:21:24 +00:00
|
|
|
(require "pretty.rkt")
|
2015-03-16 14:38:32 +00:00
|
|
|
|
|
|
|
;; A PID is a Nat.
|
|
|
|
;; A Label is a PID or 'meta.
|
|
|
|
;; Multiplexer private states
|
|
|
|
(struct mux (next-pid ;; PID
|
|
|
|
routing-table ;; (Matcherof (Setof Label))
|
|
|
|
interest-table ;; (HashTable Label Matcher)
|
2015-12-11 02:21:24 +00:00
|
|
|
)
|
|
|
|
#:transparent
|
2016-04-01 23:53:46 +00:00
|
|
|
#:methods gen:syndicate-pretty-printable
|
|
|
|
[(define (syndicate-pretty-print m [p (current-output-port)])
|
2015-12-11 02:21:24 +00:00
|
|
|
(pretty-print-mux m p))])
|
2015-03-16 14:38:32 +00:00
|
|
|
|
|
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
|
|
|
|
|
|
(define (meta-label? x) (eq? x 'meta))
|
|
|
|
|
|
|
|
(define (make-mux)
|
2016-03-12 16:54:31 +00:00
|
|
|
(mux 0 trie-empty (hash)))
|
2015-03-16 14:38:32 +00:00
|
|
|
|
|
|
|
(define (mux-add-stream m initial-patch)
|
|
|
|
(define new-pid (mux-next-pid m))
|
|
|
|
(mux-update-stream (struct-copy mux m [next-pid (+ new-pid 1)])
|
|
|
|
new-pid
|
|
|
|
initial-patch))
|
|
|
|
|
|
|
|
(define (mux-remove-stream m label)
|
2016-03-12 16:54:31 +00:00
|
|
|
(mux-update-stream m label (patch trie-empty (pattern->trie '<mux-remove-stream> ?))))
|
2015-03-16 14:38:32 +00:00
|
|
|
|
|
|
|
(define (mux-update-stream m label delta-orig)
|
|
|
|
(define old-interests (mux-interests-of m label))
|
2015-12-04 16:21:13 +00:00
|
|
|
(define old-routing-table (mux-routing-table m))
|
2015-06-20 00:29:16 +00:00
|
|
|
(define delta (limit-patch (label-patch delta-orig (datum-tset label)) old-interests))
|
2015-03-16 14:38:32 +00:00
|
|
|
(define new-interests (apply-patch old-interests delta))
|
2015-12-04 16:21:13 +00:00
|
|
|
;; CONDITION at this point: delta has been labelled and limited to
|
|
|
|
;; be minimal with respect to existing interests of its label.
|
|
|
|
(define delta-aggregate (compute-aggregate-patch delta label old-routing-table))
|
|
|
|
(define new-routing-table (apply-patch old-routing-table delta))
|
|
|
|
(values (struct-copy mux m
|
|
|
|
[routing-table new-routing-table]
|
2016-01-22 02:55:41 +00:00
|
|
|
[interest-table (if (trie-empty? new-interests)
|
2015-12-04 16:21:13 +00:00
|
|
|
(hash-remove (mux-interest-table m) label)
|
|
|
|
(hash-set (mux-interest-table m) label new-interests))])
|
|
|
|
label
|
|
|
|
delta
|
|
|
|
delta-aggregate))
|
|
|
|
|
|
|
|
(define (compute-patches old-m new-m label delta delta-aggregate)
|
|
|
|
(define old-routing-table (mux-routing-table old-m))
|
|
|
|
(define new-routing-table (mux-routing-table new-m))
|
|
|
|
(define affected-pids
|
2016-07-30 17:02:07 +00:00
|
|
|
(tset-remove (tset-add (compute-affected-pids old-routing-table delta) label) 'meta))
|
|
|
|
(define (entry-for pid)
|
|
|
|
(cond [(equal? pid label)
|
|
|
|
(define feedback
|
|
|
|
(patch-union
|
|
|
|
(patch (biased-intersection new-routing-table (patch-added delta))
|
|
|
|
(biased-intersection old-routing-table (patch-removed delta)))
|
|
|
|
(patch (biased-intersection (patch-added delta-aggregate)
|
|
|
|
(mux-interests-of new-m label))
|
|
|
|
(biased-intersection (patch-removed delta-aggregate)
|
|
|
|
(mux-interests-of old-m label)))))
|
|
|
|
(cons label feedback)]
|
|
|
|
[else
|
|
|
|
(cons pid (view-patch delta-aggregate (mux-interests-of old-m pid)))]))
|
2015-12-04 16:21:13 +00:00
|
|
|
(values (for/list [(pid (tset->list affected-pids))]
|
2016-07-30 17:02:07 +00:00
|
|
|
(entry-for pid))
|
|
|
|
(cdr (entry-for 'meta))))
|
2015-03-16 14:38:32 +00:00
|
|
|
|
|
|
|
(define (compute-affected-pids routing-table delta)
|
2016-01-22 02:55:41 +00:00
|
|
|
(define cover (trie-union (patch-added delta) (patch-removed delta)))
|
|
|
|
(trie-match-trie cover
|
2016-03-12 16:54:31 +00:00
|
|
|
(trie-step routing-table observe-parenthesis)
|
2016-03-13 10:37:51 +00:00
|
|
|
#:seed datum-tset-empty
|
2016-03-12 16:54:31 +00:00
|
|
|
#:combiner (lambda (v1 v2 acc) (tset-union v2 acc))))
|
2015-03-16 14:38:32 +00:00
|
|
|
|
2015-12-04 16:21:13 +00:00
|
|
|
(define (mux-route-message m body)
|
2016-06-27 18:42:42 +00:00
|
|
|
(if (trie-lookup (mux-routing-table m) body #f #:wildcard-union (lambda (a b) (or a b)))
|
|
|
|
;; some other stream has declared body
|
2016-07-30 17:02:07 +00:00
|
|
|
(values '() #f)
|
|
|
|
(let ((pids (trie-lookup (mux-routing-table m)
|
2016-06-27 18:42:42 +00:00
|
|
|
(observe body)
|
|
|
|
datum-tset-empty
|
2016-07-30 17:02:07 +00:00
|
|
|
#:wildcard-union tset-union)))
|
|
|
|
(values (tset->list (tset-remove pids 'meta))
|
|
|
|
(tset-member? pids 'meta)))))
|
2015-03-16 14:38:32 +00:00
|
|
|
|
|
|
|
(define (mux-interests-of m label)
|
2016-03-12 16:54:31 +00:00
|
|
|
(hash-ref (mux-interest-table m) label trie-empty))
|
2015-12-11 02:21:24 +00:00
|
|
|
|
2016-05-08 23:46:44 +00:00
|
|
|
;; There's a problem with mux-focus-event in most circumstances: often
|
|
|
|
;; you will want to focus incoming events with respect to some
|
|
|
|
;; locally-stored memory of interests. But that local memory may be
|
|
|
|
;; *ahead* of the incoming event stream! There's the round-trip
|
|
|
|
;; latency between the actor and the dataspaces where patch actions
|
|
|
|
;; are applied. This could lead to unwanted discarding of retractions,
|
|
|
|
;; and even of assertions in cases of quick pulses of interest.
|
|
|
|
;;
|
|
|
|
;; ;; Mux Label Event -> (Option Event)
|
|
|
|
;; (define (mux-focus-event m label e)
|
|
|
|
;; (define interests (mux-interests-of m label))
|
|
|
|
;; (match e
|
|
|
|
;; [(patch added removed)
|
|
|
|
;; (define p (patch (biased-intersection added interests)
|
|
|
|
;; (biased-intersection removed interests)))
|
|
|
|
;; (and (patch-non-empty? p) p)]
|
|
|
|
;; [(message body)
|
|
|
|
;; (and (trie-lookup interests (observe body) #f) e)]))
|
|
|
|
|
2015-12-11 02:21:24 +00:00
|
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
|
|
|
|
|
|
(define (pretty-print-mux m [p (current-output-port)])
|
|
|
|
(match-define (mux next-pid routing-table interest-table) m)
|
|
|
|
(fprintf p "MUX:\n")
|
|
|
|
(fprintf p " - ~a labelled entities with claims\n" (hash-count interest-table))
|
|
|
|
(fprintf p " - next label: ~a\n" next-pid)
|
|
|
|
(fprintf p " - routing-table:\n")
|
2016-01-22 02:55:41 +00:00
|
|
|
(display (indented-port-output 3 (lambda (p) (pretty-print-trie routing-table p))) p)
|
2015-12-11 02:21:24 +00:00
|
|
|
(newline p))
|