From 9af8adee793eb5919d017616aff64274cd42270c Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Fri, 27 Jun 2014 16:26:46 -0400 Subject: [PATCH] actorise the relay --- minimart/relay.rkt | 89 +++++++++++++++++++--------------------------- 1 file changed, 37 insertions(+), 52 deletions(-) diff --git a/minimart/relay.rkt b/minimart/relay.rkt index bc63358..093161a 100644 --- a/minimart/relay.rkt +++ b/minimart/relay.rkt @@ -42,63 +42,48 @@ ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Connections -(struct connection-state (client-id tunnelled-gestalt) #:transparent) - (define (ping-interval) (* 1000 (max (- (ws-idle-timeout) 10) (* (ws-idle-timeout) 0.8)))) (define (spawn-connection-handler c server-id) - (define (send-event e s) - (send (websocket-message server-id - (connection-state-client-id s) - (jsexpr->string (lift-json-event e))) - #:meta-level 1)) - (define ((handle-connection-routing-change g) s) - (if (gestalt-empty? g) - (transition s (quit)) ;; websocket connection closed - #f)) - (define ((handle-tunnelled-routing-change g) s) - (transition s (send-event (routing-update g) s))) - (define ((handle-tunnellable-message m) s) - (if (gestalt-accepts? (connection-state-tunnelled-gestalt s) m) - (transition s (send-event m s)) - (transition s '()))) - (define relay-connections - (gestalt-union (sub (timer-expired c ?) #:meta-level 1) - (sub (websocket-message c server-id ?) #:meta-level 1) - (sub (websocket-message c server-id ?) #:meta-level 1 #:level 1) - (pub (websocket-message server-id c ?) #:meta-level 1))) - (define (connection-handler e s) - (match e - [(routing-update g) - (sequence-transitions - (transition s '()) - (handle-connection-routing-change (gestalt-filter g relay-connections)) - (handle-tunnelled-routing-change - (gestalt-filter g (connection-state-tunnelled-gestalt s))))] - [(? message? m) - (sequence-transitions - (match m - [(message (websocket-message from to data) 1 #f) - (match (drop-json-action (string->jsexpr data)) + (actor #:name relay + #:state [tunnelled-gestalt (gestalt-empty)] + + (send #:meta-level 1 (set-timer c (ping-interval) 'relative)) + (subscribe (timer-expired c ?) + #:meta-level 1 + (send #:meta-level 1 (set-timer c (ping-interval) 'relative)) + (send-event 'ping)) + + (observe-advertisers (websocket-message c server-id ?) + #:meta-level 1 + #:presence peer-connected? + (when (not peer-connected?) (quit))) + + (advertise (websocket-message server-id c ?) #:meta-level 1) + (subscribe (websocket-message c server-id ($ data)) + #:meta-level 1 + #:run-transition (handle-incoming (drop-json-action (string->jsexpr data)))) + + (define (handle-incoming data) + (match data [(routing-update g-unfiltered) (define g (gestalt-transform g-unfiltered - (lambda (ml l p) (if (zero? ml) p '(#f . #f))))) - (transition (struct-copy connection-state s [tunnelled-gestalt g]) - (routing-update (gestalt-union g relay-connections)))] + (lambda (ml l p) (if (zero? ml) p '(#f . #f))))) + (begin-transition + #:update [tunnelled-gestalt g] + #:update-routes)] [(? message? m) - (transition s (if (zero? (message-meta-level m)) m '()))] + (begin-transition + (when (zero? (message-meta-level m)) m))] ['ping - (transition s (send-event 'pong s))] + (begin-transition (send-event 'pong))] ['pong - (transition s '())])] - [(message (timer-expired _ _) 1 #f) - (transition s (list (send (set-timer c (ping-interval) 'relative) #:meta-level 1) - (send-event 'ping s)))] - [_ - (transition s '())]) - (handle-tunnellable-message m))] - [#f #f])) - (list (send (set-timer c (ping-interval) 'relative) #:meta-level 1) - (spawn connection-handler - (connection-state c (gestalt-empty)) - relay-connections))) + (begin-transition)])) + + (observe-gestalt tunnelled-gestalt + [event ;; routing-update or message, prefiltered by tunnelled-gestalt + (send-event event)]) + + (define (send-event e) + (send #:meta-level 1 + (websocket-message server-id c (jsexpr->string (lift-json-event e)))))))