Membrane-tracing infrastructure

This commit is contained in:
Tony Garnock-Jones 2024-05-30 13:11:42 +02:00
parent c59506fe0e
commit a0ba43cfae
2 changed files with 36 additions and 1 deletions

View File

@ -23,6 +23,7 @@ export * from './runtime/supervise.js';
export * as SaltyCrypto from 'salty-crypto';
export * as Cryptography from './transport/cryptography.js';
export * as WireProtocol from './transport/protocol.js';
export * as Membrane from './transport/membrane.js';
export * as Relay from './transport/relay.js';
export * as Sturdy from './transport/sturdy.js';

View File

@ -7,6 +7,21 @@ import * as IO from '../gen/protocol.js';
import { fromCaveat, WireRef } from '../gen/sturdy.js';
import { attenuate } from '../runtime/rewrite.js';
export type MembraneTraceEvent = { type: 'grab' | 'drop', oid: IO.Oid, ref: Ref, delta: number, newCount: number };
export type LayerMembraneTraceEvent = MembraneTraceEvent & { table: 'exported' | 'imported' };
export type LayerTraceEvent =
| LayerMembraneTraceEvent
| { type: 'outbound' | 'inbound', target: any, event: 'assert', assertion: Value<any>, handle: Handle }
| { type: 'outbound' | 'inbound', event: 'retract', handle: Handle }
| { type: 'outbound' | 'inbound', event: 'message', assertion: Value<any> }
| { type: 'outbound' | 'inbound', target: any, event: 'sync', peer: any }
;
export type LayerTracer = (e: LayerTraceEvent) => void;
let defaultLayerTracer: LayerTracer | undefined = void 0;
export function getDefaultLayerTracer(): LayerTracer | undefined { return defaultLayerTracer; }
export function setDefaultLayerTracer(t: LayerTracer | undefined) { defaultLayerTracer = t; }
export class WireSymbol {
count = 0;
@ -18,6 +33,7 @@ export class WireSymbol {
drop(): void {
this.count--;
this.side.tracer?.({ type: 'drop', oid: this.oid, ref: this.ref, delta: -1, newCount: this.count });
if (this.count === 0) {
this.side.byOid.delete(this.oid);
this.side.byRef.delete(this.ref);
@ -31,6 +47,8 @@ export class Membrane {
readonly byOid = new IdentityMap<IO.Oid, WireSymbol>();
readonly byRef = new IdentityMap<Ref, WireSymbol>();
tracer?: (e: MembraneTraceEvent) => void;
grab<Table extends WhichTable>(table: Table,
key: Parameters<Membrane[Table]['get']>[0],
transient: boolean,
@ -51,6 +69,7 @@ export class Membrane {
this.byOid.set(e.oid, e);
}
if (!transient) e.count++;
this.tracer?.({ type: 'grab', oid: e.oid, ref: e.ref, delta: transient ? 0 : +1, newCount: e.count });
return e;
}
}
@ -80,11 +99,19 @@ export abstract class LayerBoundary implements ProxyOutbound, ProxyInbound {
readonly exported = new Membrane();
readonly imported = new Membrane();
constructor(public trustPeer = true, public nextLocalOid: IO.Oid = 0) {}
readonly tracer: LayerTracer | undefined = defaultLayerTracer;
constructor(public trustPeer = true, public nextLocalOid: IO.Oid = 0) {
if (this.tracer) {
this.exported.tracer = e => { const f = e as LayerMembraneTraceEvent; f.table = 'exported'; this.tracer?.(f); };
this.imported.tracer = e => { const f = e as LayerMembraneTraceEvent; f.table = 'imported'; this.tracer?.(f); };
}
}
abstract send(remoteOid: IO.Oid, event: IO.Event<Embedded<WireRef>>): void;
proxyAssertion(targetRemoteOid: IO.Oid, assertion: Assertion, handle: Handle): Value<Embedded<WireRef>> {
this.tracer?.({ type: 'outbound', target: targetRemoteOid, event: 'assert', assertion, handle });
const pins: Array<WireSymbol> = [];
const rewritten = mapEmbeddeds(assertion, r => this.rewriteRefOut(r, false, pins));
this.grabImportedOid(targetRemoteOid, pins);
@ -93,16 +120,19 @@ export abstract class LayerBoundary implements ProxyOutbound, ProxyInbound {
}
proxyRetract(handle: Handle): void {
this.tracer?.({ type: 'outbound', event: 'retract', handle });
(this.outboundAssertions.get(handle) ?? []).forEach(e => e.drop());
this.outboundAssertions.delete(handle);
}
proxyMessage(assertion: Assertion): Value<Embedded<WireRef>> {
this.tracer?.({ type: 'outbound', event: 'message', assertion });
const pins: Array<WireSymbol> = [];
return mapEmbeddeds(assertion, r => this.rewriteRefOut(r, true, pins));
}
proxySync(targetRemoteOid: IO.Oid, peer: Ref): Embedded<WireRef> {
this.tracer?.({ type: 'outbound', target: targetRemoteOid, event: 'sync', peer });
const peerEntity = new SyncPeerEntity(peer);
this.grabImportedOid(targetRemoteOid, peerEntity.pins);
return this.rewriteRefOut(Turn.ref(peerEntity), false, peerEntity.pins);
@ -192,6 +222,7 @@ export abstract class LayerBoundary implements ProxyOutbound, ProxyInbound {
handle(localOid: IO.Oid, m: IO.Event<Embedded<WireRef>>): void {
switch (m._variant) {
case 'Assert': {
this.tracer?.({ type: 'inbound', event: 'assert', target: localOid, assertion: m.value.assertion, handle: m.value.handle });
const [a, pins] = this.rewriteIn(m.value.assertion);
const r = this.grabExportedOid(localOid, pins);
this.inboundAssertions.set(m.value.handle, {
@ -201,6 +232,7 @@ export abstract class LayerBoundary implements ProxyOutbound, ProxyInbound {
break;
}
case 'Retract': {
this.tracer?.({ type: 'inbound', event: 'retract', handle: m.value.handle });
const remoteHandle = m.value.handle;
const h = this.inboundAssertions.get(remoteHandle);
if (h === void 0) throw new Error(`Peer retracted invalid handle ${remoteHandle}`);
@ -210,6 +242,7 @@ export abstract class LayerBoundary implements ProxyOutbound, ProxyInbound {
break;
}
case 'Message': {
this.tracer?.({ type: 'inbound', event: 'message', assertion: m.value.body });
const [a, pins] = this.rewriteIn(m.value.body);
pins.forEach(e => {
e.drop();
@ -220,6 +253,7 @@ export abstract class LayerBoundary implements ProxyOutbound, ProxyInbound {
break;
}
case 'Sync': {
this.tracer?.({ type: 'inbound', event: 'sync', target: localOid, peer: m.value.peer });
const pins: Array<WireSymbol> = [];
const r = this.grabExportedOid(localOid, pins);
const k = this.rewriteRefIn(m.value.peer, pins);