syndicate-rkt/syndicate/broker/tf.rkt

109 lines
4.1 KiB
Racket
Raw Normal View History

2019-05-02 14:59:25 +00:00
#lang imperative-syndicate
(require imperative-syndicate/pattern)
(require/activate imperative-syndicate/drivers/timer)
(require/activate "federation2.rkt")
(assertion-struct present (who))
(message-struct says (who what))
(message-struct change-presence (who what))
(message-struct terminate (who))
(spawn #:name 'monitor
(on (message (router-outbound $name $body))
(printf "-> ~a : ~v\n" name body))
(on (message (router-inbound $name $body))
(printf " ~a ->: ~v\n" name body)))
(define C (capture (discard)))
(define (leaf name node)
(spawn #:name (list 'leaf name)
(stop-when (message (terminate name)))
(during (router-connection node name)
(on-start
(send! (router-inbound name (Subscribe (gensym (format "~a-P-" name)) (present C))))
(send! (router-inbound name (Subscribe (gensym (format "~a-S-" name)) (says C C)))))
(on (message (router-outbound name (Subscribe $x (says C C))))
(sleep 2)
;; We won't see our own one of these, because routers expect us to have done
;; local delivery ourselves. OHHH I am starting to get some insight into what is
;; underneath the way multicast lets you choose whether or not to see your own
;; transmissions! If you're in ~relay-node mode, you won't want to see them; if
;; you're in ~leaf mode, you will!
(send! (router-inbound name (Msg x (list name "Hello world!")))))
(on (message (router-outbound name (Subscribe $x (present C))))
(react
(field [present? #t])
(stop-when (message (router-outbound name (Unsubscribe x))))
(begin/dataflow
;; We won't see our own one of these either! For the same reasons as
;; explained above.
(if (present?)
(send! (router-inbound name (Add x (list name))))
(send! (router-inbound name (Del x (list name))))))
(on (message (change-presence name $new-presence))
(present? new-presence))))
)))
(define (relay node1 node2)
(spawn #:name (list 'relay node1 node2)
(define node1-connid (string->symbol (format "~a->~a" node1 node2)))
(define node2-connid (string->symbol (format "~a->~a" node2 node1)))
(field [pending1 '()]
[pending2 '()])
(during (router-connection node1 node1-connid)
(on (message (router-outbound node1-connid $body))
(pending1 (cons body (pending1)))))
(during (router-connection node2 node2-connid)
(on (message (router-outbound node2-connid $body))
(pending2 (cons body (pending2)))))
(during (router-connection node1 node1-connid)
(during (router-connection node2 node2-connid)
(begin/dataflow
(when (pair? (pending1))
(for [(body (reverse (pending1)))]
(send! (router-inbound node2-connid body)))
(pending1 '())))
(begin/dataflow
(when (pair? (pending2))
(for [(body (reverse (pending2)))]
(send! (router-inbound node1-connid body)))
(pending2 '())))))))
(spawn* (define-syntax-rule (pause n action)
(begin (sleep n)
(newline)
(printf "********** ~v\n" 'action)
action))
(pause 0 (begin (leaf 'c1 'n1)
(leaf 'c2 'n1)
(leaf 'c3 'n2)
(leaf 'c4 'n2)))
(pause 0.5 (relay 'n1 'n2))
(pause 0.25 (leaf 'c5 'n3))
(pause 0.25 (relay 'n2 'n3))
(pause 0.5 'delivery-of-the-says-messages) ;; the newline is important here
(pause 1 (send! (change-presence 'c1 #f)))
(pause 0.2 (send! (change-presence 'c1 #t)))
(pause 0.2 (send! (terminate 'c1)))
(pause 0.2 (send! (terminate 'c3)))
(pause 0.2 (send! (terminate 'c2)))
(pause 0.2 (send! (terminate 'c4)))
)