From 1c2bd113738b3aba8aa93b804180d0b672c1c76f Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Fri, 3 May 2019 00:10:15 +0100 Subject: [PATCH] Connect router to dataspace --- syndicate/broker/tf.rkt | 68 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 67 insertions(+), 1 deletion(-) diff --git a/syndicate/broker/tf.rkt b/syndicate/broker/tf.rkt index 8753f1e..817f97d 100644 --- a/syndicate/broker/tf.rkt +++ b/syndicate/broker/tf.rkt @@ -53,6 +53,72 @@ ))) +(define (leaf2 name node) + (local-require imperative-syndicate/term) + (spawn #:name (list 'leaf2 name) + (assertion-struct to-broker (assertion)) + (assertion-struct from-broker (assertion)) + + ;;---------------------------------------- + + (stop-when (message (terminate name))) + + (field [present? #t]) + (assert #:when (present?) (to-broker (present name))) + (on (message (change-presence name $new-presence)) + (present? new-presence)) + + ;; TODO: Doing it this way, with the implementation in `leaf` + ;; above, causes missing "absent" messages because `leaf` + ;; processes don't respond to the specific `presence` + ;; interests generated by the way `during` is implemented, + ;; only to general ones. + ;; + ;; (during (from-broker (present $who)) + ;; (on-start (printf "~a: ~a present\n" name who)) + ;; (on-stop (printf "~a: ~a absent\n" name who))) + + (on (asserted (from-broker (present $who))) (printf "~a: ~a present\n" name who)) + (on (retracted (from-broker (present $who))) (printf "~a: ~a absent\n" name who)) + + (on (message (from-broker (says $who $what))) + (printf "~a: ~a says ~v\n" name who what)) + + ;;---------------------------------------- + + (during (to-broker $what) + ;; This takes care of the self-signalling discussed above. + (assert (from-broker what))) + + (during (router-connection node name) + (on (message (router-outbound name (Subscribe $subid $spec))) + (react + (define (update-fn) + (values (observe (to-broker spec)) + (term->skeleton-interest + (to-broker spec) + (capture-facet-context + (lambda (op . captures) + (schedule-script! + (current-actor) + (lambda () + (define ctor (match op ['+ Add] ['- Del] ['! Msg])) + (send! (router-inbound name (ctor subid captures)))))))))) + (add-endpoint! (current-facet) "router" #f update-fn) + (stop-when (message (router-outbound name (Unsubscribe subid)))))) + + (during (observe ($ pat (from-broker $spec))) + (define ep (gensym 'ep)) + (on-start (send! (router-inbound name (Subscribe ep spec)))) + (on-stop (send! (router-inbound name (Unsubscribe ep)))) + (on (message (router-outbound name (Add ep $captures))) + (react (assert (instantiate-term->value pat captures)) + (stop-when (message (router-outbound name (Del ep captures)))))) + (on (message (router-outbound name (Msg ep $captures))) + (send! (instantiate-term->value pat captures)))) + + ))) + (define (relay node1 node2) (spawn #:name (list 'relay node1 node2) (define node1-connid (string->symbol (format "~a->~a" node1 node2))) @@ -88,7 +154,7 @@ action)) (pause 0 (begin (leaf 'c1 'n1) - (leaf 'c2 'n1) + (leaf2 'c2 'n1) (leaf 'c3 'n2) (leaf 'c4 'n2)))