novy-syndicate/src/distributed/server.ts

80 lines
3.4 KiB
TypeScript
Raw Normal View History

2021-10-11 10:55:45 +00:00
import { Actor, Handle, Ref, Turn } from '../runtime/actor.js';
import { Dataspace, during, observe, P } from '../runtime/dataspace.js';
import { Relay, spawnRelay } from '../transport/relay.js';
2021-03-04 10:29:37 +00:00
import * as net from 'net';
import { mint, sturdyEncode, validate } from '../transport/sturdy.js';
import { KEY_LENGTH } from '../transport/cryptography.js';
import { attenuate } from '../runtime/rewrite.js';
2021-10-11 10:55:45 +00:00
import { Bytes, embed, Embedded } from '@preserves/core';
import { Attenuation, fromSturdyRef } from '../gen/sturdy.js';
import { $bind, Bind, fromBind, toResolve } from '../gen/gatekeeper.js';
2021-03-04 21:08:49 +00:00
2021-04-16 18:29:16 +00:00
new Actor(t => {
t.activeFacet.preventInertCheck();
2021-04-19 11:00:08 +00:00
2021-03-04 10:29:37 +00:00
const ds = t.ref(new Dataspace());
2021-04-19 11:00:08 +00:00
const dsOid = 'syndicate';
const dsKey = new Bytes(KEY_LENGTH);
t.assert(ds, fromBind(Bind({
oid: dsOid,
key: dsKey,
target: ds,
})));
mint(dsOid, dsKey).then(v => {
console.log(fromSturdyRef(v).asPreservesText());
console.log(sturdyEncode(fromSturdyRef(v)).toHex());
});
2021-03-04 10:29:37 +00:00
function spawnConnection(t: Turn, socket: net.Socket) {
console.log('connection', socket.remoteAddress, socket.remotePort);
spawnRelay(t, {
packetWriter: bs => socket.write(bs),
setup(t: Turn, r: Relay) {
socket.on('error', err => t.freshen(t =>
2021-04-16 18:29:16 +00:00
((err as any).code === 'ECONNRESET') ? t.stopActor() : t.crash(err)));
socket.on('close', () => t.freshen(t => t.stopActor()));
socket.on('end', () => t.freshen(t => t.stopActor()));
2021-03-04 10:29:37 +00:00
socket.on('data', data => r.accept(data));
2021-04-16 18:29:16 +00:00
t.activeFacet.actor.atExit(() => socket.destroy());
2021-03-04 10:29:37 +00:00
},
2021-04-19 11:00:08 +00:00
initialRef: t.ref(during(async (t, a0) => {
const a = toResolve(a0);
if (a === void 0) return null;
const r = a.sturdyref;
let facet = t.facet(t => {
2021-10-11 10:55:45 +00:00
observe(t, ds, P.rec($bind, P.lit(r.oid), P.bind('key'), P.bind('target')),
during(async (t, bindings) => {
const [key, embeddedTarget] = bindings as [Bytes, Embedded<Ref>];
const target = embeddedTarget.embeddedValue;
if (!await validate(r, key)) return null;
const cavs: Attenuation = [];
r.caveatChain.forEach(cs => cavs.push(... cs));
const attenuated_ds = attenuate(target, ... cavs);
let replyHandle: Handle | undefined;
t.freshen(t => replyHandle = t.assert(a.observer, embed(attenuated_ds)));
return t => t.retract(replyHandle);
}));
2021-04-19 11:00:08 +00:00
});
return t => t.stop(facet);
})),
2021-03-04 10:29:37 +00:00
// debug: true,
});
}
t.spawn(t => {
const server = net.createServer(socket => t.freshen(t => spawnConnection(t, socket)));
server.on('error', err => t.freshen(t => t.crash(err)));
server.listen(5999, '0.0.0.0', 512);
2021-04-16 18:29:16 +00:00
t.activeFacet.preventInertCheck();
t.activeFacet.actor.atExit(() => {
2021-03-04 10:29:37 +00:00
try {
server.close();
} catch (e) {
console.error(e);
}
});
});
});