diff --git a/src/actor.ts b/src/actor.ts index 2688b76..a597e23 100644 --- a/src/actor.ts +++ b/src/actor.ts @@ -1,4 +1,5 @@ import { Dictionary, IdentitySet, Record, Tuple, Value, is, preserves } from 'preserves'; +import { queueTask } from './task.js'; //--------------------------------------------------------------------------- @@ -126,13 +127,13 @@ export class Actor { console.error(`Actor ${this.id} crashed:`, this.exitReason.err); } // this.exitHooks.forEach(hook => hook(t)); - queueMicrotask(() => + queueTask(() => t.freshen(t => this.outbound.forEach((peer, h) => t._retract(peer, h)))); } execute(proc: () => void): void { - queueMicrotask(() => { + queueTask(() => { if (this.exitReason !== null) return; try { proc(); diff --git a/src/relay.ts b/src/relay.ts index 5f45f01..df80b87 100644 --- a/src/relay.ts +++ b/src/relay.ts @@ -15,6 +15,7 @@ import { WireRef, TurnMessage, } from './protocol.js'; +import { queueTask } from './task.js'; export class SyncPeerEntity implements Entity { readonly relay: Relay; @@ -205,7 +206,7 @@ export class Relay { send(remoteOid: Oid, m: EntityMessage): void { if (this.pendingTurn.length === 0) { - queueMicrotask(() => { + queueTask(() => { if (this.debug) console.log('OUT', this.pendingTurn.asPreservesText()); this.w(underlying(encode(this.pendingTurn, { canonical: true, diff --git a/src/task.ts b/src/task.ts new file mode 100644 index 0000000..c7fa299 --- /dev/null +++ b/src/task.ts @@ -0,0 +1,14 @@ +const LIMIT = 25000; + +let taskCounter = 0; +export function queueTask(f: () => void) { + taskCounter++; + if (taskCounter === LIMIT) { + setTimeout(() => taskCounter = 0, 0); + } + if (taskCounter >= LIMIT) { + setTimeout(f, 0); + } else { + queueMicrotask(f); + } +}