Must be able to disable heartbeats for testing
This commit is contained in:
parent
9004725341
commit
d865087b7d
|
@ -2,39 +2,49 @@
|
||||||
|
|
||||||
(provide heartbeat)
|
(provide heartbeat)
|
||||||
|
|
||||||
|
(module+ for-testing
|
||||||
|
(provide heartbeats-enabled?))
|
||||||
|
|
||||||
(require "wire-protocol.rkt")
|
(require "wire-protocol.rkt")
|
||||||
|
|
||||||
(require/activate imperative-syndicate/drivers/timer)
|
(require/activate imperative-syndicate/drivers/timer)
|
||||||
|
|
||||||
(define-logger syndicate/distributed)
|
(define-logger syndicate/distributed)
|
||||||
|
|
||||||
|
(define heartbeats-enabled? (make-parameter #t))
|
||||||
|
|
||||||
;; TODO: move heartbeats to transport level, and use separate transport-activity timeouts from
|
;; TODO: move heartbeats to transport level, and use separate transport-activity timeouts from
|
||||||
;; message-activity timeouts. Using message-activity only has problems when messages are large
|
;; message-activity timeouts. Using message-activity only has problems when messages are large
|
||||||
;; and links are slow. Also, moving to transport level lets us use e.g. WebSocket's ping
|
;; and links are slow. Also, moving to transport level lets us use e.g. WebSocket's ping
|
||||||
;; mechanism rather than a message-level mechanism.
|
;; mechanism rather than a message-level mechanism.
|
||||||
(define (heartbeat who send-message teardown)
|
(define (heartbeat who send-message teardown)
|
||||||
(define period (ping-interval))
|
(cond
|
||||||
(define grace (* 3 period))
|
[(heartbeats-enabled?)
|
||||||
|
(define period (ping-interval))
|
||||||
|
(define grace (* 3 period))
|
||||||
|
|
||||||
(log-syndicate/distributed-debug
|
(log-syndicate/distributed-debug
|
||||||
"Peer ~v heartbeat period ~ams; must not experience silence longer than ~ams"
|
"Peer ~v heartbeat period ~ams; must not experience silence longer than ~ams"
|
||||||
who period grace)
|
who period grace)
|
||||||
|
|
||||||
(field [next-ping-time 0]) ;; when we are to send the next ping
|
(field [next-ping-time 0]) ;; when we are to send the next ping
|
||||||
(field [last-received-traffic (current-inexact-milliseconds)]) ;; when we last heard from the peer
|
(field [last-received-traffic (current-inexact-milliseconds)]) ;; when we last heard from the peer
|
||||||
|
|
||||||
(define (schedule-next-ping!)
|
(define (schedule-next-ping!)
|
||||||
(next-ping-time (+ (current-inexact-milliseconds) period)))
|
(next-ping-time (+ (current-inexact-milliseconds) period)))
|
||||||
|
|
||||||
(on (asserted (later-than (next-ping-time)))
|
(on (asserted (later-than (next-ping-time)))
|
||||||
(schedule-next-ping!)
|
(schedule-next-ping!)
|
||||||
(send-message (Ping)))
|
(send-message (Ping)))
|
||||||
|
|
||||||
(on (asserted (later-than (+ (last-received-traffic) grace)))
|
(on (asserted (later-than (+ (last-received-traffic) grace)))
|
||||||
(log-syndicate/distributed-info "Peer ~v heartbeat timeout after ~ams of inactivity"
|
(log-syndicate/distributed-info "Peer ~v heartbeat timeout after ~ams of inactivity"
|
||||||
who grace)
|
who grace)
|
||||||
(teardown))
|
(teardown))
|
||||||
|
|
||||||
(lambda ()
|
(lambda ()
|
||||||
(schedule-next-ping!)
|
(schedule-next-ping!)
|
||||||
(last-received-traffic (current-inexact-milliseconds))))
|
(last-received-traffic (current-inexact-milliseconds)))]
|
||||||
|
[else
|
||||||
|
(log-syndicate/distributed-debug "Peer ~v heartbeats disabled" who)
|
||||||
|
void]))
|
||||||
|
|
|
@ -5,6 +5,9 @@
|
||||||
(require imperative-syndicate/distributed)
|
(require imperative-syndicate/distributed)
|
||||||
(require imperative-syndicate/distributed/internal-protocol)
|
(require imperative-syndicate/distributed/internal-protocol)
|
||||||
|
|
||||||
|
(require (submod imperative-syndicate/distributed/heartbeat for-testing))
|
||||||
|
(heartbeats-enabled? #f)
|
||||||
|
|
||||||
(assertion-struct researcher (name topic))
|
(assertion-struct researcher (name topic))
|
||||||
|
|
||||||
(define test-address (server-loopback-connection "test"))
|
(define test-address (server-loopback-connection "test"))
|
||||||
|
|
|
@ -5,6 +5,9 @@
|
||||||
(require (only-in imperative-syndicate/lang activate))
|
(require (only-in imperative-syndicate/lang activate))
|
||||||
(require imperative-syndicate/distributed)
|
(require imperative-syndicate/distributed)
|
||||||
|
|
||||||
|
(require (submod imperative-syndicate/distributed/heartbeat for-testing))
|
||||||
|
(heartbeats-enabled? #f)
|
||||||
|
|
||||||
(assertion-struct presence (who))
|
(assertion-struct presence (who))
|
||||||
|
|
||||||
(define test-address (server-loopback-connection "test"))
|
(define test-address (server-loopback-connection "test"))
|
||||||
|
|
Loading…
Reference in New Issue