Reuse decoder, allowing multi- or partial-packet input byte chunks
This commit is contained in:
parent
abcf0dd933
commit
12103e43d2
68
src/relay.ts
68
src/relay.ts
|
@ -1,5 +1,5 @@
|
||||||
import { Actor, Assertion, attenuate, Entity, Handle, Ref, Turn } from './actor.js';
|
import { Actor, Assertion, attenuate, Entity, Handle, Ref, Turn } from './actor.js';
|
||||||
import { Bytes, canonicalString, decode, encode, FlexMap, IdentityMap, mapPointers, underlying, Value } from 'preserves';
|
import { BytesLike, canonicalString, Decoder, encode, FlexMap, IdentityMap, mapPointers, underlying, Value } from 'preserves';
|
||||||
import {
|
import {
|
||||||
EncodedAttenuation,
|
EncodedAttenuation,
|
||||||
EntityMessage,
|
EntityMessage,
|
||||||
|
@ -146,6 +146,30 @@ export class Relay {
|
||||||
debug: boolean;
|
debug: boolean;
|
||||||
trustPeer: boolean;
|
trustPeer: boolean;
|
||||||
|
|
||||||
|
readonly decoder = new Decoder<WireRef>(void 0, {
|
||||||
|
includeAnnotations: false,
|
||||||
|
decodePointer: v => {
|
||||||
|
function complain(): never {
|
||||||
|
throw new Error(
|
||||||
|
`Received invalid object reference ${v.asPreservesText()} from peer`);
|
||||||
|
}
|
||||||
|
if (!(Array.isArray(v) && v.length >= 2 && typeof v[1] === 'number')) {
|
||||||
|
complain();
|
||||||
|
}
|
||||||
|
const oid = v[1] as Oid;
|
||||||
|
switch (v[0]) {
|
||||||
|
case 0:
|
||||||
|
if (v.length > 2) complain();
|
||||||
|
return myRef(oid);
|
||||||
|
case 1:
|
||||||
|
// TODO: check EncodedAttenuation
|
||||||
|
return yourRef(oid, v.slice(2) as EncodedAttenuation);
|
||||||
|
default:
|
||||||
|
complain();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
constructor(t: Turn, options: RelayOptions) {
|
constructor(t: Turn, options: RelayOptions) {
|
||||||
this.actor = t.actor;
|
this.actor = t.actor;
|
||||||
this.w = options.packetWriter;
|
this.w = options.packetWriter;
|
||||||
|
@ -255,37 +279,19 @@ export class Relay {
|
||||||
return this.exported.byOid.get(localOid)?.ref ?? INERT_REF;
|
return this.exported.byOid.get(localOid)?.ref ?? INERT_REF;
|
||||||
}
|
}
|
||||||
|
|
||||||
accept(bs0: Uint8Array): void {
|
accept(bs: BytesLike): void {
|
||||||
Turn.for(this.actor, t => {
|
Turn.for(this.actor, t => {
|
||||||
const bs = Bytes.from(bs0);
|
this.decoder.write(bs);
|
||||||
const wireTurn = decode<WireRef>(bs, {
|
while (true) {
|
||||||
decodePointer: v => {
|
const wireTurn = this.decoder.try_next() as (TurnMessage<WireRef> | undefined);
|
||||||
function complain(): never {
|
if (wireTurn === void 0) break;
|
||||||
throw new Error(
|
// TODO: deep check that wireTurn really is a TurnMessage
|
||||||
`Received invalid object reference ${v.asPreservesText()} from peer`);
|
if (this.debug) console.log('IN', wireTurn.asPreservesText());
|
||||||
}
|
wireTurn.forEach(v => {
|
||||||
if (!(Array.isArray(v) && v.length >= 2 && typeof v[1] === 'number')) {
|
const [localOid, m] = v;
|
||||||
complain();
|
this.handle(t, this.lookupLocal(localOid), m);
|
||||||
}
|
});
|
||||||
const oid = v[1] as Oid;
|
}
|
||||||
switch (v[0]) {
|
|
||||||
case 0:
|
|
||||||
if (v.length > 2) complain();
|
|
||||||
return myRef(oid);
|
|
||||||
case 1:
|
|
||||||
// TODO: check EncodedAttenuation
|
|
||||||
return yourRef(oid, v.slice(2) as EncodedAttenuation);
|
|
||||||
default:
|
|
||||||
complain();
|
|
||||||
}
|
|
||||||
},
|
|
||||||
}) as TurnMessage<WireRef>;
|
|
||||||
// ^ TODO: deep check that v is a TurnMessage
|
|
||||||
if (this.debug) console.log('IN', wireTurn.asPreservesText());
|
|
||||||
wireTurn.forEach(v => {
|
|
||||||
const [localOid, m] = v;
|
|
||||||
this.handle(t, this.lookupLocal(localOid), m);
|
|
||||||
});
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue