2021-08-18 17:35:49 +00:00
|
|
|
import asyncio
|
|
|
|
import inspect
|
|
|
|
import logging
|
|
|
|
import sys
|
|
|
|
import traceback
|
2021-12-25 21:59:21 +00:00
|
|
|
import threading
|
2021-08-18 17:35:49 +00:00
|
|
|
|
2021-08-19 02:59:04 +00:00
|
|
|
from preserves import Embedded, preserve
|
|
|
|
|
2021-08-18 17:35:49 +00:00
|
|
|
from .idgen import IdGenerator
|
2021-12-25 21:59:21 +00:00
|
|
|
from .metapy import staticproperty
|
2021-12-25 16:35:11 +00:00
|
|
|
from .dataflow import Graph, Field
|
2021-08-18 17:35:49 +00:00
|
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
_next_actor_number = IdGenerator()
|
|
|
|
_next_handle = IdGenerator()
|
|
|
|
_next_facet_id = IdGenerator()
|
|
|
|
|
2021-12-25 21:59:21 +00:00
|
|
|
_active = threading.local()
|
|
|
|
_active.turn = None
|
|
|
|
|
2021-08-19 21:09:07 +00:00
|
|
|
# decorator
|
|
|
|
def run_system(**kwargs):
|
|
|
|
return lambda boot_proc: start_actor_system(boot_proc, **kwargs)
|
|
|
|
|
2021-08-19 02:59:04 +00:00
|
|
|
def start_actor_system(boot_proc, debug = False, name = None, configure_logging = True):
|
|
|
|
if configure_logging:
|
|
|
|
logging.basicConfig(level = logging.DEBUG if debug else logging.INFO)
|
2021-08-18 17:35:49 +00:00
|
|
|
loop = asyncio.get_event_loop()
|
2021-08-19 02:59:04 +00:00
|
|
|
if debug:
|
|
|
|
loop.set_debug(True)
|
|
|
|
queue_task(lambda: Actor(boot_proc, name = name), loop = loop)
|
2021-08-18 17:35:49 +00:00
|
|
|
loop.run_forever()
|
2021-08-19 02:59:04 +00:00
|
|
|
while asyncio.all_tasks(loop):
|
|
|
|
loop.stop()
|
|
|
|
loop.run_forever()
|
2021-08-18 17:35:49 +00:00
|
|
|
loop.close()
|
|
|
|
|
|
|
|
def adjust_engine_inhabitant_count(delta):
|
|
|
|
loop = asyncio.get_running_loop()
|
|
|
|
if not hasattr(loop, '__syndicate_inhabitant_count'):
|
|
|
|
loop.__syndicate_inhabitant_count = 0
|
|
|
|
loop.__syndicate_inhabitant_count = loop.__syndicate_inhabitant_count + delta
|
|
|
|
if loop.__syndicate_inhabitant_count == 0:
|
|
|
|
log.debug('Inhabitant count reached zero')
|
|
|
|
loop.stop()
|
|
|
|
|
2021-08-19 18:04:38 +00:00
|
|
|
def remove_noerror(collection, item):
|
|
|
|
try:
|
|
|
|
collection.remove(item)
|
|
|
|
except ValueError:
|
|
|
|
pass
|
|
|
|
|
2021-08-18 17:35:49 +00:00
|
|
|
class Actor:
|
|
|
|
def __init__(self, boot_proc, name = None, initial_assertions = {}, daemon = False):
|
|
|
|
self.name = name or 'a' + str(next(_next_actor_number))
|
|
|
|
self._daemon = daemon
|
|
|
|
if not daemon:
|
|
|
|
adjust_engine_inhabitant_count(1)
|
|
|
|
self.root = Facet(self, None)
|
2021-09-07 12:35:40 +00:00
|
|
|
self.outbound = initial_assertions or {}
|
2021-08-18 17:35:49 +00:00
|
|
|
self.exit_reason = None # None -> running, True -> terminated OK, exn -> error
|
|
|
|
self.exit_hooks = []
|
|
|
|
self._log = None
|
2021-12-25 16:35:11 +00:00
|
|
|
self._dataflow_graph = None
|
2021-09-07 12:35:40 +00:00
|
|
|
Turn.run(Facet(self, self.root, set(self.outbound.keys())), stop_if_inert_after(boot_proc))
|
2021-08-18 17:35:49 +00:00
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return '<Actor:%s>' % (self.name,)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def daemon(self):
|
|
|
|
return self._daemon
|
|
|
|
|
|
|
|
@daemon.setter
|
|
|
|
def daemon(self, value):
|
|
|
|
if self._daemon != value:
|
|
|
|
self._daemon = value
|
|
|
|
adjust_engine_inhabitant_count(-1 if value else 1)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def alive(self):
|
|
|
|
return self.exit_reason is None
|
|
|
|
|
|
|
|
@property
|
|
|
|
def log(self):
|
|
|
|
if self._log is None:
|
|
|
|
self._log = logging.getLogger('syndicate.Actor.%s' % (self.name,))
|
|
|
|
return self._log
|
|
|
|
|
2021-12-25 16:35:11 +00:00
|
|
|
@property
|
|
|
|
def dataflow_graph(self):
|
|
|
|
if self._dataflow_graph is None:
|
|
|
|
self._dataflow_graph = Graph()
|
|
|
|
return self._dataflow_graph
|
|
|
|
|
2021-08-18 17:35:49 +00:00
|
|
|
def at_exit(self, hook):
|
|
|
|
self.exit_hooks.append(hook)
|
|
|
|
|
2021-08-19 18:04:38 +00:00
|
|
|
def cancel_at_exit(self, hook):
|
|
|
|
remove_noerror(self.exit_hooks, hook)
|
|
|
|
|
2021-12-25 21:59:21 +00:00
|
|
|
def _repair_dataflow_graph(self):
|
2021-12-25 16:35:11 +00:00
|
|
|
if self._dataflow_graph is not None:
|
2021-12-25 21:59:21 +00:00
|
|
|
self._dataflow_graph.repair_damage(lambda a: a())
|
2021-12-25 16:35:11 +00:00
|
|
|
|
2021-12-25 21:59:21 +00:00
|
|
|
def _terminate(self, exit_reason):
|
2021-08-18 17:35:49 +00:00
|
|
|
if self.exit_reason is not None: return
|
2021-08-19 02:59:04 +00:00
|
|
|
self.log.debug('Terminating %r with exit_reason %r', self, exit_reason)
|
2021-08-18 17:35:49 +00:00
|
|
|
self.exit_reason = exit_reason
|
|
|
|
if exit_reason != True:
|
|
|
|
self.log.error('crashed: %s' % (exit_reason,))
|
|
|
|
for h in self.exit_hooks:
|
2021-12-25 21:59:21 +00:00
|
|
|
h()
|
|
|
|
self.root._terminate(exit_reason == True)
|
2021-12-24 23:38:02 +00:00
|
|
|
if not self._daemon:
|
|
|
|
adjust_engine_inhabitant_count(-1)
|
2021-08-18 17:35:49 +00:00
|
|
|
|
2021-09-07 12:35:40 +00:00
|
|
|
def _pop_outbound(self, handle, clear_from_source_facet):
|
|
|
|
e = self.outbound.pop(handle)
|
|
|
|
if e and clear_from_source_facet:
|
|
|
|
try:
|
|
|
|
e.source_facet.handles.remove(handle)
|
|
|
|
except KeyError:
|
|
|
|
pass
|
|
|
|
return e
|
|
|
|
|
2021-08-18 17:35:49 +00:00
|
|
|
class Facet:
|
2021-12-25 21:59:21 +00:00
|
|
|
@staticproperty
|
|
|
|
def active():
|
|
|
|
return _active.turn._facet
|
|
|
|
|
2021-09-07 12:35:40 +00:00
|
|
|
def __init__(self, actor, parent, initial_handles=None):
|
2021-08-18 17:35:49 +00:00
|
|
|
self.id = next(_next_facet_id)
|
|
|
|
self.actor = actor
|
|
|
|
self.parent = parent
|
|
|
|
if parent:
|
|
|
|
parent.children.add(self)
|
|
|
|
self.children = set()
|
2021-09-07 12:35:40 +00:00
|
|
|
self.handles = initial_handles or set()
|
2021-08-18 17:35:49 +00:00
|
|
|
self.shutdown_actions = []
|
2021-08-19 20:08:53 +00:00
|
|
|
self.linked_tasks = []
|
2021-08-18 17:35:49 +00:00
|
|
|
self.alive = True
|
|
|
|
self.inert_check_preventers = 0
|
2021-12-24 23:37:12 +00:00
|
|
|
self._log = None
|
2021-08-18 17:35:49 +00:00
|
|
|
|
|
|
|
@property
|
|
|
|
def log(self):
|
2021-12-24 23:37:12 +00:00
|
|
|
if self._log is None:
|
|
|
|
if self.parent is None:
|
|
|
|
p = self.actor.log
|
|
|
|
else:
|
|
|
|
p = self.parent.log
|
|
|
|
self._log = p.getChild(str(self.id))
|
|
|
|
return self._log
|
2021-08-18 17:35:49 +00:00
|
|
|
|
|
|
|
def _repr_labels(self):
|
|
|
|
pieces = []
|
|
|
|
f = self
|
|
|
|
while f.parent is not None:
|
|
|
|
pieces.append(str(f.id))
|
|
|
|
f = f.parent
|
|
|
|
pieces.append(self.actor.name)
|
|
|
|
pieces.reverse()
|
|
|
|
return ':'.join(pieces)
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return '<Facet:%s>' % (self._repr_labels(),)
|
|
|
|
|
|
|
|
def on_stop(self, a):
|
|
|
|
self.shutdown_actions.append(a)
|
|
|
|
|
2021-08-19 18:04:38 +00:00
|
|
|
def cancel_on_stop(self, a):
|
|
|
|
remove_noerror(self.shutdown_actions, a)
|
|
|
|
|
2022-01-11 17:20:39 +00:00
|
|
|
def on_stop_or_crash(self, a):
|
|
|
|
def cleanup():
|
|
|
|
self.cancel_on_stop(cleanup)
|
|
|
|
self.actor.cancel_at_exit(cleanup)
|
|
|
|
a()
|
|
|
|
self.on_stop(cleanup)
|
|
|
|
self.actor.at_exit(cleanup)
|
|
|
|
return cleanup
|
|
|
|
|
2021-08-18 17:35:49 +00:00
|
|
|
def isinert(self):
|
2021-08-19 20:08:53 +00:00
|
|
|
return \
|
|
|
|
len(self.children) == 0 and \
|
2021-09-07 12:35:40 +00:00
|
|
|
len(self.handles) == 0 and \
|
2021-08-19 20:08:53 +00:00
|
|
|
len(self.linked_tasks) == 0 and \
|
|
|
|
self.inert_check_preventers == 0
|
2021-08-18 17:35:49 +00:00
|
|
|
|
|
|
|
def prevent_inert_check(self):
|
|
|
|
armed = True
|
|
|
|
self.inert_check_preventers = self.inert_check_preventers + 1
|
|
|
|
def disarm():
|
|
|
|
nonlocal armed
|
|
|
|
if not armed: return
|
|
|
|
armed = False
|
|
|
|
self.inert_check_preventers = self.inert_check_preventers - 1
|
|
|
|
return disarm
|
|
|
|
|
2021-08-19 21:04:35 +00:00
|
|
|
def linked_task(self, coro_fn, loop = None):
|
2021-08-19 20:08:53 +00:00
|
|
|
task = None
|
2022-01-11 17:20:39 +00:00
|
|
|
@self.on_stop_or_crash
|
2021-12-25 21:59:21 +00:00
|
|
|
def cancel_linked_task():
|
2021-08-19 20:08:53 +00:00
|
|
|
nonlocal task
|
|
|
|
if task is not None:
|
|
|
|
remove_noerror(self.linked_tasks, task)
|
|
|
|
task.cancel()
|
|
|
|
task = None
|
|
|
|
async def guarded_task():
|
|
|
|
try:
|
2021-08-19 21:04:35 +00:00
|
|
|
await coro_fn(self)
|
2021-08-19 20:08:53 +00:00
|
|
|
finally:
|
|
|
|
Turn.external(self, cancel_linked_task)
|
|
|
|
task = find_loop(loop).create_task(guarded_task())
|
|
|
|
self.linked_tasks.append(task)
|
|
|
|
|
2021-12-25 21:59:21 +00:00
|
|
|
def _terminate(self, orderly):
|
2021-08-18 17:35:49 +00:00
|
|
|
if not self.alive: return
|
2021-08-19 02:59:04 +00:00
|
|
|
self.log.debug('%s terminating %r', 'orderly' if orderly else 'disorderly', self)
|
2021-08-18 17:35:49 +00:00
|
|
|
self.alive = False
|
|
|
|
|
|
|
|
parent = self.parent
|
|
|
|
if parent:
|
|
|
|
parent.children.remove(self)
|
|
|
|
|
2021-12-25 21:59:21 +00:00
|
|
|
with ActiveFacet(self):
|
2021-08-18 17:35:49 +00:00
|
|
|
for child in list(self.children):
|
2021-12-25 21:59:21 +00:00
|
|
|
child._terminate(orderly)
|
2021-08-18 17:35:49 +00:00
|
|
|
if orderly:
|
2021-12-25 21:59:21 +00:00
|
|
|
with ActiveFacet(self.parent or self):
|
2021-12-24 23:38:02 +00:00
|
|
|
for h in self.shutdown_actions:
|
2021-12-25 21:59:21 +00:00
|
|
|
h()
|
|
|
|
turn = Turn.active
|
2021-09-07 12:35:40 +00:00
|
|
|
for h in self.handles:
|
|
|
|
# Optimization: don't clear from source facet, the source facet is us and we're
|
|
|
|
# about to clear our handles in one fell swoop.
|
|
|
|
turn._retract(self.actor._pop_outbound(h, clear_from_source_facet=False))
|
|
|
|
self.handles.clear()
|
2021-08-18 17:35:49 +00:00
|
|
|
|
|
|
|
if orderly:
|
|
|
|
if parent:
|
|
|
|
if parent.isinert():
|
2021-12-25 21:59:21 +00:00
|
|
|
parent._terminate(True)
|
2021-08-18 17:35:49 +00:00
|
|
|
else:
|
2021-12-25 21:59:21 +00:00
|
|
|
self.actor._terminate(True)
|
2021-08-18 17:35:49 +00:00
|
|
|
|
|
|
|
class ActiveFacet:
|
2021-12-25 21:59:21 +00:00
|
|
|
def __init__(self, facet):
|
|
|
|
self.turn = Turn.active
|
2021-08-18 17:35:49 +00:00
|
|
|
self.outer_facet = None
|
|
|
|
self.inner_facet = facet
|
|
|
|
|
|
|
|
def __enter__(self):
|
2021-08-19 02:59:04 +00:00
|
|
|
self.outer_facet = self.turn._facet
|
|
|
|
self.turn._facet = self.inner_facet
|
2021-08-18 17:35:49 +00:00
|
|
|
return None
|
|
|
|
|
|
|
|
def __exit__(self, t, v, tb):
|
2021-08-19 02:59:04 +00:00
|
|
|
self.turn._facet = self.outer_facet
|
2021-08-18 17:35:49 +00:00
|
|
|
self.outer_facet = None
|
|
|
|
|
|
|
|
async def ensure_awaitable(value):
|
|
|
|
if inspect.isawaitable(value):
|
|
|
|
return await value
|
|
|
|
else:
|
|
|
|
return value
|
|
|
|
|
2021-08-19 18:04:38 +00:00
|
|
|
def find_loop(loop = None):
|
|
|
|
return asyncio.get_running_loop() if loop is None else loop
|
|
|
|
|
|
|
|
def queue_task(thunk, loop = None):
|
2021-08-18 17:35:49 +00:00
|
|
|
async def task():
|
|
|
|
await ensure_awaitable(thunk())
|
2021-08-19 18:04:38 +00:00
|
|
|
return find_loop(loop).create_task(task())
|
2021-08-18 17:35:49 +00:00
|
|
|
|
2021-08-19 18:04:38 +00:00
|
|
|
def queue_task_threadsafe(thunk, loop = None):
|
2021-08-19 02:59:04 +00:00
|
|
|
async def task():
|
|
|
|
await ensure_awaitable(thunk())
|
2021-08-19 18:04:38 +00:00
|
|
|
return asyncio.run_coroutine_threadsafe(task(), find_loop(loop))
|
2021-08-19 02:59:04 +00:00
|
|
|
|
2021-08-18 17:35:49 +00:00
|
|
|
class Turn:
|
2021-12-25 21:59:21 +00:00
|
|
|
@staticproperty
|
|
|
|
def active():
|
2022-01-11 19:53:59 +00:00
|
|
|
t = getattr(_active, 'turn', False)
|
|
|
|
if t is False:
|
|
|
|
t = _active.turn = None
|
|
|
|
return t
|
2021-12-25 21:59:21 +00:00
|
|
|
|
2021-08-18 17:35:49 +00:00
|
|
|
@classmethod
|
|
|
|
def run(cls, facet, action, zombie_turn = False):
|
|
|
|
if not zombie_turn:
|
|
|
|
if not facet.actor.alive: return
|
|
|
|
if not facet.alive: return
|
|
|
|
turn = cls(facet)
|
|
|
|
try:
|
2021-12-25 21:59:21 +00:00
|
|
|
saved = Turn.active
|
|
|
|
_active.turn = turn
|
|
|
|
try:
|
|
|
|
action()
|
|
|
|
facet.actor._repair_dataflow_graph()
|
|
|
|
finally:
|
|
|
|
_active.turn = saved
|
2021-08-18 17:35:49 +00:00
|
|
|
except:
|
|
|
|
ei = sys.exc_info()
|
2021-08-19 02:59:04 +00:00
|
|
|
facet.log.error('%s', ''.join(traceback.format_exception(*ei)))
|
2021-12-25 21:59:21 +00:00
|
|
|
Turn.run(facet.actor.root, lambda: facet.actor._terminate(ei[1]))
|
2021-08-18 17:35:49 +00:00
|
|
|
else:
|
|
|
|
turn._deliver()
|
|
|
|
|
2021-08-19 02:59:04 +00:00
|
|
|
@classmethod
|
2021-08-19 18:04:38 +00:00
|
|
|
def external(cls, facet, action, loop = None):
|
2021-08-19 02:59:04 +00:00
|
|
|
return queue_task_threadsafe(lambda: cls.run(facet, action), loop)
|
|
|
|
|
2021-08-18 17:35:49 +00:00
|
|
|
def __init__(self, facet):
|
2021-08-19 02:59:04 +00:00
|
|
|
self._facet = facet
|
2021-08-18 17:35:49 +00:00
|
|
|
self.queues = {}
|
|
|
|
|
|
|
|
@property
|
|
|
|
def log(self):
|
2021-08-19 02:59:04 +00:00
|
|
|
return self._facet.log
|
2021-08-18 17:35:49 +00:00
|
|
|
|
|
|
|
def ref(self, entity):
|
2021-08-19 02:59:04 +00:00
|
|
|
return Ref(self._facet, entity)
|
2021-08-18 17:35:49 +00:00
|
|
|
|
2021-08-19 21:04:35 +00:00
|
|
|
# this actually can work as a decorator as well as a normal method!
|
2021-08-18 17:35:49 +00:00
|
|
|
def facet(self, boot_proc):
|
2021-08-19 02:59:04 +00:00
|
|
|
new_facet = Facet(self._facet.actor, self._facet)
|
2021-12-25 21:59:21 +00:00
|
|
|
with ActiveFacet(new_facet):
|
|
|
|
stop_if_inert_after(boot_proc)()
|
2021-08-18 17:35:49 +00:00
|
|
|
return new_facet
|
|
|
|
|
2021-08-19 02:59:04 +00:00
|
|
|
def prevent_inert_check(self):
|
|
|
|
return self._facet.prevent_inert_check()
|
|
|
|
|
2021-08-19 19:50:21 +00:00
|
|
|
# decorator
|
|
|
|
def linked_task(self, loop = None):
|
2021-08-19 21:04:35 +00:00
|
|
|
return lambda coro_fn: self._facet.linked_task(coro_fn, loop = loop)
|
2021-08-19 18:04:38 +00:00
|
|
|
|
2021-08-18 17:35:49 +00:00
|
|
|
def stop(self, facet = None, continuation = None):
|
|
|
|
if facet is None:
|
2021-08-19 02:59:04 +00:00
|
|
|
facet = self._facet
|
2022-01-08 12:43:42 +00:00
|
|
|
if continuation is not None:
|
|
|
|
facet.on_stop(continuation)
|
|
|
|
facet._terminate(True)
|
2021-08-18 17:35:49 +00:00
|
|
|
|
2021-08-19 21:04:35 +00:00
|
|
|
# can also be used as a decorator
|
|
|
|
def on_stop(self, a):
|
|
|
|
self._facet.on_stop(a)
|
|
|
|
|
2022-01-11 17:20:39 +00:00
|
|
|
# can also be used as a decorator
|
|
|
|
def on_stop_or_crash(self, a):
|
|
|
|
self._facet.on_stop_or_crash(a)
|
|
|
|
|
2021-09-07 12:35:40 +00:00
|
|
|
def spawn(self, boot_proc, name = None, initial_handles = None, daemon = False):
|
2021-12-25 21:59:21 +00:00
|
|
|
def action():
|
2021-08-18 17:35:49 +00:00
|
|
|
new_outbound = {}
|
2021-09-07 12:35:40 +00:00
|
|
|
if initial_handles is not None:
|
|
|
|
for handle in initial_handles:
|
|
|
|
new_outbound[handle] = \
|
|
|
|
self._facet.actor._pop_outbound(handle, clear_from_source_facet=True)
|
2021-08-18 17:35:49 +00:00
|
|
|
queue_task(lambda: Actor(boot_proc,
|
|
|
|
name = name,
|
|
|
|
initial_assertions = new_outbound,
|
|
|
|
daemon = daemon))
|
2021-08-19 02:59:04 +00:00
|
|
|
self._enqueue(self._facet, action)
|
2021-08-18 17:35:49 +00:00
|
|
|
|
|
|
|
def stop_actor(self):
|
2021-12-25 21:59:21 +00:00
|
|
|
self._enqueue(self._facet.actor.root, lambda: self._facet.actor._terminate(True))
|
2021-08-18 17:35:49 +00:00
|
|
|
|
|
|
|
def crash(self, exn):
|
2021-12-25 21:59:21 +00:00
|
|
|
self._enqueue(self._facet.actor.root, lambda: self._facet.actor._terminate(exn))
|
2021-08-18 17:35:49 +00:00
|
|
|
|
2021-12-25 16:35:11 +00:00
|
|
|
def field(self, initial_value=None, name=None):
|
|
|
|
return Field(self._facet.actor.dataflow_graph, initial_value, name)
|
|
|
|
|
|
|
|
# can also be used as a decorator
|
|
|
|
def dataflow(self, a):
|
|
|
|
f = self._facet
|
|
|
|
f.prevent_inert_check()
|
2021-12-25 21:59:21 +00:00
|
|
|
def subject():
|
2021-12-25 16:35:11 +00:00
|
|
|
if not f.alive: return
|
2021-12-25 21:59:21 +00:00
|
|
|
with ActiveFacet(f):
|
|
|
|
a()
|
|
|
|
f.on_stop(lambda: f.actor.dataflow_graph.forget_subject(subject))
|
2021-12-25 22:03:00 +00:00
|
|
|
f.actor.dataflow_graph.with_subject(subject, lambda: subject())
|
2021-12-25 16:35:11 +00:00
|
|
|
|
|
|
|
def publish_dataflow(self, assertion_function):
|
|
|
|
endpoint = DataflowPublication(assertion_function)
|
2021-12-25 21:59:21 +00:00
|
|
|
self.dataflow(lambda: endpoint.update())
|
2021-12-25 16:35:11 +00:00
|
|
|
|
2021-08-18 17:35:49 +00:00
|
|
|
def publish(self, ref, assertion):
|
|
|
|
handle = next(_next_handle)
|
|
|
|
self._publish(ref, assertion, handle)
|
|
|
|
return handle
|
|
|
|
|
|
|
|
def _publish(self, ref, assertion, handle):
|
|
|
|
# TODO: attenuation
|
2021-08-19 02:59:04 +00:00
|
|
|
assertion = preserve(assertion)
|
2021-09-07 12:35:40 +00:00
|
|
|
facet = self._facet
|
|
|
|
e = OutboundAssertion(facet, handle, ref)
|
|
|
|
facet.actor.outbound[handle] = e
|
|
|
|
facet.handles.add(handle)
|
2021-12-25 21:59:21 +00:00
|
|
|
def action():
|
2021-08-18 17:35:49 +00:00
|
|
|
e.established = True
|
2021-08-19 02:59:04 +00:00
|
|
|
self.log.debug('%r <-- publish %r handle %r', ref, assertion, handle)
|
2021-12-25 21:59:21 +00:00
|
|
|
ref.entity.on_publish(assertion, handle)
|
2021-08-18 17:35:49 +00:00
|
|
|
self._enqueue(ref.facet, action)
|
|
|
|
|
|
|
|
def retract(self, handle):
|
|
|
|
if handle is not None:
|
2021-09-07 12:35:40 +00:00
|
|
|
e = self._facet.actor._pop_outbound(handle, clear_from_source_facet=True)
|
2021-08-18 17:35:49 +00:00
|
|
|
if e is not None:
|
|
|
|
self._retract(e)
|
|
|
|
|
|
|
|
def replace(self, ref, handle, assertion):
|
2021-12-25 16:35:11 +00:00
|
|
|
if assertion is None or ref is None:
|
|
|
|
new_handle = None
|
|
|
|
else:
|
|
|
|
new_handle = self.publish(ref, assertion)
|
2021-08-18 17:35:49 +00:00
|
|
|
self.retract(handle)
|
|
|
|
return new_handle
|
|
|
|
|
|
|
|
def _retract(self, e):
|
2021-09-07 12:35:40 +00:00
|
|
|
# Assumes e has already been removed from self._facet.actor.outbound and the
|
|
|
|
# appropriate set of handles
|
2021-12-25 21:59:21 +00:00
|
|
|
def action():
|
2021-08-18 17:35:49 +00:00
|
|
|
if e.established:
|
|
|
|
e.established = False
|
2021-08-19 02:59:04 +00:00
|
|
|
self.log.debug('%r <-- retract handle %r', e.ref, e.handle)
|
2021-12-25 21:59:21 +00:00
|
|
|
e.ref.entity.on_retract(e.handle)
|
2021-08-18 17:35:49 +00:00
|
|
|
self._enqueue(e.ref.facet, action)
|
|
|
|
|
|
|
|
def sync(self, ref, k):
|
|
|
|
class SyncContinuation(Entity):
|
2021-12-25 21:59:21 +00:00
|
|
|
def on_message(self, _value):
|
|
|
|
k()
|
2021-08-18 17:35:49 +00:00
|
|
|
self._sync(ref, self.ref(SyncContinuation()))
|
|
|
|
|
|
|
|
def _sync(self, ref, peer):
|
2021-08-19 02:59:04 +00:00
|
|
|
peer = preserve(peer)
|
2021-12-25 21:59:21 +00:00
|
|
|
def action():
|
2021-08-19 02:59:04 +00:00
|
|
|
self.log.debug('%r <-- sync peer %r', ref, peer)
|
2021-12-25 21:59:21 +00:00
|
|
|
ref.entity.on_sync(peer)
|
2021-08-19 02:59:04 +00:00
|
|
|
self._enqueue(ref.facet, action)
|
2021-08-18 17:35:49 +00:00
|
|
|
|
|
|
|
def send(self, ref, message):
|
|
|
|
# TODO: attenuation
|
2021-08-19 02:59:04 +00:00
|
|
|
message = preserve(message)
|
2021-12-25 21:59:21 +00:00
|
|
|
def action():
|
2021-08-19 02:59:04 +00:00
|
|
|
self.log.debug('%r <-- message %r', ref, message)
|
2021-12-25 21:59:21 +00:00
|
|
|
ref.entity.on_message(message)
|
2021-08-18 17:35:49 +00:00
|
|
|
self._enqueue(ref.facet, action)
|
|
|
|
|
2022-01-11 17:20:49 +00:00
|
|
|
# decorator
|
|
|
|
def after(self, delay_seconds):
|
|
|
|
def decorate(action):
|
|
|
|
@self.linked_task()
|
|
|
|
async def task(facet):
|
|
|
|
await asyncio.sleep(delay_seconds)
|
|
|
|
Turn.external(facet, action)
|
|
|
|
return decorate
|
|
|
|
|
2021-08-18 17:35:49 +00:00
|
|
|
def _enqueue(self, target_facet, action):
|
2021-12-01 14:31:34 +00:00
|
|
|
target_actor = target_facet.actor
|
|
|
|
if target_actor not in self.queues:
|
|
|
|
self.queues[target_actor] = []
|
|
|
|
self.queues[target_actor].append((target_facet, action))
|
2021-08-18 17:35:49 +00:00
|
|
|
|
|
|
|
def _deliver(self):
|
2021-12-01 14:31:34 +00:00
|
|
|
for (actor, q) in self.queues.items():
|
2021-08-18 17:35:49 +00:00
|
|
|
# Stupid python scoping bites again
|
2021-12-01 14:31:34 +00:00
|
|
|
def make_deliver_q(actor, q): # gratuitous
|
2021-12-25 21:59:21 +00:00
|
|
|
def deliver_q():
|
|
|
|
turn = Turn.active
|
2021-12-01 14:31:34 +00:00
|
|
|
saved_facet = turn._facet
|
|
|
|
for (facet, action) in q:
|
|
|
|
turn._facet = facet
|
2021-12-25 21:59:21 +00:00
|
|
|
action()
|
2021-12-01 14:31:34 +00:00
|
|
|
turn._facet = saved_facet
|
|
|
|
return lambda: Turn.run(actor.root, deliver_q)
|
|
|
|
queue_task(make_deliver_q(actor, q))
|
2021-08-18 17:35:49 +00:00
|
|
|
self.queues = {}
|
|
|
|
|
|
|
|
def stop_if_inert_after(action):
|
2021-12-25 21:59:21 +00:00
|
|
|
def wrapped_action():
|
|
|
|
turn = Turn.active
|
|
|
|
action()
|
|
|
|
def check_action():
|
2021-08-19 02:59:04 +00:00
|
|
|
if (turn._facet.parent is not None and not turn._facet.parent.alive) \
|
|
|
|
or turn._facet.isinert():
|
2021-08-18 17:35:49 +00:00
|
|
|
turn.stop()
|
2021-08-19 02:59:04 +00:00
|
|
|
turn._enqueue(turn._facet, check_action)
|
2021-08-18 17:35:49 +00:00
|
|
|
return wrapped_action
|
|
|
|
|
2021-12-25 16:35:11 +00:00
|
|
|
class DataflowPublication:
|
|
|
|
def __init__(self, assertion_function):
|
|
|
|
self.assertion_function = assertion_function
|
|
|
|
self.handle = None
|
|
|
|
self.target = None
|
|
|
|
self.assertion = None
|
|
|
|
|
2021-12-25 21:59:21 +00:00
|
|
|
def update(self):
|
|
|
|
(next_target, next_assertion) = self.assertion_function() or (None, None)
|
2021-12-25 16:35:11 +00:00
|
|
|
if next_target != self.target or next_assertion != self.assertion_function:
|
|
|
|
self.target = next_target
|
|
|
|
self.assertion = next_assertion
|
2021-12-25 21:59:21 +00:00
|
|
|
self.handle = Turn.active.replace(self.target, self.handle, self.assertion)
|
2021-12-25 16:35:11 +00:00
|
|
|
|
2021-08-18 17:35:49 +00:00
|
|
|
class Ref:
|
|
|
|
def __init__(self, facet, entity):
|
|
|
|
self.facet = facet
|
|
|
|
self.entity = entity
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return '<Ref:%s/%r>' % (self.facet._repr_labels(), self.entity)
|
|
|
|
|
|
|
|
class OutboundAssertion:
|
2021-09-07 12:35:40 +00:00
|
|
|
def __init__(self, source_facet, handle, ref):
|
|
|
|
self.source_facet = source_facet
|
2021-08-18 17:35:49 +00:00
|
|
|
self.handle = handle
|
|
|
|
self.ref = ref
|
|
|
|
self.established = False
|
|
|
|
|
2021-08-19 21:04:35 +00:00
|
|
|
def __repr__(self):
|
2021-09-07 12:35:40 +00:00
|
|
|
return '<OutboundAssertion src=%r handle=%s ref=%r%s>' % \
|
|
|
|
(self.source_facet, self.handle, self.ref, ' established' if self.established else '')
|
2021-08-19 21:04:35 +00:00
|
|
|
|
2021-08-18 17:35:49 +00:00
|
|
|
# Can act as a mixin
|
|
|
|
class Entity:
|
2021-12-25 21:59:21 +00:00
|
|
|
def on_publish(self, v, handle):
|
2021-08-18 17:35:49 +00:00
|
|
|
pass
|
|
|
|
|
2021-12-25 21:59:21 +00:00
|
|
|
def on_retract(self, handle):
|
2021-08-18 17:35:49 +00:00
|
|
|
pass
|
|
|
|
|
2021-12-25 21:59:21 +00:00
|
|
|
def on_message(self, v):
|
2021-08-18 17:35:49 +00:00
|
|
|
pass
|
|
|
|
|
2021-12-25 21:59:21 +00:00
|
|
|
def on_sync(self, peer):
|
|
|
|
Turn.active.send(peer, True)
|
2021-08-18 17:35:49 +00:00
|
|
|
|
2021-08-19 02:59:04 +00:00
|
|
|
_inert_actor = None
|
|
|
|
_inert_facet = None
|
|
|
|
_inert_ref = None
|
2021-08-18 17:35:49 +00:00
|
|
|
_inert_entity = Entity()
|
2021-12-25 21:59:21 +00:00
|
|
|
def __boot_inert():
|
2021-08-19 02:59:04 +00:00
|
|
|
global _inert_actor, _inert_facet, _inert_ref
|
2021-12-25 21:59:21 +00:00
|
|
|
_inert_actor = Turn.active._facet.actor
|
|
|
|
_inert_facet = Turn.active._facet
|
|
|
|
_inert_ref = Turn.active.ref(_inert_entity)
|
2021-08-19 02:59:04 +00:00
|
|
|
async def __run_inert():
|
|
|
|
Actor(__boot_inert, name = '_inert_actor')
|
2022-01-11 19:19:40 +00:00
|
|
|
def __setup_inert():
|
2022-01-11 19:53:59 +00:00
|
|
|
def setup_main():
|
|
|
|
loop = asyncio.new_event_loop()
|
|
|
|
loop.run_until_complete(__run_inert())
|
|
|
|
loop.close()
|
|
|
|
t = threading.Thread(target=setup_main)
|
|
|
|
t.start()
|
|
|
|
t.join()
|
2022-01-11 19:19:40 +00:00
|
|
|
__setup_inert()
|