Shared timers

This commit is contained in:
Emery Hemingway 2024-02-28 17:17:46 +00:00
parent 7bf9b3fe48
commit 75d8d6d3bf
3 changed files with 148 additions and 68 deletions

View File

@ -1,52 +1,135 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[asyncdispatch, monotimes, times, posix, times, epoll]
import std/[asyncdispatch, times, oserrors, posix, times, epoll]
import pkg/sys/[handles, ioqueue]
import preserves
import ../syndicate, ../protocols/timer
from ../protocols/dataspace import Observe
import ../[bags, syndicate], ../protocols/[timer, dataspace]
export timer
type Observe = dataspace.Observe
type
Observe = dataspace.Observe
Time = posix.Time
#[
proc timerfd_create(clock_id: ClockId, flags: cint): cint
{.importc: "timerfd_create", header: "<sys/timerfd.h>".}
proc timerfd_settime(ufd: cint, flags: cint,
utmr: var Itimerspec, otmr: var Itimerspec): cint
{.importc: "timerfd_settime", header: "<sys/timerfd.h>".}
proc eventfd(count: cuint, flags: cint): cint
{.importc: "eventfd", header: "<sys/eventfd.h>".}
]#
when defined(linux):
import std/[epoll, posix]
proc now: float64 = getTime().toUnixFloat()
{.pragma: timerfd, importc, header: "<sys/timerfd.h>".}
proc spawnTimers*(ds: Cap): Actor {.discardable.} =
## Spawn a timer actor.
bootActor("timers") do (root: Facet):
let pat = inject(grab Observe(pattern: dropType LaterThan), {0: grabLit()})
#[
during(ds, pat) do (seconds: float):
let period = seconds - now()
if period < 0.001 or true:
let h = publish(ds, LaterThan(seconds: seconds).toPreserves)
]#
proc timerfd_create(clock_id: ClockId, flags: cint): cint {.timerfd.}
proc timerfd_settime(ufd: cint, flags: cint,
utmr: var Itimerspec, otmr: var Itimerspec): cint {.timerfd.}
proc timerfd_gettime(ufd: cint, curr: var Itimerspec): cint {.timerfd.}
#[
else:
let fdi = timerfd_create(CLOCK_MONOTONIC, O_CLOEXEC or O_NONBLOCK)
var
TFD_NONBLOCK {.timerfd.}: cint
TFD_CLOEXEC {.timerfd.}: cint
TFD_TIMER_ABSTIME {.timerfd.}: cint
addCallback(sleepAsync(period * 1_000), turn) do (turn: Turn):
discard publish(turn, ds, LaterThan(seconds: seconds))
]#
func toFloat(ts: Timespec): float =
ts.tv_sec.float + ts.tv_nsec.float / 1_000_000_000
func toTimespec(f: float): Timespec =
result.tv_sec = Time(f)
result.tv_nsec = clong(uint64(f * 1_000_000_000) mod 1_000_000_000)
proc `<`(a, b: Timespec): bool =
a.tv_sec.clong <= b.tv_sec.clong and a.tv_nsec <= b.tv_nsec
proc `+`(a, b: Timespec): Timespec =
result.tv_sec = Time a.tv_sec.clong + b.tv_sec.clong
result.tv_nsec = a.tv_nsec + b.tv_nsec
proc clock_realtime: Timespec =
if clock_gettime(CLOCK_REALTIME, result) < 0:
raiseOSError(osLastError(), "clock_gettime")
proc nsec(epoch: float): clong =
clong(uint64(epoch * 1_000_000_000) mod 1_000_000_000'u64)
type TimerDriver = ref object
target: Cap
deadlines: Bag[float]
pending: float
fd: cint
waiting: bool
proc earliestFloat(driver: TimerDriver): float =
assert driver.deadlines.len > 0
result = high float
for deadline in driver.deadlines:
if deadline < result:
result = deadline
proc timerDuration(driver: TimerDriver): Timespec =
var its: Itimerspec
if timerfd_gettime(driver.fd, its) < 0:
raiseOSError(osLastError(), "timerfd_gettime")
its.it_value
proc settime(driver: TimerDriver; deadline: float) =
var
old: Itimerspec
its = Itimerspec(it_value: deadline.toTimeSpec)
if timerfd_settime(driver.fd, TFD_TIMER_ABSTIME, its, old) < 0:
raiseOSError(osLastError(), "failed to set timeout")
driver.pending = deadline
proc await(driver: TimerDriver) {.turnWork.} =
while driver.deadlines.len > 0:
assert driver.waiting
var
deadline = driver.earliestFloat()
now = clock_realtime()
roughNow = now.toFloat()
if deadline < roughNow:
discard publish(driver.target, LaterThan(seconds: deadline))
del(driver.deadlines, deadline)
else:
driver.settime(deadline)
wait(FD driver.fd, Read)
driver.waiting = false
proc spawnTimerDriver(facet: Facet; cap: Cap): TimerDriver =
let driver = TimerDriver(target: cap)
driver.fd = timerfd_create(
CLOCK_REALTIME, TFD_NONBLOCK or TFD_CLOEXEC)
if driver.fd < 0:
raiseOSError(osLastError(), "failed to acquire timer descriptor")
facet.onStop do ():
discard driver.fd.close()
driver
{.error: "create a new timerfd when the set timer is after a new deadline".}
proc spawnTimerActor*(ds: Cap): Actor {.discardable.} =
## Spawn a timer actor that responds to
## dataspace observations of timeouts on `ds`.
spawnActor(ds.relay, "timers") do (facet: Facet):
let driver = spawnTimerDriver(facet, ds)
let pat = inject(grab Observe(pattern: dropType LaterThan), {0: grabLit()})
during(ds, pat) do (deadline: float):
if deadline < clock_realtime().toFloat():
discard publish(driver.target, LaterThan(seconds: deadline))
else:
if change(driver.deadlines, deadline, +1) == cdAbsentToPresent:
if not driver.waiting:
driver.settime(deadline)
driver.waiting = true
facet.queueWork(whelp await(driver))
elif deadline < driver.pending:
driver.settime(deadline)
do:
discard change(driver.deadlines, deadline, -1, clamp = true)
#[
proc after*(ds: Cap; dur: Duration; cb: proc () {.closure.}) =
## Execute `act` after some duration of time.
let later = now() + dur.inMilliseconds.float64 * 1_000.0
onPublish(ds, grab LaterThan(seconds: later)):
## Execute `cb` after some duration of time.
var
later = clock_realtime().toFloat() +
dur.inMilliseconds.float / 1_000.0
pat = ?LaterThan(seconds: later)
onPublish(ds, pat):
cb()
]#
# TODO: periodic timer

View File

@ -46,3 +46,5 @@ proc `$`*(bag: Bag): string =
if result.len > 1: result.add ' '
result.add $x
result.add '}'
export tables.contains, tables.del, tables.len

View File

@ -1,46 +1,41 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/times
import std/[os, times]
import pkg/cps
import pkg/sys/ioqueue
import sam/syndicate
import sam/actors/timers
proc now: float64 = getTime().toUnixFloat()
let actor = bootActor("timer-test") do (facet: Facet):
let ds = facet.newDataspace()
let h = publish(ds, toRecord(Symbol"greet", "hello world!"))
runActor("timer-test") do (root: Facet):
echo "actor calls boot proc with root facte ", root
let ds = root.newDataspace()
echo "new dataspace ", ds
let h = publish(ds, "hello world!".toPreserves)
echo "published to handle ", h
onMessage(ds, grab()) do (v: Value):
stderr.writeLine "observed message ", v
message(ds, "hello world!".toPreserves)
echo "sent a message"
retract(ds, h)
echo "retracted handle ", h
# facet.sync(ds)
root.onStop:
facet.onStop do ():
echo "anonymous stop handler was invoked"
echo "stopping actor"
root.stopActor()
echo "actor stopped but still executing?"
let
oneSec = initDuration(seconds = 1)
halfSec = initDuration(milliseconds = 500)
timers = facet.newDataspace()
spawnTimerActor(timers)
#[
block:
# spawnTimers(ds)
onPublish(ds, grab(LaterThan(seconds: now()+1.0))) do:
stderr.writeLine "slept one second once"
onPublish(ds, grab(LaterThan(seconds: now()+1.0))) do:
stderr.writeLine "slept one second twice"
onPublish(ds, grab(LaterThan(seconds: now()+1.0))) do:
stderr.writeLine "slept one second thrice"
stopActor()
]#
timers.after(initDuration(seconds = 1)) do ():
echo "slept one second"
timers.after(initDuration(seconds = 2)) do ():
echo "slept 3 seconds"
timers.after(initDuration(seconds = 3)) do ():
echo "slept six seconds"
stopActor(facet)
timers.after(initDuration(seconds = 20)) do ():
echo "slept twenty seconds"
var progress = true
while not actor.stopped:
if not run(actor):
ioqueue.run()