From dbfd5dc8e61d6a6c6340605efcb70c77432e1d58 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 20 Jun 2019 14:39:05 +0100 Subject: [PATCH] Connection backoff --- packages/driver-websocket/package.json | 1 + packages/driver-websocket/src/index.js | 47 +++++++++++++++++++++----- 2 files changed, 40 insertions(+), 8 deletions(-) diff --git a/packages/driver-websocket/package.json b/packages/driver-websocket/package.json index 207d996..79866c7 100644 --- a/packages/driver-websocket/package.json +++ b/packages/driver-websocket/package.json @@ -21,6 +21,7 @@ "dependencies": { "@syndicate-lang/core": "^0.2.7", "@syndicate-lang/driver-timer": "^0.2.7", + "debug": "^4.1.1", "isomorphic-ws": "^4.0.1", "ws": "^6.1.2" } diff --git a/packages/driver-websocket/src/index.js b/packages/driver-websocket/src/index.js index 97693f0..1e0c0fd 100644 --- a/packages/driver-websocket/src/index.js +++ b/packages/driver-websocket/src/index.js @@ -18,6 +18,7 @@ import { currentFacet, Bytes, Observe, Dataspace } from "@syndicate-lang/core"; const { sleep } = activate require("@syndicate-lang/driver-timer"); +const debugFactory = require('debug'); const _WebSocket = require('isomorphic-ws'); @@ -32,26 +33,56 @@ Object.assign(module.exports, { spawn named 'WebSocketFactory' { during Observe(WebSocket($id, $url, $options)) spawn named ['WebSocket', id, url] { const facet = currentFacet(); + const debug = debugFactory('syndicate/driver-websocket:' + (id && id.toString())); let finish = Dataspace.backgroundTask(); on stop finish(); + const DEFAULT_RECONNECT_DELAY = 1000; // milliseconds + const MAX_RECONNECT_DELAY = 30000; // milliseconds + let ws = null; field this.connected = false; + field this.reconnectDelay = DEFAULT_RECONNECT_DELAY; + + const guard = (context, f) => { + try { + f() + } catch (e) { + // Swallow e, which will be some kind of websocket-related exception. + debug('exception in actor '+facet.actor.toString()+' during '+context+':', e.message); + facet.stop(); + } + }; const connect = () => { disconnect(); - console.log('WebSocket', id, url, 'connecting'); + debug(url, 'connecting'); ws = new _WebSocket(url, [], options.toJS()); ws.onerror = Dataspace.wrapExternal((e) => { - console.error('WebSocket', id, url, e); + debug(url, e.message); disconnect(); - sleep(1000, connect); + if (this.reconnectDelay !== DEFAULT_RECONNECT_DELAY) { + debug('Will reconnect in '+Math.floor(this.reconnectDelay)+' milliseconds'); + } + sleep(Math.floor(this.reconnectDelay), connect); + this.reconnectDelay = this.reconnectDelay * 1.618 + (Math.random() * 1000); + this.reconnectDelay = + this.reconnectDelay > MAX_RECONNECT_DELAY + ? MAX_RECONNECT_DELAY + (Math.random() * 1000) + : this.reconnectDelay; }); - ws.onopen = Dataspace.wrapExternal(() => { this.connected = true; }); - ws.onclose = Dataspace.wrapExternal(() => { if (this.connected) { connect(); }}); + ws.onopen = Dataspace.wrapExternal(() => { + this.connected = true; + this.reconnectDelay = DEFAULT_RECONNECT_DELAY; + }); + ws.onclose = Dataspace.wrapExternal(() => { + if (this.connected) { + connect(); + } + }); ws.onmessage = Dataspace.wrapExternal((data) => { if (typeof Blob !== 'undefined' && data.data instanceof Blob) { var reader = new FileReader(); @@ -67,8 +98,8 @@ spawn named 'WebSocketFactory' { const disconnect = () => { if (ws) { - console.log('WebSocket', id, url, 'disconnecting'); - try { ws.close(); } catch (_e) {} + debug(url, 'disconnecting'); + guard('close', () => ws.close()); ws = null; } this.connected = false; @@ -81,7 +112,7 @@ spawn named 'WebSocketFactory' { on message DataOut(id, $data) { if (this.connected) { - ws.send(Bytes.toIO(data)); + guard('send', () => ws.send(Bytes.toIO(data))); } } }