Abstract broker over spatial separation syntax used
This commit is contained in:
parent
97bb848611
commit
ae6c5a409f
|
@ -14,6 +14,7 @@
|
|||
;; (require (except-in "../main.rkt" dataspace assert))
|
||||
;; (require "../actor.rkt")
|
||||
(require syndicate/trie)
|
||||
(require syndicate/pattern)
|
||||
(require syndicate/patch)
|
||||
(require syndicate/demand-matcher)
|
||||
(require syndicate/protocol/advertise)
|
||||
|
@ -42,7 +43,15 @@
|
|||
(when (web-request-header-websocket-upgrade? req)
|
||||
(spawn-broker-server-connection id req)))))
|
||||
|
||||
(define (spawn-broker-server-connection req-id http-req)
|
||||
(define (http-req->scope http-req)
|
||||
(define http-resource (web-request-header-resource http-req))
|
||||
(define http-vh (web-resource-virtual-host http-resource))
|
||||
(define scope (broker-scope (web-virtual-host-name http-vh)
|
||||
(web-virtual-host-port http-vh)
|
||||
(resource-path->string (web-resource-path http-resource))))
|
||||
(lambda (v) (broker-data scope v)))
|
||||
|
||||
(define (spawn-broker-server-connection req-id http-req #:scope [scope (http-req->scope http-req)])
|
||||
(actor #:name (list 'broker:connection req-id)
|
||||
|
||||
(on-start (log-syndicate-broker-info "Starting broker connection ~v" req-id))
|
||||
|
@ -56,12 +65,6 @@
|
|||
(define (send-event e)
|
||||
(websocket-message-send! req-id (jsexpr->string (lift-json-event e))))
|
||||
|
||||
(define http-resource (web-request-header-resource http-req))
|
||||
(define http-vh (web-resource-virtual-host http-resource))
|
||||
(define scope (broker-scope (web-virtual-host-name http-vh)
|
||||
(web-virtual-host-port http-vh)
|
||||
(resource-path->string (web-resource-path http-resource))))
|
||||
|
||||
(field [ping-time-deadline 0])
|
||||
(on (asserted (later-than (ping-time-deadline)))
|
||||
(ping-time-deadline (+ (current-inexact-milliseconds) (ping-interval)))
|
||||
|
@ -72,12 +75,14 @@
|
|||
['ping (send-event 'pong)]
|
||||
['pong (void)]
|
||||
[(? patch? p) (patch! (log-packet req-id 'inbound 'patch (wrap-patch scope p)))]
|
||||
[(message body) (send! (log-packet req-id 'inbound 'message (broker-data scope body)))]))
|
||||
[(message body) (send! (log-packet req-id 'inbound 'message (scope body)))]))
|
||||
|
||||
(on-event
|
||||
[(? patch? p) (send-event (log-packet req-id 'outbound 'patch (unwrap-patch scope p)))]
|
||||
[(message (broker-data (== scope) body))
|
||||
(send-event (message (log-packet req-id 'outbound 'message body)))])))
|
||||
[(message scoped-body)
|
||||
(match (match-value/captures scoped-body (scope (?!)))
|
||||
[(list body) (send-event (message (log-packet req-id 'outbound 'message body)))]
|
||||
[_ (void)])])))
|
||||
|
||||
(define (log-packet c direction kind value)
|
||||
(log-syndicate-broker-debug "Broker: ~v: ~a ~a\n~v" c direction kind value)
|
||||
|
@ -87,27 +92,22 @@
|
|||
(match-define (patch added removed) p)
|
||||
(patch (unwrap-trie scope added) (unwrap-trie scope removed)))
|
||||
|
||||
(define (unwrap-trie scope t)
|
||||
(if (trie-empty? t)
|
||||
t
|
||||
(let ((observations (trie-step t observe-parenthesis)))
|
||||
(trie-union (trie-prepend observe-parenthesis (unwrap-trie scope observations))
|
||||
(trie-step* t (list broker-data-parenthesis
|
||||
broker-scope-parenthesis
|
||||
(broker-scope-host scope)
|
||||
(broker-scope-port scope)
|
||||
(broker-scope-path scope)))))))
|
||||
|
||||
(define (wrap-patch scope p)
|
||||
(match-define (patch added removed) p)
|
||||
(patch (wrap-trie scope added) (wrap-trie scope removed)))
|
||||
|
||||
(define (wrap-trie scope t)
|
||||
(define (lift-beneath-observation f t)
|
||||
(if (trie-empty? t)
|
||||
t
|
||||
(let ((observations (trie-step t observe-parenthesis)))
|
||||
(trie-union (trie-prepend observe-parenthesis (wrap-trie scope observations))
|
||||
(pattern->trie #t (broker-data scope (embedded-trie t)))))))
|
||||
(trie-union (trie-prepend observe-parenthesis (lift-beneath-observation f observations))
|
||||
(f t)))))
|
||||
|
||||
(define (unwrap-trie scope t)
|
||||
(lift-beneath-observation (lambda (t) (trie-project t (scope (?!)))) t))
|
||||
|
||||
(define (wrap-trie scope t)
|
||||
(lift-beneath-observation (lambda (t) (pattern->trie #t (scope (embedded-trie t)))) t))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
|
|
Loading…
Reference in New Issue