From c21fdb5003417c99b8bb599df03fd7914cba7466 Mon Sep 17 00:00:00 2001 From: Emery Hemingway Date: Tue, 25 Jun 2024 17:38:36 +0300 Subject: [PATCH] drivers/timers: use modified timer protocol Use a protocol where the timer sends a message on expiry. --- sbom.json | 2 +- src/syndicate/drivers/timers.nim | 52 +++++++++++++++++++++++++++++-- src/syndicate/protocols/timer.nim | 1 + 3 files changed, 51 insertions(+), 4 deletions(-) diff --git a/sbom.json b/sbom.json index 9c19a43..9bf701d 100644 --- a/sbom.json +++ b/sbom.json @@ -7,7 +7,7 @@ "bom-ref": "pkg:nim/syndicate", "name": "syndicate", "description": "Syndicated actors for conversational concurrency", - "version": "20240623", + "version": "20240625", "authors": [ { "name": "Emery Hemingway" diff --git a/src/syndicate/drivers/timers.nim b/src/syndicate/drivers/timers.nim index 72e39b7..cafd595 100644 --- a/src/syndicate/drivers/timers.nim +++ b/src/syndicate/drivers/timers.nim @@ -2,7 +2,7 @@ # SPDX-License-Identifier: Unlicense import - std/[tables, times], + std/[options, tables, times], pkg/preserves, ../../syndicate, ../protocols/timer @@ -111,19 +111,65 @@ else: discard close(fd) driver.timers.excl(fd) + proc runTimer(driver: TimerDriver; peer: Cap; label: Value; start, deadline: float) {.asyncio.} = + ## Run timer driver concurrently with actor. + assert start < deadline + let fd = timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK or TFD_CLOEXEC) + if fd < 0: + raiseOSError(osLastError(), "failed to acquire timer descriptor") + var + old: Itimerspec + its = Itimerspec(it_value: deadline.toTimespec) + if timerfd_settime(fd, TFD_TIMER_ABSTIME, its, old) < 0: + raiseOSError(osLastError(), "failed to set timeout") + driver.timers.incl(fd) + var now = wallFloat() + while now < deadline: + # Check if the timer is expired which + # could happen before waiting. + wait(FD fd, Read) + now = wallFloat() + let facet = driver.deadlines.getOrDefault(deadline) + if not facet.isNil: + # Check if the deadline is still observed. + proc turnWork(turn: Turn) = + message(turn, peer, TimerExpired(label: label, seconds: now - start)) + run(facet, turnWork) + discard close(fd) + driver.timers.excl(fd) + driver.deadlines.del deadline + proc spawnTimerDriver*(turn: Turn; ds: Cap): Actor {.discardable.} = ## Spawn a timer actor that responds to ## dataspace observations of timeouts on `ds`. linkActor(turn, "timers") do (turn: Turn): let driver = spawnTimerDriver(turn.facet, ds) - let pat = observePattern(!LaterThan, {@[0.toPreserves]: grabLit()}) - during(turn, ds, pat) do (deadline: float): + + let laterThanPat = observePattern(!LaterThan, {@[0.toPreserves]: grabLit()}) + during(turn, ds, laterThanPat) do (deadline: float): driver.deadlines[deadline] = turn.facet discard trampoline: whelp await(driver, deadline) do: driver.deadlines.del deadline + during(turn, ds, SetTimer.grabType) do (req: SetTimer): + var + now = wallFloat() + deadline = req.seconds + let peer = req.peer.unembed Cap + if peer.isSome: + if req.kind == TimerKind.relative: + deadline = deadline + now + if deadline <= now: + message(turn, peer.get, TimerExpired(label: req.label)) + else: + driver.deadlines[deadline] = turn.facet + discard trampoline: + whelp driver.runTimer(peer.get, req.label, now, deadline) + do: + driver.deadlines.del deadline + proc after*(turn: Turn; ds: Cap; dur: Duration; act: TurnAction) = ## Execute `act` after some duration of time. var later = wallFloat() + dur.inMilliseconds.float / 1_000.0 diff --git a/src/syndicate/protocols/timer.nim b/src/syndicate/protocols/timer.nim index 5f2b9a0..d211dab 100644 --- a/src/syndicate/protocols/timer.nim +++ b/src/syndicate/protocols/timer.nim @@ -11,6 +11,7 @@ type `label`*: Value `seconds`*: float `kind`*: TimerKind + `peer`* {.preservesEmbedded.}: Value `TimerKind`* {.preservesOr, pure.} = enum `relative`, `absolute`, `clear`