From 75d8d6d3bf4cf8f2b2c099258aa2230160b4f831 Mon Sep 17 00:00:00 2001 From: Emery Hemingway Date: Wed, 28 Feb 2024 17:17:46 +0000 Subject: [PATCH] Shared timers --- src/sam/actors/timers.nim | 153 +++++++++++++++++++++++++++++--------- src/sam/bags.nim | 2 + tests/test_timers.nim | 61 +++++++-------- 3 files changed, 148 insertions(+), 68 deletions(-) diff --git a/src/sam/actors/timers.nim b/src/sam/actors/timers.nim index 7bbf28e..2a6b79f 100644 --- a/src/sam/actors/timers.nim +++ b/src/sam/actors/timers.nim @@ -1,52 +1,135 @@ # SPDX-FileCopyrightText: ☭ Emery Hemingway # SPDX-License-Identifier: Unlicense -import std/[asyncdispatch, monotimes, times, posix, times, epoll] +import std/[asyncdispatch, times, oserrors, posix, times, epoll] +import pkg/sys/[handles, ioqueue] import preserves -import ../syndicate, ../protocols/timer -from ../protocols/dataspace import Observe +import ../[bags, syndicate], ../protocols/[timer, dataspace] export timer -type Observe = dataspace.Observe +type + Observe = dataspace.Observe + Time = posix.Time -#[ -proc timerfd_create(clock_id: ClockId, flags: cint): cint - {.importc: "timerfd_create", header: "".} -proc timerfd_settime(ufd: cint, flags: cint, - utmr: var Itimerspec, otmr: var Itimerspec): cint - {.importc: "timerfd_settime", header: "".} -proc eventfd(count: cuint, flags: cint): cint - {.importc: "eventfd", header: "".} -]# +when defined(linux): + import std/[epoll, posix] -proc now: float64 = getTime().toUnixFloat() + {.pragma: timerfd, importc, header: "".} -proc spawnTimers*(ds: Cap): Actor {.discardable.} = - ## Spawn a timer actor. - bootActor("timers") do (root: Facet): - let pat = inject(grab Observe(pattern: dropType LaterThan), {0: grabLit()}) - #[ - during(ds, pat) do (seconds: float): - let period = seconds - now() - if period < 0.001 or true: - let h = publish(ds, LaterThan(seconds: seconds).toPreserves) - ]# + proc timerfd_create(clock_id: ClockId, flags: cint): cint {.timerfd.} + proc timerfd_settime(ufd: cint, flags: cint, + utmr: var Itimerspec, otmr: var Itimerspec): cint {.timerfd.} + proc timerfd_gettime(ufd: cint, curr: var Itimerspec): cint {.timerfd.} - #[ - else: - let fdi = timerfd_create(CLOCK_MONOTONIC, O_CLOEXEC or O_NONBLOCK) + var + TFD_NONBLOCK {.timerfd.}: cint + TFD_CLOEXEC {.timerfd.}: cint + TFD_TIMER_ABSTIME {.timerfd.}: cint - addCallback(sleepAsync(period * 1_000), turn) do (turn: Turn): - discard publish(turn, ds, LaterThan(seconds: seconds)) - ]# + func toFloat(ts: Timespec): float = + ts.tv_sec.float + ts.tv_nsec.float / 1_000_000_000 + + func toTimespec(f: float): Timespec = + result.tv_sec = Time(f) + result.tv_nsec = clong(uint64(f * 1_000_000_000) mod 1_000_000_000) + + proc `<`(a, b: Timespec): bool = + a.tv_sec.clong <= b.tv_sec.clong and a.tv_nsec <= b.tv_nsec + + proc `+`(a, b: Timespec): Timespec = + result.tv_sec = Time a.tv_sec.clong + b.tv_sec.clong + result.tv_nsec = a.tv_nsec + b.tv_nsec + + proc clock_realtime: Timespec = + if clock_gettime(CLOCK_REALTIME, result) < 0: + raiseOSError(osLastError(), "clock_gettime") + + proc nsec(epoch: float): clong = + clong(uint64(epoch * 1_000_000_000) mod 1_000_000_000'u64) + + type TimerDriver = ref object + target: Cap + deadlines: Bag[float] + pending: float + fd: cint + waiting: bool + + proc earliestFloat(driver: TimerDriver): float = + assert driver.deadlines.len > 0 + result = high float + for deadline in driver.deadlines: + if deadline < result: + result = deadline + + proc timerDuration(driver: TimerDriver): Timespec = + var its: Itimerspec + if timerfd_gettime(driver.fd, its) < 0: + raiseOSError(osLastError(), "timerfd_gettime") + its.it_value + + proc settime(driver: TimerDriver; deadline: float) = + var + old: Itimerspec + its = Itimerspec(it_value: deadline.toTimeSpec) + if timerfd_settime(driver.fd, TFD_TIMER_ABSTIME, its, old) < 0: + raiseOSError(osLastError(), "failed to set timeout") + driver.pending = deadline + + proc await(driver: TimerDriver) {.turnWork.} = + while driver.deadlines.len > 0: + assert driver.waiting + var + deadline = driver.earliestFloat() + now = clock_realtime() + roughNow = now.toFloat() + if deadline < roughNow: + discard publish(driver.target, LaterThan(seconds: deadline)) + del(driver.deadlines, deadline) + else: + driver.settime(deadline) + wait(FD driver.fd, Read) + driver.waiting = false + + proc spawnTimerDriver(facet: Facet; cap: Cap): TimerDriver = + let driver = TimerDriver(target: cap) + driver.fd = timerfd_create( + CLOCK_REALTIME, TFD_NONBLOCK or TFD_CLOEXEC) + if driver.fd < 0: + raiseOSError(osLastError(), "failed to acquire timer descriptor") + facet.onStop do (): + discard driver.fd.close() + driver + + {.error: "create a new timerfd when the set timer is after a new deadline".} + + proc spawnTimerActor*(ds: Cap): Actor {.discardable.} = + ## Spawn a timer actor that responds to + ## dataspace observations of timeouts on `ds`. + spawnActor(ds.relay, "timers") do (facet: Facet): + let driver = spawnTimerDriver(facet, ds) + let pat = inject(grab Observe(pattern: dropType LaterThan), {0: grabLit()}) + during(ds, pat) do (deadline: float): + if deadline < clock_realtime().toFloat(): + discard publish(driver.target, LaterThan(seconds: deadline)) + else: + if change(driver.deadlines, deadline, +1) == cdAbsentToPresent: + if not driver.waiting: + driver.settime(deadline) + driver.waiting = true + facet.queueWork(whelp await(driver)) + elif deadline < driver.pending: + driver.settime(deadline) + do: + discard change(driver.deadlines, deadline, -1, clamp = true) -#[ proc after*(ds: Cap; dur: Duration; cb: proc () {.closure.}) = - ## Execute `act` after some duration of time. - let later = now() + dur.inMilliseconds.float64 * 1_000.0 - onPublish(ds, grab LaterThan(seconds: later)): + ## Execute `cb` after some duration of time. + var + later = clock_realtime().toFloat() + + dur.inMilliseconds.float / 1_000.0 + pat = ?LaterThan(seconds: later) + onPublish(ds, pat): cb() -]# # TODO: periodic timer diff --git a/src/sam/bags.nim b/src/sam/bags.nim index 8586ae8..0b9ae42 100644 --- a/src/sam/bags.nim +++ b/src/sam/bags.nim @@ -46,3 +46,5 @@ proc `$`*(bag: Bag): string = if result.len > 1: result.add ' ' result.add $x result.add '}' + +export tables.contains, tables.del, tables.len diff --git a/tests/test_timers.nim b/tests/test_timers.nim index a2cbe5a..0fdd422 100644 --- a/tests/test_timers.nim +++ b/tests/test_timers.nim @@ -1,46 +1,41 @@ # SPDX-FileCopyrightText: ☭ Emery Hemingway # SPDX-License-Identifier: Unlicense -import std/times +import std/[os, times] import pkg/cps +import pkg/sys/ioqueue import sam/syndicate - import sam/actors/timers -proc now: float64 = getTime().toUnixFloat() +let actor = bootActor("timer-test") do (facet: Facet): + let ds = facet.newDataspace() + let h = publish(ds, toRecord(Symbol"greet", "hello world!")) -runActor("timer-test") do (root: Facet): - echo "actor calls boot proc with root facte ", root - - let ds = root.newDataspace() - echo "new dataspace ", ds - let h = publish(ds, "hello world!".toPreserves) - echo "published to handle ", h - - onMessage(ds, grab()) do (v: Value): - stderr.writeLine "observed message ", v - - message(ds, "hello world!".toPreserves) - echo "sent a message" retract(ds, h) - echo "retracted handle ", h - # facet.sync(ds) - root.onStop: + facet.onStop do (): echo "anonymous stop handler was invoked" - echo "stopping actor" - root.stopActor() - echo "actor stopped but still executing?" + let + oneSec = initDuration(seconds = 1) + halfSec = initDuration(milliseconds = 500) + timers = facet.newDataspace() + spawnTimerActor(timers) - #[ - block: - # spawnTimers(ds) - onPublish(ds, grab(LaterThan(seconds: now()+1.0))) do: - stderr.writeLine "slept one second once" - onPublish(ds, grab(LaterThan(seconds: now()+1.0))) do: - stderr.writeLine "slept one second twice" - onPublish(ds, grab(LaterThan(seconds: now()+1.0))) do: - stderr.writeLine "slept one second thrice" - stopActor() - ]# + timers.after(initDuration(seconds = 1)) do (): + echo "slept one second" + + timers.after(initDuration(seconds = 2)) do (): + echo "slept 3 seconds" + + timers.after(initDuration(seconds = 3)) do (): + echo "slept six seconds" + stopActor(facet) + + timers.after(initDuration(seconds = 20)) do (): + echo "slept twenty seconds" + +var progress = true +while not actor.stopped: + if not run(actor): + ioqueue.run()