Support broadcast messages
This commit is contained in:
parent
173a0edb54
commit
707245cfe2
|
@ -0,0 +1,24 @@
|
||||||
|
#lang syndicate
|
||||||
|
;; Demonstrate sending a message to multiple receivers.
|
||||||
|
|
||||||
|
(require syndicate/actor)
|
||||||
|
|
||||||
|
(struct envelope (destination message) #:prefab)
|
||||||
|
|
||||||
|
(actor (forever (on (message (envelope 'alice $message))
|
||||||
|
(log-info "Alice received ~v" message))))
|
||||||
|
|
||||||
|
(actor (forever (on (message (envelope 'bob $message))
|
||||||
|
(log-info "Bob received ~v" message))))
|
||||||
|
|
||||||
|
(actor
|
||||||
|
(log-info "Waiting for Alice and Bob.")
|
||||||
|
(until (asserted (observe (envelope 'alice _))))
|
||||||
|
(until (asserted (observe (envelope 'bob _))))
|
||||||
|
|
||||||
|
(log-info "Sending a few messages...")
|
||||||
|
(send! (envelope 'alice "For Alice's eyes only"))
|
||||||
|
(send! (envelope 'bob "Dear Bob, how are you? Kind regards, etc."))
|
||||||
|
(send! (envelope ? "Important announcement!"))
|
||||||
|
|
||||||
|
(log-info "Sent all the messages."))
|
|
@ -115,9 +115,13 @@
|
||||||
#:combiner (lambda (v1 v2 acc) (tset-union v2 acc))))
|
#:combiner (lambda (v1 v2 acc) (tset-union v2 acc))))
|
||||||
|
|
||||||
(define (mux-route-message m body)
|
(define (mux-route-message m body)
|
||||||
(if (trie-lookup (mux-routing-table m) body #f) ;; some other stream has declared body
|
(if (trie-lookup (mux-routing-table m) body #f #:wildcard-union (lambda (a b) (or a b)))
|
||||||
|
;; some other stream has declared body
|
||||||
'()
|
'()
|
||||||
(tset->list (trie-lookup (mux-routing-table m) (observe body) datum-tset-empty))))
|
(tset->list (trie-lookup (mux-routing-table m)
|
||||||
|
(observe body)
|
||||||
|
datum-tset-empty
|
||||||
|
#:wildcard-union tset-union))))
|
||||||
|
|
||||||
(define (mux-interests-of m label)
|
(define (mux-interests-of m label)
|
||||||
(hash-ref (mux-interest-table m) label trie-empty))
|
(hash-ref (mux-interest-table m) label trie-empty))
|
||||||
|
|
|
@ -24,34 +24,36 @@
|
||||||
;; Match a single value against a projection, returning a list of
|
;; Match a single value against a projection, returning a list of
|
||||||
;; captured values.
|
;; captured values.
|
||||||
(define (match-value/captures v p)
|
(define (match-value/captures v p)
|
||||||
(define captures-rev
|
(define (walk v p captures-rev)
|
||||||
(let walk ((v v) (p p) (captures-rev '()))
|
(match* (v p)
|
||||||
(match* (v p)
|
[(_ (capture sub))
|
||||||
[(_ (capture sub))
|
(match (walk v sub '())
|
||||||
(match (walk v sub '())
|
[#f #f]
|
||||||
[#f #f]
|
['() (cons v captures-rev)]
|
||||||
['() (cons v captures-rev)]
|
[_ (error 'match-value/captures "Bindings in capture sub-patterns not supported")])]
|
||||||
[_ (error 'match-value/captures "Bindings in capture sub-patterns not supported")])]
|
[(_ (predicate-match pred? sub)) #:when (pred? v)
|
||||||
[(_ (predicate-match pred? sub)) #:when (pred? v)
|
(walk v sub captures-rev)]
|
||||||
(walk v sub captures-rev)]
|
[((== ?) _)
|
||||||
[(_ (== ?))
|
captures-rev]
|
||||||
captures-rev]
|
[(_ (== ?))
|
||||||
[((cons v1 v2) (cons p1 p2))
|
captures-rev]
|
||||||
(match (walk v1 p1 captures-rev)
|
[((cons v1 v2) (cons p1 p2))
|
||||||
[#f #f]
|
(match (walk v1 p1 captures-rev)
|
||||||
[c (walk v2 p2 c)])]
|
[#f #f]
|
||||||
[((? vector? v) (? vector? p)) #:when (= (vector-length v) (vector-length p))
|
[c (walk v2 p2 c)])]
|
||||||
(for/fold [(c captures-rev)] [(vv (in-vector v)) (pp (in-vector p))]
|
[((? vector? v) (? vector? p)) #:when (= (vector-length v) (vector-length p))
|
||||||
(walk vv pp c))]
|
(for/fold [(c captures-rev)] [(vv (in-vector v)) (pp (in-vector p))]
|
||||||
[(_ _) #:when (or (treap? v) (treap? p))
|
(walk vv pp c))]
|
||||||
(error 'match-value/captures "Cannot match on treaps at present")]
|
[(_ _) #:when (or (treap? v) (treap? p))
|
||||||
[((? non-object-struct?) (? non-object-struct?))
|
(error 'match-value/captures "Cannot match on treaps at present")]
|
||||||
#:when (eq? (struct->struct-type v) (struct->struct-type p))
|
[((? non-object-struct?) (? non-object-struct?))
|
||||||
(walk (struct->vector v) (struct->vector p) captures-rev)]
|
#:when (eq? (struct->struct-type v) (struct->struct-type p))
|
||||||
[(_ _) #:when (equal? v p)
|
(walk (struct->vector v) (struct->vector p) captures-rev)]
|
||||||
captures-rev]
|
[(_ _) #:when (equal? v p)
|
||||||
[(_ _)
|
captures-rev]
|
||||||
#f])))
|
[(_ _)
|
||||||
|
#f]))
|
||||||
|
(define captures-rev (walk v p '()))
|
||||||
(and captures-rev (reverse captures-rev)))
|
(and captures-rev (reverse captures-rev)))
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
|
Loading…
Reference in New Issue