Fix race condition (!) in timer driver
This commit is contained in:
parent
14f1cbd4fa
commit
66e2e8b1a7
|
@ -10,6 +10,7 @@
|
||||||
(require syndicate/protocol/advertise)
|
(require syndicate/protocol/advertise)
|
||||||
|
|
||||||
(struct pending-timer (deadline label) #:transparent)
|
(struct pending-timer (deadline label) #:transparent)
|
||||||
|
(struct timer-interrupt (counter now sealed-labels) #:transparent)
|
||||||
|
|
||||||
(provide (struct-out set-timer)
|
(provide (struct-out set-timer)
|
||||||
(struct-out timer-expired)
|
(struct-out timer-expired)
|
||||||
|
@ -18,14 +19,19 @@
|
||||||
(struct set-timer (label msecs kind) #:prefab)
|
(struct set-timer (label msecs kind) #:prefab)
|
||||||
(struct timer-expired (label msecs) #:prefab)
|
(struct timer-expired (label msecs) #:prefab)
|
||||||
|
|
||||||
(define expiry-projection (inbound (?! (timer-expired ? ?))))
|
(define timer-interrupt-projection (inbound (?! (timer-interrupt ? ? ?))))
|
||||||
|
|
||||||
|
(struct timer-state (active-count interrupt-counter) #:transparent)
|
||||||
|
|
||||||
(define (spawn-timer-driver)
|
(define (spawn-timer-driver)
|
||||||
(define control-ch (make-channel))
|
(define control-ch (make-channel))
|
||||||
(thread (lambda () (timer-driver-thread-main control-ch)))
|
(thread (lambda () (timer-driver-thread-main control-ch)))
|
||||||
(define (timer-driver e count)
|
(define (timer-driver e state)
|
||||||
|
(match-define (timer-state old-active-count old-interrupt-counter) state)
|
||||||
(match e
|
(match e
|
||||||
[(patch added _removed)
|
[(patch added _removed)
|
||||||
|
;; 21 Jan 2016.
|
||||||
|
;;
|
||||||
;; Previously, this driver caused the background thread to call
|
;; Previously, this driver caused the background thread to call
|
||||||
;; send-ground-message to indicate that a timer had expired.
|
;; send-ground-message to indicate that a timer had expired.
|
||||||
;; However, this can lead to a race! In cases where a timer
|
;; However, this can lead to a race! In cases where a timer
|
||||||
|
@ -52,50 +58,87 @@
|
||||||
;; similar to hardware interrupts, where the driver has to
|
;; similar to hardware interrupts, where the driver has to
|
||||||
;; "clear the interrupt" in order to let the system continue
|
;; "clear the interrupt" in order to let the system continue
|
||||||
;; properly.
|
;; properly.
|
||||||
(define-values (new-count actions-rev interrupt-clearing-patch)
|
;;
|
||||||
(for/fold [(count count)
|
;; 3 Oct 2017.
|
||||||
(actions-rev '())
|
;;
|
||||||
|
;; An additional kind of race can happen, even with ground
|
||||||
|
;; patches! If enough external ground-events spam the queue,
|
||||||
|
;; AND some other party is setting timers fast enough, AND some
|
||||||
|
;; listener for timer-expired at a particular label is
|
||||||
|
;; persistent, THEN this actor will observe a "stale"
|
||||||
|
;; interrupt, leading to duplicate timer-expired messages. This
|
||||||
|
;; happens because our clearing of the interrupt assertions is
|
||||||
|
;; delayed for long enough for the set-timer message to come in
|
||||||
|
;; and reestablish interest in interrupt assertions, before the
|
||||||
|
;; old ones have had a change to be retracted, so they appear
|
||||||
|
;; again.
|
||||||
|
;;
|
||||||
|
;; The fix relies on (a) no patch coalescing and (b) no
|
||||||
|
;; reordering of events. Each interrupt is given a number which
|
||||||
|
;; monotonically increases. We remember the most recent counter
|
||||||
|
;; we have processed, and ignore interrupts numbered at or
|
||||||
|
;; below it.
|
||||||
|
;;
|
||||||
|
;; This could be expressed using range matching if our tries
|
||||||
|
;; supported that: we'd express interest in ALL counter values,
|
||||||
|
;; retracting interest in them one at a time LOCALLY at the
|
||||||
|
;; same time we sent our (potentially heavily delayed) ground
|
||||||
|
;; patch on its way. We'd be relying on compression of adjacent
|
||||||
|
;; sequence numbers to keep the tries trim.
|
||||||
|
;;
|
||||||
|
;; Absent that, our hacky depend-on-no-coalescing-or-reordering
|
||||||
|
;; solution will do.
|
||||||
|
;;
|
||||||
|
(define-values (active-count interrupt-counter actions interrupt-clearing-patch)
|
||||||
|
(for/fold [(active-count old-active-count)
|
||||||
|
(interrupt-counter old-interrupt-counter)
|
||||||
|
(actions '())
|
||||||
(interrupt-clearing-patch patch-empty)]
|
(interrupt-clearing-patch patch-empty)]
|
||||||
[(expiry (trie-project/set/single added expiry-projection))]
|
[(interrupt (trie-project/set/single added timer-interrupt-projection))]
|
||||||
(values (- count 1)
|
(match-define (timer-interrupt candidate-interrupt-counter now sealed-labels) interrupt)
|
||||||
(cons (message expiry) actions-rev)
|
(define labels (seal-contents sealed-labels))
|
||||||
(patch-seq interrupt-clearing-patch
|
(if (> candidate-interrupt-counter old-interrupt-counter)
|
||||||
(retract expiry)))))
|
(values (- active-count (length labels))
|
||||||
|
(max candidate-interrupt-counter interrupt-counter)
|
||||||
|
(cons actions (for/list [(label labels)]
|
||||||
|
(message (timer-expired label now))))
|
||||||
|
(patch-seq interrupt-clearing-patch (retract interrupt)))
|
||||||
|
(values active-count interrupt-counter actions interrupt-clearing-patch))))
|
||||||
(send-ground-patch interrupt-clearing-patch)
|
(send-ground-patch interrupt-clearing-patch)
|
||||||
(transition new-count
|
(transition (timer-state active-count interrupt-counter)
|
||||||
(cons (reverse actions-rev)
|
(cons actions (when (zero? active-count)
|
||||||
(when (zero? new-count) (unsub (inbound (timer-expired ? ?))))))]
|
(unsub (inbound (timer-interrupt ? ? ?))))))]
|
||||||
[(message (and instruction (set-timer _ _ _)))
|
[(message (and instruction (set-timer _ _ _)))
|
||||||
(channel-put control-ch instruction)
|
(channel-put control-ch instruction)
|
||||||
(transition (+ count 1)
|
(transition (timer-state (+ old-active-count 1) old-interrupt-counter)
|
||||||
(when (= count 0) (sub (inbound (timer-expired ? ?)))))]
|
(when (= old-active-count 0) (sub (inbound (timer-interrupt ? ? ?)))))]
|
||||||
[_ #f]))
|
[_ #f]))
|
||||||
(actor #:name 'drivers/timer
|
(actor #:name 'drivers/timer
|
||||||
timer-driver
|
timer-driver
|
||||||
0 ;; initial count
|
(timer-state 0 -1)
|
||||||
(patch-seq (sub (set-timer ? ? ?))
|
(patch-seq (sub (set-timer ? ? ?))
|
||||||
(pub (timer-expired ? ?)))))
|
(pub (timer-expired ? ?)))))
|
||||||
|
|
||||||
(define (timer-driver-thread-main control-ch)
|
(define (timer-driver-thread-main control-ch)
|
||||||
(define heap (make-timer-heap))
|
(define heap (make-timer-heap))
|
||||||
(let loop ()
|
(let loop ((interrupt-counter 0))
|
||||||
(sync (match (next-timer heap)
|
(sync (match (next-timer heap)
|
||||||
[#f never-evt]
|
[#f never-evt]
|
||||||
[t (handle-evt (timer-evt (pending-timer-deadline t))
|
[t (handle-evt (alarm-evt (pending-timer-deadline t))
|
||||||
(lambda (now)
|
(lambda (_dummy)
|
||||||
|
(define now (current-inexact-milliseconds))
|
||||||
|
(define labels (fire-timers! heap now))
|
||||||
(send-ground-patch
|
(send-ground-patch
|
||||||
(for/fold [(interrupt-asserting-patch patch-empty)]
|
(assert (timer-interrupt interrupt-counter now (seal labels))))
|
||||||
[(expiry (fire-timers! heap now))]
|
(loop (+ interrupt-counter 1))))])
|
||||||
(patch-seq interrupt-asserting-patch (assert expiry))))
|
|
||||||
(loop)))])
|
|
||||||
(handle-evt control-ch
|
(handle-evt control-ch
|
||||||
(match-lambda
|
(match-lambda
|
||||||
[(set-timer label msecs 'relative)
|
[(set-timer label msecs 'relative)
|
||||||
(install-timer! heap label (+ (current-inexact-milliseconds) msecs))
|
(install-timer! heap label (+ (current-inexact-milliseconds) msecs))
|
||||||
(loop)]
|
(loop interrupt-counter)]
|
||||||
[(set-timer label msecs 'absolute)
|
[(set-timer label msecs 'absolute)
|
||||||
(install-timer! heap label msecs)
|
(install-timer! heap label msecs)
|
||||||
(loop)]
|
(loop interrupt-counter)]
|
||||||
['quit (void)])))))
|
['quit (void)])))))
|
||||||
|
|
||||||
(define (make-timer-heap)
|
(define (make-timer-heap)
|
||||||
|
@ -111,21 +154,13 @@
|
||||||
(let ((m (heap-min heap)))
|
(let ((m (heap-min heap)))
|
||||||
(if (<= (pending-timer-deadline m) now)
|
(if (<= (pending-timer-deadline m) now)
|
||||||
(begin (heap-remove-min! heap)
|
(begin (heap-remove-min! heap)
|
||||||
(cons (timer-expired (pending-timer-label m) now)
|
(cons (pending-timer-label m)
|
||||||
(fire-timers! heap now)))
|
(fire-timers! heap now)))
|
||||||
'()))))
|
'()))))
|
||||||
|
|
||||||
(define (install-timer! heap label deadline)
|
(define (install-timer! heap label deadline)
|
||||||
(heap-add! heap (pending-timer deadline label)))
|
(heap-add! heap (pending-timer deadline label)))
|
||||||
|
|
||||||
;; Racket's alarm-evt is almost the right design for timeouts: its
|
|
||||||
;; synchronisation value should be the (or some) value of the clock
|
|
||||||
;; after the asked-for time. That way it serves as timeout and
|
|
||||||
;; clock-reader in one.
|
|
||||||
(define (timer-evt msecs)
|
|
||||||
(handle-evt (alarm-evt msecs)
|
|
||||||
(lambda (_) (current-inexact-milliseconds))))
|
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
|
||||||
(spawn-timer-driver)
|
(spawn-timer-driver)
|
||||||
|
|
Loading…
Reference in New Issue