Scoped broker connections
This commit is contained in:
parent
f486f93bd4
commit
c6cfa2fe87
|
@ -9,16 +9,23 @@
|
|||
(require (except-in "../main.rkt" dataspace assert))
|
||||
(require "../actor.rkt")
|
||||
(require "../trie.rkt")
|
||||
(require "../patch.rkt")
|
||||
(require "../demand-matcher.rkt")
|
||||
(require "../drivers/timer.rkt")
|
||||
(require "../drivers/websocket.rkt")
|
||||
(require json)
|
||||
(require "protocol.rkt")
|
||||
|
||||
(struct broker-scope (host path) #:prefab)
|
||||
(struct broker-data (scope assertion) #:prefab)
|
||||
|
||||
(define broker-data-parenthesis (struct-type->parenthesis struct:broker-data))
|
||||
(define broker-scope-parenthesis (struct-type->parenthesis struct:broker-scope))
|
||||
|
||||
;; Depends on timer driver and websocket driver running at the given metalevel.
|
||||
(define (spawn-broker-server port
|
||||
#:ssl-options [ssl-options #f])
|
||||
(define any-client (websocket-remote-client ?))
|
||||
(define any-client (websocket-remote-client ? ? ?))
|
||||
(define server-id (websocket-local-server port ssl-options))
|
||||
(spawn-demand-matcher (advertise (websocket-message (?! any-client) server-id ?))
|
||||
(observe (websocket-message (?! any-client) server-id ?))
|
||||
|
@ -26,7 +33,10 @@
|
|||
(lambda (c) (spawn-connection-handler c server-id))))
|
||||
|
||||
(define (spawn-connection-handler c server-id)
|
||||
(actor (define (arm-ping-timer!)
|
||||
(actor (define scope (broker-scope (websocket-remote-client-host c)
|
||||
(websocket-remote-client-path c)))
|
||||
|
||||
(define (arm-ping-timer!)
|
||||
(send! #:meta-level 1 (set-timer c (ping-interval) 'relative)))
|
||||
|
||||
(define (send-event e)
|
||||
|
@ -51,20 +61,37 @@
|
|||
(match (drop-json-action (string->jsexpr data))
|
||||
['ping (send-event 'pong)]
|
||||
['pong (void)]
|
||||
[(? patch? p) (patch! (log-packet c 'inbound 'patch (patch-without-at-meta p)))]
|
||||
[(message (at-meta _)) (void)]
|
||||
[(message body) (send! (log-packet c 'inbound 'message body))]))
|
||||
[(? patch? p) (patch! (log-packet c 'inbound 'patch (wrap-patch scope p)))]
|
||||
[(message body) (send! (log-packet c 'inbound 'message (broker-data scope body)))]))
|
||||
|
||||
(on-event
|
||||
[(? patch? p) (send-event (log-packet c 'outbound 'patch (clean-patch p)))]
|
||||
[(message (at-meta _)) #f]
|
||||
[(message body) (send-event (message (log-packet c 'outbound 'message body)))]))
|
||||
[(? patch? p) (send-event (log-packet c 'outbound 'patch (unwrap-patch scope p)))]
|
||||
[(message (broker-data (== scope) body))
|
||||
(send-event (message (log-packet c 'outbound 'message body)))]))
|
||||
(log-info "\nEnding broker connection from ~v" c)))
|
||||
|
||||
(define (log-packet c direction kind value)
|
||||
(log-info "\nBroker: ~v: ~a ~a\n~v" c direction kind value)
|
||||
value)
|
||||
|
||||
(define (unwrap-patch scope p)
|
||||
(patch-step* p (list broker-data-parenthesis
|
||||
broker-scope-parenthesis
|
||||
(broker-scope-host scope)
|
||||
(broker-scope-path scope))))
|
||||
|
||||
(define (wrap-patch scope p)
|
||||
(match-define (patch added removed) p)
|
||||
(patch (wrap-trie scope added) (wrap-trie scope removed)))
|
||||
|
||||
(define (wrap-trie scope t)
|
||||
(define observations (trie-step t observe-parenthesis))
|
||||
(trie-union (trie-prepend observe-parenthesis (wrap-trie* scope observations))
|
||||
(wrap-trie* scope t)))
|
||||
|
||||
(define (wrap-trie* scope t)
|
||||
(pattern->trie #t (broker-data scope (embedded-trie t))))
|
||||
|
||||
(define stuff-to-prune
|
||||
(trie-union-all #:combiner (lambda (v1 v2) (trie-success #t))
|
||||
(list (pattern->trie #t (at-meta ?))
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
patch-pruned-by
|
||||
patch-without-at-meta
|
||||
patch-step
|
||||
patch-step*
|
||||
only-meta-tset
|
||||
compute-aggregate-patch
|
||||
apply-patch
|
||||
|
@ -155,6 +156,9 @@
|
|||
(patch (trie-step added key)
|
||||
(trie-step removed key)))
|
||||
|
||||
(define (patch-step* p keys)
|
||||
(foldl (lambda (key p) (patch-step p key)) p keys))
|
||||
|
||||
(define only-meta-tset (datum-tset 'meta))
|
||||
|
||||
;; Entries labelled with `label` may already exist in `base`; the
|
||||
|
|
|
@ -660,7 +660,7 @@
|
|||
[((? branch?) (? open-parenthesis?))
|
||||
(rlookup-open m key)]
|
||||
[((? branch?) _)
|
||||
(rlookup-sigma m key)]
|
||||
(rlookup-sigma m (canonicalize key))]
|
||||
[(_ _) trie-empty]))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
|
Loading…
Reference in New Issue