drivers/timers: use modified timer protocol
Use a protocol where the timer sends a message on expiry.
This commit is contained in:
parent
cdc59b7641
commit
c21fdb5003
|
@ -7,7 +7,7 @@
|
|||
"bom-ref": "pkg:nim/syndicate",
|
||||
"name": "syndicate",
|
||||
"description": "Syndicated actors for conversational concurrency",
|
||||
"version": "20240623",
|
||||
"version": "20240625",
|
||||
"authors": [
|
||||
{
|
||||
"name": "Emery Hemingway"
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
import
|
||||
std/[tables, times],
|
||||
std/[options, tables, times],
|
||||
pkg/preserves,
|
||||
../../syndicate, ../protocols/timer
|
||||
|
||||
|
@ -111,19 +111,65 @@ else:
|
|||
discard close(fd)
|
||||
driver.timers.excl(fd)
|
||||
|
||||
proc runTimer(driver: TimerDriver; peer: Cap; label: Value; start, deadline: float) {.asyncio.} =
|
||||
## Run timer driver concurrently with actor.
|
||||
assert start < deadline
|
||||
let fd = timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK or TFD_CLOEXEC)
|
||||
if fd < 0:
|
||||
raiseOSError(osLastError(), "failed to acquire timer descriptor")
|
||||
var
|
||||
old: Itimerspec
|
||||
its = Itimerspec(it_value: deadline.toTimespec)
|
||||
if timerfd_settime(fd, TFD_TIMER_ABSTIME, its, old) < 0:
|
||||
raiseOSError(osLastError(), "failed to set timeout")
|
||||
driver.timers.incl(fd)
|
||||
var now = wallFloat()
|
||||
while now < deadline:
|
||||
# Check if the timer is expired which
|
||||
# could happen before waiting.
|
||||
wait(FD fd, Read)
|
||||
now = wallFloat()
|
||||
let facet = driver.deadlines.getOrDefault(deadline)
|
||||
if not facet.isNil:
|
||||
# Check if the deadline is still observed.
|
||||
proc turnWork(turn: Turn) =
|
||||
message(turn, peer, TimerExpired(label: label, seconds: now - start))
|
||||
run(facet, turnWork)
|
||||
discard close(fd)
|
||||
driver.timers.excl(fd)
|
||||
driver.deadlines.del deadline
|
||||
|
||||
proc spawnTimerDriver*(turn: Turn; ds: Cap): Actor {.discardable.} =
|
||||
## Spawn a timer actor that responds to
|
||||
## dataspace observations of timeouts on `ds`.
|
||||
linkActor(turn, "timers") do (turn: Turn):
|
||||
let driver = spawnTimerDriver(turn.facet, ds)
|
||||
let pat = observePattern(!LaterThan, {@[0.toPreserves]: grabLit()})
|
||||
during(turn, ds, pat) do (deadline: float):
|
||||
|
||||
let laterThanPat = observePattern(!LaterThan, {@[0.toPreserves]: grabLit()})
|
||||
during(turn, ds, laterThanPat) do (deadline: float):
|
||||
driver.deadlines[deadline] = turn.facet
|
||||
discard trampoline:
|
||||
whelp await(driver, deadline)
|
||||
do:
|
||||
driver.deadlines.del deadline
|
||||
|
||||
during(turn, ds, SetTimer.grabType) do (req: SetTimer):
|
||||
var
|
||||
now = wallFloat()
|
||||
deadline = req.seconds
|
||||
let peer = req.peer.unembed Cap
|
||||
if peer.isSome:
|
||||
if req.kind == TimerKind.relative:
|
||||
deadline = deadline + now
|
||||
if deadline <= now:
|
||||
message(turn, peer.get, TimerExpired(label: req.label))
|
||||
else:
|
||||
driver.deadlines[deadline] = turn.facet
|
||||
discard trampoline:
|
||||
whelp driver.runTimer(peer.get, req.label, now, deadline)
|
||||
do:
|
||||
driver.deadlines.del deadline
|
||||
|
||||
proc after*(turn: Turn; ds: Cap; dur: Duration; act: TurnAction) =
|
||||
## Execute `act` after some duration of time.
|
||||
var later = wallFloat() + dur.inMilliseconds.float / 1_000.0
|
||||
|
|
|
@ -11,6 +11,7 @@ type
|
|||
`label`*: Value
|
||||
`seconds`*: float
|
||||
`kind`*: TimerKind
|
||||
`peer`* {.preservesEmbedded.}: Value
|
||||
|
||||
`TimerKind`* {.preservesOr, pure.} = enum
|
||||
`relative`, `absolute`, `clear`
|
||||
|
|
Loading…
Reference in New Issue