2021-08-19 22:40:05 +00:00
|
|
|
import sys
|
2021-08-19 02:59:04 +00:00
|
|
|
import asyncio
|
|
|
|
|
|
|
|
from preserves import Embedded, stringify
|
|
|
|
from preserves.fold import map_embeddeds
|
|
|
|
|
2021-12-25 21:59:21 +00:00
|
|
|
from . import actor, encode, transport, Decoder, gatekeeper, turn
|
2021-08-19 21:04:35 +00:00
|
|
|
from .during import During
|
2021-12-25 21:59:21 +00:00
|
|
|
from .actor import _inert_ref
|
2021-08-19 02:59:04 +00:00
|
|
|
from .idgen import IdGenerator
|
2021-10-20 12:37:58 +00:00
|
|
|
from .schema import protocol, sturdy, transportAddress
|
2021-08-19 02:59:04 +00:00
|
|
|
|
|
|
|
class InboundAssertion:
|
2021-09-07 12:57:24 +00:00
|
|
|
def __init__(self, remote_handle, local_handle, pins):
|
2021-08-19 02:59:04 +00:00
|
|
|
self.remote_handle = remote_handle
|
|
|
|
self.local_handle = local_handle
|
2021-09-07 12:57:24 +00:00
|
|
|
self.pins = pins
|
2021-08-19 02:59:04 +00:00
|
|
|
|
|
|
|
class WireSymbol:
|
2021-09-07 12:57:24 +00:00
|
|
|
def __init__(self, oid, ref, membrane):
|
2021-08-19 02:59:04 +00:00
|
|
|
self.oid = oid
|
|
|
|
self.ref = ref
|
2021-09-07 12:57:24 +00:00
|
|
|
self.membrane = membrane
|
2021-08-19 02:59:04 +00:00
|
|
|
self.count = 0
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return '<ws:%d/%d:%r>' % (self.oid, self.count, self.ref)
|
|
|
|
|
2021-09-07 12:57:24 +00:00
|
|
|
def grab(self, pins):
|
|
|
|
self.count = self.count + 1
|
|
|
|
pins.append(self)
|
|
|
|
|
|
|
|
def drop(self):
|
|
|
|
self.count = self.count - 1
|
|
|
|
if self.count == 0:
|
|
|
|
del self.membrane.oid_map[self.oid]
|
|
|
|
del self.membrane.ref_map[self.ref]
|
|
|
|
|
2021-08-19 02:59:04 +00:00
|
|
|
class Membrane:
|
|
|
|
def __init__(self):
|
|
|
|
self.oid_map = {}
|
|
|
|
self.ref_map = {}
|
|
|
|
|
2021-09-07 12:57:24 +00:00
|
|
|
def _get(self, pins, map, key, is_transient, ws_maker):
|
2021-08-19 02:59:04 +00:00
|
|
|
ws = map.get(key, None)
|
2021-09-07 12:57:24 +00:00
|
|
|
if ws is None and ws_maker is not None:
|
2021-08-19 02:59:04 +00:00
|
|
|
ws = ws_maker()
|
|
|
|
self.oid_map[ws.oid] = ws
|
|
|
|
self.ref_map[ws.ref] = ws
|
2021-09-07 12:57:24 +00:00
|
|
|
if not is_transient and ws is not None:
|
|
|
|
ws.grab(pins)
|
2021-08-19 02:59:04 +00:00
|
|
|
return ws
|
|
|
|
|
2021-09-07 12:57:24 +00:00
|
|
|
def get_ref(self, pins, local_ref, is_transient, ws_maker):
|
|
|
|
return self._get(pins, self.ref_map, local_ref, is_transient, ws_maker)
|
2021-08-19 02:59:04 +00:00
|
|
|
|
2021-09-07 12:57:24 +00:00
|
|
|
def get_oid(self, pins, remote_oid, ws_maker):
|
|
|
|
return self._get(pins, self.oid_map, remote_oid, False, ws_maker)
|
2021-08-19 02:59:04 +00:00
|
|
|
|
2021-09-07 12:57:24 +00:00
|
|
|
def drop_all(wss):
|
|
|
|
for ws in wss:
|
|
|
|
ws.drop()
|
2021-08-19 02:59:04 +00:00
|
|
|
|
|
|
|
# There are other kinds of relay. This one has exactly two participants connected to each other.
|
|
|
|
class TunnelRelay:
|
|
|
|
def __init__(self,
|
|
|
|
address,
|
|
|
|
gatekeeper_peer = None,
|
|
|
|
gatekeeper_oid = 0,
|
2021-11-22 15:31:30 +00:00
|
|
|
publish_service = None,
|
|
|
|
publish_oid = 0,
|
2021-08-19 02:59:04 +00:00
|
|
|
on_connected = None,
|
|
|
|
on_disconnected = None,
|
|
|
|
):
|
2021-12-25 21:59:21 +00:00
|
|
|
self.facet = turn.active_facet()
|
2021-08-19 02:59:04 +00:00
|
|
|
self.facet.on_stop(self._shutdown)
|
|
|
|
self.address = address
|
|
|
|
self.gatekeeper_peer = gatekeeper_peer
|
|
|
|
self.gatekeeper_oid = gatekeeper_oid
|
2021-11-22 15:31:30 +00:00
|
|
|
self.publish_service = publish_service
|
|
|
|
self.publish_oid = publish_oid
|
2021-08-19 02:59:04 +00:00
|
|
|
self._reset()
|
2021-08-19 21:04:35 +00:00
|
|
|
self.facet.linked_task(
|
2023-02-12 21:02:08 +00:00
|
|
|
lambda facet: self._reconnecting_main(facet.actor._system,
|
2021-08-19 21:04:35 +00:00
|
|
|
on_connected = on_connected,
|
|
|
|
on_disconnected = on_disconnected))
|
2021-08-19 02:59:04 +00:00
|
|
|
|
|
|
|
def _reset(self):
|
|
|
|
self.inbound_assertions = {} # map remote handle to InboundAssertion
|
2021-09-07 12:57:24 +00:00
|
|
|
self.outbound_assertions = {} # map local handle to `WireSymbol`s
|
2021-08-19 02:59:04 +00:00
|
|
|
self.exported_references = Membrane()
|
|
|
|
self.imported_references = Membrane()
|
|
|
|
self.pending_turn = []
|
|
|
|
self._connected = False
|
|
|
|
self.gatekeeper_handle = None
|
2021-11-30 19:43:03 +00:00
|
|
|
if self.publish_service is None:
|
|
|
|
self.next_local_oid = IdGenerator(initial_value=0)
|
|
|
|
else:
|
2021-11-30 20:51:54 +00:00
|
|
|
self.next_local_oid = IdGenerator(initial_value=(self.publish_oid + 1))
|
2021-11-22 15:31:30 +00:00
|
|
|
# Very specific specialization of logic in rewrite_ref_out
|
|
|
|
ws = WireSymbol(self.publish_oid, self.publish_service, self.exported_references)
|
|
|
|
self.exported_references.get_ref([], self.publish_service, False, lambda: ws)
|
2021-08-19 02:59:04 +00:00
|
|
|
|
|
|
|
@property
|
|
|
|
def connected(self):
|
|
|
|
return self._connected
|
|
|
|
|
2021-12-25 21:59:21 +00:00
|
|
|
def _shutdown(self):
|
2021-08-19 02:59:04 +00:00
|
|
|
self._disconnect()
|
|
|
|
|
|
|
|
def deregister(self, handle):
|
2021-09-07 12:57:24 +00:00
|
|
|
drop_all(self.outbound_assertions.pop(handle, ()))
|
2021-08-19 02:59:04 +00:00
|
|
|
|
2021-09-07 12:57:24 +00:00
|
|
|
def _lookup_exported_oid(self, local_oid, pins):
|
|
|
|
ws = self.exported_references.get_oid(pins, local_oid, None)
|
|
|
|
if ws is None:
|
|
|
|
return _inert_ref
|
|
|
|
return ws.ref
|
|
|
|
|
|
|
|
def register_imported_oid(self, remote_oid, pins):
|
|
|
|
self.imported_references.get_oid(pins, remote_oid, None)
|
2021-08-19 02:59:04 +00:00
|
|
|
|
2021-09-07 12:57:24 +00:00
|
|
|
def register(self, target_oid, assertion, maybe_handle):
|
|
|
|
pins = []
|
|
|
|
self.register_imported_oid(target_oid, pins)
|
2021-08-19 02:59:04 +00:00
|
|
|
rewritten = map_embeddeds(
|
2021-09-07 12:57:24 +00:00
|
|
|
lambda r: Embedded(self.rewrite_ref_out(r, maybe_handle is None, pins)),
|
2021-08-19 02:59:04 +00:00
|
|
|
assertion)
|
|
|
|
if maybe_handle is not None:
|
2021-09-07 12:57:24 +00:00
|
|
|
self.outbound_assertions[maybe_handle] = pins
|
2021-08-19 02:59:04 +00:00
|
|
|
return rewritten
|
|
|
|
|
2021-09-07 12:57:24 +00:00
|
|
|
def rewrite_ref_out(self, r, is_transient, pins):
|
2021-08-19 02:59:04 +00:00
|
|
|
if isinstance(r.entity, RelayEntity) and r.entity.relay == self:
|
|
|
|
# TODO attenuation
|
|
|
|
return sturdy.WireRef.yours(sturdy.Oid(r.entity.oid), ())
|
|
|
|
else:
|
|
|
|
ws = self.exported_references.get_ref(
|
2021-11-30 19:43:03 +00:00
|
|
|
pins, r, is_transient, lambda: WireSymbol(next(self.next_local_oid), r,
|
2021-09-07 12:57:24 +00:00
|
|
|
self.exported_references))
|
2021-08-19 02:59:04 +00:00
|
|
|
return sturdy.WireRef.mine(sturdy.Oid(ws.oid))
|
|
|
|
|
2021-12-25 21:59:21 +00:00
|
|
|
def rewrite_in(self, assertion, pins):
|
2021-08-19 02:59:04 +00:00
|
|
|
rewritten = map_embeddeds(
|
2021-12-25 21:59:21 +00:00
|
|
|
lambda wire_ref: Embedded(self.rewrite_ref_in(wire_ref, pins)),
|
2021-08-19 02:59:04 +00:00
|
|
|
assertion)
|
2021-09-07 12:57:24 +00:00
|
|
|
return rewritten
|
2021-08-19 02:59:04 +00:00
|
|
|
|
2021-12-25 21:59:21 +00:00
|
|
|
def rewrite_ref_in(self, wire_ref, pins):
|
2021-08-19 02:59:04 +00:00
|
|
|
if wire_ref.VARIANT.name == 'mine':
|
|
|
|
oid = wire_ref.oid.value
|
|
|
|
ws = self.imported_references.get_oid(
|
2021-09-07 12:57:24 +00:00
|
|
|
pins, oid, lambda: WireSymbol(oid, turn.ref(RelayEntity(self, oid)),
|
|
|
|
self.imported_references))
|
2021-08-19 02:59:04 +00:00
|
|
|
return ws.ref
|
|
|
|
else:
|
|
|
|
oid = wire_ref.oid.value
|
2021-09-07 12:57:24 +00:00
|
|
|
local_ref = self._lookup_exported_oid(oid, pins)
|
2021-08-19 02:59:04 +00:00
|
|
|
attenuation = wire_ref.attenuation
|
|
|
|
if len(attenuation) > 0:
|
|
|
|
raise NotImplementedError('Non-empty attenuations not yet implemented') # TODO
|
|
|
|
return local_ref
|
|
|
|
|
|
|
|
def _on_disconnected(self):
|
|
|
|
self._connected = False
|
2021-12-25 21:59:21 +00:00
|
|
|
def retract_inbound():
|
2021-08-19 02:59:04 +00:00
|
|
|
for ia in self.inbound_assertions.values():
|
|
|
|
turn.retract(ia.local_handle)
|
|
|
|
if self.gatekeeper_handle is not None:
|
|
|
|
turn.retract(self.gatekeeper_handle)
|
|
|
|
self._reset()
|
2021-12-25 21:59:21 +00:00
|
|
|
turn.run(self.facet, retract_inbound)
|
2021-08-19 02:59:04 +00:00
|
|
|
self._disconnect()
|
|
|
|
|
|
|
|
def _on_connected(self):
|
|
|
|
self._connected = True
|
|
|
|
if self.gatekeeper_peer is not None:
|
2021-12-25 21:59:21 +00:00
|
|
|
def connected_action():
|
|
|
|
gk = self.rewrite_ref_in(sturdy.WireRef.mine(sturdy.Oid(self.gatekeeper_oid)), [])
|
2021-08-19 02:59:04 +00:00
|
|
|
self.gatekeeper_handle = turn.publish(self.gatekeeper_peer, Embedded(gk))
|
2021-12-25 21:59:21 +00:00
|
|
|
turn.run(self.facet, connected_action)
|
2021-08-19 02:59:04 +00:00
|
|
|
|
|
|
|
def _on_event(self, v):
|
2021-12-25 21:59:21 +00:00
|
|
|
turn.run(self.facet, lambda: self._handle_event(v))
|
2021-08-19 02:59:04 +00:00
|
|
|
|
2021-12-25 21:59:21 +00:00
|
|
|
def _handle_event(self, v):
|
2021-08-19 02:59:04 +00:00
|
|
|
packet = protocol.Packet.decode(v)
|
2023-02-12 21:02:08 +00:00
|
|
|
# self.facet.log.info('IN: %r', packet)
|
2021-08-19 02:59:04 +00:00
|
|
|
variant = packet.VARIANT.name
|
2021-12-25 21:59:21 +00:00
|
|
|
if variant == 'Turn': self._handle_turn_events(packet.value.value)
|
|
|
|
elif variant == 'Error': self._on_error(packet.value.message, packet.value.detail)
|
2022-01-16 23:20:17 +00:00
|
|
|
elif variant == 'Extension': pass
|
2021-08-19 02:59:04 +00:00
|
|
|
|
2021-12-25 21:59:21 +00:00
|
|
|
def _on_error(self, message, detail):
|
2021-08-19 02:59:04 +00:00
|
|
|
self.facet.log.error('Error from server: %r (detail: %r)', message, detail)
|
|
|
|
self._disconnect()
|
|
|
|
|
2021-12-25 21:59:21 +00:00
|
|
|
def _handle_turn_events(self, events):
|
2021-08-19 02:59:04 +00:00
|
|
|
for e in events:
|
2021-09-07 12:57:24 +00:00
|
|
|
pins = []
|
|
|
|
ref = self._lookup_exported_oid(e.oid.value, pins)
|
2021-08-19 02:59:04 +00:00
|
|
|
event = e.event
|
|
|
|
variant = event.VARIANT.name
|
|
|
|
if variant == 'Assert':
|
2021-12-25 21:59:21 +00:00
|
|
|
self._handle_publish(pins, ref, event.value.assertion.value, event.value.handle.value)
|
2021-08-19 02:59:04 +00:00
|
|
|
elif variant == 'Retract':
|
2021-12-25 21:59:21 +00:00
|
|
|
self._handle_retract(pins, ref, event.value.handle.value)
|
2021-08-19 02:59:04 +00:00
|
|
|
elif variant == 'Message':
|
2021-12-25 21:59:21 +00:00
|
|
|
self._handle_message(pins, ref, event.value.body.value)
|
2021-08-19 02:59:04 +00:00
|
|
|
elif variant == 'Sync':
|
2021-12-25 21:59:21 +00:00
|
|
|
self._handle_sync(pins, ref, event.value.peer)
|
2021-08-19 02:59:04 +00:00
|
|
|
|
2021-12-25 21:59:21 +00:00
|
|
|
def _handle_publish(self, pins, ref, assertion, remote_handle):
|
|
|
|
assertion = self.rewrite_in(assertion, pins)
|
2021-08-19 02:59:04 +00:00
|
|
|
self.inbound_assertions[remote_handle] = \
|
2021-09-07 12:57:24 +00:00
|
|
|
InboundAssertion(remote_handle, turn.publish(ref, assertion), pins)
|
2021-08-19 02:59:04 +00:00
|
|
|
|
2021-12-25 21:59:21 +00:00
|
|
|
def _handle_retract(self, pins, ref, remote_handle):
|
2021-08-19 02:59:04 +00:00
|
|
|
ia = self.inbound_assertions.pop(remote_handle, None)
|
|
|
|
if ia is None:
|
|
|
|
raise ValueError('Peer retracted invalid handle %s' % (remote_handle,))
|
2021-09-07 12:57:24 +00:00
|
|
|
drop_all(ia.pins)
|
|
|
|
drop_all(pins)
|
2021-08-19 02:59:04 +00:00
|
|
|
turn.retract(ia.local_handle)
|
|
|
|
|
2021-12-25 21:59:21 +00:00
|
|
|
def _handle_message(self, pins, ref, message):
|
|
|
|
message = self.rewrite_in(message, pins)
|
2021-09-07 12:57:24 +00:00
|
|
|
for ws in pins:
|
|
|
|
if ws.count == 1:
|
|
|
|
raise ValueError('Cannot receive transient reference')
|
2021-08-19 02:59:04 +00:00
|
|
|
turn.send(ref, message)
|
2021-09-07 12:57:24 +00:00
|
|
|
drop_all(pins)
|
2021-08-19 02:59:04 +00:00
|
|
|
|
2021-12-25 21:59:21 +00:00
|
|
|
def _handle_sync(self, pins, ref, wire_peer):
|
|
|
|
peer = self.rewrite_ref_in(wire_peer, pins)
|
|
|
|
def done():
|
2021-08-19 02:59:04 +00:00
|
|
|
turn.send(peer, True)
|
2021-09-07 12:57:24 +00:00
|
|
|
drop_all(pins)
|
2021-08-19 02:59:04 +00:00
|
|
|
turn.sync(ref, done)
|
|
|
|
|
|
|
|
def _send(self, remote_oid, turn_event):
|
|
|
|
if len(self.pending_turn) == 0:
|
2021-12-25 21:59:21 +00:00
|
|
|
def flush_pending():
|
2021-08-19 02:59:04 +00:00
|
|
|
packet = protocol.Packet.Turn(protocol.Turn(self.pending_turn))
|
|
|
|
self.pending_turn = []
|
2023-02-12 21:02:08 +00:00
|
|
|
# self.facet.log.info('OUT: %r', packet)
|
2021-08-19 02:59:04 +00:00
|
|
|
self._send_bytes(encode(packet))
|
2023-02-12 21:02:08 +00:00
|
|
|
self.facet.actor._system.queue_task(lambda: turn.run(self.facet, flush_pending))
|
2021-08-19 02:59:04 +00:00
|
|
|
self.pending_turn.append(protocol.TurnEvent(protocol.Oid(remote_oid), turn_event))
|
|
|
|
|
|
|
|
def _send_bytes(self, bs):
|
|
|
|
raise Exception('subclassresponsibility')
|
|
|
|
|
|
|
|
def _disconnect(self):
|
|
|
|
raise Exception('subclassresponsibility')
|
|
|
|
|
2023-02-12 21:02:08 +00:00
|
|
|
async def _reconnecting_main(self, system, on_connected=None, on_disconnected=None):
|
2021-08-19 02:59:04 +00:00
|
|
|
should_run = True
|
|
|
|
while should_run and self.facet.alive:
|
2023-02-12 21:02:08 +00:00
|
|
|
did_connect = await self.main(system, on_connected=(on_connected or _default_on_connected))
|
2021-08-19 02:59:04 +00:00
|
|
|
should_run = await (on_disconnected or _default_on_disconnected)(self, did_connect)
|
|
|
|
|
|
|
|
@staticmethod
|
2021-12-25 21:59:21 +00:00
|
|
|
def from_str(conn_str, **kwargs):
|
|
|
|
return transport.connection_from_str(conn_str, **kwargs)
|
2021-08-19 21:04:35 +00:00
|
|
|
|
|
|
|
# decorator
|
2024-03-29 13:01:27 +00:00
|
|
|
def connect(conn_str, cap = None, **kwargs):
|
2021-08-19 21:04:35 +00:00
|
|
|
def prepare_resolution_handler(handler):
|
|
|
|
@During().add_handler
|
2021-12-25 21:59:21 +00:00
|
|
|
def handle_gatekeeper(gk):
|
2024-03-29 13:01:27 +00:00
|
|
|
if cap is None:
|
|
|
|
handler(gk.embeddedValue)
|
|
|
|
else:
|
|
|
|
gatekeeper.resolve(gk.embeddedValue, cap)(handler)
|
2021-08-19 21:04:35 +00:00
|
|
|
return transport.connection_from_str(
|
|
|
|
conn_str,
|
|
|
|
gatekeeper_peer = turn.ref(handle_gatekeeper),
|
|
|
|
**kwargs)
|
|
|
|
return prepare_resolution_handler
|
2021-08-19 02:59:04 +00:00
|
|
|
|
|
|
|
class RelayEntity(actor.Entity):
|
|
|
|
def __init__(self, relay, oid):
|
|
|
|
self.relay = relay
|
|
|
|
self.oid = oid
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return '<Relay %s %s>' % (stringify(self.relay.address), self.oid)
|
|
|
|
|
|
|
|
def _send(self, e):
|
|
|
|
self.relay._send(self.oid, e)
|
|
|
|
|
2021-12-25 21:59:21 +00:00
|
|
|
def on_publish(self, assertion, handle):
|
2021-08-19 02:59:04 +00:00
|
|
|
self._send(protocol.Event.Assert(protocol.Assert(
|
2021-09-07 12:57:24 +00:00
|
|
|
protocol.Assertion(self.relay.register(self.oid, assertion, handle)),
|
2021-08-19 02:59:04 +00:00
|
|
|
protocol.Handle(handle))))
|
|
|
|
|
2021-12-25 21:59:21 +00:00
|
|
|
def on_retract(self, handle):
|
2021-08-19 02:59:04 +00:00
|
|
|
self.relay.deregister(handle)
|
|
|
|
self._send(protocol.Event.Retract(protocol.Retract(protocol.Handle(handle))))
|
|
|
|
|
2021-12-25 21:59:21 +00:00
|
|
|
def on_message(self, message):
|
2021-08-19 02:59:04 +00:00
|
|
|
self._send(protocol.Event.Message(protocol.Message(
|
2021-09-07 12:57:24 +00:00
|
|
|
protocol.Assertion(self.relay.register(self.oid, message, None)))))
|
2021-08-19 02:59:04 +00:00
|
|
|
|
2021-12-25 21:59:21 +00:00
|
|
|
def on_sync(self, peer):
|
2021-09-07 12:57:24 +00:00
|
|
|
pins = []
|
|
|
|
self.relay.register_imported_oid(self.oid, pins)
|
|
|
|
entity = SyncPeerEntity(self.relay, peer, pins)
|
|
|
|
rewritten = Embedded(self.relay.rewrite_ref_out(turn.ref(entity), False, pins))
|
2021-08-19 02:59:04 +00:00
|
|
|
self._send(protocol.Event.Sync(protocol.Sync(rewritten)))
|
|
|
|
|
|
|
|
class SyncPeerEntity(actor.Entity):
|
2021-09-07 12:57:24 +00:00
|
|
|
def __init__(self, relay, peer, pins):
|
2021-08-19 02:59:04 +00:00
|
|
|
self.relay = relay
|
|
|
|
self.peer = peer
|
2021-09-07 12:57:24 +00:00
|
|
|
self.pins = pins
|
2021-08-19 02:59:04 +00:00
|
|
|
|
2021-12-25 21:59:21 +00:00
|
|
|
def on_message(self, body):
|
2021-09-07 12:57:24 +00:00
|
|
|
drop_all(self.pins)
|
2021-08-19 02:59:04 +00:00
|
|
|
turn.send(self.peer, body)
|
|
|
|
|
|
|
|
async def _default_on_connected(relay):
|
|
|
|
relay.facet.log.info('Connected')
|
|
|
|
|
|
|
|
async def _default_on_disconnected(relay, did_connect):
|
|
|
|
if did_connect:
|
|
|
|
# Reconnect immediately
|
|
|
|
relay.facet.log.info('Disconnected')
|
|
|
|
else:
|
|
|
|
await asyncio.sleep(2)
|
|
|
|
return True
|
|
|
|
|
|
|
|
class _StreamTunnelRelay(TunnelRelay, asyncio.Protocol):
|
2021-12-25 21:59:21 +00:00
|
|
|
def __init__(self, address, **kwargs):
|
|
|
|
super().__init__(address, **kwargs)
|
2021-08-19 02:59:04 +00:00
|
|
|
self.decoder = None
|
|
|
|
self.stop_signal = None
|
|
|
|
self.transport = None
|
|
|
|
|
|
|
|
def connection_lost(self, exc):
|
|
|
|
self._on_disconnected()
|
|
|
|
|
|
|
|
def connection_made(self, transport):
|
|
|
|
self.transport = transport
|
|
|
|
self._on_connected()
|
|
|
|
|
|
|
|
def data_received(self, chunk):
|
|
|
|
self.decoder.extend(chunk)
|
|
|
|
while True:
|
|
|
|
v = self.decoder.try_next()
|
|
|
|
if v is None: break
|
|
|
|
self._on_event(v)
|
|
|
|
|
|
|
|
def _send_bytes(self, bs):
|
|
|
|
if self.transport:
|
|
|
|
self.transport.write(bs)
|
|
|
|
|
|
|
|
def _disconnect(self):
|
|
|
|
if self.stop_signal:
|
2021-08-19 21:27:33 +00:00
|
|
|
def set_stop_signal():
|
|
|
|
try:
|
|
|
|
self.stop_signal.set_result(True)
|
|
|
|
except:
|
|
|
|
pass
|
|
|
|
self.stop_signal.get_loop().call_soon_threadsafe(set_stop_signal)
|
2021-08-19 02:59:04 +00:00
|
|
|
|
2023-02-12 21:02:08 +00:00
|
|
|
async def _create_connection(self, system):
|
2021-08-19 02:59:04 +00:00
|
|
|
raise Exception('subclassresponsibility')
|
|
|
|
|
2023-02-12 21:02:08 +00:00
|
|
|
async def main(self, system, on_connected=None):
|
2021-08-19 02:59:04 +00:00
|
|
|
if self.transport is not None:
|
|
|
|
raise Exception('Cannot run connection twice!')
|
|
|
|
|
|
|
|
self.decoder = Decoder(decode_embedded = sturdy.WireRef.decode)
|
2023-02-12 21:02:08 +00:00
|
|
|
self.stop_signal = system.loop.create_future()
|
2021-08-19 02:59:04 +00:00
|
|
|
try:
|
2023-02-12 21:02:08 +00:00
|
|
|
_transport, _protocol = await self._create_connection(system)
|
2021-08-19 02:59:04 +00:00
|
|
|
except OSError as e:
|
|
|
|
log.error('%s: Could not connect to server: %s' % (self.__class__.__qualname__, e))
|
|
|
|
return False
|
|
|
|
|
|
|
|
try:
|
|
|
|
if on_connected: await on_connected(self)
|
|
|
|
await self.stop_signal
|
|
|
|
return True
|
|
|
|
finally:
|
|
|
|
self.transport.close()
|
|
|
|
self.transport = None
|
|
|
|
self.stop_signal = None
|
|
|
|
self.decoder = None
|
|
|
|
|
|
|
|
@transport.address(transportAddress.Tcp)
|
|
|
|
class TcpTunnelRelay(_StreamTunnelRelay):
|
2023-02-12 21:02:08 +00:00
|
|
|
async def _create_connection(self, system):
|
|
|
|
return await system.loop.create_connection(lambda: self, self.address.host, self.address.port)
|
2021-08-19 02:59:04 +00:00
|
|
|
|
|
|
|
@transport.address(transportAddress.Unix)
|
|
|
|
class UnixSocketTunnelRelay(_StreamTunnelRelay):
|
2023-02-12 21:02:08 +00:00
|
|
|
async def _create_connection(self, system):
|
|
|
|
return await system.loop.create_unix_connection(lambda: self, self.address.path)
|
2021-08-19 02:59:04 +00:00
|
|
|
|
|
|
|
@transport.address(transportAddress.WebSocket)
|
|
|
|
class WebsocketTunnelRelay(TunnelRelay):
|
2021-12-25 21:59:21 +00:00
|
|
|
def __init__(self, address, **kwargs):
|
|
|
|
super().__init__(address, **kwargs)
|
2023-02-12 21:02:08 +00:00
|
|
|
self.system = None
|
2021-08-19 02:59:04 +00:00
|
|
|
self.ws = None
|
|
|
|
|
|
|
|
def _send_bytes(self, bs):
|
2023-02-12 21:02:08 +00:00
|
|
|
if self.system:
|
2021-08-19 02:59:04 +00:00
|
|
|
def _do_send():
|
|
|
|
if self.ws:
|
2023-02-12 21:02:08 +00:00
|
|
|
self.system.queue_task(lambda: self.ws.send(bs))
|
|
|
|
self.system.loop.call_soon_threadsafe(_do_send)
|
2021-08-19 02:59:04 +00:00
|
|
|
|
|
|
|
def _disconnect(self):
|
2023-02-12 21:02:08 +00:00
|
|
|
if self.system:
|
2021-08-19 02:59:04 +00:00
|
|
|
def _do_disconnect():
|
|
|
|
if self.ws:
|
2023-02-12 21:02:08 +00:00
|
|
|
self.system.queue_task(lambda: self.ws.close())
|
|
|
|
self.system.loop.call_soon_threadsafe(_do_disconnect)
|
2021-08-19 02:59:04 +00:00
|
|
|
|
|
|
|
def __connection_error(self, e):
|
|
|
|
self.facet.log.error('Could not connect to server: %s' % (e,))
|
|
|
|
return False
|
|
|
|
|
2023-02-12 21:02:08 +00:00
|
|
|
async def main(self, system, on_connected=None):
|
2024-03-29 11:41:10 +00:00
|
|
|
import websockets
|
|
|
|
|
2021-08-19 02:59:04 +00:00
|
|
|
if self.ws is not None:
|
|
|
|
raise Exception('Cannot run connection twice!')
|
|
|
|
|
2023-02-12 21:02:08 +00:00
|
|
|
self.system = system
|
2021-08-19 02:59:04 +00:00
|
|
|
|
|
|
|
try:
|
|
|
|
self.ws = await websockets.connect(self.address.url)
|
|
|
|
except OSError as e:
|
|
|
|
return self.__connection_error(e)
|
|
|
|
except websockets.exceptions.InvalidHandshake as e:
|
|
|
|
return self.__connection_error(e)
|
|
|
|
|
|
|
|
try:
|
|
|
|
if on_connected: await on_connected(self)
|
|
|
|
self._on_connected()
|
|
|
|
while True:
|
|
|
|
chunk = await self.ws.recv()
|
|
|
|
self._on_event(Decoder(chunk, decode_embedded = sturdy.WireRef.decode).next())
|
|
|
|
except websockets.exceptions.WebSocketException:
|
|
|
|
pass
|
|
|
|
finally:
|
|
|
|
self._on_disconnected()
|
|
|
|
|
|
|
|
if self.ws:
|
|
|
|
await self.ws.close()
|
2023-02-12 21:02:08 +00:00
|
|
|
self.system = None
|
2021-08-19 02:59:04 +00:00
|
|
|
self.ws = None
|
|
|
|
return True
|
2021-08-19 22:40:05 +00:00
|
|
|
|
|
|
|
@transport.address(transportAddress.Stdio)
|
|
|
|
class PipeTunnelRelay(_StreamTunnelRelay):
|
2021-12-25 21:59:21 +00:00
|
|
|
def __init__(self, address, input_fileobj = sys.stdin, output_fileobj = sys.stdout, **kwargs):
|
|
|
|
super().__init__(address, **kwargs)
|
2021-08-19 22:40:05 +00:00
|
|
|
self.input_fileobj = input_fileobj
|
|
|
|
self.output_fileobj = output_fileobj
|
|
|
|
self.reader = asyncio.StreamReader()
|
|
|
|
|
2023-02-12 21:02:08 +00:00
|
|
|
async def _create_connection(self, system):
|
|
|
|
return await system.loop.connect_read_pipe(lambda: self, self.input_fileobj)
|
2021-08-19 22:40:05 +00:00
|
|
|
|
|
|
|
def _send_bytes(self, bs):
|
|
|
|
self.output_fileobj.buffer.write(bs)
|
|
|
|
self.output_fileobj.buffer.flush()
|
2021-11-22 15:31:30 +00:00
|
|
|
|
2021-12-25 21:59:21 +00:00
|
|
|
def run_stdio_service(entity):
|
|
|
|
PipeTunnelRelay(transportAddress.Stdio(), publish_service=turn.ref(entity))
|
2021-11-22 15:31:30 +00:00
|
|
|
|
|
|
|
# decorator
|
|
|
|
def service(**kwargs):
|
2023-03-06 22:24:36 +00:00
|
|
|
return lambda entity: actor.run_system(**kwargs)(lambda: run_stdio_service(entity))
|