From 22ae65d05e0186da587d632da513b4421da3eeb7 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sat, 14 Jun 2014 20:52:38 -0400 Subject: [PATCH] General-purpose broker-style relay from js-marketplace --- minimart/relay.rkt | 104 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 minimart/relay.rkt diff --git a/minimart/relay.rkt b/minimart/relay.rkt new file mode 100644 index 0000000..bc63358 --- /dev/null +++ b/minimart/relay.rkt @@ -0,0 +1,104 @@ +#lang racket/base +;; Generic relay for WebSockets/TCP/etc-based participation in a network. + +(provide spawn-websocket-relay) + +(require racket/set) +(require racket/match) +(require net/rfc6455) +(require "main.rkt") +(require "demand-matcher.rkt") +(require "drivers/timer.rkt") +(require "drivers/websocket.rkt") +(require json) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Main: start WebSocket server + +;; Depends on timer driver and websocket driver running at metalevel 1. +(define (spawn-websocket-relay port [ssl-options #f]) + (define server-id (websocket-local-server port ssl-options)) + (spawn-demand-matcher (websocket-message (?! (websocket-remote-client ?)) server-id ?) + #:meta-level 1 + (lambda (c) (spawn-connection-handler c server-id)))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Wire protocol representation of events and actions + +(define (drop-json-action j) + (match j + ["ping" 'ping] + ["pong" 'pong] + [`("routes" ,gj) (routing-update (jsexpr->gestalt gj (lambda (v) (set 'peer))))] + [`("message" ,body ,meta-level ,feedback?) (message body meta-level feedback?)])) + +(define (lift-json-event j) + (match j + ['ping "ping"] + ['pong "pong"] + [(routing-update g) `("routes" ,(gestalt->jsexpr g (lambda (v) #t)))] + [(message body meta-level feedback?) `("message" ,body ,meta-level ,feedback?)])) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Connections + +(struct connection-state (client-id tunnelled-gestalt) #:transparent) + +(define (ping-interval) (* 1000 (max (- (ws-idle-timeout) 10) (* (ws-idle-timeout) 0.8)))) + +(define (spawn-connection-handler c server-id) + (define (send-event e s) + (send (websocket-message server-id + (connection-state-client-id s) + (jsexpr->string (lift-json-event e))) + #:meta-level 1)) + (define ((handle-connection-routing-change g) s) + (if (gestalt-empty? g) + (transition s (quit)) ;; websocket connection closed + #f)) + (define ((handle-tunnelled-routing-change g) s) + (transition s (send-event (routing-update g) s))) + (define ((handle-tunnellable-message m) s) + (if (gestalt-accepts? (connection-state-tunnelled-gestalt s) m) + (transition s (send-event m s)) + (transition s '()))) + (define relay-connections + (gestalt-union (sub (timer-expired c ?) #:meta-level 1) + (sub (websocket-message c server-id ?) #:meta-level 1) + (sub (websocket-message c server-id ?) #:meta-level 1 #:level 1) + (pub (websocket-message server-id c ?) #:meta-level 1))) + (define (connection-handler e s) + (match e + [(routing-update g) + (sequence-transitions + (transition s '()) + (handle-connection-routing-change (gestalt-filter g relay-connections)) + (handle-tunnelled-routing-change + (gestalt-filter g (connection-state-tunnelled-gestalt s))))] + [(? message? m) + (sequence-transitions + (match m + [(message (websocket-message from to data) 1 #f) + (match (drop-json-action (string->jsexpr data)) + [(routing-update g-unfiltered) + (define g (gestalt-transform g-unfiltered + (lambda (ml l p) (if (zero? ml) p '(#f . #f))))) + (transition (struct-copy connection-state s [tunnelled-gestalt g]) + (routing-update (gestalt-union g relay-connections)))] + [(? message? m) + (transition s (if (zero? (message-meta-level m)) m '()))] + ['ping + (transition s (send-event 'pong s))] + ['pong + (transition s '())])] + [(message (timer-expired _ _) 1 #f) + (transition s (list (send (set-timer c (ping-interval) 'relative) #:meta-level 1) + (send-event 'ping s)))] + [_ + (transition s '())]) + (handle-tunnellable-message m))] + [#f #f])) + (list (send (set-timer c (ping-interval) 'relative) #:meta-level 1) + (spawn connection-handler + (connection-state c (gestalt-empty)) + relay-connections)))