Connect router to dataspace

This commit is contained in:
Tony Garnock-Jones 2019-05-03 00:10:15 +01:00
parent 79e8d54a51
commit 1c2bd11373
1 changed files with 67 additions and 1 deletions

View File

@ -53,6 +53,72 @@
)))
(define (leaf2 name node)
(local-require imperative-syndicate/term)
(spawn #:name (list 'leaf2 name)
(assertion-struct to-broker (assertion))
(assertion-struct from-broker (assertion))
;;----------------------------------------
(stop-when (message (terminate name)))
(field [present? #t])
(assert #:when (present?) (to-broker (present name)))
(on (message (change-presence name $new-presence))
(present? new-presence))
;; TODO: Doing it this way, with the implementation in `leaf`
;; above, causes missing "absent" messages because `leaf`
;; processes don't respond to the specific `presence`
;; interests generated by the way `during` is implemented,
;; only to general ones.
;;
;; (during (from-broker (present $who))
;; (on-start (printf "~a: ~a present\n" name who))
;; (on-stop (printf "~a: ~a absent\n" name who)))
(on (asserted (from-broker (present $who))) (printf "~a: ~a present\n" name who))
(on (retracted (from-broker (present $who))) (printf "~a: ~a absent\n" name who))
(on (message (from-broker (says $who $what)))
(printf "~a: ~a says ~v\n" name who what))
;;----------------------------------------
(during (to-broker $what)
;; This takes care of the self-signalling discussed above.
(assert (from-broker what)))
(during (router-connection node name)
(on (message (router-outbound name (Subscribe $subid $spec)))
(react
(define (update-fn)
(values (observe (to-broker spec))
(term->skeleton-interest
(to-broker spec)
(capture-facet-context
(lambda (op . captures)
(schedule-script!
(current-actor)
(lambda ()
(define ctor (match op ['+ Add] ['- Del] ['! Msg]))
(send! (router-inbound name (ctor subid captures))))))))))
(add-endpoint! (current-facet) "router" #f update-fn)
(stop-when (message (router-outbound name (Unsubscribe subid))))))
(during (observe ($ pat (from-broker $spec)))
(define ep (gensym 'ep))
(on-start (send! (router-inbound name (Subscribe ep spec))))
(on-stop (send! (router-inbound name (Unsubscribe ep))))
(on (message (router-outbound name (Add ep $captures)))
(react (assert (instantiate-term->value pat captures))
(stop-when (message (router-outbound name (Del ep captures))))))
(on (message (router-outbound name (Msg ep $captures)))
(send! (instantiate-term->value pat captures))))
)))
(define (relay node1 node2)
(spawn #:name (list 'relay node1 node2)
(define node1-connid (string->symbol (format "~a->~a" node1 node2)))
@ -88,7 +154,7 @@
action))
(pause 0 (begin (leaf 'c1 'n1)
(leaf 'c2 'n1)
(leaf2 'c2 'n1)
(leaf 'c3 'n2)
(leaf 'c4 'n2)))