syndicate-py/syndicate/actor.py

604 lines
19 KiB
Python

import asyncio
import inspect
import logging
import sys
import traceback
import threading
from preserves import Embedded, preserve
from .idgen import IdGenerator
from .metapy import staticproperty
from .dataflow import Graph, Field
log = logging.getLogger(__name__)
_next_actor_number = IdGenerator()
_next_handle = IdGenerator()
_next_facet_id = IdGenerator()
_active = threading.local()
_active.turn = None
# decorator
def run_system(**kwargs):
return lambda boot_proc: System().run(boot_proc, **kwargs)
class System:
def __init__(self, loop = None):
self.loop = loop or asyncio.get_event_loop()
self.inhabitant_count = 0
self.exit_signal = asyncio.Queue()
def run(self, boot_proc, debug = None, name = None, configure_logging = True):
if configure_logging:
logging.basicConfig(level = logging.DEBUG if debug else logging.INFO)
if debug:
self.loop.set_debug(True)
self.queue_task(lambda: Actor(boot_proc, system = self, name = name))
# From Python 3.12, we may be able to use:
# asyncio.run(self._run, debug=debug, loop_factory=lambda: self.loop)
# but until then:
with asyncio.Runner(debug=debug, loop_factory=lambda: self.loop) as r:
return r.run(self._run())
async def _run(self):
try:
await self.exit_signal.get()
except asyncio.CancelledError:
pass
finally:
log.debug('System._run main loop exit')
def adjust_engine_inhabitant_count(self, delta):
self.inhabitant_count = self.inhabitant_count + delta
if self.inhabitant_count == 0:
log.debug('Inhabitant count reached zero')
self.exit_signal.put_nowait(())
def queue_task(self, thunk):
async def task():
try:
await ensure_awaitable(thunk())
except asyncio.CancelledError:
pass
return self.loop.create_task(task())
def queue_task_threadsafe(self, thunk):
async def task():
try:
await ensure_awaitable(thunk())
except asyncio.CancelledError:
pass
return self.loop.call_soon_threadsafe(lambda: asyncio.run_coroutine_threadsafe(task(), self.loop))
async def ensure_awaitable(value):
if inspect.isawaitable(value):
return await value
else:
return value
def remove_noerror(collection, item):
try:
collection.remove(item)
except ValueError:
pass
class Actor:
def __init__(self, boot_proc, system, name = None, initial_assertions = {}, daemon = False):
self.name = name or 'a' + str(next(_next_actor_number))
self._system = system
self._daemon = daemon
if not daemon:
system.adjust_engine_inhabitant_count(1)
self.root = Facet(self, None)
self.outbound = initial_assertions or {}
self.exit_reason = None # None -> running, True -> terminated OK, exn -> error
self.exit_hooks = []
self._log = None
self._dataflow_graph = None
Turn.run(Facet(self, self.root, set(self.outbound.keys())), stop_if_inert_after(boot_proc))
def __repr__(self):
return '<Actor:%s>' % (self.name,)
@property
def daemon(self):
return self._daemon
@daemon.setter
def daemon(self, value):
if self._daemon != value:
self._daemon = value
self._system.adjust_engine_inhabitant_count(-1 if value else 1)
@property
def alive(self):
return self.exit_reason is None
@property
def log(self):
if self._log is None:
self._log = logging.getLogger('syndicate.Actor.%s' % (self.name,))
return self._log
@property
def dataflow_graph(self):
if self._dataflow_graph is None:
self._dataflow_graph = Graph()
return self._dataflow_graph
def at_exit(self, hook):
self.exit_hooks.append(hook)
def cancel_at_exit(self, hook):
remove_noerror(self.exit_hooks, hook)
def _repair_dataflow_graph(self):
if self._dataflow_graph is not None:
self._dataflow_graph.repair_damage(lambda a: a())
def _terminate(self, exit_reason):
if self.exit_reason is not None: return
self.log.debug('Terminating %r with exit_reason %r', self, exit_reason)
self.exit_reason = exit_reason
if exit_reason != True:
self.log.error('crashed: %s' % (exit_reason,))
for h in self.exit_hooks:
h()
self.root._terminate(exit_reason == True)
if not self._daemon:
self._system.adjust_engine_inhabitant_count(-1)
def _pop_outbound(self, handle, clear_from_source_facet):
e = self.outbound.pop(handle)
if e and clear_from_source_facet:
try:
e.source_facet.handles.remove(handle)
except KeyError:
pass
return e
class Facet:
@staticproperty
def active():
return _active.turn._facet
def __init__(self, actor, parent, initial_handles=None):
self.id = next(_next_facet_id)
self.actor = actor
self.parent = parent
if parent:
parent.children.add(self)
self.children = set()
self.handles = initial_handles or set()
self.shutdown_actions = []
self.linked_tasks = []
self.alive = True
self.inert_check_preventers = 0
self._log = None
@property
def log(self):
if self._log is None:
if self.parent is None:
p = self.actor.log
else:
p = self.parent.log
self._log = p.getChild(str(self.id))
return self._log
def _repr_labels(self):
pieces = []
f = self
while f.parent is not None:
pieces.append(str(f.id))
f = f.parent
pieces.append(self.actor.name)
pieces.reverse()
return ':'.join(pieces)
def __repr__(self):
return '<Facet:%s>' % (self._repr_labels(),)
def on_stop(self, a):
self.shutdown_actions.append(a)
def cancel_on_stop(self, a):
remove_noerror(self.shutdown_actions, a)
def on_stop_or_crash(self, a):
def cleanup():
self.cancel_on_stop(cleanup)
self.actor.cancel_at_exit(cleanup)
a()
self.on_stop(cleanup)
self.actor.at_exit(cleanup)
return cleanup
def isinert(self):
return \
len(self.children) == 0 and \
len(self.handles) == 0 and \
len(self.linked_tasks) == 0 and \
self.inert_check_preventers == 0
def prevent_inert_check(self):
armed = True
self.inert_check_preventers = self.inert_check_preventers + 1
def disarm():
nonlocal armed
if not armed: return
armed = False
self.inert_check_preventers = self.inert_check_preventers - 1
return disarm
@property
def loop(self):
return self.actor._system.loop
def linked_task(self, coro_fn, run_in_executor=False):
task = None
if run_in_executor:
inner_coro_fn = coro_fn
async def outer_coro_fn(facet):
await self.loop.run_in_executor(None, lambda: inner_coro_fn(facet))
coro_fn = outer_coro_fn
@self.on_stop_or_crash
def cancel_linked_task():
nonlocal task
if task is not None:
remove_noerror(self.linked_tasks, task)
task.cancel()
task = None
async def guarded_task():
should_terminate_facet = True
try:
if await coro_fn(self) is True:
should_terminate_facet = False
except asyncio.CancelledError:
pass
except:
import traceback
traceback.print_exc()
finally:
if should_terminate_facet:
Turn.external(self, lambda: Turn.active.stop())
else:
Turn.external(self, cancel_linked_task)
task = self.loop.create_task(guarded_task())
self.linked_tasks.append(task)
def _terminate(self, orderly):
if not self.alive: return
self.log.debug('%s terminating %r', 'orderly' if orderly else 'disorderly', self)
self.alive = False
parent = self.parent
if parent:
parent.children.remove(self)
with ActiveFacet(self):
for child in list(self.children):
child._terminate(orderly)
if orderly:
with ActiveFacet(self.parent or self):
for h in self.shutdown_actions:
h()
turn = Turn.active
for h in self.handles:
# Optimization: don't clear from source facet, the source facet is us and we're
# about to clear our handles in one fell swoop.
turn._retract(self.actor._pop_outbound(h, clear_from_source_facet=False))
self.handles.clear()
if orderly:
if parent:
if parent.isinert():
parent._terminate(True)
else:
self.actor._terminate(True)
class ActiveFacet:
def __init__(self, facet):
self.turn = Turn.active
self.outer_facet = None
self.inner_facet = facet
def __enter__(self):
self.outer_facet = self.turn._facet
self.turn._facet = self.inner_facet
return None
def __exit__(self, t, v, tb):
self.turn._facet = self.outer_facet
self.outer_facet = None
def find_loop(loop = None):
return asyncio.get_running_loop() if loop is None else loop
class Turn:
@staticproperty
def active():
t = getattr(_active, 'turn', False)
if t is False:
t = _active.turn = None
return t
@classmethod
def run(cls, facet, action, zombie_turn = False):
if not zombie_turn:
if not facet.actor.alive: return
if not facet.alive: return
turn = cls(facet)
try:
saved = Turn.active
_active.turn = turn
try:
action()
facet.actor._repair_dataflow_graph()
finally:
_active.turn = saved
except:
ei = sys.exc_info()
facet.log.error('%s', ''.join(traceback.format_exception(*ei)))
Turn.run(facet.actor.root, lambda: facet.actor._terminate(ei[1]))
else:
turn._deliver()
@classmethod
def external(cls, facet, action, loop = None):
return facet.actor._system.queue_task_threadsafe(lambda: cls.run(facet, action))
def __init__(self, facet):
self._facet = facet
self._system = facet.actor._system
self.queues = {}
@property
def log(self):
return self._facet.log
def ref(self, entity):
return Ref(self._facet, entity)
# this actually can work as a decorator as well as a normal method!
def facet(self, boot_proc):
new_facet = Facet(self._facet.actor, self._facet)
with ActiveFacet(new_facet):
stop_if_inert_after(boot_proc)()
return new_facet
def prevent_inert_check(self):
return self._facet.prevent_inert_check()
# decorator
def linked_task(self, **kwargs):
return lambda coro_fn: self._facet.linked_task(coro_fn, **kwargs)
def stop(self, facet = None, continuation = None):
if facet is None:
facet = self._facet
if continuation is not None:
facet.on_stop(continuation)
facet._terminate(True)
# can also be used as a decorator
def on_stop(self, a):
self._facet.on_stop(a)
# can also be used as a decorator
def on_stop_or_crash(self, a):
self._facet.on_stop_or_crash(a)
def spawn(self, boot_proc, name = None, initial_handles = None, daemon = False):
def action():
new_outbound = {}
if initial_handles is not None:
for handle in initial_handles:
new_outbound[handle] = \
self._facet.actor._pop_outbound(handle, clear_from_source_facet=True)
self._system.queue_task(lambda: Actor(boot_proc,
system = self._system,
name = name,
initial_assertions = new_outbound,
daemon = daemon))
self._enqueue(self._facet, action)
def stop_actor(self):
self._enqueue(self._facet.actor.root, lambda: self._facet.actor._terminate(True))
def crash(self, exn):
self._enqueue(self._facet.actor.root, lambda: self._facet.actor._terminate(exn))
def field(self, initial_value=None, name=None):
return Field(self._facet.actor.dataflow_graph, initial_value, name)
# can also be used as a decorator
def dataflow(self, a):
f = self._facet
f.prevent_inert_check()
def subject():
if not f.alive: return
with ActiveFacet(f):
a()
f.on_stop(lambda: f.actor.dataflow_graph.forget_subject(subject))
f.actor.dataflow_graph.with_subject(subject, lambda: subject())
def publish_dataflow(self, assertion_function):
endpoint = DataflowPublication(assertion_function)
self.dataflow(lambda: endpoint.update())
def publish(self, ref, assertion):
handle = next(_next_handle)
self._publish(ref, assertion, handle)
return handle
def _publish(self, ref, assertion, handle):
# TODO: attenuation
assertion = preserve(assertion)
facet = self._facet
e = OutboundAssertion(facet, handle, ref)
facet.actor.outbound[handle] = e
facet.handles.add(handle)
def action():
e.established = True
self.log.debug('%r <-- publish %r handle %r', ref, assertion, handle)
ref.entity.on_publish(assertion, handle)
self._enqueue(ref.facet, action)
def retract(self, handle):
if handle is not None:
e = self._facet.actor._pop_outbound(handle, clear_from_source_facet=True)
if e is not None:
self._retract(e)
def replace(self, ref, handle, assertion):
if assertion is None or ref is None:
new_handle = None
else:
new_handle = self.publish(ref, assertion)
self.retract(handle)
return new_handle
def _retract(self, e):
# Assumes e has already been removed from self._facet.actor.outbound and the
# appropriate set of handles
def action():
if e.established:
e.established = False
self.log.debug('%r <-- retract handle %r', e.ref, e.handle)
e.ref.entity.on_retract(e.handle)
self._enqueue(e.ref.facet, action)
def sync(self, ref, k):
class SyncContinuation(Entity):
def on_message(self, _value):
k()
self._sync(ref, self.ref(SyncContinuation()))
def _sync(self, ref, peer):
peer = preserve(peer)
def action():
self.log.debug('%r <-- sync peer %r', ref, peer)
ref.entity.on_sync(peer)
self._enqueue(ref.facet, action)
def send(self, ref, message):
# TODO: attenuation
message = preserve(message)
def action():
self.log.debug('%r <-- message %r', ref, message)
ref.entity.on_message(message)
self._enqueue(ref.facet, action)
# decorator
def after(self, delay_seconds):
def decorate(action):
@self.linked_task()
async def task(facet):
await asyncio.sleep(delay_seconds)
Turn.external(facet, action)
return decorate
def _enqueue(self, target_facet, action):
target_actor = target_facet.actor
if target_actor not in self.queues:
self.queues[target_actor] = []
self.queues[target_actor].append((target_facet, action))
def _deliver(self):
for (actor, q) in self.queues.items():
# Stupid python scoping bites again
def make_deliver_q(actor, q): # gratuitous
def deliver_q():
turn = Turn.active
saved_facet = turn._facet
for (facet, action) in q:
turn._facet = facet
action()
turn._facet = saved_facet
return lambda: Turn.run(actor.root, deliver_q)
self._system.queue_task(make_deliver_q(actor, q))
self.queues = {}
def stop_if_inert_after(action):
def wrapped_action():
turn = Turn.active
action()
def check_action():
if (turn._facet.parent is not None and not turn._facet.parent.alive) \
or turn._facet.isinert():
turn.stop()
turn._enqueue(turn._facet, check_action)
return wrapped_action
class DataflowPublication:
def __init__(self, assertion_function):
self.assertion_function = assertion_function
self.handle = None
self.target = None
self.assertion = None
def update(self):
(next_target, next_assertion) = self.assertion_function() or (None, None)
if next_target != self.target or next_assertion != self.assertion_function:
self.target = next_target
self.assertion = next_assertion
self.handle = Turn.active.replace(self.target, self.handle, self.assertion)
class Ref:
def __init__(self, facet, entity):
self.facet = facet
self.entity = entity
def __repr__(self):
return '<Ref:%s/%r>' % (self.facet._repr_labels(), self.entity)
class OutboundAssertion:
def __init__(self, source_facet, handle, ref):
self.source_facet = source_facet
self.handle = handle
self.ref = ref
self.established = False
def __repr__(self):
return '<OutboundAssertion src=%r handle=%s ref=%r%s>' % \
(self.source_facet, self.handle, self.ref, ' established' if self.established else '')
# Can act as a mixin
class Entity:
def on_publish(self, v, handle):
pass
def on_retract(self, handle):
pass
def on_message(self, v):
pass
def on_sync(self, peer):
Turn.active.send(peer, True)
_inert_actor = None
_inert_facet = None
_inert_ref = None
_inert_entity = Entity()
def __boot_inert():
global _inert_actor, _inert_facet, _inert_ref
_inert_actor = Turn.active._facet.actor
_inert_facet = Turn.active._facet
_inert_ref = Turn.active.ref(_inert_entity)
async def __run_inert():
Actor(__boot_inert, system = System(), name = '_inert_actor')
def __setup_inert():
def setup_main():
loop = asyncio.new_event_loop()
loop.run_until_complete(__run_inert())
loop.close()
t = threading.Thread(target=setup_main)
t.start()
t.join()
__setup_inert()