syndicate-rkt/syndicate/broker/tf.rkt

182 lines
7.4 KiB
Racket

#lang imperative-syndicate
(require imperative-syndicate/pattern)
(require/activate imperative-syndicate/drivers/timer)
(require/activate "federation2.rkt")
(assertion-struct present (who))
(message-struct says (who what))
(message-struct change-presence (who what))
(message-struct terminate (who))
(spawn #:name 'monitor
(on (message (router-outbound $name $body))
(log-info "-> ~a : ~v" name body))
(on (message (router-inbound $name $body))
(log-info " ~a ->: ~v" name body)))
(define C (capture (discard)))
(define (leaf name node)
(spawn #:name (list 'leaf name)
(stop-when (message (terminate name)))
(during (router-connection node name)
(on-start
(send! (router-inbound name (Subscribe (gensym (format "~a-P-" name)) (present C))))
(send! (router-inbound name (Subscribe (gensym (format "~a-S-" name)) (says C C)))))
(on (message (router-outbound name (Subscribe $x (says C C))))
(sleep 2)
;; We won't see our own one of these, because routers expect us to have done
;; local delivery ourselves. OHHH I am starting to get some insight into what is
;; underneath the way multicast lets you choose whether or not to see your own
;; transmissions! If you're in ~relay-node mode, you won't want to see them; if
;; you're in ~leaf mode, you will!
(send! (router-inbound name (Msg x (list name "Hello world!")))))
(on (message (router-outbound name (Subscribe $x (present C))))
(react
(field [present? #t])
(stop-when (message (router-outbound name (Unsubscribe x))))
(begin/dataflow
;; We won't see our own one of these either! For the same reasons as
;; explained above.
(if (present?)
(send! (router-inbound name (Add x (list name))))
(send! (router-inbound name (Del x (list name))))))
(on (message (change-presence name $new-presence))
(present? new-presence))))
)))
(assertion-struct to-broker (node assertion))
(assertion-struct from-broker (node assertion))
(define (leaf2 name node)
(local-require imperative-syndicate/term)
(spawn #:name (list 'leaf2 name)
;;----------------------------------------
(stop-when (message (terminate name)))
(field [present? #t])
(assert #:when (present?) (to-broker node (present name)))
(on (message (change-presence name $new-presence))
(present? new-presence))
;; TODO: Doing it this way, with the implementation in `leaf`
;; above, causes missing "absent" messages because `leaf`
;; processes don't respond to the specific `presence`
;; interests generated by the way `during` is implemented,
;; only to general ones.
;;
;; (during (from-broker node (present $who))
;; (on-start (log-info "~a: ~a present" name who))
;; (on-stop (log-info "~a: ~a absent" name who)))
(on (asserted (from-broker node (present $who))) (log-info "~a: ~a present" name who))
(on (retracted (from-broker node (present $who))) (log-info "~a: ~a absent" name who))
(on (asserted (from-broker node (observe (present _))))
(log-info "~a: someone cares about presence!" name))
(on (message (from-broker node (says $who $what)))
(log-info "~a: ~a says ~v" name who what))
;;----------------------------------------
(during (to-broker node $what)
;; This takes care of the self-signalling discussed above.
(assert (from-broker node what)))
(during (router-connection node name)
(on (message (router-outbound name (Subscribe $subid $spec)))
(react
(define (update-fn)
(values (observe (to-broker node spec))
(term->skeleton-interest
(to-broker node spec)
(capture-facet-context
(lambda (op . captures)
(schedule-script!
(current-actor)
(lambda ()
(define ctor (match op ['+ Add] ['- Del] ['! Msg]))
(send! (router-inbound name (ctor subid captures))))))))))
(add-endpoint! (current-facet) "router" #f update-fn)
(assert (from-broker node (observe spec)))
(stop-when (message (router-outbound name (Unsubscribe subid))))))
(during (observe ($ pat (from-broker node $spec)))
(define ep (gensym 'ep))
(on-start (send! (router-inbound name (Subscribe ep spec))))
(on-stop (send! (router-inbound name (Unsubscribe ep))))
(assert (from-broker node (observe spec))) ;; more self-signalling
(on (message (router-outbound name (Add ep $captures)))
(react (assert (instantiate-term->value pat captures))
(stop-when (message (router-outbound name (Del ep captures))))))
(on (message (router-outbound name (Msg ep $captures)))
(send! (instantiate-term->value pat captures))))
)))
(define (relay node1 node2)
(spawn #:name (list 'relay node1 node2)
(define node1-connid (string->symbol (format "~a->~a" node1 node2)))
(define node2-connid (string->symbol (format "~a->~a" node2 node1)))
(field [pending1 '()]
[pending2 '()])
(during (router-connection node1 node1-connid)
(on (message (router-outbound node1-connid $body))
(pending1 (cons body (pending1)))))
(during (router-connection node2 node2-connid)
(on (message (router-outbound node2-connid $body))
(pending2 (cons body (pending2)))))
(during (router-connection node1 node1-connid)
(during (router-connection node2 node2-connid)
(begin/dataflow
(when (pair? (pending1))
(for [(body (reverse (pending1)))]
(send! (router-inbound node2-connid body)))
(pending1 '())))
(begin/dataflow
(when (pair? (pending2))
(for [(body (reverse (pending2)))]
(send! (router-inbound node1-connid body)))
(pending2 '())))))))
(spawn* (define-syntax-rule (pause n action)
(begin (sleep n)
(log-info "\n********** ~v" 'action)
action))
(pause 0 (begin
(leaf 'c1 'n1)
(leaf2 'c2 'n1)
(leaf 'c3 'n2)
(leaf 'c4 'n2)
))
(pause 0.5 (relay 'n1 'n2))
(pause 0.25 (leaf2 'c5 'n3))
(pause 0.25 (relay 'n2 'n3))
(pause 0.5 'delivery-of-the-says-messages) ;; the newline is important here
(pause 1 (send! (change-presence 'c1 #f)))
(pause 0.2 (send! (change-presence 'c1 #t)))
(pause 0.2 (send! (terminate 'c1)))
(pause 0.2 (send! (terminate 'c3)))
(pause 0.2 (send! (terminate 'c2)))
(pause 0.2 (send! (terminate 'c4)))
)