syndicate-rkt/syndicate/broker/tf.rkt

189 lines
7.7 KiB
Racket

#lang imperative-syndicate
(require imperative-syndicate/pattern)
(require/activate imperative-syndicate/drivers/timer)
(require/activate "federation.rkt")
(assertion-struct present (who))
(message-struct says (who what))
(message-struct change-presence (who what))
(message-struct terminate (who))
(spawn #:name 'monitor
(on (message (router-outbound $name $body))
(log-info "-> ~a : ~v" name body))
(on (message (router-inbound $name $body))
(log-info " ~a ->: ~v" name body)))
(define C (capture (discard)))
(define (leaf name node)
(spawn #:name (list 'leaf name)
(stop-when (message (terminate name)))
(during (router-connection node name)
(on-start
(send! (router-inbound name (Subscribe (gensym (format "~a-P-" name)) (present C))))
(send! (router-inbound name (Subscribe (gensym (format "~a-S-" name)) (says C C)))))
(on (message (router-outbound name (Subscribe $x (says C C))))
(sleep 2)
;; We won't see our own one of these, because routers expect us to have done
;; local delivery ourselves. OHHH I am starting to get some insight into what is
;; underneath the way multicast lets you choose whether or not to see your own
;; transmissions! If you're in ~relay-node mode, you won't want to see them; if
;; you're in ~leaf mode, you will!
(send! (router-inbound name (Msg x (list name "Hello world!")))))
(on (message (router-outbound name (Subscribe $x (present C))))
(react
(field [present? #t])
(stop-when (message (router-outbound name (Unsubscribe x))))
(begin/dataflow
;; We won't see our own one of these either! For the same reasons as
;; explained above.
(if (present?)
(send! (router-inbound name (Add x (list name))))
(send! (router-inbound name (Del x (list name))))))
(on (message (change-presence name $new-presence))
(present? new-presence))))
)))
(assertion-struct to-broker (node assertion))
(assertion-struct from-broker (node assertion))
(define (leaf2 name node)
(local-require imperative-syndicate/term)
(spawn #:name (list 'leaf2 name)
;;----------------------------------------
(stop-when (message (terminate name)))
(field [present? #t])
(assert #:when (present?) (to-broker node (present name)))
(on (message (change-presence name $new-presence))
(present? new-presence))
;; TODO: Doing it this way, with the implementation in `leaf`
;; above, causes missing "absent" messages because `leaf`
;; processes don't respond to the specific `presence`
;; interests generated by the way `during` is implemented,
;; only to general ones.
;;
;; NB: There's a semantic difference here! In the
;; commented-out version immediately below, we care about the
;; start and stop events of the *facet*, so will get "absent"
;; messages upon clean termination. In the other version,
;; with separate asserted/retracted handlers, we will *not*
;; get "absent" messages on clean termination, because the
;; assertion is still there even as the actor terminates!
;;
;; (during (from-broker node (present $who))
;; (on-start (log-info "~a: ~a present" name who))
;; (on-stop (log-info "~a: ~a absent" name who)))
(on (asserted (from-broker node (present $who))) (log-info "~a: ~a present" name who))
(on (retracted (from-broker node (present $who))) (log-info "~a: ~a absent" name who))
(on (asserted (from-broker node (observe (present _))))
(log-info "~a: someone cares about presence!" name))
(on (message (from-broker node (says $who $what)))
(log-info "~a: ~a says ~v" name who what))
;;----------------------------------------
(during (to-broker node $what)
;; This takes care of the self-signalling discussed above.
(assert (from-broker node what)))
(during (router-connection node name)
(on (message (router-outbound name (Subscribe $subid $spec)))
(react
(let ((! (lambda (ctor)
(lambda (cs) (send! (router-inbound name (ctor subid cs)))))))
(add-observer-endpoint! (lambda () (to-broker node spec))
#:on-add (! Add)
#:on-remove (! Del)
#:on-message (! Msg)))
(assert (from-broker node (observe spec)))
(stop-when (message (router-outbound name (Unsubscribe subid))))))
(during (observe ($ pat (from-broker node $spec)))
(define ep (gensym 'ep))
(on-start (send! (router-inbound name (Subscribe ep spec))))
(on-stop (send! (router-inbound name (Unsubscribe ep))))
(assert (from-broker node (observe spec))) ;; more self-signalling
(on (message (router-outbound name (Add ep $captures)))
(react (assert (instantiate-term->value pat captures))
(stop-when (message (router-outbound name (Del ep captures))))))
(on (message (router-outbound name (Msg ep $captures)))
(send! (instantiate-term->value pat captures))))
)))
(define (relay node1 node2)
(spawn #:name (list 'relay node1 node2)
(define node1-connid (string->symbol (format "~a(~a)" node1 node2)))
(define node2-connid (string->symbol (format "~a(~a)" node2 node1)))
(field [pending1 '()]
[pending2 '()])
(stop-when (message (terminate (list 'relay node1 node2))))
(during (router-connection node1 node1-connid)
(on (message (router-outbound node1-connid $body))
(pending1 (cons body (pending1)))))
(during (router-connection node2 node2-connid)
(on (message (router-outbound node2-connid $body))
(pending2 (cons body (pending2)))))
(during (router-connection node1 node1-connid)
(during (router-connection node2 node2-connid)
(begin/dataflow
(when (pair? (pending1))
(for [(body (reverse (pending1)))]
(send! (router-inbound node2-connid body)))
(pending1 '())))
(begin/dataflow
(when (pair? (pending2))
(for [(body (reverse (pending2)))]
(send! (router-inbound node1-connid body)))
(pending2 '())))))))
(spawn* (define-syntax-rule (pause n action)
(begin (sleep n)
(log-info "\n********** ~v" 'action)
action))
(pause 0 (begin
(leaf 'c1 'n1)
(leaf2 'c2 'n1)
(leaf 'c3 'n2)
(leaf 'c4 'n2)
))
(pause 0.5 (relay 'n1 'n2))
(pause 0.25 (leaf2 'c5 'n3))
(pause 0.25 (relay 'n2 'n3))
(pause 0.5 'delivery-of-the-says-messages) ;; the newline is important here
(pause 1 (send! (change-presence 'c1 #f)))
(pause 0.2 (send! (change-presence 'c1 #t)))
(pause 0.4 (send! (terminate (list 'relay 'n1 'n2))))
(pause 0.4 (relay 'n1 'n2))
(pause 0.2 (send! (terminate 'c1)))
(pause 0.2 (send! (terminate 'c3)))
(pause 0.2 (send! (terminate 'c2)))
(pause 0.2 (send! (terminate 'c4)))
)