Connect router to dataspace
This commit is contained in:
parent
b09aef3912
commit
eb2e4c9fcb
|
@ -53,6 +53,72 @@
|
|||
|
||||
)))
|
||||
|
||||
(define (leaf2 name node)
|
||||
(local-require imperative-syndicate/term)
|
||||
(spawn #:name (list 'leaf2 name)
|
||||
(assertion-struct to-broker (assertion))
|
||||
(assertion-struct from-broker (assertion))
|
||||
|
||||
;;----------------------------------------
|
||||
|
||||
(stop-when (message (terminate name)))
|
||||
|
||||
(field [present? #t])
|
||||
(assert #:when (present?) (to-broker (present name)))
|
||||
(on (message (change-presence name $new-presence))
|
||||
(present? new-presence))
|
||||
|
||||
;; TODO: Doing it this way, with the implementation in `leaf`
|
||||
;; above, causes missing "absent" messages because `leaf`
|
||||
;; processes don't respond to the specific `presence`
|
||||
;; interests generated by the way `during` is implemented,
|
||||
;; only to general ones.
|
||||
;;
|
||||
;; (during (from-broker (present $who))
|
||||
;; (on-start (printf "~a: ~a present\n" name who))
|
||||
;; (on-stop (printf "~a: ~a absent\n" name who)))
|
||||
|
||||
(on (asserted (from-broker (present $who))) (printf "~a: ~a present\n" name who))
|
||||
(on (retracted (from-broker (present $who))) (printf "~a: ~a absent\n" name who))
|
||||
|
||||
(on (message (from-broker (says $who $what)))
|
||||
(printf "~a: ~a says ~v\n" name who what))
|
||||
|
||||
;;----------------------------------------
|
||||
|
||||
(during (to-broker $what)
|
||||
;; This takes care of the self-signalling discussed above.
|
||||
(assert (from-broker what)))
|
||||
|
||||
(during (router-connection node name)
|
||||
(on (message (router-outbound name (Subscribe $subid $spec)))
|
||||
(react
|
||||
(define (update-fn)
|
||||
(values (observe (to-broker spec))
|
||||
(term->skeleton-interest
|
||||
(to-broker spec)
|
||||
(capture-facet-context
|
||||
(lambda (op . captures)
|
||||
(schedule-script!
|
||||
(current-actor)
|
||||
(lambda ()
|
||||
(define ctor (match op ['+ Add] ['- Del] ['! Msg]))
|
||||
(send! (router-inbound name (ctor subid captures))))))))))
|
||||
(add-endpoint! (current-facet) "router" #f update-fn)
|
||||
(stop-when (message (router-outbound name (Unsubscribe subid))))))
|
||||
|
||||
(during (observe ($ pat (from-broker $spec)))
|
||||
(define ep (gensym 'ep))
|
||||
(on-start (send! (router-inbound name (Subscribe ep spec))))
|
||||
(on-stop (send! (router-inbound name (Unsubscribe ep))))
|
||||
(on (message (router-outbound name (Add ep $captures)))
|
||||
(react (assert (instantiate-term->value pat captures))
|
||||
(stop-when (message (router-outbound name (Del ep captures))))))
|
||||
(on (message (router-outbound name (Msg ep $captures)))
|
||||
(send! (instantiate-term->value pat captures))))
|
||||
|
||||
)))
|
||||
|
||||
(define (relay node1 node2)
|
||||
(spawn #:name (list 'relay node1 node2)
|
||||
(define node1-connid (string->symbol (format "~a->~a" node1 node2)))
|
||||
|
@ -88,7 +154,7 @@
|
|||
action))
|
||||
|
||||
(pause 0 (begin (leaf 'c1 'n1)
|
||||
(leaf 'c2 'n1)
|
||||
(leaf2 'c2 'n1)
|
||||
(leaf 'c3 'n2)
|
||||
(leaf 'c4 'n2)))
|
||||
|
||||
|
|
Loading…
Reference in New Issue