Proper heartbeats

This commit is contained in:
Tony Garnock-Jones 2019-06-20 11:55:29 +01:00
parent c122253681
commit aef3490a20
8 changed files with 72 additions and 10 deletions

View File

@ -8,6 +8,8 @@
(require "turn.rkt")
(require imperative-syndicate/term)
(require/activate "heartbeat.rkt")
(define-logger syndicate/distributed)
(spawn #:name 'client-factory
@ -69,6 +71,13 @@
(extend-turn! turn (Clear (hash-ref subs spec)))
(set! subs (hash-remove subs spec)))
(define reset-heartbeat! (heartbeat (list 'client address scope)
w
(lambda () (stop-current-facet))))
(on (message (server-packet address _))
(reset-heartbeat!))
(on (message (server-packet address (Ping)))
(w (Pong)))

View File

@ -16,7 +16,8 @@
(reassert-on (tcp-connection id (tcp-address host port))
(retracted (tcp-accepted id))
(asserted (tcp-rejected id _))
(retracted (server-transport-connected address)))
(retracted (server-transport-connected address))
(retracted (server-connected address)))
(during (tcp-accepted id)
(on-start (issue-unbounded-credit! tcp-in id))

View File

@ -0,0 +1,40 @@
#lang imperative-syndicate
(provide heartbeat)
(require "wire-protocol.rkt")
(require/activate imperative-syndicate/drivers/timer)
(define-logger syndicate/distributed)
;; TODO: move heartbeats to transport level, and use separate transport-activity timeouts from
;; message-activity timeouts. Using message-activity only has problems when messages are large
;; and links are slow. Also, moving to transport level lets us use e.g. WebSocket's ping
;; mechanism rather than a message-level mechanism.
(define (heartbeat who send-message teardown)
(define period (ping-interval))
(define grace (* 3 period))
(log-syndicate/distributed-debug
"Peer ~v heartbeat period ~ams; must not experience silence longer than ~ams"
who period grace)
(field [next-ping-time 0]) ;; when we are to send the next ping
(field [last-received-traffic (current-inexact-milliseconds)]) ;; when we last heard from the peer
(define (schedule-next-ping!)
(next-ping-time (+ (current-inexact-milliseconds) period)))
(on (asserted (later-than (next-ping-time)))
(schedule-next-ping!)
(send-message (Ping)))
(on (asserted (later-than (+ (last-received-traffic) grace)))
(log-syndicate/distributed-info "Peer ~v heartbeat timeout after ~ams of inactivity"
who grace)
(teardown))
(lambda ()
(schedule-next-ping!)
(last-received-traffic (current-inexact-milliseconds))))

View File

@ -14,6 +14,7 @@
;; Internal connection protocol
(assertion-struct server-poa (connection-id)) ;; "Point of Attachment"
(assertion-struct server-poa-ready (connection-id))
(assertion-struct message-poa->server (connection-id body))
(assertion-struct message-server->poa (connection-id body))

View File

@ -4,6 +4,8 @@
(require "internal-protocol.rkt")
(require "turn.rkt")
(require/activate "heartbeat.rkt")
(spawn #:name 'server-factory
;; Previously, we just had server-envelope. Now, we have both
@ -24,6 +26,7 @@
(during/spawn (server-poa $id)
(define root-facet (current-facet))
(assert (server-poa-ready id))
(on-start
(match (let-event [(message (message-poa->server id $p))] p)
[(Connect scope) (react (connected id scope root-facet))]
@ -41,7 +44,12 @@
(reset-turn! turn)
(stop-facet root-facet))
(define reset-heartbeat! (heartbeat (list 'server id scope)
(lambda (m) (send! (message-server->poa id m)))
(lambda () (stop-facet root-facet))))
(on (message (message-poa->server id $p))
(reset-heartbeat!)
(match p
[(Turn items)
(for [(item (in-list items))]

View File

@ -13,6 +13,7 @@
(define (server-facet/tcp id)
(assert (tcp-accepted id))
(assert (server-poa id))
(stop-when (retracted (server-poa-ready id)))
(on-start (issue-unbounded-credit! tcp-in id))
(define accumulate! (packet-accumulator (lambda (p) (send! (message-poa->server id p)))))
(on (message (tcp-in id $bs))

View File

@ -10,19 +10,13 @@
(require imperative-syndicate/protocol/credit)
(require/activate imperative-syndicate/drivers/web)
(require/activate imperative-syndicate/drivers/timer)
(require/activate imperative-syndicate/distributed/server)
(define (server-facet/websocket id)
(assert (http-accepted id))
(assert (http-response-websocket id))
(assert (server-poa id))
(field [ping-time-deadline 0])
(on (asserted (later-than (ping-time-deadline)))
(ping-time-deadline (+ (current-inexact-milliseconds) (ping-interval)))
(send! (message-server->poa id (Ping))))
(stop-when (retracted (server-poa-ready id)))
(on (message (websocket-in id $body))
(define-values (packet remainder) (decode body))
(when (not (equal? remainder #""))

View File

@ -45,8 +45,16 @@
(preserves:encode v)))
(define (ping-interval)
(* 1000 (max (- (ws-idle-timeout) 10)
(* (ws-idle-timeout) 0.8))))
(* 1000 (min 60 ;; reasonable default?
;;
;; TODO: disable the net/rfc6455 ws-idle-timeout, when we can.
;;
;; The net/rfc6455 ws-idle-timeout has to be paid attention to here because it
;; can't be disabled, because the built-in webserver (which net/rfc6455
;; interoperates with) has a per-connection timer that also can't be disabled.
;;
(max (- (ws-idle-timeout) 10)
(* (ws-idle-timeout) 0.8)))))
(define (packet-accumulator handle-packet!)
(field [buffer #""])