90 lines
2.9 KiB
TypeScript
90 lines
2.9 KiB
TypeScript
/// SPDX-License-Identifier: GPL-3.0-or-later
|
|
/// SPDX-FileCopyrightText: Copyright © 2016-2024 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
|
|
|
|
import { Assertion, Facet, Ref, Turn } from '../runtime/actor.js';
|
|
import { BytesLike, Decoder, encode, Embedded, stringify, underlying } from '@preserves/core';
|
|
import * as IO from '../gen/protocol.js';
|
|
import { wireRefEmbeddedType } from './protocol.js';
|
|
import { WireRef } from '../gen/sturdy.js';
|
|
import { LayerBoundary } from './membrane.js';
|
|
|
|
const FLUSH = Symbol.for('flush');
|
|
|
|
export type PacketWriter = (bs: Uint8Array) => void;
|
|
|
|
export interface RelayOptions {
|
|
packetWriter: PacketWriter;
|
|
setup(r: Relay): void;
|
|
debug?: boolean;
|
|
trustPeer?: boolean;
|
|
initialOid?: IO.Oid;
|
|
initialRef?: Ref;
|
|
nextLocalOid?: IO.Oid;
|
|
}
|
|
|
|
export class Relay extends LayerBoundary {
|
|
readonly facet: Facet;
|
|
readonly selfRef: Ref;
|
|
readonly w: PacketWriter;
|
|
readonly peer: Ref | null;
|
|
pendingTurn: IO.Turn<Embedded<WireRef>> = [];
|
|
debug: boolean;
|
|
|
|
readonly decoder = new Decoder(void 0, {
|
|
includeAnnotations: false,
|
|
embeddedDecode: wireRefEmbeddedType,
|
|
});
|
|
|
|
constructor(options: RelayOptions) {
|
|
super(options.trustPeer, options.nextLocalOid);
|
|
|
|
this.facet = Turn.activeFacet;
|
|
this.selfRef = Turn.ref(this);
|
|
this.w = options.packetWriter;
|
|
this.debug = options.debug ?? false;
|
|
|
|
this.facet.preventInertCheck();
|
|
options.setup(this);
|
|
|
|
if (options.initialRef !== void 0) {
|
|
this.rewriteRefOut(options.initialRef, false, []);
|
|
}
|
|
|
|
this.peer = (options.initialOid !== void 0)
|
|
? this.rewriteRefIn(new Embedded<WireRef>(WireRef.mine(options.initialOid)), [])
|
|
: null;
|
|
}
|
|
|
|
message(body: Assertion) {
|
|
if (body === FLUSH) {
|
|
if (this.debug) console.log('OUT', stringify(IO.fromTurn(this.pendingTurn)));
|
|
this.w(underlying(encode(IO.fromTurn(this.pendingTurn), {
|
|
canonical: true,
|
|
embeddedEncode: wireRefEmbeddedType,
|
|
})));
|
|
this.pendingTurn = [];
|
|
}
|
|
}
|
|
|
|
send(remoteOid: IO.Oid, m: IO.Event<Embedded<WireRef>>): void {
|
|
if (this.pendingTurn.length === 0) {
|
|
Turn.active.message(this.selfRef, FLUSH);
|
|
}
|
|
this.pendingTurn.push(IO.TurnEvent({ oid: remoteOid, event: m }));
|
|
}
|
|
|
|
accept(bs: BytesLike): void {
|
|
this.facet.turn(() => {
|
|
this.decoder.write(bs);
|
|
while (true) {
|
|
const rawPacket = this.decoder.try_next();
|
|
if (rawPacket === void 0) break;
|
|
const wirePacket = IO.toPacket(rawPacket);
|
|
if (wirePacket === void 0) throw new Error("Bad IO.Packet");
|
|
if (this.debug) console.log('IN', stringify(rawPacket));
|
|
this.proxyPacket(wirePacket);
|
|
}
|
|
});
|
|
}
|
|
}
|