Support connection_timeout
This commit is contained in:
parent
0429e59ad1
commit
561aa01fea
|
@ -71,6 +71,7 @@ class TunnelRelay:
|
||||||
publish_oid = 0,
|
publish_oid = 0,
|
||||||
on_connected = None,
|
on_connected = None,
|
||||||
on_disconnected = None,
|
on_disconnected = None,
|
||||||
|
connection_timeout = None,
|
||||||
):
|
):
|
||||||
self.facet = turn.active_facet()
|
self.facet = turn.active_facet()
|
||||||
self.facet.on_stop(self._shutdown)
|
self.facet.on_stop(self._shutdown)
|
||||||
|
@ -79,6 +80,7 @@ class TunnelRelay:
|
||||||
self.gatekeeper_oid = gatekeeper_oid
|
self.gatekeeper_oid = gatekeeper_oid
|
||||||
self.publish_service = publish_service
|
self.publish_service = publish_service
|
||||||
self.publish_oid = publish_oid
|
self.publish_oid = publish_oid
|
||||||
|
self.connection_timeout = connection_timeout
|
||||||
self._reset()
|
self._reset()
|
||||||
self.facet.linked_task(
|
self.facet.linked_task(
|
||||||
lambda facet: self._reconnecting_main(facet.actor._system,
|
lambda facet: self._reconnecting_main(facet.actor._system,
|
||||||
|
@ -341,10 +343,6 @@ class _StreamTunnelRelay(TunnelRelay, asyncio.Protocol):
|
||||||
def connection_lost(self, exc):
|
def connection_lost(self, exc):
|
||||||
self._on_disconnected()
|
self._on_disconnected()
|
||||||
|
|
||||||
def connection_made(self, transport):
|
|
||||||
self.transport = transport
|
|
||||||
self._on_connected()
|
|
||||||
|
|
||||||
def data_received(self, chunk):
|
def data_received(self, chunk):
|
||||||
self.decoder.extend(chunk)
|
self.decoder.extend(chunk)
|
||||||
while True:
|
while True:
|
||||||
|
@ -375,16 +373,25 @@ class _StreamTunnelRelay(TunnelRelay, asyncio.Protocol):
|
||||||
self.decoder = Decoder(decode_embedded = sturdy.WireRef.decode)
|
self.decoder = Decoder(decode_embedded = sturdy.WireRef.decode)
|
||||||
self.stop_signal = system.loop.create_future()
|
self.stop_signal = system.loop.create_future()
|
||||||
try:
|
try:
|
||||||
_transport, _protocol = await self._create_connection(system)
|
try:
|
||||||
|
transport, _protocol = await asyncio.wait_for(
|
||||||
|
self._create_connection(system), timeout=self.connection_timeout)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
self.facet.log.error(
|
||||||
|
'%s: Timeout connecting to server' % (self.__class__.__qualname__,))
|
||||||
|
return False
|
||||||
except OSError as e:
|
except OSError as e:
|
||||||
self.facet.log.error('%s: Could not connect to server: %s' % (self.__class__.__qualname__, e))
|
self.facet.log.error(
|
||||||
|
'%s: Could not connect to server: %s' % (self.__class__.__qualname__, e))
|
||||||
return False
|
return False
|
||||||
|
|
||||||
try:
|
self.transport = transport
|
||||||
|
self._on_connected()
|
||||||
if on_connected: await on_connected(self)
|
if on_connected: await on_connected(self)
|
||||||
await self.stop_signal
|
await self.stop_signal
|
||||||
return True
|
return True
|
||||||
finally:
|
finally:
|
||||||
|
if self.transport:
|
||||||
self.transport.close()
|
self.transport.close()
|
||||||
self.transport = None
|
self.transport = None
|
||||||
self.stop_signal = None
|
self.stop_signal = None
|
||||||
|
@ -434,7 +441,10 @@ class WebsocketTunnelRelay(TunnelRelay):
|
||||||
self.system = system
|
self.system = system
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.ws = await websockets.connect(self.address.url)
|
self.ws = await websockets.connect(
|
||||||
|
self.address.url, open_timeout=self.connection_timeout)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
return self.__connection_error('timeout')
|
||||||
except OSError as e:
|
except OSError as e:
|
||||||
return self.__connection_error(e)
|
return self.__connection_error(e)
|
||||||
except websockets.exceptions.InvalidHandshake as e:
|
except websockets.exceptions.InvalidHandshake as e:
|
||||||
|
|
Loading…
Reference in New Issue