From f6f97b17b8eb07576713a5eefe116709627a362a Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 22 Nov 2021 16:31:30 +0100 Subject: [PATCH] Inferior service support --- inf.py | 9 +++++++++ syndicate/relay.py | 16 ++++++++++++++++ 2 files changed, 25 insertions(+) create mode 100644 inf.py diff --git a/inf.py b/inf.py new file mode 100644 index 0000000..2ed092b --- /dev/null +++ b/inf.py @@ -0,0 +1,9 @@ +from syndicate import relay +from syndicate.during import During +import logging + +@relay.service(name='inf', debug=True) +@During().add_handler +def main(turn, args): + logging.info(f'in main {turn}, {args}') + turn.on_stop(lambda turn: logging.info(f'args retracted {args}')) diff --git a/syndicate/relay.py b/syndicate/relay.py index 35fb3c1..e848f71 100644 --- a/syndicate/relay.py +++ b/syndicate/relay.py @@ -72,6 +72,8 @@ class TunnelRelay: address, gatekeeper_peer = None, gatekeeper_oid = 0, + publish_service = None, + publish_oid = 0, on_connected = None, on_disconnected = None, ): @@ -80,6 +82,8 @@ class TunnelRelay: self.address = address self.gatekeeper_peer = gatekeeper_peer self.gatekeeper_oid = gatekeeper_oid + self.publish_service = publish_service + self.publish_oid = publish_oid self._reset() self.facet.linked_task( lambda facet: self._reconnecting_main(asyncio.get_running_loop(), @@ -94,6 +98,10 @@ class TunnelRelay: self.pending_turn = [] self._connected = False self.gatekeeper_handle = None + if self.publish_service is not None: + # Very specific specialization of logic in rewrite_ref_out + ws = WireSymbol(self.publish_oid, self.publish_service, self.exported_references) + self.exported_references.get_ref([], self.publish_service, False, lambda: ws) @property def connected(self): @@ -460,3 +468,11 @@ class PipeTunnelRelay(_StreamTunnelRelay): def _send_bytes(self, bs): self.output_fileobj.buffer.write(bs) self.output_fileobj.buffer.flush() + +def run_stdio_service(turn, entity): + PipeTunnelRelay(turn, transportAddress.Stdio(), publish_service=turn.ref(entity)) + +# decorator +def service(**kwargs): + return lambda entity: \ + actor.start_actor_system(lambda turn: run_stdio_service(turn, entity), **kwargs)