diff --git a/src/syndicate/actors/timers.nim b/src/syndicate/actors/timers.nim index 4227899..d31541e 100644 --- a/src/syndicate/actors/timers.nim +++ b/src/syndicate/actors/timers.nim @@ -1,35 +1,115 @@ # SPDX-FileCopyrightText: ☭ Emery Hemingway # SPDX-License-Identifier: Unlicense -import std/[asyncdispatch, monotimes, times] +import std/[sets, times] +import pkg/sys/[handles, ioqueue] import preserves -import syndicate - -import ../protocols/timer -from syndicate/protocols/dataspace import Observe +import ../../syndicate, ../bags, ../protocols/[timer, dataspace] export timer -type Observe = dataspace.Observe +type + Observe = dataspace.Observe -proc now: float64 = getTime().toUnixFloat() +when defined(linux): + import std/[oserrors, posix] + type Time = posix.Time -proc spawnTimers*(turn: var Turn; ds: Cap): Actor {.discardable.} = - ## Spawn a timer actor. - spawn("timer", turn) do (turn: var Turn): + {.pragma: timerfd, importc, header: "".} - during(turn, ds, inject(grab Observe(pattern: dropType LaterThan), {0: grabLit()})) do (seconds: float): - let period = seconds - now() - if period < 0.001: - discard publish(turn, ds, LaterThan(seconds: seconds)) - else: - addCallback(sleepAsync(period * 1_000), turn) do (turn: var Turn): - discard publish(turn, ds, LaterThan(seconds: seconds)) + proc timerfd_create(clock_id: ClockId, flags: cint): cint {.timerfd.} + proc timerfd_settime(ufd: cint, flags: cint, + utmr: var Itimerspec, otmr: var Itimerspec): cint {.timerfd.} + proc timerfd_gettime(ufd: cint, curr: var Itimerspec): cint {.timerfd.} + + var + TFD_NONBLOCK {.timerfd.}: cint + TFD_CLOEXEC {.timerfd.}: cint + TFD_TIMER_ABSTIME {.timerfd.}: cint + + proc `<`(a, b: Timespec): bool = + a.tv_sec.clong < b.tv_sec.clong or + (a.tv_sec.clong == b.tv_sec.clong and a.tv_nsec < b.tv_nsec) + + proc `+`(a, b: Timespec): Timespec = + result.tv_sec = Time a.tv_sec.clong + b.tv_sec.clong + result.tv_nsec = a.tv_nsec + b.tv_nsec + + func toFloat(ts: Timespec): float = + ts.tv_sec.float + ts.tv_nsec.float / 1_000_000_000 + + func toTimespec(f: float): Timespec = + result.tv_sec = Time(f) + result.tv_nsec = clong(uint64(f * 1_000_000_000) mod 1_000_000_000) + + proc clock_realtime: Timespec = + if clock_gettime(CLOCK_REALTIME, result) < 0: + raiseOSError(osLastError(), "clock_gettime") + + type + TimerDriver = ref object + facet: Facet + ## Owning facet of driver. + target: Cap + ## Destination for LaterThan assertions. + deadlines: Bag[float] + ## Deadlines that other actors are observing. + timers: HashSet[cint] + # TODO: use a single timer descriptor + + proc spawnTimerDriver(facet: Facet; cap: Cap): TimerDriver = + let driver = TimerDriver(facet: facet, target: cap) + facet.onStop do (turn: var Turn): + for fd in driver.timers: + unregister(FD fd) + discard close(fd) + driver + + proc earliestFloat(driver: TimerDriver): float = + assert driver.deadlines.len > 0 + result = high float + for deadline in driver.deadlines: + if deadline < result: + result = deadline + + proc await(driver: TimerDriver; deadline: float) {.asyncio.} = + ## Run timer driver concurrently with actor. + let fd = timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK or TFD_CLOEXEC) + if fd < 0: + raiseOSError(osLastError(), "failed to acquire timer descriptor") + var + old: Itimerspec + its = Itimerspec(it_value: deadline.toTimespec) + if timerfd_settime(fd, TFD_TIMER_ABSTIME, its, old) < 0: + raiseOSError(osLastError(), "failed to set timeout") + driver.timers.incl(fd) + while clock_realtime() < its.it_value: + # Check if the timer is expired which + # could happen before waiting. + wait(FD fd, Read) + if deadline in driver.deadlines: + # Check if the deadline is still observed. + proc turnWork(turn: var Turn) = + discard publish(turn, driver.target, LaterThan(seconds: deadline)) + run(driver.facet, turnWork) + discard close(fd) + driver.timers.excl(fd) + + proc spawnTimerActor*(ds: Cap; turn: var Turn): Actor {.discardable.} = + ## Spawn a timer actor that responds to + ## dataspace observations of timeouts on `ds`. + spawnActor("timers", turn) do (turn: var Turn): + let driver = spawnTimerDriver(turn.facet, ds) + let pat = inject(grab Observe(pattern: dropType LaterThan), {0: grabLit()}) + during(turn, ds, pat) do (deadline: float): + if change(driver.deadlines, deadline, +1) == cdAbsentToPresent: + discard trampoline(whelp await(driver, deadline)) + do: + discard change(driver.deadlines, deadline, -1, clamp = true) + # TODO: retract assertions that are unobserved. proc after*(turn: var Turn; ds: Cap; dur: Duration; act: TurnAction) = ## Execute `act` after some duration of time. - let later = now() + dur.inMilliseconds.float64 * 1_000.0 + var later = clock_realtime().toFloat() + dur.inMilliseconds.float / 1_000.0 onPublish(turn, ds, grab LaterThan(seconds: later)): act(turn) - -# TODO: periodic timer diff --git a/tests/test_timers.nim b/tests/test_timers.nim index 5de3fdb..8c2165c 100644 --- a/tests/test_timers.nim +++ b/tests/test_timers.nim @@ -2,16 +2,25 @@ # SPDX-License-Identifier: Unlicense import std/times +import pkg/sys/ioqueue import syndicate, syndicate/actors/timers -proc now: float64 = getTime().toUnixFloat() +let actor = bootActor("timer-test") do (turn: var Turn): + let timers = newDataspace(turn) + spawnTimerActor(timers, turn) -runActor("test_timers") do (ds: Cap; turn: var Turn): - onPublish(turn, ds, grab(LaterThan(seconds: now()+1.0))) do: - stderr.writeLine "slept one second once" - onPublish(turn, ds, grab(LaterThan(seconds: now()+1.0))) do: - stderr.writeLine "slept one second twice" - onPublish(turn, ds, grab(LaterThan(seconds: now()+1.0))) do: - stderr.writeLine "slept one second thrice" - quit() - spawnTimers(turn, ds) + onPublish(turn, timers, ?LaterThan(seconds: 1356100000)): + echo "now in 13th bʼakʼtun" + + after(turn, timers, initDuration(seconds = 3)) do (turn: var Turn): + echo "third timer expired" + stopActor(turn) + + after(turn, timers, initDuration(seconds = 1)) do (turn: var Turn): + echo "first timer expired" + + after(turn, timers, initDuration(seconds = 2)) do (turn: var Turn): + echo "second timer expired" + +echo "single run of ioqueue" +ioqueue.run()