Experimental node.js worker_threads support

This commit is contained in:
Tony Garnock-Jones 2018-12-13 16:39:56 +00:00
parent 621c6b3737
commit d1877a890c
6 changed files with 361 additions and 1 deletions

View File

@ -205,7 +205,7 @@ Dataspace.prototype.applyPatch = function (ac, delta) {
} else {
removals.push([count, a]);
}
ac.cleanupChanges.change(a, -count);
if (ac) ac.cleanupChanges.change(a, -count);
}
});
removals.forEach(([count, a]) => {

View File

@ -2,6 +2,7 @@
const Immutable = require('immutable');
const Dataspace = require('./dataspace.js').Dataspace;
const Worker = require('./worker');
function Ground(bootProc) {
Dataspace.call(this, bootProc);
@ -89,6 +90,7 @@ Ground.prototype.addStopHandler = function (h) {
function bootModule(mod) {
let g = new Ground(() => {
Worker.spawnWorkerRelay();
if (Dataspace.BootSteps in mod) {
// It's really an exports dict, not a module.
Dataspace.activate(mod);

View File

@ -24,6 +24,7 @@ const Ground = require('./ground.js');
const Assertions = require('./assertions.js');
const Relay = require('./relay.js');
const Bag = require('./bag.js');
const Worker = require('./worker.js');
Object.assign(module.exports, require("preserves"));
@ -52,6 +53,7 @@ module.exports.NestedDataspace = Relay.NestedDataspace;
module.exports.inNestedDataspace = Relay.inNestedDataspace;
module.exports.bootModule = Ground.bootModule;
module.exports.spawnWorker = Worker.spawnWorker;
// These aren't so much "Universal" as they are "VM-wide-unique".
let uuidIndex = 0;

264
packages/core/src/worker.js Normal file
View File

@ -0,0 +1,264 @@
"use strict";
//---------------------------------------------------------------------------
// @syndicate-lang/core, an implementation of Syndicate dataspaces for JS.
// Copyright (C) 2016-2018 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//---------------------------------------------------------------------------
const { Dataspace } = require('./dataspace.js');
const { Observe, Outbound, Inbound, Capture, Discard } = require('./assertions.js');
const { $QuitDataspace } = require('./relay.js');
const Skeleton = require('./skeleton.js');
const RandomID = require('./randomid.js');
const { List, Map } = require('immutable');
const { Bytes, Record, Encoder, Decoder } = require("preserves");
const WorkerEvent = Record.makeConstructor('--worker-event', ['epId', 'event', 'vs']);
const worker_threads = (function () {
try {
return require('worker_threads');
} catch (_e) {
return {
__isDummyStub: true
};
}
})();
function encodePacket(p) {
return Bytes.toIO(new Encoder().push(p).contents());
}
function decodePacket(m) {
return new Decoder(m).next();
}
function sendPacket(ch, p) {
ch.postMessage(encodePacket(p));
}
function spawnWorker(workerSourceFilename, workerData) {
if (worker_threads.__isDummyStub) {
throw new Error("Cannot spawnWorker without --experimental-worker flag on node.js command line");
}
Dataspace.spawn([workerSourceFilename, workerData], function () {
const outerFacet = Dataspace.currentFacet();
outerFacet.addDataflow(function () {});
// ^ eww! Dummy endpoint to keep the root facet of the relay alive.
let endpoints = Map();
const w = new worker_threads.Worker(workerSourceFilename, {
workerData: encodePacket(workerData)
});
w.on('error', Dataspace.wrapExternal((err) => {
throw err;
}));
w.on('exit', Dataspace.wrapExternal(() => {
outerFacet.stop();
}));
w.on('message', Dataspace.wrapExternal((m) => {
m = decodePacket(m);
switch (m.get(0)) {
case 'assert': {
const ep = m.get(1);
const a = m.get(2);
if (!endpoints.includes(ep)) {
outerFacet.actor.addFacet(outerFacet, function () {
const epFacet = Dataspace.currentFacet();
endpoints = endpoints.set(ep, epFacet);
epFacet.addStopScript(() => { endpoints = endpoints.remove(ep); });
Dataspace.declareField(this, 'assertion', a);
epFacet.addEndpoint(() => {
if (Observe.isClassOf(this.assertion)) {
const spec = this.assertion.get(0);
const analysis = Skeleton.analyzeAssertion(spec);
analysis.callback = Dataspace.wrap((evt, vs) => {
epFacet.actor.scheduleScript(() => {
sendPacket(w, ['event', ep, evt, vs]);
});
});
return [Observe(spec), analysis];
} else {
return [this.assertion, null];
}
}, true);
}, true);
} else {
endpoints.get(ep).fields.assertion = a;
}
break;
}
case 'clear': {
const ep = m.get(1);
const epFacet = endpoints.get(ep, false);
if (epFacet) {
epFacet.stop(() => {
endpoints = endpoints.remove(ep);
});
}
break;
}
case 'message': {
const body = m.get(1);
Dataspace.send(body);
break;
}
default: {
const err = new Error("Invalid Worker protocol message from Worker");
err.irritant = m;
throw err;
}
}
}));
}, null);
}
function spawnWorkerRelay() {
if (worker_threads.__isDummyStub) return;
if (worker_threads.isMainThread) return;
worker_threads.workerData = decodePacket(worker_threads.workerData);
Dataspace.currentFacet().actor.dataspace.addStopHandler(() => {
process.exit();
});
Dataspace.currentFacet().addStartScript(function () {
Dataspace.spawn('WorkerRelay', function () {
const outerFacet = Dataspace.currentFacet();
let outboundEndpoints = Map();
let inboundEndpoints = Map();
let nextId = 0;
const parentPort = worker_threads.parentPort;
function sendToParent(m) {
sendPacket(parentPort, m);
}
parentPort.on('message', Dataspace.wrapExternal(function (m) {
m = decodePacket(m);
if (List.isList(m) && m.get(0) === 'event') {
const epId = m.get(1);
const evt = m.get(2);
const vs = m.get(3);
Dataspace.send(WorkerEvent(epId, evt, vs));
} else {
const err = new Error("Invalid Worker protocol message from parent");
err.irritant = m;
throw err;
}
}));
outerFacet.addEndpoint(function () {
const analysis = Skeleton.analyzeAssertion(Outbound(Capture(Discard._instance)));
analysis.callback = Dataspace.wrap(function (evt, vs) {
outerFacet.actor.scheduleScript(function () {
switch (evt) {
case Skeleton.EVENT_ADDED: {
const epId = nextId++;
outboundEndpoints = outboundEndpoints.set(vs.get(0), epId);
sendToParent(['assert', epId, vs.get(0)]);
break;
}
case Skeleton.EVENT_REMOVED: {
const epId = outboundEndpoints.get(vs.get(0));
outboundEndpoints = outboundEndpoints.remove(vs.get(0));
sendToParent(['clear', epId]);
break;
}
case Skeleton.EVENT_MESSAGE: {
sendToParent(['message', vs.get(0)]);
break;
}
}
});
});
return [analysis.assertion, analysis];
}, false);
outerFacet.addEndpoint(function () {
const analysis = Skeleton.analyzeAssertion(Observe(Inbound(Capture(Discard._instance))));
analysis.callback = Dataspace.wrap(function (evt, vs) {
outerFacet.actor.scheduleScript(function () {
const spec = vs.get(0);
switch (evt) {
case Skeleton.EVENT_ADDED: {
const epId = nextId++;
outerFacet.actor.addFacet(outerFacet, function () {
const innerFacet = Dataspace.currentFacet();
inboundEndpoints = inboundEndpoints.set(spec, { epId, facet: innerFacet });
innerFacet.addEndpoint(function () {
const analysis = Skeleton.analyzeAssertion(
WorkerEvent(epId, Capture(Discard._instance), Capture(Discard._instance)));
analysis.callback = Dataspace.wrap(function (evt, vs) {
if (evt === Skeleton.EVENT_MESSAGE) {
evt = vs.get(0);
vs = vs.get(1);
const a = Skeleton.instantiateAssertion(Inbound(spec), vs);
innerFacet.actor.scheduleScript(function () {
switch (evt) {
case Skeleton.EVENT_ADDED:
innerFacet.actor.addFacet(innerFacet, function () {
const assertionFacet = Dataspace.currentFacet();
assertionFacet.addEndpoint(function () {
return [a, null];
}, false);
assertionFacet.addEndpoint(function () {
const analysis = Skeleton.analyzeAssertion(
WorkerEvent(epId, Skeleton.EVENT_REMOVED, vs));
analysis.callback = Dataspace.wrap(function (evt, vs) {
assertionFacet.actor.scheduleScript(function () {
assertionFacet.stop();
});
});
return [analysis.assertion, analysis];
}, false);
}, true);
break;
case Skeleton.EVENT_MESSAGE:
Dataspace.send(a);
break;
}
});
}
});
return [analysis.assertion, analysis];
}, false);
}, true);
sendToParent(['assert', epId, Observe(spec)]);
break;
}
case Skeleton.EVENT_REMOVED: {
const { epId, facet } = inboundEndpoints.get(spec);
inboundEndpoints = inboundEndpoints.remove(spec);
facet.stop();
sendToParent(['clear', epId]);
break;
}
}
});
});
return [analysis.assertion, analysis];
}, false);
}, null);
});
}
module.exports.spawnWorker = spawnWorker;
module.exports.spawnWorkerRelay = spawnWorkerRelay;

View File

@ -0,0 +1,53 @@
//---------------------------------------------------------------------------
// @syndicate-lang/syntax-test, a demo of Syndicate extensions to JS.
// Copyright (C) 2016-2018 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//---------------------------------------------------------------------------
const { Inbound, Outbound } = require("@syndicate-lang/core");
const { PeriodicTick } = activate require("@syndicate-lang/driver-timer");
assertion type Tick(who, n);
assertion type Tock(msg);
assertion type Tack(who);
spawn named 'workerMain' {
const myData = require('worker_threads').workerData;
const limit = myData.get(0) === 1 ? 2 : 3;
const me = myData.toString();
console.log('In worker', me);
field this.count = 0;
stop on (this.count == limit) {
console.log(me, 'stopped!');
}
assert Outbound(Tack(me + ' ' + this.count));
on message PeriodicTick(1000) {
console.log('tick', me, this.count);
send Outbound(Tick(me, this.count++));
}
on message Inbound(Tock($msg)) {
console.log(me, 'saw:', msg);
}
on asserted Inbound(Tack($who)) console.log(me, '+++ tack:', who);
on retracted Inbound(Tack($who)) console.log(me, '--- tack:', who);
}

View File

@ -0,0 +1,39 @@
//---------------------------------------------------------------------------
// @syndicate-lang/syntax-test, a demo of Syndicate extensions to JS.
// Copyright (C) 2016-2018 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//---------------------------------------------------------------------------
const { Observe, spawnWorker } = activate require("@syndicate-lang/core");
assertion type Tick(who, n);
assertion type Tock(msg);
assertion type Tack(who);
assertion type Employee(id);
spawnWorker(__dirname + '/worker_employee.js', Employee(1));
spawnWorker(__dirname + '/worker_employee.js', Employee(2));
spawn {
on message Tick($who, $n) {
console.log('manager saw', who, 'tick', n);
send Tock(who + ' ticked ' + n);
}
on asserted Observe(Tock(_)) console.log('Someone is watching for Tock!');
on retracted Observe(Tock(_)) console.log('No-one is watching for Tock!');
on asserted Tack($who) console.log('manager + tack:', who);
on retracted Tack($who) console.log('manager - tack:', who);
}