From aef3490a207bc8af5304b040750f2287b6b1cc58 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 20 Jun 2019 11:55:29 +0100 Subject: [PATCH] Proper heartbeats --- syndicate/distributed/client.rkt | 9 +++++ syndicate/distributed/client/tcp.rkt | 3 +- syndicate/distributed/heartbeat.rkt | 40 +++++++++++++++++++++ syndicate/distributed/internal-protocol.rkt | 1 + syndicate/distributed/server.rkt | 8 +++++ syndicate/distributed/server/tcp.rkt | 1 + syndicate/distributed/server/websocket.rkt | 8 +---- syndicate/distributed/wire-protocol.rkt | 12 +++++-- 8 files changed, 72 insertions(+), 10 deletions(-) create mode 100644 syndicate/distributed/heartbeat.rkt diff --git a/syndicate/distributed/client.rkt b/syndicate/distributed/client.rkt index 11c1b6f..9486f0c 100644 --- a/syndicate/distributed/client.rkt +++ b/syndicate/distributed/client.rkt @@ -8,6 +8,8 @@ (require "turn.rkt") (require imperative-syndicate/term) +(require/activate "heartbeat.rkt") + (define-logger syndicate/distributed) (spawn #:name 'client-factory @@ -69,6 +71,13 @@ (extend-turn! turn (Clear (hash-ref subs spec))) (set! subs (hash-remove subs spec))) + (define reset-heartbeat! (heartbeat (list 'client address scope) + w + (lambda () (stop-current-facet)))) + + (on (message (server-packet address _)) + (reset-heartbeat!)) + (on (message (server-packet address (Ping))) (w (Pong))) diff --git a/syndicate/distributed/client/tcp.rkt b/syndicate/distributed/client/tcp.rkt index 3b1bc3f..083e745 100644 --- a/syndicate/distributed/client/tcp.rkt +++ b/syndicate/distributed/client/tcp.rkt @@ -16,7 +16,8 @@ (reassert-on (tcp-connection id (tcp-address host port)) (retracted (tcp-accepted id)) (asserted (tcp-rejected id _)) - (retracted (server-transport-connected address))) + (retracted (server-transport-connected address)) + (retracted (server-connected address))) (during (tcp-accepted id) (on-start (issue-unbounded-credit! tcp-in id)) diff --git a/syndicate/distributed/heartbeat.rkt b/syndicate/distributed/heartbeat.rkt new file mode 100644 index 0000000..86059c4 --- /dev/null +++ b/syndicate/distributed/heartbeat.rkt @@ -0,0 +1,40 @@ +#lang imperative-syndicate + +(provide heartbeat) + +(require "wire-protocol.rkt") + +(require/activate imperative-syndicate/drivers/timer) + +(define-logger syndicate/distributed) + +;; TODO: move heartbeats to transport level, and use separate transport-activity timeouts from +;; message-activity timeouts. Using message-activity only has problems when messages are large +;; and links are slow. Also, moving to transport level lets us use e.g. WebSocket's ping +;; mechanism rather than a message-level mechanism. +(define (heartbeat who send-message teardown) + (define period (ping-interval)) + (define grace (* 3 period)) + + (log-syndicate/distributed-debug + "Peer ~v heartbeat period ~ams; must not experience silence longer than ~ams" + who period grace) + + (field [next-ping-time 0]) ;; when we are to send the next ping + (field [last-received-traffic (current-inexact-milliseconds)]) ;; when we last heard from the peer + + (define (schedule-next-ping!) + (next-ping-time (+ (current-inexact-milliseconds) period))) + + (on (asserted (later-than (next-ping-time))) + (schedule-next-ping!) + (send-message (Ping))) + + (on (asserted (later-than (+ (last-received-traffic) grace))) + (log-syndicate/distributed-info "Peer ~v heartbeat timeout after ~ams of inactivity" + who grace) + (teardown)) + + (lambda () + (schedule-next-ping!) + (last-received-traffic (current-inexact-milliseconds)))) diff --git a/syndicate/distributed/internal-protocol.rkt b/syndicate/distributed/internal-protocol.rkt index 07b519d..f1b299d 100644 --- a/syndicate/distributed/internal-protocol.rkt +++ b/syndicate/distributed/internal-protocol.rkt @@ -14,6 +14,7 @@ ;; Internal connection protocol (assertion-struct server-poa (connection-id)) ;; "Point of Attachment" +(assertion-struct server-poa-ready (connection-id)) (assertion-struct message-poa->server (connection-id body)) (assertion-struct message-server->poa (connection-id body)) diff --git a/syndicate/distributed/server.rkt b/syndicate/distributed/server.rkt index ff4455c..3483afe 100644 --- a/syndicate/distributed/server.rkt +++ b/syndicate/distributed/server.rkt @@ -4,6 +4,8 @@ (require "internal-protocol.rkt") (require "turn.rkt") +(require/activate "heartbeat.rkt") + (spawn #:name 'server-factory ;; Previously, we just had server-envelope. Now, we have both @@ -24,6 +26,7 @@ (during/spawn (server-poa $id) (define root-facet (current-facet)) + (assert (server-poa-ready id)) (on-start (match (let-event [(message (message-poa->server id $p))] p) [(Connect scope) (react (connected id scope root-facet))] @@ -41,7 +44,12 @@ (reset-turn! turn) (stop-facet root-facet)) + (define reset-heartbeat! (heartbeat (list 'server id scope) + (lambda (m) (send! (message-server->poa id m))) + (lambda () (stop-facet root-facet)))) + (on (message (message-poa->server id $p)) + (reset-heartbeat!) (match p [(Turn items) (for [(item (in-list items))] diff --git a/syndicate/distributed/server/tcp.rkt b/syndicate/distributed/server/tcp.rkt index 260d19d..08497a7 100644 --- a/syndicate/distributed/server/tcp.rkt +++ b/syndicate/distributed/server/tcp.rkt @@ -13,6 +13,7 @@ (define (server-facet/tcp id) (assert (tcp-accepted id)) (assert (server-poa id)) + (stop-when (retracted (server-poa-ready id))) (on-start (issue-unbounded-credit! tcp-in id)) (define accumulate! (packet-accumulator (lambda (p) (send! (message-poa->server id p))))) (on (message (tcp-in id $bs)) diff --git a/syndicate/distributed/server/websocket.rkt b/syndicate/distributed/server/websocket.rkt index 313ab6a..a63ee70 100644 --- a/syndicate/distributed/server/websocket.rkt +++ b/syndicate/distributed/server/websocket.rkt @@ -10,19 +10,13 @@ (require imperative-syndicate/protocol/credit) (require/activate imperative-syndicate/drivers/web) -(require/activate imperative-syndicate/drivers/timer) (require/activate imperative-syndicate/distributed/server) (define (server-facet/websocket id) (assert (http-accepted id)) (assert (http-response-websocket id)) (assert (server-poa id)) - - (field [ping-time-deadline 0]) - (on (asserted (later-than (ping-time-deadline))) - (ping-time-deadline (+ (current-inexact-milliseconds) (ping-interval))) - (send! (message-server->poa id (Ping)))) - + (stop-when (retracted (server-poa-ready id))) (on (message (websocket-in id $body)) (define-values (packet remainder) (decode body)) (when (not (equal? remainder #"")) diff --git a/syndicate/distributed/wire-protocol.rkt b/syndicate/distributed/wire-protocol.rkt index c22606e..bae7657 100644 --- a/syndicate/distributed/wire-protocol.rkt +++ b/syndicate/distributed/wire-protocol.rkt @@ -45,8 +45,16 @@ (preserves:encode v))) (define (ping-interval) - (* 1000 (max (- (ws-idle-timeout) 10) - (* (ws-idle-timeout) 0.8)))) + (* 1000 (min 60 ;; reasonable default? + ;; + ;; TODO: disable the net/rfc6455 ws-idle-timeout, when we can. + ;; + ;; The net/rfc6455 ws-idle-timeout has to be paid attention to here because it + ;; can't be disabled, because the built-in webserver (which net/rfc6455 + ;; interoperates with) has a per-connection timer that also can't be disabled. + ;; + (max (- (ws-idle-timeout) 10) + (* (ws-idle-timeout) 0.8))))) (define (packet-accumulator handle-packet!) (field [buffer #""])