2016-07-12 19:05:56 +00:00
|
|
|
#lang syndicate/actor
|
2016-05-09 00:42:46 +00:00
|
|
|
;; Generic relay for WebSockets/TCP/etc-based participation in a network.
|
|
|
|
|
2016-05-15 10:59:28 +00:00
|
|
|
(provide spawn-broker-server
|
|
|
|
(struct-out broker-scope)
|
|
|
|
(struct-out broker-data))
|
2016-05-09 00:42:46 +00:00
|
|
|
|
|
|
|
(require racket/set)
|
|
|
|
(require racket/match)
|
|
|
|
(require net/rfc6455)
|
|
|
|
(require (except-in "../main.rkt" dataspace assert))
|
|
|
|
(require "../actor.rkt")
|
2016-05-10 04:25:50 +00:00
|
|
|
(require "../trie.rkt")
|
2016-05-12 16:15:41 +00:00
|
|
|
(require "../patch.rkt")
|
2016-05-09 00:42:46 +00:00
|
|
|
(require "../demand-matcher.rkt")
|
|
|
|
(require "../drivers/timer.rkt")
|
|
|
|
(require "../drivers/websocket.rkt")
|
|
|
|
(require json)
|
|
|
|
(require "protocol.rkt")
|
|
|
|
|
2016-05-16 18:18:44 +00:00
|
|
|
(define-logger syndicate-broker)
|
|
|
|
|
2016-05-15 10:56:29 +00:00
|
|
|
(struct broker-scope (host port path) #:prefab)
|
2016-05-12 16:15:41 +00:00
|
|
|
(struct broker-data (scope assertion) #:prefab)
|
|
|
|
|
|
|
|
(define broker-data-parenthesis (struct-type->parenthesis struct:broker-data))
|
|
|
|
(define broker-scope-parenthesis (struct-type->parenthesis struct:broker-scope))
|
|
|
|
|
2016-05-09 00:42:46 +00:00
|
|
|
;; Depends on timer driver and websocket driver running at the given metalevel.
|
|
|
|
(define (spawn-broker-server port
|
|
|
|
#:ssl-options [ssl-options #f])
|
2016-05-15 10:56:29 +00:00
|
|
|
(define any-client any-websocket-remote-client)
|
2016-05-09 00:42:46 +00:00
|
|
|
(define server-id (websocket-local-server port ssl-options))
|
|
|
|
(spawn-demand-matcher (advertise (websocket-message (?! any-client) server-id ?))
|
|
|
|
(observe (websocket-message (?! any-client) server-id ?))
|
|
|
|
#:meta-level 1
|
|
|
|
(lambda (c) (spawn-connection-handler c server-id))))
|
|
|
|
|
|
|
|
(define (spawn-connection-handler c server-id)
|
2016-05-15 10:56:29 +00:00
|
|
|
(actor (define scope (broker-scope (websocket-remote-client-request-host c)
|
|
|
|
(websocket-remote-client-request-port c)
|
|
|
|
(websocket-remote-client-request-path c)))
|
2016-05-12 16:15:41 +00:00
|
|
|
|
|
|
|
(define (arm-ping-timer!)
|
2016-05-09 00:42:46 +00:00
|
|
|
(send! #:meta-level 1 (set-timer c (ping-interval) 'relative)))
|
|
|
|
|
|
|
|
(define (send-event e)
|
|
|
|
(send! #:meta-level 1
|
|
|
|
(websocket-message server-id c (jsexpr->string (lift-json-event e)))))
|
|
|
|
|
|
|
|
(arm-ping-timer!)
|
|
|
|
|
2016-05-16 18:18:44 +00:00
|
|
|
(log-syndicate-broker-info "Starting broker connection from ~v" c)
|
2016-05-10 04:25:50 +00:00
|
|
|
(until (retracted (advertise (websocket-message c server-id _)) #:meta-level 1)
|
2016-05-09 00:42:46 +00:00
|
|
|
(assert (advertise (websocket-message server-id c _)) #:meta-level 1)
|
|
|
|
|
2016-05-12 14:57:34 +00:00
|
|
|
(on (asserted (websocket-peer-details server-id c _ _ $remote-addr $remote-port)
|
|
|
|
#:meta-level 1)
|
2016-05-16 18:18:44 +00:00
|
|
|
(log-syndicate-broker-info "Connection ~v is from ~a:~a" c remote-addr remote-port))
|
2016-05-12 14:57:34 +00:00
|
|
|
|
2016-05-09 00:42:46 +00:00
|
|
|
(on (message (timer-expired c _) #:meta-level 1)
|
|
|
|
(arm-ping-timer!)
|
|
|
|
(send-event 'ping))
|
|
|
|
|
|
|
|
(on (message (websocket-message c server-id $data) #:meta-level 1)
|
|
|
|
(match (drop-json-action (string->jsexpr data))
|
2016-05-10 04:25:50 +00:00
|
|
|
['ping (send-event 'pong)]
|
|
|
|
['pong (void)]
|
2016-05-12 16:15:41 +00:00
|
|
|
[(? patch? p) (patch! (log-packet c 'inbound 'patch (wrap-patch scope p)))]
|
|
|
|
[(message body) (send! (log-packet c 'inbound 'message (broker-data scope body)))]))
|
2016-05-09 00:42:46 +00:00
|
|
|
|
|
|
|
(on-event
|
2016-05-12 16:15:41 +00:00
|
|
|
[(? patch? p) (send-event (log-packet c 'outbound 'patch (unwrap-patch scope p)))]
|
|
|
|
[(message (broker-data (== scope) body))
|
|
|
|
(send-event (message (log-packet c 'outbound 'message body)))]))
|
2016-05-16 18:18:44 +00:00
|
|
|
(log-syndicate-broker-info "Ending broker connection from ~v" c)))
|
2016-05-10 05:24:47 +00:00
|
|
|
|
|
|
|
(define (log-packet c direction kind value)
|
2016-05-16 18:18:44 +00:00
|
|
|
(log-syndicate-broker-debug "Broker: ~v: ~a ~a\n~v" c direction kind value)
|
2016-05-10 05:24:47 +00:00
|
|
|
value)
|
2016-05-09 00:42:46 +00:00
|
|
|
|
2016-05-12 16:15:41 +00:00
|
|
|
(define (unwrap-patch scope p)
|
2016-05-15 10:10:51 +00:00
|
|
|
(match-define (patch added removed) p)
|
|
|
|
(patch (unwrap-trie scope added) (unwrap-trie scope removed)))
|
|
|
|
|
|
|
|
(define (unwrap-trie scope t)
|
|
|
|
(if (trie-empty? t)
|
|
|
|
t
|
|
|
|
(let ((observations (trie-step t observe-parenthesis)))
|
|
|
|
(trie-union (trie-prepend observe-parenthesis (unwrap-trie scope observations))
|
|
|
|
(trie-step* t (list broker-data-parenthesis
|
|
|
|
broker-scope-parenthesis
|
|
|
|
(broker-scope-host scope)
|
2016-05-15 10:56:29 +00:00
|
|
|
(broker-scope-port scope)
|
2016-05-15 10:10:51 +00:00
|
|
|
(broker-scope-path scope)))))))
|
2016-05-12 16:15:41 +00:00
|
|
|
|
|
|
|
(define (wrap-patch scope p)
|
|
|
|
(match-define (patch added removed) p)
|
|
|
|
(patch (wrap-trie scope added) (wrap-trie scope removed)))
|
|
|
|
|
|
|
|
(define (wrap-trie scope t)
|
2016-05-15 10:10:51 +00:00
|
|
|
(if (trie-empty? t)
|
|
|
|
t
|
|
|
|
(let ((observations (trie-step t observe-parenthesis)))
|
|
|
|
(trie-union (trie-prepend observe-parenthesis (wrap-trie scope observations))
|
|
|
|
(wrap-trie* scope t)))))
|
2016-05-12 16:15:41 +00:00
|
|
|
|
|
|
|
(define (wrap-trie* scope t)
|
|
|
|
(pattern->trie #t (broker-data scope (embedded-trie t))))
|
|
|
|
|
2016-05-09 00:42:46 +00:00
|
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
|
|
|
2016-07-12 19:05:56 +00:00
|
|
|
(require/activate syndicate/drivers/timer)
|
|
|
|
(require/activate syndicate/drivers/websocket)
|
|
|
|
|
|
|
|
(let ((ssl-options
|
|
|
|
(match (current-command-line-arguments)
|
|
|
|
[(vector c p) (websocket-ssl-options c p)]
|
|
|
|
[_ #f])))
|
|
|
|
(dataspace (schedule-action! (spawn-broker-server 8000))
|
|
|
|
(when ssl-options
|
|
|
|
(schedule-action! (spawn-broker-server 8443 #:ssl-options ssl-options)))
|
|
|
|
(forever)))
|