diff --git a/packages/default.js.do b/packages/default.js.do index 1a46810..39f9e47 100644 --- a/packages/default.js.do +++ b/packages/default.js.do @@ -18,8 +18,20 @@ case "$1" in redo-ifchange ../syntax/all file=$(basename "$1") redo-ifchange "src/$file" - npx syndicate-babel "src/$file" - # curl -fsS --data-binary @"src/$file" http://localhost:14641/compile/"$file" + if [ -n "$SYNDICATE_COMPILE_SERVER" ] + then + if wget -q -O - --content-on-error --post-file="src/$file" \ + ${SYNDICATE_COMPILE_SERVER}/"$file" \ + > ${targettempfile} 2>/dev/null + then + : + else + cat ${targettempfile} >&2 + rm -f ${targettempfile} + fi + else + npx syndicate-babel "src/$file" + fi ;; */dist/*) # Conservatively assume the distribution depends on ALL the diff --git a/packages/syntax-server/bin/syndicate-babel-server.js b/packages/syntax-server/bin/syndicate-babel-server.js index f7b3b1a..91c9a7e 100755 --- a/packages/syntax-server/bin/syndicate-babel-server.js +++ b/packages/syntax-server/bin/syndicate-babel-server.js @@ -1,4 +1,5 @@ -#!/usr/bin/env node +#!/bin/sh +//bin/true; exec node --experimental-worker "$0" const Program = require("../lib/index"); const Syndicate = require("@syndicate-lang/core"); Syndicate.bootModule(Program); diff --git a/packages/syntax-server/src/compiler.js b/packages/syntax-server/src/compiler.js new file mode 100644 index 0000000..3c762ca --- /dev/null +++ b/packages/syntax-server/src/compiler.js @@ -0,0 +1,30 @@ +"use strict"; + +require("@syndicate-lang/syntax/lib/index"); // patches babel -- load before any of babel loads!! +const BabelTransform = require("@babel/core/lib/transform"); + +import { Observe, Dataspace, genUuid, Inbound, Outbound } from "@syndicate-lang/core"; + +import { WorkItem, JobResult, JobError } from "./job"; +assertion type CompilationOptions(options); +assertion type Compilation(filename, input); + +export { + CompilationOptions, + Compilation, +}; + +spawn named 'compiler' { + const worker = genUuid('worker'); + during Inbound(CompilationOptions($options)) { + during Inbound(Observe(WorkItem(worker, Compilation($filename, $input), _))) { + const finalOptions = Object.assign({filename: '/' + filename}, options.toJS()); + console.log(worker, 'compiling', filename, '...'); + BabelTransform.transform(input, finalOptions, Dataspace.wrapExternal((err, output) => { + react assert Outbound(WorkItem(worker, + Compilation(filename, input), + err ? JobError(err.toString()) : JobResult(output.code))); + })); + } + } +} diff --git a/packages/syntax-server/src/index.js b/packages/syntax-server/src/index.js index d552c51..a46d363 100644 --- a/packages/syntax-server/src/index.js +++ b/packages/syntax-server/src/index.js @@ -1,19 +1,13 @@ "use strict"; -require("@syndicate-lang/syntax/lib/index"); // patches babel -- load before any of babel loads!! -const BabelTransform = require("@babel/core/lib/transform"); - -const UI = require("@syndicate-lang/driver-browser-ui"); -// @jsx UI.html -// @jsxFrag UI.htmlFragment - +import { Bytes, spawnWorker } from "@syndicate-lang/core"; const Http = activate require("@syndicate-lang/driver-http-node"); const S = activate require("@syndicate-lang/driver-streams-node"); +const C = require("./compiler"); +const J = activate require("./job"); const fs = require('fs'); -import { Dataspace, Bytes, genUuid } from "@syndicate-lang/core"; - const options = { "port": 14641, "babel-options": JSON.parse(fs.readFileSync(__dirname + '/../.babelrc')), @@ -60,19 +54,21 @@ console.info(`http://localhost:${options.port}/compile/FILENAME`); console.info(options); spawn named 'rootServer' { + assert C.CompilationOptions(options["babel-options"]); + const server = Http.HttpServer(null, options.port); during Http.Request($reqId, server, 'post', ['compile', $file], _, $reqSeal) spawn named reqId { stop on retracted S.Readable(reqId); _collectSource.call(this, reqId, (source) => { - const finalOptions = Object.assign({filename: '/' + file}, options["babel-options"]); - BabelTransform.transform(source, finalOptions, Dataspace.wrapExternal((err, output) => { - if (err) { + react { + stop on asserted J.Job(C.Compilation(file, source), J.JobError($errmsg)) { react assert Http.Response( - reqId, 400, "Error", {"Content-Type": "text/plain"}, err.toString()); - } else { - react assert Http.Response(reqId, 200, "OK", {}, output.code); + reqId, 400, "Error", {"Content-Type": "text/plain"}, errmsg); } - })); + stop on asserted J.Job(C.Compilation(file, source), J.JobResult($output)) { + react assert Http.Response(reqId, 200, "OK", {}, output); + } + } }); } } @@ -85,3 +81,11 @@ function _collectSource(streamId, cb) { cb(source); } } + +{ + const nCPUs = require('os').cpus().length; + for (let i = 0; i < nCPUs; i++) { + // spawn dataspace activate C; + spawnWorker(__dirname + '/compiler.js'); + } +} diff --git a/packages/syntax-server/src/job.js b/packages/syntax-server/src/job.js new file mode 100644 index 0000000..e8b66b1 --- /dev/null +++ b/packages/syntax-server/src/job.js @@ -0,0 +1,62 @@ +const { currentFacet, Observe, List, Set } = require("@syndicate-lang/core"); + +assertion type WorkItem(worker, item, result); +assertion type Job(item, result); +assertion type JobResult(output); +assertion type JobError(err); + +export { + WorkItem, Job, + JobResult, JobError, +}; + +spawn named 'JobDispatcher' { + field this.readyWorkers = Set(); + field this.runningJobs = 0; + // dataflow { + // console.log(this.runningJobs, 'running jobs', this.readyWorkers.size, 'idle workers'); + // } + + on asserted Observe(Observe(WorkItem($w, _, _))) this.readyWorkers = this.readyWorkers.add(w); + on retracted Observe(Observe(WorkItem($w, _, _))) this.readyWorkers = this.readyWorkers.remove(w); + + during Observe(Job($item, _)) { + on start this.runningJobs++; + on stop this.runningJobs--; + + const waitForWorker = (retryCount) => { + if (retryCount === 3) { + console.error('Failed job, too many retries', item); + react { + assert Job(item, JobError(new Error("Too many retries"))); + } + } else { + react { + const facet = currentFacet(); + dataflow { + const worker = this.readyWorkers.first(); + if (worker !== void 0) { + this.readyWorkers = this.readyWorkers.remove(worker); + facet.stop(() => { + react { + stop on retracted Observe(Observe(WorkItem(worker, _, _))) { + console.warn('Worker withdrew before answering', worker); + waitForWorker(retryCount + 1); + } + stop on asserted WorkItem(worker, item, $result) { + this.readyWorkers = this.readyWorkers.add(worker); + react { + assert Job(item, result); + } + } + } + }); + } + } + } + } + }; + + on start waitForWorker(0); + } +}