Initial broker implementation
This commit is contained in:
parent
f4f3b67cdf
commit
fc43d5c4a0
|
@ -0,0 +1,5 @@
|
|||
#lang imperative-syndicate
|
||||
|
||||
(require "wire-protocol.rkt")
|
||||
(require "protocol.rkt")
|
||||
|
|
@ -0,0 +1,32 @@
|
|||
#lang imperative-syndicate
|
||||
|
||||
(provide (all-from-out "protocol.rkt")
|
||||
(all-from-out "client.rkt")
|
||||
(all-from-out "server.rkt"))
|
||||
|
||||
(require "protocol.rkt")
|
||||
(require/activate "client.rkt")
|
||||
(require/activate "server.rkt")
|
||||
|
||||
(require/activate imperative-syndicate/drivers/tcp)
|
||||
(require/activate imperative-syndicate/drivers/web)
|
||||
|
||||
(define (main)
|
||||
(spawn #:name 'server-listener
|
||||
(define tcp-scope "broker")
|
||||
(during/spawn (tcp-connection $id (tcp-listener 8001))
|
||||
#:name `(server-connection ,tcp-scope ,id)
|
||||
(server-facet/tcp id tcp-scope))
|
||||
|
||||
(during/spawn (http-request $id 'get (http-resource (http-server _ 8000 #f)
|
||||
`(,$scope ())) _ _ _)
|
||||
#:name `(server-connection ,scope ,id)
|
||||
(server-facet/websocket id scope))))
|
||||
|
||||
(module+ main
|
||||
(let ((go (current-ground-dataspace)))
|
||||
(current-ground-dataspace
|
||||
(lambda (boot-proc)
|
||||
(go (lambda ()
|
||||
(boot-proc)
|
||||
(main)))))))
|
|
@ -0,0 +1,10 @@
|
|||
#lang imperative-syndicate
|
||||
|
||||
(provide (all-defined-out))
|
||||
|
||||
;; Client protocol
|
||||
(assertion-struct to-broker (url assertion))
|
||||
(assertion-struct from-broker (url assertion))
|
||||
(assertion-struct broker-connection (url))
|
||||
(assertion-struct broker-connected (url))
|
||||
(message-struct force-broker-disconnect (url))
|
|
@ -0,0 +1,100 @@
|
|||
#lang imperative-syndicate
|
||||
|
||||
(provide server-facet/tcp
|
||||
server-facet/websocket)
|
||||
|
||||
(require "wire-protocol.rkt")
|
||||
(require "protocol.rkt")
|
||||
(require imperative-syndicate/term)
|
||||
(require racket/set)
|
||||
|
||||
(require/activate imperative-syndicate/drivers/tcp)
|
||||
(require/activate imperative-syndicate/drivers/web)
|
||||
|
||||
;; Internal connection protocol
|
||||
(assertion-struct server-connection (connection-id scope))
|
||||
(assertion-struct server-inbound (connection-id body))
|
||||
(assertion-struct server-outbound (connection-id body))
|
||||
|
||||
;; Internal isolation
|
||||
(assertion-struct envelope (scope body))
|
||||
|
||||
(define-logger syndicate/broker)
|
||||
|
||||
(spawn #:name 'server-connection-factory
|
||||
(during/spawn (server-connection $id $scope)
|
||||
(define endpoints (set))
|
||||
|
||||
(on (message (server-inbound id (Assert $ep $a)))
|
||||
(when (not (set-member? endpoints ep))
|
||||
(set! endpoints (set-add endpoints ep))
|
||||
(react
|
||||
(on-stop (set! endpoints (set-remove endpoints ep)))
|
||||
|
||||
(field [assertion a])
|
||||
|
||||
(define (recompute-endpoint)
|
||||
(define a (assertion))
|
||||
(if (observe? a)
|
||||
(let* ((pattern (observe-specification a))
|
||||
(spec (envelope scope pattern)))
|
||||
(values (observe spec)
|
||||
(term->skeleton-interest
|
||||
spec
|
||||
(capture-facet-context
|
||||
(lambda (op . captured-values)
|
||||
(schedule-script!
|
||||
(current-actor)
|
||||
(lambda ()
|
||||
(define ctor (match op ['+ Add] ['- Del] ['! Msg]))
|
||||
(send! (server-outbound id (ctor ep captured-values))))))))))
|
||||
(values (envelope scope a) #f)))
|
||||
(add-endpoint! (current-facet) "server" #t recompute-endpoint)
|
||||
|
||||
(on (message (server-inbound id (Assert ep $new-a)))
|
||||
(assertion new-a))
|
||||
|
||||
(stop-when (message (server-inbound id (Clear ep)))))))
|
||||
|
||||
(on (message (server-inbound id (Message $body)))
|
||||
(send! (envelope scope body)))))
|
||||
|
||||
(define (server-facet/tcp id scope)
|
||||
(assert (tcp-accepted id))
|
||||
(assert (server-connection id scope))
|
||||
|
||||
(field [buffer #""])
|
||||
(begin/dataflow
|
||||
(define-values (packet remainder) (decode (buffer)))
|
||||
(when packet
|
||||
(buffer remainder)
|
||||
(send! (server-inbound id packet))))
|
||||
|
||||
(on (message (tcp-in id $bs))
|
||||
(buffer (bytes-append (buffer) bs)))
|
||||
(on (message (server-outbound id $p))
|
||||
(send! (tcp-out id (encode p)))))
|
||||
|
||||
(define (server-facet/websocket id scope)
|
||||
(assert (http-accepted id))
|
||||
(assert (http-response-websocket id))
|
||||
(assert (server-connection id scope))
|
||||
|
||||
(on (message (websocket-in id $body))
|
||||
(define-values (packet remainder) (decode body))
|
||||
(when (not (equal? remainder #""))
|
||||
(error 'server-facet/websocket "Multiple packets in a single websocket message"))
|
||||
(send! (server-inbound id packet)))
|
||||
(on (message (server-outbound id $p))
|
||||
(send! (websocket-out id (encode p)))))
|
||||
|
||||
(when (log-level? syndicate/broker-logger 'debug)
|
||||
(spawn #:name 'server-debug
|
||||
(on (asserted (server-connection $id $scope))
|
||||
(log-syndicate/broker-debug "C+ ~v ~v" id scope))
|
||||
(on (retracted (server-connection $id $scope))
|
||||
(log-syndicate/broker-debug "C- ~v ~v" id scope))
|
||||
(on (message (server-inbound $id $p))
|
||||
(log-syndicate/broker-debug "CIN ~v ~v" id p))
|
||||
(on (message (server-outbound $id $p))
|
||||
(log-syndicate/broker-debug "COUT ~v ~v" id p))))
|
|
@ -0,0 +1,27 @@
|
|||
#lang imperative-syndicate
|
||||
|
||||
(provide (all-defined-out))
|
||||
|
||||
(require (prefix-in preserves: preserves))
|
||||
(require bitsyntax)
|
||||
|
||||
;; Client --> Broker
|
||||
(message-struct Assert (endpoint-name assertion))
|
||||
(message-struct Clear (endpoint-name))
|
||||
(message-struct Message (body))
|
||||
|
||||
;; Broker --> Client
|
||||
(message-struct Add (endpoint-name captures))
|
||||
(message-struct Del (endpoint-name captures))
|
||||
(message-struct Msg (endpoint-name captures))
|
||||
|
||||
(define (decode bs)
|
||||
(parameterize ((preserves:short-form-labels '#(discard capture observe)))
|
||||
(bit-string-case bs
|
||||
#:on-short (lambda (fail) (values #f bs))
|
||||
([ (v :: (preserves:wire-value)) (rest :: binary) ] (values v (bit-string->bytes rest)))
|
||||
(else (error 'decode "Invalid wire message")))))
|
||||
|
||||
(define (encode v)
|
||||
(parameterize ((preserves:short-form-labels '#(discard capture observe)))
|
||||
(preserves:encode v)))
|
|
@ -77,19 +77,15 @@
|
|||
(when (observe? assertion)
|
||||
(define pattern (observe-specification assertion))
|
||||
(define x (mcds-outbound pattern))
|
||||
(define i (skeleton-interest
|
||||
(term->skeleton x)
|
||||
(term->skeleton-proj x)
|
||||
(term->key x)
|
||||
(term->capture-proj x)
|
||||
(define i (term->skeleton-interest
|
||||
x
|
||||
(lambda (op . captured-values)
|
||||
(when (eq? op '+)
|
||||
(define term (instantiate-term->value pattern captured-values))
|
||||
(schedule-script!
|
||||
(current-actor)
|
||||
(lambda ()
|
||||
(assert! (mcds-relevant term peer))))))
|
||||
#f))
|
||||
(assert! (mcds-relevant term peer))))))))
|
||||
(add-endpoint! (current-facet)
|
||||
"udp-dataspace (mcds-inbound (observe ...))"
|
||||
#t
|
||||
|
|
|
@ -77,11 +77,9 @@
|
|||
(define outer-capture-proj (term->capture-proj x))
|
||||
(define inner-capture-proj (map (lambda (p) (cons 0 p)) outer-capture-proj))
|
||||
;; ^ inner-capture-proj accounts for the extra (inbound ...) layer around assertions
|
||||
(define i (skeleton-interest
|
||||
(term->skeleton x)
|
||||
(term->skeleton-proj x)
|
||||
(term->key x)
|
||||
outer-capture-proj
|
||||
(define i (term->skeleton-interest
|
||||
x
|
||||
#:capture-projection outer-capture-proj
|
||||
(lambda (op . captured-values)
|
||||
(define term (inbound (instantiate-term->value x captured-values)))
|
||||
(define assertion (visibility-restriction inner-capture-proj term))
|
||||
|
@ -95,6 +93,7 @@
|
|||
['- (apply-patch! inner-ds inner-actor (bag assertion -1))]
|
||||
['! (send-assertion! (dataspace-routing-table inner-ds) assertion)])
|
||||
(schedule-inner!))
|
||||
#:cleanup
|
||||
(lambda (cache)
|
||||
(apply-patch!
|
||||
inner-ds
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
#lang racket/base
|
||||
;; Like pattern.rkt, but for dynamic use rather than compile-time use.
|
||||
|
||||
(provide term->skeleton
|
||||
(provide term->skeleton-interest
|
||||
|
||||
term->skeleton
|
||||
term->skeleton-proj
|
||||
term->key
|
||||
term->capture-proj
|
||||
|
@ -11,6 +13,18 @@
|
|||
(require racket/match)
|
||||
(require syndicate/support/struct)
|
||||
(require "pattern.rkt")
|
||||
(require "skeleton.rkt")
|
||||
|
||||
(define (term->skeleton-interest x
|
||||
#:capture-projection [capture-proj (term->capture-proj x)]
|
||||
handler
|
||||
#:cleanup [cleanup #f])
|
||||
(skeleton-interest (term->skeleton x)
|
||||
(term->skeleton-proj x)
|
||||
(term->key x)
|
||||
capture-proj
|
||||
handler
|
||||
cleanup))
|
||||
|
||||
(define (term->skeleton t)
|
||||
(let walk ((t t))
|
||||
|
|
Loading…
Reference in New Issue