diff --git a/prospect/drivers/timer.rkt b/prospect/drivers/timer.rkt index dcd8d07..8918984 100644 --- a/prospect/drivers/timer.rkt +++ b/prospect/drivers/timer.rkt @@ -19,15 +19,53 @@ (struct set-timer (label msecs kind) #:prefab) (struct timer-expired (label msecs) #:prefab) +(define expiry-projection (compile-projection (at-meta (?! (timer-expired ? ?))))) + (define (spawn-timer-driver) (define control-ch (make-channel)) (thread (lambda () (timer-driver-thread-main control-ch))) (define (timer-driver e count) (match e - [(message (at-meta (and expiry (timer-expired _ _)))) - (transition (- count 1) - (list (message expiry) - (when (= count 1) (unsub (timer-expired ? ?) #:meta-level 1))))] + [(patch added _removed) + ;; Previously, this driver caused the background thread to call + ;; send-ground-message to indicate that a timer had expired. + ;; However, this can lead to a race! In cases where a timer + ;; expires very soon, the channel-put of the set-timer + ;; instruction leads shortly thereafter to a + ;; send-ground-message which then races the establishment of + ;; the metalevel-1 subscription to the timer-expired events + ;; that are coming from the background thread. + ;; + ;; The race cannot occur in the sequential implementation + ;; because the network makes sure to enqueue the transition + ;; actions resulting from the set-timer message delivery ahead + ;; of any enqueueing of the timer-expired ground message, so + ;; that by the time the ground message is processed, the + ;; relevant subscription always exists. + ;; + ;; In a looser implementation, however, this level of + ;; synchronised activity may not exist, and the ground message + ;; may overtake the subscription establishment. + ;; + ;; Therefore, I've changed the driver to instead use ground + ;; /assertions/ to signal expired timers. Upon processing of + ;; such an assertion, the driver cleans it up. This is very + ;; similar to hardware interrupts, where the driver has to + ;; "clear the interrupt" in order to let the system continue + ;; properly. + (define-values (new-count actions-rev interrupt-clearing-patch) + (for/fold [(count count) + (actions-rev '()) + (interrupt-clearing-patch empty-patch)] + [(expiry (matcher-project/set/single added expiry-projection))] + (values (- count 1) + (cons (message expiry) actions-rev) + (patch-seq interrupt-clearing-patch + (retract expiry))))) + (send-ground-patch interrupt-clearing-patch) + (transition new-count + (cons (reverse actions-rev) + (when (zero? new-count) (unsub (timer-expired ? ?) #:meta-level 1))))] [(message (and instruction (set-timer _ _ _))) (channel-put control-ch instruction) (transition (+ count 1) @@ -45,7 +83,10 @@ [#f never-evt] [t (handle-evt (timer-evt (pending-timer-deadline t)) (lambda (now) - (for-each send-ground-message (fire-timers! heap now)) + (send-ground-patch + (for/fold [(interrupt-asserting-patch empty-patch)] + [(expiry (fire-timers! heap now))] + (patch-seq interrupt-asserting-patch (assert expiry)))) (loop)))]) (handle-evt control-ch (match-lambda diff --git a/prospect/ground.rkt b/prospect/ground.rkt index 839bb5f..023020c 100644 --- a/prospect/ground.rkt +++ b/prospect/ground.rkt @@ -12,6 +12,7 @@ (provide (struct-out external-event) send-ground-message + send-ground-patch run-ground) ;;--------------------------------------------------------------------------- @@ -27,6 +28,13 @@ (define (send-ground-message body) (async-channel-put (current-ground-event-async-channel) (message body))) +;; Patch -> Void +;; Injects a patch into the ground-VM metalevel. It will appear to be +;; asserted by the environment in general. The obligation is on the caller +;; to ensure that patches do not interfere between drivers. +(define (send-ground-patch p) + (async-channel-put (current-ground-event-async-channel) p)) + ;;--------------------------------------------------------------------------- ;; Communication via RacketEvents