syndicate-nim/src/syndicate/actors/timers.nim

116 lines
4.0 KiB
Nim

# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[sets, times]
import pkg/sys/[handles, ioqueue]
import preserves
import ../../syndicate, ../bags, ../protocols/[timer, dataspace]
export timer
type
Observe = dataspace.Observe
when defined(linux):
import std/[oserrors, posix]
type Time = posix.Time
{.pragma: timerfd, importc, header: "<sys/timerfd.h>".}
proc timerfd_create(clock_id: ClockId, flags: cint): cint {.timerfd.}
proc timerfd_settime(ufd: cint, flags: cint,
utmr: var Itimerspec, otmr: var Itimerspec): cint {.timerfd.}
proc timerfd_gettime(ufd: cint, curr: var Itimerspec): cint {.timerfd.}
var
TFD_NONBLOCK {.timerfd.}: cint
TFD_CLOEXEC {.timerfd.}: cint
TFD_TIMER_ABSTIME {.timerfd.}: cint
proc `<`(a, b: Timespec): bool =
a.tv_sec.clong < b.tv_sec.clong or
(a.tv_sec.clong == b.tv_sec.clong and a.tv_nsec < b.tv_nsec)
proc `+`(a, b: Timespec): Timespec =
result.tv_sec = Time a.tv_sec.clong + b.tv_sec.clong
result.tv_nsec = a.tv_nsec + b.tv_nsec
func toFloat(ts: Timespec): float =
ts.tv_sec.float + ts.tv_nsec.float / 1_000_000_000
func toTimespec(f: float): Timespec =
result.tv_sec = Time(f)
result.tv_nsec = clong(uint64(f * 1_000_000_000) mod 1_000_000_000)
proc clock_realtime: Timespec =
if clock_gettime(CLOCK_REALTIME, result) < 0:
raiseOSError(osLastError(), "clock_gettime")
type
TimerDriver = ref object
facet: Facet
## Owning facet of driver.
target: Cap
## Destination for LaterThan assertions.
deadlines: Bag[float]
## Deadlines that other actors are observing.
timers: HashSet[cint]
# TODO: use a single timer descriptor
proc spawnTimerDriver(facet: Facet; cap: Cap): TimerDriver =
let driver = TimerDriver(facet: facet, target: cap)
facet.onStop do (turn: var Turn):
for fd in driver.timers:
unregister(FD fd)
discard close(fd)
driver
proc earliestFloat(driver: TimerDriver): float =
assert driver.deadlines.len > 0
result = high float
for deadline in driver.deadlines:
if deadline < result:
result = deadline
proc await(driver: TimerDriver; deadline: float) {.asyncio.} =
## Run timer driver concurrently with actor.
let fd = timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK or TFD_CLOEXEC)
if fd < 0:
raiseOSError(osLastError(), "failed to acquire timer descriptor")
var
old: Itimerspec
its = Itimerspec(it_value: deadline.toTimespec)
if timerfd_settime(fd, TFD_TIMER_ABSTIME, its, old) < 0:
raiseOSError(osLastError(), "failed to set timeout")
driver.timers.incl(fd)
while clock_realtime() < its.it_value:
# Check if the timer is expired which
# could happen before waiting.
wait(FD fd, Read)
if deadline in driver.deadlines:
# Check if the deadline is still observed.
proc turnWork(turn: var Turn) =
discard publish(turn, driver.target, LaterThan(seconds: deadline))
run(driver.facet, turnWork)
discard close(fd)
driver.timers.excl(fd)
proc spawnTimerActor*(ds: Cap; turn: var Turn): Actor {.discardable.} =
## Spawn a timer actor that responds to
## dataspace observations of timeouts on `ds`.
spawnActor("timers", turn) do (turn: var Turn):
let driver = spawnTimerDriver(turn.facet, ds)
let pat = inject(grab Observe(pattern: dropType LaterThan), {0: grabLit()})
during(turn, ds, pat) do (deadline: float):
if change(driver.deadlines, deadline, +1) == cdAbsentToPresent:
discard trampoline(whelp await(driver, deadline))
do:
discard change(driver.deadlines, deadline, -1, clamp = true)
# TODO: retract assertions that are unobserved.
proc after*(turn: var Turn; ds: Cap; dur: Duration; act: TurnAction) =
## Execute `act` after some duration of time.
var later = clock_realtime().toFloat() + dur.inMilliseconds.float / 1_000.0
onPublish(turn, ds, grab LaterThan(seconds: later)):
act(turn)