Inferior service support

This commit is contained in:
Tony Garnock-Jones 2021-11-22 16:31:30 +01:00
parent 78b8bae580
commit f6f97b17b8
2 changed files with 25 additions and 0 deletions

9
inf.py Normal file
View File

@ -0,0 +1,9 @@
from syndicate import relay
from syndicate.during import During
import logging
@relay.service(name='inf', debug=True)
@During().add_handler
def main(turn, args):
logging.info(f'in main {turn}, {args}')
turn.on_stop(lambda turn: logging.info(f'args retracted {args}'))

View File

@ -72,6 +72,8 @@ class TunnelRelay:
address,
gatekeeper_peer = None,
gatekeeper_oid = 0,
publish_service = None,
publish_oid = 0,
on_connected = None,
on_disconnected = None,
):
@ -80,6 +82,8 @@ class TunnelRelay:
self.address = address
self.gatekeeper_peer = gatekeeper_peer
self.gatekeeper_oid = gatekeeper_oid
self.publish_service = publish_service
self.publish_oid = publish_oid
self._reset()
self.facet.linked_task(
lambda facet: self._reconnecting_main(asyncio.get_running_loop(),
@ -94,6 +98,10 @@ class TunnelRelay:
self.pending_turn = []
self._connected = False
self.gatekeeper_handle = None
if self.publish_service is not None:
# Very specific specialization of logic in rewrite_ref_out
ws = WireSymbol(self.publish_oid, self.publish_service, self.exported_references)
self.exported_references.get_ref([], self.publish_service, False, lambda: ws)
@property
def connected(self):
@ -460,3 +468,11 @@ class PipeTunnelRelay(_StreamTunnelRelay):
def _send_bytes(self, bs):
self.output_fileobj.buffer.write(bs)
self.output_fileobj.buffer.flush()
def run_stdio_service(turn, entity):
PipeTunnelRelay(turn, transportAddress.Stdio(), publish_service=turn.ref(entity))
# decorator
def service(**kwargs):
return lambda entity: \
actor.start_actor_system(lambda turn: run_stdio_service(turn, entity), **kwargs)