From ede0e293709f09c1c5b5b27e39ffe78f1fc4ee92 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sat, 3 Jul 2021 09:03:52 +0200 Subject: [PATCH] A few days' work redoing syndicate-rs - still WIP --- .gitignore | 1 + Cargo.lock | 541 +++++++++++------------------ Cargo.toml | 19 +- src/.gitignore | 1 + src/actor.rs | 389 +++++++++++++++++++++ src/bin/syndicate-server.rs | 256 +++++++------- src/config.rs | 7 - src/dataspace.rs | 219 ++++-------- src/error.rs | 41 +++ src/lib.rs | 65 ++-- src/packets.rs | 92 +---- src/pattern.rs | 81 +++++ src/peer.rs | 134 ++------ src/skeleton.rs | 667 ++++++++++++++---------------------- src/spaces.rs | 54 --- 15 files changed, 1259 insertions(+), 1308 deletions(-) create mode 100644 src/.gitignore create mode 100644 src/actor.rs create mode 100644 src/error.rs create mode 100644 src/pattern.rs delete mode 100644 src/spaces.rs diff --git a/.gitignore b/.gitignore index 2852c96..0b083e1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target **/*.rs.bk scratch/ +src/gen/**/*.rs diff --git a/Cargo.lock b/Cargo.lock index 357dc99..15d8d05 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,13 +2,22 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "aho-corasick" +version = "0.7.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e37cfd5e7657ada45f742d6e99ca5788580b5c529dc78faf11ece6dc702656f" +dependencies = [ + "memchr", +] + [[package]] name = "ansi_term" version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" dependencies = [ - "winapi 0.3.9", + "winapi", ] [[package]] @@ -17,7 +26,7 @@ version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" dependencies = [ - "winapi 0.3.9", + "winapi", ] [[package]] @@ -28,7 +37,7 @@ checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" dependencies = [ "hermit-abi", "libc", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -39,9 +48,9 @@ checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" [[package]] name = "base64" -version = "0.11.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b41b7ea54a0c9d92199de89e20e58d49f02f8e699814ef3fdf266f6f748d15c7" +checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" [[package]] name = "bitflags" @@ -51,25 +60,13 @@ checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" [[package]] name = "block-buffer" -version = "0.7.3" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0940dc441f31689269e10ac70eb1002a3a1d3ad1390e030043662eb7fe4688b" +checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4" dependencies = [ - "block-padding", - "byte-tools", - "byteorder", "generic-array", ] -[[package]] -name = "block-padding" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa79dedbb091f449f1f39e53edf88d5dbe95f895dae6135a8d7b881fb5af73f5" -dependencies = [ - "byte-tools", -] - [[package]] name = "bstr" version = "0.2.16" @@ -84,15 +81,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.6.1" +version = "3.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63396b8a4b9de3f4fdfb320ab6080762242f66a8ef174c49d8e19b674db4cdbe" - -[[package]] -name = "byte-tools" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3b5ca7a04898ad4bcd41c90c5285445ff5b791899bb1b0abdd2a2aa791211d7" +checksum = "9c59e7af012c713f529e7a3ee57ce9b31ddd858d4b512923602f74608b009631" [[package]] name = "byteorder" @@ -100,12 +91,6 @@ version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" -[[package]] -name = "bytes" -version = "0.5.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" - [[package]] name = "bytes" version = "1.0.1" @@ -123,15 +108,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.67" +version = "1.0.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3c69b077ad434294d3ce9f1f6143a2a4b89a8a2d54ef813d85003a4fd1137fd" - -[[package]] -name = "cfg-if" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" +checksum = "4a72c244c1ff497a746a7e1fb3d14bd08420ecda70c8f25c7112f2781652d787" [[package]] name = "cfg-if" @@ -148,7 +127,7 @@ dependencies = [ "libc", "num-integer", "num-traits", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -166,6 +145,12 @@ dependencies = [ "vec_map", ] +[[package]] +name = "convert_case" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" + [[package]] name = "core-foundation" version = "0.9.1" @@ -182,6 +167,15 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ea221b5284a47e40033bf9b66f35f984ec0ea2931eb03505246cd27a963f981b" +[[package]] +name = "cpufeatures" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66c99696f6c9dd7f35d486b9d04d7e6e202aa3e8c40d553f2fdf5e7e0c6a71ef" +dependencies = [ + "libc", +] + [[package]] name = "criterion" version = "0.3.4" @@ -193,7 +187,7 @@ dependencies = [ "clap", "criterion-plot", "csv", - "itertools 0.10.0", + "itertools 0.10.1", "lazy_static", "num-traits", "oorandom", @@ -224,7 +218,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "crossbeam-utils", ] @@ -234,18 +228,18 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94af6efb46fef72616855b036a624cf27ba656ffc9be1b9a3c931cfc7749a9a9" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "crossbeam-epoch", "crossbeam-utils", ] [[package]] name = "crossbeam-epoch" -version = "0.9.4" +version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52fb27eab85b17fbb9f6fd667089e07d6a2eb8743d02639ee7f6a7a7729c9c94" +checksum = "4ec02e091aa634e2c3ada4a392989e7c3116673ef0ac5b72232439094d73b7fd" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "crossbeam-utils", "lazy_static", "memoffset", @@ -254,12 +248,11 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.4" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4feb231f0d4d6af81aed15928e58ecf5816aa62a2393e2c82f46973e92a9a278" +checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db" dependencies = [ - "autocfg", - "cfg-if 1.0.0", + "cfg-if", "lazy_static", ] @@ -287,9 +280,9 @@ dependencies = [ [[package]] name = "digest" -version = "0.8.1" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5" +checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066" dependencies = [ "generic-array", ] @@ -300,12 +293,6 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" -[[package]] -name = "fake-simd" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" - [[package]] name = "fnv" version = "1.0.7" @@ -337,22 +324,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "fuchsia-zircon" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" -dependencies = [ - "bitflags", - "fuchsia-zircon-sys", -] - -[[package]] -name = "fuchsia-zircon-sys" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" - [[package]] name = "futures" version = "0.3.15" @@ -440,7 +411,7 @@ dependencies = [ "futures-sink", "futures-task", "memchr", - "pin-project-lite 0.2.6", + "pin-project-lite", "pin-utils", "proc-macro-hack", "proc-macro-nested", @@ -449,34 +420,30 @@ dependencies = [ [[package]] name = "generic-array" -version = "0.12.4" +version = "0.14.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffdf9f34f1447443d37393cc6c2b8313aebddcd96906caf34e54c68d8e57d7bd" +checksum = "501466ecc8a30d1d3b7fc9229b122b2ce8ed6e9d9223f1138d4babb253e51817" dependencies = [ "typenum", + "version_check", ] [[package]] name = "getrandom" -version = "0.1.16" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce" +checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "libc", - "wasi 0.9.0+wasi-snapshot-preview1", + "wasi", ] [[package]] -name = "getrandom" -version = "0.2.2" +name = "glob" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9495705279e7140bf035dde1f6e750c162df8b625267cd52cc44e0b156732c8" -dependencies = [ - "cfg-if 1.0.0", - "libc", - "wasi 0.10.2+wasi-snapshot-preview1", -] +checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" [[package]] name = "half" @@ -486,9 +453,9 @@ checksum = "62aca2aba2d62b4a7f5b33f3712cb1b0692779a56fb510499d5c0aa594daeaf3" [[package]] name = "heck" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87cbf45460356b7deeb5e3415b5563308c0a9b057c85e12b06ad551f98d0a6ac" +checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c" dependencies = [ "unicode-segmentation", ] @@ -508,7 +475,7 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "527e8c9ac747e28542699a951517aa9a6945af506cd1f2e1b53a576c17b6cc11" dependencies = [ - "bytes 1.0.1", + "bytes", "fnv", "itoa", ] @@ -532,20 +499,11 @@ dependencies = [ [[package]] name = "input_buffer" -version = "0.3.1" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19a8a95243d5a0398cae618ec29477c6e3cb631152be5c19481f80bc71559754" +checksum = "f97967975f448f1a7ddb12b0bc41069d09ed6a1c161a92687e057325db35d413" dependencies = [ - "bytes 0.5.6", -] - -[[package]] -name = "iovec" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" -dependencies = [ - "libc", + "bytes", ] [[package]] @@ -559,9 +517,9 @@ dependencies = [ [[package]] name = "itertools" -version = "0.10.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37d572918e350e82412fe766d24b15e6682fb2ed2bbe018280caa810397cb319" +checksum = "69ddb889f9d0d08a67338271fa9b62996bc788c7796a5c18cf057420aaed5eaf" dependencies = [ "either", ] @@ -581,16 +539,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "kernel32-sys" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" -dependencies = [ - "winapi 0.2.8", - "winapi-build", -] - [[package]] name = "lazy_static" version = "1.4.0" @@ -599,9 +547,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.94" +version = "0.2.97" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18794a8ad5b29321f790b55d93dfba91e125cb1a9edbd4f8e3150acc771c1a5e" +checksum = "12b8adadd720df158f4d70dfe7ccc6adb0472d7c55ca83445f6a5ab3e36f8fb6" [[package]] name = "log" @@ -609,7 +557,7 @@ version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", ] [[package]] @@ -635,53 +583,33 @@ checksum = "b16bd47d9e329435e309c58469fe0791c2d0d1ba96ec0954152a5ae2b04387dc" [[package]] name = "memoffset" -version = "0.6.3" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f83fb6581e8ed1f85fd45c116db8405483899489e38406156c25eb743554361d" +checksum = "59accc507f1338036a0477ef61afdae33cde60840f4dfe481319ce3ad116ddf9" dependencies = [ "autocfg", ] [[package]] name = "mio" -version = "0.6.23" +version = "0.7.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4afd66f5b91bf2a3bc13fad0e21caedac168ca4c707504e75585648ae80e4cc4" +checksum = "8c2bdb6314ec10835cd3293dd268473a835c02b7b352e788be788b3c6ca6bb16" dependencies = [ - "cfg-if 0.1.10", - "fuchsia-zircon", - "fuchsia-zircon-sys", - "iovec", - "kernel32-sys", "libc", "log", "miow", - "net2", - "slab", - "winapi 0.2.8", -] - -[[package]] -name = "mio-uds" -version = "0.6.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afcb699eb26d4332647cc848492bbc15eafb26f08d0304550d5aa1f612e066f0" -dependencies = [ - "iovec", - "libc", - "mio", + "ntapi", + "winapi", ] [[package]] name = "miow" -version = "0.2.2" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebd808424166322d4a38da87083bfddd3ac4c131334ed55856112eb06d46944d" +checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" dependencies = [ - "kernel32-sys", - "net2", - "winapi 0.2.8", - "ws2_32-sys", + "winapi", ] [[package]] @@ -703,14 +631,12 @@ dependencies = [ ] [[package]] -name = "net2" -version = "0.2.37" +name = "ntapi" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "391630d12b68002ae1e25e8f974306474966550ad82dac6886fb8910c19568ae" +checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44" dependencies = [ - "cfg-if 0.1.10", - "libc", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -802,9 +728,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.7.2" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af8b08b04175473088b46763e51ee54da5f9a164bc162f615b91bc179dbf15a3" +checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56" [[package]] name = "oorandom" @@ -814,18 +740,18 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" [[package]] name = "opaque-debug" -version = "0.2.3" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c" +checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" [[package]] name = "openssl" -version = "0.10.34" +version = "0.10.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d7830286ad6a3973c0f1d9b73738f69c76b739301d0229c4b96501695cbe4c8" +checksum = "549430950c79ae24e6d02e0b7404534ecf311d94cc9f861e9e4020187d13d885" dependencies = [ "bitflags", - "cfg-if 1.0.0", + "cfg-if", "foreign-types", "libc", "once_cell", @@ -849,9 +775,9 @@ dependencies = [ [[package]] name = "openssl-sys" -version = "0.9.63" +version = "0.9.65" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6b0d6fb7d80f877617dfcb014e605e2b5ab2fb0afdf27935219bb6bd984cb98" +checksum = "7a7907e3bfa08bb85105209cdfcb6c63d109f8f6c1ed6ca318fff5c1853fbc1d" dependencies = [ "autocfg", "cc", @@ -876,33 +802,13 @@ dependencies = [ "ucd-trie", ] -[[package]] -name = "pin-project" -version = "0.4.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "918192b5c59119d51e0cd221f4d49dde9112824ba717369e903c97d076083d0f" -dependencies = [ - "pin-project-internal 0.4.28", -] - [[package]] name = "pin-project" version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7509cc106041c40a4518d2af7a61530e1eed0e6285296a3d8c5472806ccc4a4" dependencies = [ - "pin-project-internal 1.0.7", -] - -[[package]] -name = "pin-project-internal" -version = "0.4.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3be26700300be6d9d23264c73211d8190e755b6b5ca7a1b28230025511b52a5e" -dependencies = [ - "proc-macro2", - "quote", - "syn", + "pin-project-internal", ] [[package]] @@ -916,12 +822,6 @@ dependencies = [ "syn", ] -[[package]] -name = "pin-project-lite" -version = "0.1.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "257b64915a082f7811703966789728173279bdebb956b143dbcd23f6f970a777" - [[package]] name = "pin-project-lite" version = "0.2.6" @@ -942,9 +842,9 @@ checksum = "3831453b3449ceb48b6d9c7ad7c96d5ea673e9b470a1dc578c2ce6521230884c" [[package]] name = "plotters" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45ca0ae5f169d0917a7c7f5a9c1a3d3d9598f18f529dd2b8373ed988efea307a" +checksum = "32a3fd9ec30b9749ce28cd91f255d569591cdf937fe280c312143e3c4bad6f2a" dependencies = [ "num-traits", "plotters-backend", @@ -976,7 +876,7 @@ checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" [[package]] name = "preserves" -version = "0.13.0" +version = "0.15.0" dependencies = [ "lazy_static", "num", @@ -984,6 +884,18 @@ dependencies = [ "serde_bytes", ] +[[package]] +name = "preserves-schema" +version = "0.2.0" +dependencies = [ + "convert_case", + "glob", + "lazy_static", + "preserves", + "regex", + "structopt", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -1022,9 +934,9 @@ checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086" [[package]] name = "proc-macro2" -version = "1.0.26" +version = "1.0.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a152013215dca273577e18d2bf00fa862b89b24169fb78c4c95aeb07992c9cec" +checksum = "f0d8caf72986c1a598726adc988bb5984792ef84f5ee5aa50209145ee8077038" dependencies = [ "unicode-xid", ] @@ -1040,90 +952,49 @@ dependencies = [ [[package]] name = "rand" -version = "0.7.3" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" -dependencies = [ - "getrandom 0.1.16", - "libc", - "rand_chacha 0.2.2", - "rand_core 0.5.1", - "rand_hc 0.2.0", -] - -[[package]] -name = "rand" -version = "0.8.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ef9e7e66b4468674bfcb0c81af8b7fa0bb154fa9f28eb840da5c447baeb8d7e" +checksum = "2e7573632e6454cf6b99d7aac4ccca54be06da05aca2ef7423d22d27d4d4bcd8" dependencies = [ "libc", - "rand_chacha 0.3.0", - "rand_core 0.6.2", - "rand_hc 0.3.0", + "rand_chacha", + "rand_core", + "rand_hc", ] [[package]] name = "rand_chacha" -version = "0.2.2" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core 0.5.1", -] - -[[package]] -name = "rand_chacha" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e12735cf05c9e10bf21534da50a147b924d555dc7a547c42e6bb2d5b6017ae0d" -dependencies = [ - "ppv-lite86", - "rand_core 0.6.2", + "rand_core", ] [[package]] name = "rand_core" -version = "0.5.1" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" +checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7" dependencies = [ - "getrandom 0.1.16", -] - -[[package]] -name = "rand_core" -version = "0.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34cf66eb183df1c5876e2dcf6b13d57340741e8dc255b48e40a26de954d06ae7" -dependencies = [ - "getrandom 0.2.2", + "getrandom", ] [[package]] name = "rand_hc" -version = "0.2.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" +checksum = "d51e9f596de227fda2ea6c84607f5558e196eeaf43c986b724ba4fb8fdf497e7" dependencies = [ - "rand_core 0.5.1", -] - -[[package]] -name = "rand_hc" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3190ef7066a446f2e7f42e239d161e905420ccab01eb967c9eb27d21b2322a73" -dependencies = [ - "rand_core 0.6.2", + "rand_core", ] [[package]] name = "rayon" -version = "1.5.0" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b0d8e0819fadc20c74ea8373106ead0600e3a67ef1fe8da56e39b9ae7275674" +checksum = "c06aca804d41dbc8ba42dfd964f0d01334eceb64314b9ecf7c5fad5188a06d90" dependencies = [ "autocfg", "crossbeam-deque", @@ -1133,9 +1004,9 @@ dependencies = [ [[package]] name = "rayon-core" -version = "1.9.0" +version = "1.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ab346ac5921dc62ffa9f89b7a773907511cdfa5490c572ae9be1be33e8afa4a" +checksum = "d78120e2c850279833f1dd3582f730c4ab53ed95aeaaaa862a2a5c71b1656d8e" dependencies = [ "crossbeam-channel", "crossbeam-deque", @@ -1146,9 +1017,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.2.8" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "742739e41cd49414de871ea5e549afb7e2a3ac77b589bcbebe8c82fab37147fc" +checksum = "5ab49abadf3f9e1c4bc499e8845e152ad87d2ad2d30371841171169e9d75feee" dependencies = [ "bitflags", ] @@ -1159,16 +1030,17 @@ version = "1.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461" dependencies = [ + "aho-corasick", + "memchr", "regex-syntax", ] [[package]] name = "regex-automata" -version = "0.1.9" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" dependencies = [ - "byteorder", "regex-syntax", ] @@ -1184,7 +1056,7 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" dependencies = [ - "winapi 0.3.9", + "winapi", ] [[package]] @@ -1218,7 +1090,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75" dependencies = [ "lazy_static", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -1229,9 +1101,9 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" [[package]] name = "security-framework" -version = "2.2.0" +version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3670b1d2fdf6084d192bc71ead7aabe6c06aa2ea3fbd9cc3ac111fa5c2b1bd84" +checksum = "23a2ac85147a3a11d77ecf1bc7166ec0b92febfa4461c37944e180f319ece467" dependencies = [ "bitflags", "core-foundation", @@ -1242,9 +1114,9 @@ dependencies = [ [[package]] name = "security-framework-sys" -version = "2.2.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3676258fd3cfe2c9a0ec99ce3038798d847ce3e4bb17746373eb9f0f1ac16339" +checksum = "7e4effb91b4b8b6fb7732e670b6cee160278ff8e6bf485c7805d9e319d76e284" dependencies = [ "core-foundation-sys", "libc", @@ -1320,13 +1192,14 @@ dependencies = [ [[package]] name = "sha-1" -version = "0.8.2" +version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7d94d0bede923b3cea61f3f1ff57ff8cdfd77b400fb8f9998949e0cf04163df" +checksum = "8c4cfa741c5832d0ef7fab46cabed29c2aae926db0b11bb2069edd8db5e64e16" dependencies = [ "block-buffer", + "cfg-if", + "cpufeatures", "digest", - "fake-simd", "opaque-debug", ] @@ -1383,9 +1256,9 @@ dependencies = [ [[package]] name = "syn" -version = "1.0.72" +version = "1.0.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1e8cdbefb79a9a5a65e0db8b47b723ee907b7c7f8496c76a1770b5c310bab82" +checksum = "f71489ff30030d2ae598524f61326b902466f72a0fb1a8564c001cc63425bcc7" dependencies = [ "proc-macro2", "quote", @@ -1394,13 +1267,14 @@ dependencies = [ [[package]] name = "syndicate-rs" -version = "0.1.0" +version = "0.2.0" dependencies = [ - "bytes 0.5.6", + "bytes", "criterion", "futures", "openssl", "preserves", + "preserves-schema", "serde", "serde_bytes", "structopt", @@ -1419,12 +1293,12 @@ version = "3.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dac1c663cfc93810f88aed9b8941d48cabf856a1b111c29a40439018d870eb22" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "libc", - "rand 0.8.3", + "rand", "redox_syscall", "remove_dir_all", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -1436,6 +1310,26 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "thiserror" +version = "1.0.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa6f76457f59514c7eeb4e59d891395fab0b2fd1d40723ae737d64153392e9c6" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a36768c0fbf1bb15eca10defa29526bda730a2376c2ab4393ccfa16fb1a318d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "thread_local" version = "1.1.3" @@ -1472,30 +1366,26 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "0.2.25" +version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6703a273949a90131b290be1fe7b039d0fc884aa1935860dfcbe056f28cd8092" +checksum = "5fb2ed024293bb19f7a5dc54fe83bf86532a44c12a2bb8ba40d64a4509395ca2" dependencies = [ - "bytes 0.5.6", - "fnv", - "futures-core", - "iovec", - "lazy_static", + "autocfg", + "bytes", "libc", "memchr", "mio", - "mio-uds", "num_cpus", - "pin-project-lite 0.1.12", - "slab", + "pin-project-lite", "tokio-macros", + "winapi", ] [[package]] name = "tokio-macros" -version = "0.2.6" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e44da00bfc73a25f814cd8d7e57a68a5c31b74b3152a0a1d1f590c97ed06265a" +checksum = "c49e3df43841dafb86046472506755d8501c5615673955f6aa17181125d13c37" dependencies = [ "proc-macro2", "quote", @@ -1504,28 +1394,28 @@ dependencies = [ [[package]] name = "tokio-tungstenite" -version = "0.10.1" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8b8fe88007ebc363512449868d7da4389c9400072a3f666f212c7280082882a" +checksum = "1e96bb520beab540ab664bd5a9cfeaa1fcd846fa68c830b42e2c8963071251d2" dependencies = [ - "futures", + "futures-util", "log", - "pin-project 0.4.28", + "pin-project", "tokio", "tungstenite", ] [[package]] name = "tokio-util" -version = "0.3.1" +version = "0.6.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be8242891f2b6cbef26a2d7e8605133c2c554cd35b3e4948ea892d6d68436499" +checksum = "1caa0b0c8d94a049db56b5acf8cba99dc0623aab1b26d5b5f5e2d945846b3592" dependencies = [ - "bytes 0.5.6", + "bytes", "futures-core", "futures-sink", "log", - "pin-project-lite 0.1.12", + "pin-project-lite", "tokio", ] @@ -1535,8 +1425,8 @@ version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09adeb8c97449311ccd28a427f96fb563e7fd31aabf994189879d9da2394b89d" dependencies = [ - "cfg-if 1.0.0", - "pin-project-lite 0.2.6", + "cfg-if", + "pin-project-lite", "tracing-attributes", "tracing-core", ] @@ -1567,7 +1457,7 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" dependencies = [ - "pin-project 1.0.7", + "pin-project", "tracing", ] @@ -1616,20 +1506,21 @@ dependencies = [ [[package]] name = "tungstenite" -version = "0.10.1" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cfea31758bf674f990918962e8e5f07071a3161bd7c4138ed23e416e1ac4264e" +checksum = "5fe8dada8c1a3aeca77d6b51a4f1314e0f4b8e438b7b1b71e3ddaca8080e4093" dependencies = [ "base64", "byteorder", - "bytes 0.5.6", + "bytes", "http", "httparse", "input_buffer", "log", "native-tls", - "rand 0.7.3", + "rand", "sha-1", + "thiserror", "url", "utf-8", ] @@ -1657,9 +1548,9 @@ dependencies = [ [[package]] name = "unicode-normalization" -version = "0.1.17" +version = "0.1.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07fbfce1c8a97d547e8b5334978438d9d6ec8c20e38f56d4a4374d181493eaef" +checksum = "d54590932941a9e9266f0832deed84ebe1bf2e4c9e4a3554d393d18f5e854bf9" dependencies = [ "tinyvec", ] @@ -1702,9 +1593,9 @@ checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" [[package]] name = "vcpkg" -version = "0.2.12" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbdbff6266a24120518560b5dc983096efb98462e51d0d68169895b237be3e5d" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" [[package]] name = "vec_map" @@ -1725,16 +1616,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56" dependencies = [ "same-file", - "winapi 0.3.9", + "winapi", "winapi-util", ] -[[package]] -name = "wasi" -version = "0.9.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" - [[package]] name = "wasi" version = "0.10.2+wasi-snapshot-preview1" @@ -1747,7 +1632,7 @@ version = "0.2.74" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d54ee1d4ed486f78874278e63e4069fc1ab9f6a18ca492076ffb90c5eb2997fd" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "wasm-bindgen-macro", ] @@ -1805,12 +1690,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "winapi" -version = "0.2.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" - [[package]] name = "winapi" version = "0.3.9" @@ -1821,12 +1700,6 @@ dependencies = [ "winapi-x86_64-pc-windows-gnu", ] -[[package]] -name = "winapi-build" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" - [[package]] name = "winapi-i686-pc-windows-gnu" version = "0.4.0" @@ -1839,7 +1712,7 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" dependencies = [ - "winapi 0.3.9", + "winapi", ] [[package]] @@ -1847,13 +1720,3 @@ name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" - -[[package]] -name = "ws2_32-sys" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e" -dependencies = [ - "winapi 0.2.8", - "winapi-build", -] diff --git a/Cargo.toml b/Cargo.toml index eec1067..25f710f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,11 +1,12 @@ [package] name = "syndicate-rs" -version = "0.1.0" +version = "0.2.0" authors = ["Tony Garnock-Jones "] edition = "2018" [patch.crates-io] preserves = { path = "/home/tonyg/src/preserves/implementations/rust/preserves" } +preserves-schema = { path = "/home/tonyg/src/preserves/implementations/rust/preserves-schema" } [features] vendored-openssl = ["openssl/vendored"] @@ -17,22 +18,26 @@ lto = true [lib] name = "syndicate" +[build-dependencies] +preserves-schema = "0.2.0" + [dependencies] -preserves = "0.13.0" +preserves = "0.15.0" +preserves-schema = "0.2.0" serde = { version = "1.0", features = ["derive", "rc"] } serde_bytes = "0.11" -tokio = { version = "0.2.21", features = ["macros", "rt-threaded", "sync", "dns", "tcp", "time", "stream"] } -tokio-util = { version = "0.3.1", features = ["codec"] } -bytes = "0.5.4" +tokio = { version = "1.7.1", features = ["macros", "sync", "net", "rt", "rt-multi-thread", "time"] } +tokio-util = { version = "0.6.7", features = ["codec"] } +bytes = "1.0.1" futures = "0.3.5" structopt = "0.3.14" -tungstenite = "0.10.1" -tokio-tungstenite = "0.10.1" +tungstenite = "0.13.0" +tokio-tungstenite = "0.14.0" tracing = "0.1.14" tracing-subscriber = "0.2.5" diff --git a/src/.gitignore b/src/.gitignore new file mode 100644 index 0000000..c75f615 --- /dev/null +++ b/src/.gitignore @@ -0,0 +1 @@ +schemas/ diff --git a/src/actor.rs b/src/actor.rs new file mode 100644 index 0000000..64dcd8a --- /dev/null +++ b/src/actor.rs @@ -0,0 +1,389 @@ +use super::Assertion; +use super::ActorId; +use super::Handle; +use super::schemas::internal_protocol::*; +use super::error::Error; + +use preserves::value::Domain; +use preserves::value::IOResult; +use preserves::value::IOValue; +use preserves::value::Map; +use preserves::value::NestedValue; + +use std::boxed::Box; +use std::cell::Cell; +use std::collections::hash_map::HashMap; +use std::future::Future; +use std::future::ready; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use tokio::select; +use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver}; +use tokio_util::sync::CancellationToken; + +use tracing::{Instrument, trace, error}; + +pub type ActorResult = Result<(), Error>; +pub type ActorHandle = tokio::task::JoinHandle; + +pub trait Entity { + fn assert(&mut self, _t: &mut Activation, _a: Assertion, _h: Handle) -> ActorResult { + Ok(()) + } + fn retract(&mut self, _t: &mut Activation, _h: Handle) -> ActorResult { + Ok(()) + } + fn message(&mut self, _t: &mut Activation, _m: Assertion) -> ActorResult { + Ok(()) + } + fn sync(&mut self, t: &mut Activation, peer: Arc) -> ActorResult { + t.message(peer, Assertion::new(true)); + Ok(()) + } +} + +type OutboundAssertions = Map>; + +// This is what other implementations call a "Turn", renamed here to +// avoid conflicts with schemas::internal_protocol::Turn. +pub struct Activation<'activation> { + outbound_assertions: &'activation mut OutboundAssertions, + queues: HashMap, Event)>>, +} + +enum SystemMessage { + Release, + ReleaseOid(Oid), + Turn(Turn), + Crash(Error), +} + +pub struct Mailbox { + pub actor_id: ActorId, + pub mailbox_id: u64, + tx: UnboundedSender, + pub queue_depth: Arc, + pub mailbox_count: Arc, +} + +pub struct Actor { + pub template_mailbox: Mailbox, + rx: UnboundedReceiver, + pub outbound_assertions: OutboundAssertions, + pub oid_map: Map>>, + pub next_task_id: u64, + pub linked_tasks: Map, +} + +#[derive(Debug, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct Ref { + pub relay: Mailbox, + pub target: Oid, + /* TODO: attenuation */ +} + +//--------------------------------------------------------------------------- + +impl<'activation> Activation<'activation> { + pub fn for_actor(actor: &'activation mut Actor) -> Self { + Self::for_actor_details(&mut actor.outbound_assertions) + } + + pub fn for_actor_details(outbound_assertions: &'activation mut OutboundAssertions) -> Self { + Activation { + outbound_assertions, + queues: HashMap::new(), + } + } + + pub fn assert(&mut self, r: Arc, a: M) -> Handle where M: Into { + let handle = crate::next_handle(); + self.queue_for(&r).push((Arc::clone(&r), Event::Assert(Box::new( + Assert { assertion: Assertion(a.into()), handle: handle.clone() })))); + self.outbound_assertions.insert(handle.clone(), r); + handle + } + + pub fn retract(&mut self, handle: Handle) { + if let Some(r) = self.outbound_assertions.remove(&handle) { + self.retract_known_ref(r, handle) + } + } + + pub fn retract_known_ref(&mut self, r: Arc, handle: Handle) { + self.queue_for(&r).push((r, Event::Retract(Box::new(Retract { handle })))); + } + + pub fn message(&mut self, r: Arc, m: M) where M: Into { + self.queue_for(&r).push((r, Event::Message(Box::new( + Message { body: Assertion(m.into()) })))) + } + + fn queue_for(&mut self, r: &Arc) -> &mut Vec<(Arc, Event)> { + self.queues.entry(r.relay.actor_id).or_default() + } + + pub fn deliver(&mut self) { + for (_actor_id, turn) in std::mem::take(&mut self.queues).into_iter() { + if turn.len() == 0 { continue; } + let first_ref = Arc::clone(&turn[0].0); + let target = &first_ref.relay; + target.send(Turn(turn.into_iter().map( + |(r, e)| TurnEvent { oid: r.target.clone(), event: e }).collect())); + } + } +} + +impl<'activation> Drop for Activation<'activation> { + fn drop(&mut self) { + self.deliver() + } +} + +impl Mailbox { + pub fn send(&self, t: Turn) { + let _ = self.tx.send(SystemMessage::Turn(t)); + self.queue_depth.fetch_add(1, Ordering::Relaxed); + } +} + +impl std::fmt::Debug for Mailbox { + fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { + write!(f, "#", self.actor_id, self.mailbox_id) + } +} + +impl std::hash::Hash for Mailbox { + fn hash(&self, state: &mut H) { + self.mailbox_id.hash(state) + } +} + +impl Eq for Mailbox {} +impl PartialEq for Mailbox { + fn eq(&self, other: &Mailbox) -> bool { + self.mailbox_id == other.mailbox_id + } +} + +impl Ord for Mailbox { + fn cmp(&self, other: &Mailbox) -> std::cmp::Ordering { + return self.mailbox_id.cmp(&other.mailbox_id) + } +} + +impl PartialOrd for Mailbox { + fn partial_cmp(&self, other: &Mailbox) -> Option { + return Some(self.cmp(&other)) + } +} + +impl Clone for Mailbox { + fn clone(&self) -> Self { + let Mailbox { actor_id, tx, queue_depth, mailbox_count, .. } = self; + mailbox_count.fetch_add(1, Ordering::SeqCst); + Mailbox { + actor_id: *actor_id, + mailbox_id: crate::next_mailbox_id(), + tx: tx.clone(), + queue_depth: Arc::clone(queue_depth), + mailbox_count: Arc::clone(mailbox_count), + } + } +} + +impl Drop for Mailbox { + fn drop(&mut self) { + if self.mailbox_count.fetch_sub(1, Ordering::SeqCst) == 1 { + let _ = self.tx.send(SystemMessage::Release); + () + } + } +} + +impl Actor { + pub fn new() -> Self { + let (tx, rx) = unbounded_channel(); + Actor { + template_mailbox: Mailbox { + actor_id: crate::next_actor_id(), + mailbox_id: crate::next_mailbox_id(), + tx, + queue_depth: Arc::new(AtomicUsize::new(0)), + mailbox_count: Arc::new(AtomicUsize::new(0)), + }, + rx, + outbound_assertions: Map::new(), + oid_map: Map::new(), + next_task_id: 0, + linked_tasks: Map::new(), + } + } + + pub fn id(&self) -> ActorId { + self.template_mailbox.actor_id + } + + pub fn create(&mut self, e: E) -> Arc { + let r = Ref { + relay: self.template_mailbox.clone(), + target: crate::next_oid(), + }; + self.oid_map.insert(r.target.clone(), Cell::new(Box::new(e))); + Arc::new(r) + } + + pub fn boot + Send + 'static>( + mut self, + name: tracing::Span, + boot: F, + ) -> ActorHandle { + tokio::spawn(async move { + trace!("start"); + let run_future = self.run(boot); + let result = run_future.await; + match &result { + Ok(()) => trace!("normal stop"), + Err(e) => error!("{}", e), + } + result + }.instrument(name)) + } + + pub fn start(self, name: tracing::Span) -> ActorHandle { + self.boot(name, ready(Ok(()))) + } + + async fn run>(&mut self, boot: F) -> ActorResult { + boot.await?; + loop { + match self.rx.recv().await { + None => + Err(Error { + message: "Unexpected channel close".to_owned(), + detail: _Any::new(false), + })?, + Some(m) => { + if self.handle(m)? { + return Ok(()); + } + // We would have a loop calling try_recv until it answers "no more at + // present" here, to avoid decrementing queue_depth for every message + // (instead zeroing it on queue empty - it only needs to be approximate), + // but try_recv has been removed from mpsc at the time of writing. See + // https://github.com/tokio-rs/tokio/issues/3350 . + self.template_mailbox.queue_depth.fetch_sub(1, Ordering::Relaxed); + } + } + } + } + + fn handle(&mut self, m: SystemMessage) -> Result { + match m { + SystemMessage::Release => + Ok(true), + SystemMessage::ReleaseOid(oid) => { + self.oid_map.remove(&oid); + Ok(false) + } + SystemMessage::Turn(Turn(events)) => { + for TurnEvent { oid, event } in events.into_iter() { + if let Some(e) = self.oid_map.get_mut(&oid) { + let mut t = Activation::for_actor_details(&mut self.outbound_assertions); + let e = e.get_mut(); + match event { + Event::Assert(b) => { + let Assert { assertion: Assertion(assertion), handle } = *b; + e.assert(&mut t, assertion, handle)?; + } + Event::Retract(b) => { + let Retract { handle } = *b; + e.retract(&mut t, handle)?; + } + Event::Message(b) => { + let Message { body: Assertion(body) } = *b; + e.message(&mut t, body)?; + } + Event::Sync(b) => { + let Sync { peer } = *b; + e.sync(&mut t, peer)?; + } + } + } + } + Ok(false) + } + SystemMessage::Crash(e) => + Err(e)? + } + } + + pub fn linked_task + Send + 'static>( + &mut self, + name: tracing::Span, + boot: F, + ) { + let mailbox = self.template_mailbox.clone(); + let token = CancellationToken::new(); + let task_id = self.next_task_id; + self.next_task_id += 1; + { + let token = token.clone(); + tokio::spawn(async move { + trace!("linked task start"); + select! { + _ = token.cancelled() => (), + result = boot => match result { + Ok(()) => trace!("linked task normal stop"), + Err(e) => { + error!("linked task error: {}", e); + let _ = mailbox.tx.send(SystemMessage::Crash(e)); + () + } + } + } + }.instrument(name)); + } + self.linked_tasks.insert(task_id, token); + } +} + +impl Drop for Actor { + fn drop(&mut self) { + for (_task_id, token) in std::mem::take(&mut self.linked_tasks).into_iter() { + token.cancel(); + } + + let to_clear = std::mem::take(&mut self.outbound_assertions); + let mut t = Activation::for_actor(self); + for (handle, r) in to_clear.into_iter() { + t.retract_known_ref(r, handle); + } + } +} + +impl Drop for Ref { + fn drop(&mut self) { + let _ = self.relay.tx.send(SystemMessage::ReleaseOid(self.target.clone())); + () + } +} + +impl Domain for Ref { + fn from_preserves(v: IOValue) -> IOResult { + panic!("aiee") + } + fn as_preserves(&self) -> IOValue { + panic!("aiee") + } +} + +impl Domain for super::schemas::sturdy::WireRef { + fn from_preserves(v: IOValue) -> IOResult { + panic!("aiee") + } + fn as_preserves(&self) -> IOValue { + panic!("aiee") + } +} diff --git a/src/bin/syndicate-server.rs b/src/bin/syndicate-server.rs index 930c5b1..c5d2759 100644 --- a/src/bin/syndicate-server.rs +++ b/src/bin/syndicate-server.rs @@ -1,133 +1,33 @@ -use syndicate::{config, spaces, packets, ConnId}; -use syndicate::peer::Peer; - -use std::sync::{Mutex, Arc}; use futures::{SinkExt, StreamExt}; -use tracing::{Level, error, info, trace}; -use tracing_futures::Instrument; +use preserves::value::PackedReader; +use preserves::value::PackedWriter; +use preserves::value::Reader; +use preserves::value::Writer; + +use std::convert::TryFrom; +use std::future::Ready; +use std::future::ready; +use std::sync::Arc; + +use structopt::StructOpt; // for from_args in main + +use syndicate::actor::*; +use syndicate::dataspace::*; +use syndicate::error::Error; +use syndicate::error::error; +use syndicate::config; +use syndicate::packets; +use syndicate::peer::Peer; use tokio::net::TcpListener; use tokio::net::TcpStream; use tokio_util::codec::Framed; +use tracing::{Level, info, trace}; + use tungstenite::Message; -use structopt::StructOpt; // for from_args in main - -type UnitAsyncResult = Result<(), std::io::Error>; - -fn message_error(e: E) -> packets::Error { - packets::Error::Message(e.to_string()) -} - -fn encode_message(p: packets::S2C) -> - Result -{ - let mut bs = Vec::with_capacity(128); - preserves::ser::to_writer(&mut preserves::value::PackedWriter::new(&mut bs), &p)?; - Ok(Message::Binary(bs)) -} - -fn message_encoder(p: packets::S2C) -> futures::future::Ready> -{ - futures::future::ready(encode_message(p)) -} - -async fn message_decoder(r: Result) -> Option> -{ - match r { - Ok(ref m) => match m { - Message::Text(_) => - Some(Err(preserves::error::syntax_error("Text websocket frames are not accepted"))), - Message::Binary(ref bs) => - match preserves::de::from_bytes(bs) { - Ok(p) => Some(Ok(p)), - Err(e) => Some(Err(e.into())), - }, - Message::Ping(_) => - None, // pings are handled by tungstenite before we see them - Message::Pong(_) => - None, // unsolicited pongs are to be ignored - Message::Close(_) => - Some(Err(preserves::error::eof())), - } - Err(tungstenite::Error::Io(e)) => - Some(Err(e.into())), - Err(e) => - Some(Err(message_error(e))), - } -} - -async fn run_connection(connid: ConnId, - mut stream: TcpStream, - spaces: Arc>, - addr: std::net::SocketAddr, - config: config::ServerConfigRef) -> - UnitAsyncResult -{ - let mut buf = [0; 1]; // peek at the first byte to see what kind of connection to expect - match stream.peek(&mut buf).await? { - 1 => match buf[0] { - 71 /* ASCII 'G' for "GET" */ => { - info!(protocol = display("websocket"), peer = debug(addr)); - let s = tokio_tungstenite::accept_async(stream).await - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; - let (o, i) = s.split(); - let i = i.filter_map(message_decoder); - let o = o.sink_map_err(message_error).with(message_encoder); - let mut p = Peer::new(connid, i, o); - p.run(spaces, &config).await? - }, - _ => { - info!(protocol = display("raw"), peer = debug(addr)); - let (o, i) = Framed::new(stream, packets::Codec::new()).split(); - let mut p = Peer::new(connid, i, o); - p.run(spaces, &config).await? - } - } - 0 => return Err(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, - "closed before starting")), - _ => unreachable!() - } - Ok(()) -} - -static NEXT_ID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1); - -async fn run_listener(spaces: Arc>, port: u16, config: config::ServerConfigRef) -> - UnitAsyncResult -{ - let mut listener = TcpListener::bind(format!("0.0.0.0:{}", port)).await?; - loop { - let (stream, addr) = listener.accept().await?; - let id = NEXT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - let spaces = Arc::clone(&spaces); - let config = Arc::clone(&config); - if let Some(n) = config.recv_buffer_size { stream.set_recv_buffer_size(n)?; } - if let Some(n) = config.send_buffer_size { stream.set_send_buffer_size(n)?; } - tokio::spawn(async move { - match run_connection(id, stream, spaces, addr, config).await { - Ok(()) => info!("closed"), - Err(e) => info!(error = display(e), "closed"), - } - }.instrument(tracing::info_span!("connection", id))); - } -} - -async fn periodic_tasks(spaces: Arc>) -> UnitAsyncResult { - let interval = core::time::Duration::from_secs(10); - let mut delay = tokio::time::interval(interval); - loop { - delay.next().await.unwrap(); - { - let mut spaces = spaces.lock().unwrap(); - spaces.cleanup(); - spaces.dump_stats(interval); - } - } -} - #[tokio::main] async fn main() -> Result<(), Box> { let filter = tracing_subscriber::filter::EnvFilter::from_default_env() @@ -175,30 +75,112 @@ async fn main() -> Result<(), Box> { let config = Arc::new(config::ServerConfig::from_args()); - let spaces = Arc::new(Mutex::new(spaces::Spaces::new())); let mut daemons = Vec::new(); - { - let spaces = Arc::clone(&spaces); - tokio::spawn(async move { - periodic_tasks(spaces).await - }); - } - trace!("startup"); + let ds = { + let ac = Actor::new(); + let ds = ac.create(Dataspace::new()); + daemons.push(ac.start(tracing::info_span!("dataspace"))); + ds + }; + for port in config.ports.clone() { - let spaces = Arc::clone(&spaces); + let ds = Arc::clone(&ds); let config = Arc::clone(&config); - daemons.push(tokio::spawn(async move { - info!(port, "listening"); - match run_listener(spaces, port, config).await { - Ok(()) => (), - Err(e) => error!("{}", e), - } - }.instrument(tracing::info_span!("listener", port)))); + let ac = Actor::new(); + ac.linked_task(tracing::info_span!("listener", port), run_listener(ds, port, config)); } futures::future::join_all(daemons).await; Ok(()) } + +//--------------------------------------------------------------------------- + +fn message_error(e: E) -> Error { + error(&e.to_string(), false) +} + +fn encode_message(p: packets::Packet) -> Result { + let mut bs = Vec::with_capacity(128); + PackedWriter::new(&mut bs).write(&(&p).into())?; + Ok(Message::Binary(bs)) +} + +fn message_encoder(p: packets::Packet) -> Ready> +{ + ready(encode_message(p)) +} + +fn message_decoder_inner( + r: Result, +) -> Result, Error> { + match r { + Ok(m) => match m { + Message::Text(_) => + Err("Text websocket frames are not accepted")?, + Message::Binary(bs) => { + let iov = PackedReader::decode_bytes(&bs).demand_next(false)?; + let p = packets::Packet::try_from(&iov)?; + Ok(Some(p)) + } + Message::Ping(_) => + Ok(None), // pings are handled by tungstenite before we see them + Message::Pong(_) => + Ok(None), // unsolicited pongs are to be ignored + Message::Close(_) => + Err("EOF")?, + }, + Err(e) => Err(message_error(e)), + } +} + +fn message_decoder(r: Result) -> Ready>> { + ready(message_decoder_inner(r).transpose()) +} + +async fn run_connection( + mut stream: TcpStream, + ds: Arc, + addr: std::net::SocketAddr, + config: Arc, +) -> ActorResult { + let mut buf = [0; 1]; // peek at the first byte to see what kind of connection to expect + match stream.peek(&mut buf).await? { + 1 => match buf[0] { + 71 /* ASCII 'G' for "GET" */ => { + info!(protocol = display("websocket"), peer = debug(addr)); + let s = tokio_tungstenite::accept_async(stream).await + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + let (o, i) = s.split(); + let i = i.filter_map(message_decoder); + let o = o.sink_map_err(message_error).with(message_encoder); + let mut p = Peer::new(i, o, ds, config); + p.run().await? + }, + _ => { + info!(protocol = display("raw"), peer = debug(addr)); + let (o, i) = Framed::new(stream, packets::Codec).split(); + let mut p = Peer::new(i, o, ds, config); + p.run().await? + } + } + 0 => Err(error("closed before starting", false))?, + _ => unreachable!() + } + Ok(()) +} + +async fn run_listener(ds: Arc, port: u16, config: Arc) -> ActorResult { + let listener = TcpListener::bind(format!("0.0.0.0:{}", port)).await?; + loop { + let (stream, addr) = listener.accept().await?; + let mut ac = Actor::new(); + let ds = Arc::clone(&ds); + let config = Arc::clone(&config); + ac.linked_task(tracing::info_span!("connection", id = (ac.id())), + run_connection(stream, ds, addr, config)); + } +} diff --git a/src/config.rs b/src/config.rs index 192488b..b76f6d9 100644 --- a/src/config.rs +++ b/src/config.rs @@ -5,15 +5,8 @@ pub struct ServerConfig { #[structopt(short = "p", long = "port", default_value = "8001")] pub ports: Vec, - #[structopt(long)] - pub recv_buffer_size: Option, - #[structopt(long)] - pub send_buffer_size: Option, - #[structopt(long, default_value = "10000")] pub overload_threshold: usize, #[structopt(long, default_value = "5")] pub overload_turn_limit: usize, } - -pub type ServerConfigRef = std::sync::Arc; diff --git a/src/dataspace.rs b/src/dataspace.rs index 516ea69..0f370e9 100644 --- a/src/dataspace.rs +++ b/src/dataspace.rs @@ -1,36 +1,21 @@ -use super::V; -use super::ConnId; -use super::packets::{self, Assertion, EndpointName}; +use super::Assertion; +use super::Handle; use super::skeleton; +use super::actor::*; +use super::schemas::dataspace::*; -use preserves::value::{self, Map, NestedValue}; -use std::sync::{Arc, RwLock, atomic::{AtomicUsize, Ordering}}; -use tokio::sync::mpsc::UnboundedSender; +use preserves::value::Map; -pub type DataspaceRef = Arc>; -pub type DataspaceError = (String, V); - -#[derive(Debug)] -struct Actor { - tx: UnboundedSender, - queue_depth: Arc, - endpoints: Map, -} - -#[derive(Debug)] -struct ActorEndpoint { - analysis_results: Option, - assertion: Assertion, -} +use std::convert::TryFrom; #[derive(Debug)] pub struct Churn { - pub peers_added: usize, - pub peers_removed: usize, pub assertions_added: usize, pub assertions_removed: usize, pub endpoints_added: usize, pub endpoints_removed: usize, + pub observers_added: usize, + pub observers_removed: usize, pub messages_injected: usize, pub messages_delivered: usize, } @@ -38,167 +23,38 @@ pub struct Churn { impl Churn { pub fn new() -> Self { Self { - peers_added: 0, - peers_removed: 0, assertions_added: 0, assertions_removed: 0, endpoints_added: 0, endpoints_removed: 0, + observers_added: 0, + observers_removed: 0, messages_injected: 0, messages_delivered: 0, } } pub fn reset(&mut self) { - self.peers_added = 0; - self.peers_removed = 0; - self.assertions_added = 0; - self.assertions_removed = 0; - self.endpoints_added = 0; - self.endpoints_removed = 0; - self.messages_injected = 0; - self.messages_delivered = 0; + *self = Churn::new() } } #[derive(Debug)] pub struct Dataspace { - name: V, - peers: Map, - index: skeleton::Index, + pub index: skeleton::Index, + pub handle_map: Map)>, pub churn: Churn, } impl Dataspace { - pub fn new(name: &V) -> Self { + pub fn new() -> Self { Self { - name: name.clone(), - peers: Map::new(), index: skeleton::Index::new(), + handle_map: Map::new(), churn: Churn::new(), } } - pub fn new_ref(name: &V) -> DataspaceRef { - Arc::new(RwLock::new(Self::new(name))) - } - - pub fn register(&mut self, id: ConnId, - tx: UnboundedSender, - queue_depth: Arc) - { - assert!(!self.peers.contains_key(&id)); - self.peers.insert(id, Actor { - tx, - queue_depth, - endpoints: Map::new(), - }); - self.churn.peers_added += 1; - } - - pub fn deregister(&mut self, id: ConnId) { - let ac = self.peers.remove(&id).unwrap(); - self.churn.peers_removed += 1; - let mut outbound_turns: Map> = Map::new(); - for (epname, ep) in ac.endpoints { - self.remove_endpoint(&mut outbound_turns, id, &epname, ep); - } - outbound_turns.remove(&id); - self.deliver_outbound_turns(outbound_turns); - } - - fn remove_endpoint(&mut self, - mut outbound_turns: &mut Map>, - id: ConnId, - epname: &EndpointName, - ep: ActorEndpoint) - { - let ActorEndpoint{ analysis_results, assertion } = ep; - if let Some(ar) = analysis_results { - self.index.remove_endpoint(&ar, skeleton::Endpoint { - connection: id, - name: epname.clone(), - }); - } - let old_assertions = self.index.assertion_count(); - self.index.remove((&assertion).into(), &mut outbound_turns); - self.churn.assertions_removed += old_assertions - self.index.assertion_count(); - self.churn.endpoints_removed += 1; - } - - pub fn turn(&mut self, id: ConnId, actions: Vec) -> - Result<(), DataspaceError> - { - let mut outbound_turns: Map> = Map::new(); - for a in actions { - tracing::trace!(action = debug(&a), "turn"); - match a { - packets::Action::Assert(ref epname, ref assertion) => { - let ac = self.peers.get_mut(&id).unwrap(); - if ac.endpoints.contains_key(&epname) { - return Err(("Duplicate endpoint name".to_string(), value::to_value(a))); - } - - let ar = - if let Some(fs) = assertion.value().as_simple_record("observe", Some(1)) { - let ar = skeleton::analyze(&fs[0]); - let events = self.index.add_endpoint(&ar, skeleton::Endpoint { - connection: id, - name: epname.clone(), - }); - outbound_turns.entry(id).or_insert_with(Vec::new).extend(events); - Some(ar) - } else { - None - }; - - let old_assertions = self.index.assertion_count(); - self.index.insert(assertion.into(), &mut outbound_turns); - self.churn.assertions_added += self.index.assertion_count() - old_assertions; - self.churn.endpoints_added += 1; - - ac.endpoints.insert(epname.clone(), ActorEndpoint { - analysis_results: ar, - assertion: assertion.clone() - }); - } - packets::Action::Clear(ref epname) => { - let ac = self.peers.get_mut(&id).unwrap(); - match ac.endpoints.remove(epname) { - None => { - return Err(("Nonexistent endpoint name".to_string(), value::to_value(a))); - } - Some(ep) => { - self.remove_endpoint(&mut outbound_turns, id, epname, ep); - outbound_turns.entry(id).or_insert_with(Vec::new) - .push(packets::Event::End(epname.clone())); - } - } - } - packets::Action::Message(ref assertion) => { - self.index.send(assertion.into(), - &mut outbound_turns, - &mut self.churn.messages_delivered); - self.churn.messages_injected += 1; - } - } - } - self.deliver_outbound_turns(outbound_turns); - Ok(()) - } - - fn deliver_outbound_turns(&mut self, outbound_turns: Map>) { - for (target, events) in outbound_turns { - let actor = self.peers.get_mut(&target).unwrap(); - let _ = actor.tx.send(packets::S2C::Turn(events)); - actor.queue_depth.fetch_add(1, Ordering::Relaxed); - } - } - - pub fn peer_count(&self) -> usize { - self.peers.len() - } - pub fn assertion_count(&self) -> usize { self.index.assertion_count() } @@ -206,4 +62,49 @@ impl Dataspace { pub fn endpoint_count(&self) -> isize { self.index.endpoint_count() } + + pub fn observer_count(&self) -> usize { + self.index.observer_count() + } +} + +impl Entity for Dataspace { + fn assert(&mut self, t: &mut Activation, a: Assertion, h: Handle) -> ActorResult { + tracing::trace!(action = debug(&a), "assert"); + + let old_assertions = self.index.assertion_count(); + self.index.insert(t, &a); + self.churn.assertions_added += self.index.assertion_count() - old_assertions; + self.churn.endpoints_added += 1; + + if let Ok(o) = Observe::try_from(&a) { + self.index.add_observer(t, &o.pattern, &o.observer); + self.churn.observers_added += 1; + self.handle_map.insert(h, (a, Some(o))); + } else { + self.handle_map.insert(h, (a, None)); + } + Ok(()) + } + + fn retract(&mut self, t: &mut Activation, h: Handle) -> ActorResult { + if let Some((a, maybe_o)) = self.handle_map.remove(&h) { + if let Some(o) = maybe_o { + self.index.remove_observer(o.pattern, &o.observer); + self.churn.observers_removed += 1; + } + + let old_assertions = self.index.assertion_count(); + self.index.remove(t, &a); + self.churn.assertions_removed += old_assertions - self.index.assertion_count(); + self.churn.endpoints_removed += 1; + } + Ok(()) + } + + fn message(&mut self, t: &mut Activation, m: Assertion) -> ActorResult { + self.index.send(t, &m, &mut self.churn.messages_delivered); + self.churn.messages_injected += 1; + Ok(()) + } } diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..038b870 --- /dev/null +++ b/src/error.rs @@ -0,0 +1,41 @@ +pub use super::schemas::internal_protocol::_Any; +pub use super::schemas::internal_protocol::_Ptr; +pub use super::schemas::internal_protocol::Error; + +use preserves::value::NestedValue; +use preserves::value::Value; +use preserves_schema::support::ParseError; + +impl std::error::Error for Error {} + +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { + write!(f, "Error: {}; detail: {:?}", self.message, self.detail) + } +} + +pub fn error(message: &str, detail: Detail) -> Error where Value<_Any, _Ptr>: From { + Error { + message: message.to_owned(), + detail: _Any::new(detail), + } +} + +impl From<&str> for Error { + fn from(v: &str) -> Self { + error(v, false) + } +} + +impl From for Error { + fn from(v: std::io::Error) -> Self { + error(&format!("{}", v), false) + } +} + +impl From for Error { + fn from(v: ParseError) -> Self { + error(&format!("{}", v), false) + } +} + diff --git a/src/lib.rs b/src/lib.rs index 1932afc..5b363f3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,30 +1,45 @@ #![recursion_limit="512"] -pub mod bag; -pub mod config; -pub mod dataspace; -pub mod packets; -pub mod peer; -pub mod skeleton; -pub mod spaces; - pub use preserves::value; -// use std::sync::atomic::{AtomicUsize, Ordering}; -// -// #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] -// pub enum Syndicate { -// Placeholder(usize), -// } -// -// impl value::Domain for Syndicate {} -// -// static NEXT_PLACEHOLDER: AtomicUsize = AtomicUsize::new(0); -// impl Syndicate { -// pub fn new_placeholder() -> Self { -// Self::Placeholder(NEXT_PLACEHOLDER.fetch_add(1, Ordering::SeqCst)) -// } -// } +pub use schemas::internal_protocol::Handle; +pub use schemas::internal_protocol::Oid; -pub type ConnId = u64; -pub type V = value::IOValue; // value::ArcValue; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; + +pub mod actor; +pub mod bag; +pub mod config; +pub mod dataspace; +pub mod error; +pub mod schemas; +pub mod packets; +pub mod pattern; +pub mod peer; +pub mod skeleton; + +pub type Assertion = schemas::dataspace::_Any; + +pub type ActorId = u64; +static NEXT_ACTOR_ID: AtomicU64 = AtomicU64::new(0); +pub fn next_actor_id() -> ActorId { + NEXT_ACTOR_ID.fetch_add(1, Ordering::Relaxed) +} + +static NEXT_OID: AtomicU64 = AtomicU64::new(0); +pub fn next_oid() -> Oid { + Oid(value::signed_integer::SignedInteger::from( + NEXT_OID.fetch_add(1, Ordering::Relaxed) as u128)) +} + +static NEXT_HANDLE: AtomicU64 = AtomicU64::new(0); +pub fn next_handle() -> Handle { + Handle(value::signed_integer::SignedInteger::from( + NEXT_HANDLE.fetch_add(1, Ordering::Relaxed) as u128)) +} + +static NEXT_MAILBOX_ID: AtomicU64 = AtomicU64::new(0); +pub fn next_mailbox_id() -> u64 { + NEXT_MAILBOX_ID.fetch_add(1, Ordering::Relaxed) +} diff --git a/src/packets.rs b/src/packets.rs index 3321811..1200f01 100644 --- a/src/packets.rs +++ b/src/packets.rs @@ -1,91 +1,35 @@ -use super::V; +pub use crate::schemas::internal_protocol::*; -use bytes::{Buf, buf::BufMutExt, BytesMut}; -use std::sync::Arc; -use std::marker::PhantomData; +use bytes::{Buf, BufMut, BytesMut}; -use preserves::{ - de::Deserializer, - error, - ser::to_writer, - value::{PackedReader, PackedWriter}, -}; +use std::convert::TryFrom; -pub type EndpointName = V; -pub type Assertion = V; -pub type Captures = Arc>; +use preserves::value::PackedReader; +use preserves::value::PackedWriter; +use preserves::value::Reader; +use preserves::value::Writer; -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub enum Action { - Assert(EndpointName, Assertion), - Clear(EndpointName), - Message(Assertion), -} +pub struct Codec; -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub enum Event { - Add(EndpointName, Captures), - Del(EndpointName, Captures), - Msg(EndpointName, Captures), - End(EndpointName), -} - -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub enum C2S { - Connect(V), - Turn(Vec), - Ping(), - Pong(), -} - -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub enum S2C { - Err(String, V), - Turn(Vec), - Ping(), - Pong(), -} - -//--------------------------------------------------------------------------- - -pub type Error = error::Error; - -pub struct Codec { - ph_in: PhantomData, - ph_out: PhantomData, -} - -pub type ServerCodec = Codec; -pub type ClientCodec = Codec; - -impl Codec { - pub fn new() -> Self { - Codec { ph_in: PhantomData, ph_out: PhantomData } - } -} - -impl tokio_util::codec::Decoder for Codec { - type Item = InT; +impl tokio_util::codec::Decoder for Codec { + type Item = Packet; type Error = Error; fn decode(&mut self, bs: &mut BytesMut) -> Result, Self::Error> { let mut r = PackedReader::decode_bytes(bs); - let mut d = Deserializer::from_reader(&mut r); - match Self::Item::deserialize(&mut d) { - Err(e) if error::is_eof_error(&e) => Ok(None), - Err(e) => Err(e), - Ok(item) => { - let count = d.read.source.index; + match r.next(false)? { + None => Ok(None), + Some(item) => { + let count = r.source.index; bs.advance(count); - Ok(Some(item)) + Ok(Some(Packet::try_from(&item)?)) } } } } -impl tokio_util::codec::Encoder for Codec -{ +impl tokio_util::codec::Encoder<&Packet> for Codec { type Error = Error; - fn encode(&mut self, item: OutT, bs: &mut BytesMut) -> Result<(), Self::Error> { - to_writer(&mut PackedWriter::new(&mut bs.writer()), &item) + fn encode(&mut self, item: &Packet, bs: &mut BytesMut) -> Result<(), Self::Error> { + Ok(PackedWriter::new(&mut bs.writer()).write(&item.into())?) } } diff --git a/src/pattern.rs b/src/pattern.rs new file mode 100644 index 0000000..61f5da7 --- /dev/null +++ b/src/pattern.rs @@ -0,0 +1,81 @@ +use crate::Assertion; +use crate::schemas::dataspace_patterns::*; + +use preserves::value::NestedValue; + +use std::convert::TryFrom; + +#[derive(Debug, Clone, PartialOrd, Ord, PartialEq, Eq)] +pub enum PathStep { + Index(usize), + Key(Assertion), +} + +pub type Path = Vec; +pub type Paths = Vec; + +struct Analyzer { + pub const_paths: Paths, + pub const_values: Vec<_Any>, + pub capture_paths: Paths, +} + +pub struct PatternAnalysis { + pub const_paths: Paths, + pub const_values: _Any, + pub capture_paths: Paths, +} + +impl PatternAnalysis { + pub fn new(p: &Pattern) -> Self { + let mut analyzer = Analyzer { + const_paths: Vec::new(), + const_values: Vec::new(), + capture_paths: Vec::new(), + }; + analyzer.walk(&mut Vec::new(), p); + PatternAnalysis { + const_paths: analyzer.const_paths, + const_values: _Any::new(analyzer.const_values), + capture_paths: analyzer.capture_paths, + } + } +} + +impl Analyzer { + fn walk_step(&mut self, path: &mut Path, s: PathStep, p: &Pattern) { + path.push(s); + self.walk(path, p); + path.pop(); + } + + fn walk(&mut self, path: &mut Path, p: &Pattern) { + match p { + Pattern::DCompound(b) => match &**b { + DCompound::Rec { members, .. } | + DCompound::Arr { members, .. } => { + for (i, p) in members { + self.walk_step(path, PathStep::Index(usize::try_from(i).unwrap_or(0)), p); + } + } + DCompound::Dict { members, .. } => { + for (k, p) in members { + self.walk_step(path, PathStep::Key(k.clone()), p); + } + } + } + Pattern::DBind(b) => { + let DBind { pattern, .. } = &**b; + self.capture_paths.push(path.clone()); + self.walk(path, pattern) + } + Pattern::DDiscard(_) => + (), + Pattern::DLit(b) => { + let DLit { value } = &**b; + self.const_paths.push(path.clone()); + self.const_values.push(value.clone()); + } + } + } +} diff --git a/src/peer.rs b/src/peer.rs index b19c705..9191215 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -1,68 +1,40 @@ -use super::V; -use super::ConnId; -use super::dataspace; -use super::packets; -use super::spaces; -use super::config; - -use core::time::Duration; -use futures::{Sink, SinkExt, Stream}; use futures::FutureExt; +use futures::StreamExt; use futures::select; -use preserves::value; -use std::pin::Pin; -use std::sync::{Mutex, Arc, atomic::{AtomicUsize, Ordering}}; -use tokio::stream::StreamExt; -use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver, error::TryRecvError}; -use tokio::time::interval; +use futures::{Sink, SinkExt, Stream}; -pub type ResultC2S = Result; +use preserves::value; + +use std::pin::Pin; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use super::actor::*; +use super::config; +use super::error::Error; +use super::error::error; +use super::packets; pub struct Peer -where I: Stream + Send, - O: Sink, +where I: Stream> + Send, + O: Sink, { - id: ConnId, - tx: UnboundedSender, - rx: UnboundedReceiver, i: Pin>, o: Pin>, - space: Option, -} - -fn err(s: &str, ctx: V) -> packets::S2C { - packets::S2C::Err(s.into(), ctx) + ds: Arc, + config: Arc, } impl Peer -where I: Stream + Send, - O: Sink, +where I: Stream> + Send, + O: Sink, { - pub fn new(id: ConnId, i: I, o: O) -> Self { - let (tx, rx) = unbounded_channel(); - Peer{ id, tx, rx, i: Box::pin(i), o: Box::pin(o), space: None } + pub fn new(i: I, o: O, ds: Arc, config: Arc) -> Self { + Peer{ i: Box::pin(i), o: Box::pin(o), ds, config } } - pub async fn run(&mut self, spaces: Arc>, config: &config::ServerConfig) -> - Result<(), packets::Error> - { - let firstpacket = self.i.next().await; - let dsname = if let Some(Ok(packets::C2S::Connect(dsname))) = firstpacket { - dsname - } else { - let e = format!("Expected initial Connect, got {:?}", firstpacket); - self.o.send(err(&e, value::FALSE.clone())).await?; - return Err(preserves::error::syntax_error(&e)) - }; - - self.space = Some(spaces.lock().unwrap().lookup(&dsname)); + pub async fn run(mut self) -> Result<(), packets::Error> { let queue_depth = Arc::new(AtomicUsize::new(0)); - self.space.as_ref().unwrap().write().unwrap().register( - self.id, - self.tx.clone(), - Arc::clone(&queue_depth)); - - let mut ping_timer = interval(Duration::from_secs(60)); let mut running = true; let mut overloaded = None; @@ -71,12 +43,11 @@ where I: Stream + Send, let mut to_send = Vec::new(); let queue_depth_sample = queue_depth.load(Ordering::Relaxed); - if queue_depth_sample > config.overload_threshold { + if queue_depth_sample > self.config.overload_threshold { let n = overloaded.unwrap_or(0); tracing::warn!(turns=n, queue_depth=queue_depth_sample, "overloaded"); - if n == config.overload_turn_limit { - to_send.push(err("Overloaded", - value::Value::from(queue_depth_sample as u64).wrap())); + if n == self.config.overload_turn_limit { + to_send.push(error("Overloaded", queue_depth_sample as u128)); running = false; } else { if queue_depth_sample > previous_sample.unwrap_or(0) { @@ -94,50 +65,22 @@ where I: Stream + Send, previous_sample = Some(queue_depth_sample); select! { - _instant = ping_timer.next().boxed().fuse() => to_send.push(packets::S2C::Ping()), frame = self.i.next().fuse() => match frame { Some(res) => match res { Ok(p) => { tracing::trace!(packet = debug(&p), "input"); match p { - packets::C2S::Turn(actions) => { - match self.space.as_ref().unwrap().write().unwrap() - .turn(self.id, actions) - { - Ok(()) => (), - Err((msg, ctx)) => { - to_send.push(err(&msg, ctx)); - running = false; - } - } + packets::Packet::Turn(b) => { + let packets::Turn(actions) = &*b; + /* ... */ } - packets::C2S::Ping() => - to_send.push(packets::S2C::Pong()), - packets::C2S::Pong() => - (), - packets::C2S::Connect(_) => { - to_send.push(err("Unexpected Connect", value::to_value(p))); - running = false; + packets::Packet::Error(b) => { + let e = &*b; + /* ... */ } } } - Err(e) if preserves::error::is_eof_error(&e) => { - tracing::trace!("eof"); - running = false; - } - Err(e) if preserves::error::is_syntax_error(&e) => { - to_send.push(err(&e.to_string(), value::FALSE.clone())); - running = false; - } - Err(e) => { - if preserves::error::is_io_error(&e) { - return Err(e); - } else { - to_send.push(err(&format!("Packet deserialization error: {}", e), - value::FALSE.clone())); - running = false; - } - } + Err(e) => return Err(e), } None => { tracing::trace!("remote has closed"); @@ -167,7 +110,7 @@ where I: Stream + Send, } if !ok { /* weird. */ - to_send.push(err("Outbound channel closed unexpectedly", value::FALSE.clone())); + to_send.push(error("Outbound channel closed unexpectedly", value::FALSE.clone())); running = false; } }, @@ -185,14 +128,3 @@ where I: Stream + Send, Ok(()) } } - -impl Drop for Peer -where I: Stream + Send, - O: Sink, -{ - fn drop(&mut self) { - if let Some(ref s) = self.space { - s.write().unwrap().deregister(self.id); - } - } -} diff --git a/src/skeleton.rs b/src/skeleton.rs index 7c15aac..d776a1e 100644 --- a/src/skeleton.rs +++ b/src/skeleton.rs @@ -1,118 +1,99 @@ -use super::ConnId; +use super::Assertion; use super::bag; -use super::packets::Assertion; -use super::packets::Captures; -use super::packets::EndpointName; -use super::packets::Event; use preserves::value::{Map, Set, Value, NestedValue}; -use std::cmp::Ordering; use std::collections::btree_map::Entry; +use std::convert::TryFrom; +use std::convert::TryInto; use std::sync::Arc; +use crate::actor::Activation; +use crate::actor::Ref; +use crate::schemas::internal_protocol::Handle; +use crate::schemas::dataspace_patterns as ds; +use crate::pattern::{self, PathStep, Path, Paths}; + type Bag = bag::BTreeBag; -pub type Path = Vec; -pub type Paths = Vec; -pub type Events = Vec; -pub type TurnMap = Map; +type Captures = Assertion; #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] -pub struct Endpoint { - pub connection: ConnId, - pub name: EndpointName, -} - -#[derive(Debug)] -pub enum Skeleton { - Blank, - Guarded(Guard, Vec) -} - -#[derive(Debug)] -pub struct AnalysisResults { - pub skeleton: Skeleton, - pub const_paths: Paths, - pub const_vals: Captures, - pub capture_paths: Paths, - pub assertion: Assertion, +pub enum Guard { + Rec(Assertion, usize), + Seq(usize), + Map, } #[derive(Debug)] pub struct Index { - all_assertions: Bag, + all_assertions: Bag, + observer_count: usize, root: Node, } +#[derive(Debug)] +struct Node { + continuation: Continuation, + edges: Map>, +} + +#[derive(Debug)] +struct Continuation { + cached_assertions: Set, + leaf_map: Map>, +} + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] +struct Selector { + pop_count: usize, + step: PathStep, +} + +#[derive(Debug)] +struct Leaf { // aka Topic + cached_assertions: Set, + endpoints_map: Map, +} + +#[derive(Debug)] +struct Endpoints { + cached_captures: Bag, + endpoints: Map, Map>, +} + +//--------------------------------------------------------------------------- + impl Index { pub fn new() -> Self { - Index{ all_assertions: Bag::new(), root: Node::new(Continuation::new(Set::new())) } - } - - pub fn add_endpoint(&mut self, analysis_results: &AnalysisResults, endpoint: Endpoint) -> Events - { - let continuation = self.root.extend(&analysis_results.skeleton); - let continuation_cached_assertions = &continuation.cached_assertions; - let const_val_map = - continuation.leaf_map.entry(analysis_results.const_paths.clone()).or_insert_with(|| { - let mut cvm = Map::new(); - for a in continuation_cached_assertions { - let key = project_paths(a.unscope(), &analysis_results.const_paths); - cvm.entry(key).or_insert_with(Leaf::new).cached_assertions.insert(a.clone()); - } - cvm - }); - let capture_paths = &analysis_results.capture_paths; - let leaf = const_val_map.entry(analysis_results.const_vals.clone()).or_insert_with(Leaf::new); - let leaf_cached_assertions = &leaf.cached_assertions; - let endpoints = leaf.endpoints_map.entry(capture_paths.clone()).or_insert_with(|| { - let mut b = Bag::new(); - for a in leaf_cached_assertions { - let (restriction_paths, term) = a.unpack(); - if is_unrestricted(&capture_paths, restriction_paths) { - let captures = project_paths(term, &capture_paths); - *b.entry(captures).or_insert(0) += 1; - } - } - Endpoints::new(b) - }); - let endpoint_name = endpoint.name.clone(); - endpoints.endpoints.insert(endpoint); - endpoints.cached_captures.into_iter() - .map(|(cs,_)| Event::Add(endpoint_name.clone(), cs.clone())) - .collect() - } - - pub fn remove_endpoint(&mut self, analysis_results: &AnalysisResults, endpoint: Endpoint) { - let continuation = self.root.extend(&analysis_results.skeleton); - if let Entry::Occupied(mut const_val_map_entry) - = continuation.leaf_map.entry(analysis_results.const_paths.clone()) - { - let const_val_map = const_val_map_entry.get_mut(); - if let Entry::Occupied(mut leaf_entry) - = const_val_map.entry(analysis_results.const_vals.clone()) - { - let leaf = leaf_entry.get_mut(); - if let Entry::Occupied(mut endpoints_entry) - = leaf.endpoints_map.entry(analysis_results.capture_paths.clone()) - { - let endpoints = endpoints_entry.get_mut(); - endpoints.endpoints.remove(&endpoint); - if endpoints.endpoints.is_empty() { - endpoints_entry.remove_entry(); - } - } - if leaf.is_empty() { - leaf_entry.remove_entry(); - } - } - if const_val_map.is_empty() { - const_val_map_entry.remove_entry(); - } + Index { + all_assertions: Bag::new(), + observer_count: 0, + root: Node::new(Continuation::new(Set::new())), } } - pub fn insert(&mut self, outer_value: CachedAssertion, outputs: &mut TurnMap) { + pub fn add_observer( + &mut self, + t: &mut Activation, + pat: &ds::Pattern, + observer: &Arc, + ) { + let analysis = pattern::PatternAnalysis::new(pat); + self.root.extend(pat).add_observer(t, &analysis, observer); + self.observer_count += 1; + } + + pub fn remove_observer( + &mut self, + pat: ds::Pattern, + observer: &Arc, + ) { + let analysis = pattern::PatternAnalysis::new(&pat); + self.root.extend(&pat).remove_observer(analysis, observer); + self.observer_count -= 1; + } + + pub fn insert(&mut self, t: &mut Activation, outer_value: &Assertion) { let net = self.all_assertions.change(outer_value.clone(), 1); match net { bag::Net::AbsentToPresent => { @@ -123,9 +104,8 @@ impl Index { |l, v| { l.cached_assertions.insert(v.clone()); }, |es, cs| { if es.cached_captures.change(cs.clone(), 1) == bag::Net::AbsentToPresent { - for ep in &es.endpoints { - outputs.entry(ep.connection).or_insert_with(Vec::new) - .push(Event::Add(ep.name.clone(), cs.clone())) + for (observer, capture_map) in &mut es.endpoints { + capture_map.insert(cs.clone(), t.assert(observer.clone(), cs.clone())); } } }) @@ -136,7 +116,7 @@ impl Index { } } - pub fn remove(&mut self, outer_value: CachedAssertion, outputs: &mut TurnMap) { + pub fn remove(&mut self, t: &mut Activation, outer_value: &Assertion) { let net = self.all_assertions.change(outer_value.clone(), -1); match net { bag::Net::PresentToAbsent => { @@ -147,9 +127,10 @@ impl Index { |l, v| { l.cached_assertions.remove(v); }, |es, cs| { if es.cached_captures.change(cs.clone(), -1) == bag::Net::PresentToAbsent { - for ep in &es.endpoints { - outputs.entry(ep.connection).or_insert_with(Vec::new) - .push(Event::Del(ep.name.clone(), cs.clone())) + for capture_map in es.endpoints.values_mut() { + if let Some(h) = capture_map.remove(&cs) { + t.retract(h); + } } } }) @@ -160,11 +141,7 @@ impl Index { } } - pub fn send(&mut self, - outer_value: CachedAssertion, - outputs: &mut TurnMap, - delivery_count: &mut usize) - { + pub fn send(&mut self, t: &mut Activation, outer_value: &Assertion, delivery_count: &mut usize) { Modification::new( false, &outer_value, @@ -172,9 +149,8 @@ impl Index { |_l, _v| (), |es, cs| { *delivery_count += es.endpoints.len(); - for ep in &es.endpoints { - outputs.entry(ep.connection).or_insert_with(Vec::new) - .push(Event::Msg(ep.name.clone(), cs.clone())) + for observer in es.endpoints.keys() { + t.message(observer.clone(), cs.clone()); } }).perform(&mut self.root); } @@ -186,12 +162,10 @@ impl Index { pub fn endpoint_count(&self) -> isize { return self.all_assertions.total() } -} -#[derive(Debug)] -struct Node { - continuation: Continuation, - edges: Map>, + pub fn observer_count(&self) -> usize { + return self.observer_count + } } impl Node { @@ -199,38 +173,60 @@ impl Node { Node { continuation, edges: Map::new() } } - fn extend(&mut self, skeleton: &Skeleton) -> &mut Continuation { - let (_pop_count, final_node) = self.extend_walk(&mut Vec::new(), 0, 0, skeleton); + fn extend(&mut self, pat: &ds::Pattern) -> &mut Continuation { + let (_pop_count, final_node) = self.extend_walk(&mut Vec::new(), 0, PathStep::Index(0), pat); &mut final_node.continuation } - fn extend_walk(&mut self, path: &mut Path, pop_count: usize, index: usize, skeleton: &Skeleton) - -> (usize, &mut Node) { - match skeleton { - Skeleton::Blank => (pop_count, self), - Skeleton::Guarded(cls, kids) => { - let selector = Selector { pop_count, index }; - let continuation = &self.continuation; - let table = self.edges.entry(selector).or_insert_with(Map::new); - let mut next_node = table.entry(cls.clone()).or_insert_with(|| { - Self::new(Continuation::new( - continuation.cached_assertions.iter() - .filter(|a| { - Some(cls) == class_of(project_path(a.unscope(), path)).as_ref() }) - .cloned() - .collect())) - }); - let mut pop_count = 0; - for (index, kid) in kids.iter().enumerate() { - path.push(index); - let (pc, nn) = next_node.extend_walk(path, pop_count, index, kid); - pop_count = pc; - next_node = nn; - path.pop(); - } - (pop_count + 1, next_node) + fn extend_walk( + &mut self, + path: &mut Path, + pop_count: usize, + step: PathStep, + pat: &ds::Pattern, + ) -> (usize, &mut Node) { + let (guard, members): (Guard, Vec<(PathStep, &ds::Pattern)>) = match pat { + ds::Pattern::DCompound(b) => match &**b { + ds::DCompound::Arr { ctor, members } => + (Guard::Seq(usize::try_from(&ctor.arity).unwrap_or(0)), + members.iter().map(|(i, p)| (PathStep::Index(i.try_into().unwrap_or(0)), p)).collect()), + ds::DCompound::Rec { ctor, members } => + (Guard::Rec(ctor.label.clone(), usize::try_from(&ctor.arity).unwrap_or(0)), + members.iter().map(|(i, p)| (PathStep::Index(i.try_into().unwrap_or(0)), p)).collect()), + ds::DCompound::Dict { members, .. } => + (Guard::Map, + members.iter().map(|(k, p)| (PathStep::Key(k.clone()), p)).collect()), } + ds::Pattern::DBind(b) => { + let ds::DBind { pattern, .. } = &**b; + return self.extend_walk(path, pop_count, step, pattern); + } + ds::Pattern::DDiscard(_) | ds::Pattern::DLit(_) => + return (pop_count, self), + }; + + let selector = Selector { pop_count, step }; + let continuation = &self.continuation; + let table = self.edges.entry(selector).or_insert_with(Map::new); + let mut next_node = table.entry(guard.clone()).or_insert_with(|| { + Self::new(Continuation::new( + continuation.cached_assertions.iter() + .filter(|a| match project_path(a, path) { + Some(v) => Some(&guard) == class_of(v).as_ref(), + None => false, + }) + .cloned() + .collect())) + }); + let mut pop_count = 0; + for (step, kid) in members.into_iter() { + path.push(step.clone()); + let (pc, nn) = next_node.extend_walk(path, pop_count, step, kid); + pop_count = pc; + next_node = nn; + path.pop(); } + (pop_count + 1, next_node) } } @@ -257,35 +253,31 @@ impl<'a, T> Stack<'a, T> { } struct Modification<'op, FCont, FLeaf, FEndpoints> -where FCont: FnMut(&mut Continuation, &CachedAssertion) -> (), - FLeaf: FnMut(&mut Leaf, &CachedAssertion) -> (), +where FCont: FnMut(&mut Continuation, &Assertion) -> (), + FLeaf: FnMut(&mut Leaf, &Assertion) -> (), FEndpoints: FnMut(&mut Endpoints, Captures) -> () { create_leaf_if_absent: bool, - outer_value: &'op CachedAssertion, - restriction_paths: Option<&'op Paths>, - outer_value_term: &'op Assertion, + outer_value: &'op Assertion, m_cont: FCont, m_leaf: FLeaf, m_endpoints: FEndpoints, } impl<'op, FCont, FLeaf, FEndpoints> Modification<'op, FCont, FLeaf, FEndpoints> -where FCont: FnMut(&mut Continuation, &CachedAssertion) -> (), - FLeaf: FnMut(&mut Leaf, &CachedAssertion) -> (), +where FCont: FnMut(&mut Continuation, &Assertion) -> (), + FLeaf: FnMut(&mut Leaf, &Assertion) -> (), FEndpoints: FnMut(&mut Endpoints, Captures) -> () { fn new(create_leaf_if_absent: bool, - outer_value: &'op CachedAssertion, + outer_value: &'op Assertion, m_cont: FCont, m_leaf: FLeaf, - m_endpoints: FEndpoints) -> Self { - let (restriction_paths, outer_value_term) = outer_value.unpack(); + m_endpoints: FEndpoints, + ) -> Self { Modification { create_leaf_if_absent, outer_value, - restriction_paths, - outer_value_term, m_cont, m_leaf, m_endpoints, @@ -293,7 +285,7 @@ where FCont: FnMut(&mut Continuation, &CachedAssertion) -> (), } fn perform(&mut self, n: &mut Node) { - self.node(n, &Stack::Item(&Value::from(vec![self.outer_value_term.clone()]).wrap(), &Stack::Empty)) + self.node(n, &Stack::Item(&Value::from(vec![self.outer_value.clone()]).wrap(), &Stack::Empty)) } fn node(&mut self, n: &mut Node, term_stack: &Stack<&Assertion>) { @@ -301,10 +293,11 @@ where FCont: FnMut(&mut Continuation, &CachedAssertion) -> (), for (selector, table) in &mut n.edges { let mut next_stack = term_stack; for _ in 0..selector.pop_count { next_stack = next_stack.pop() } - let next_value = step(next_stack.top(), selector.index); - if let Some(next_class) = class_of(next_value) { - if let Some(next_node) = table.get_mut(&next_class) { - self.node(next_node, &Stack::Item(next_value, next_stack)) + if let Some(next_value) = step(next_stack.top(), &selector.step) { + if let Some(next_class) = class_of(next_value) { + if let Some(next_node) = table.get_mut(&next_class) { + self.node(next_node, &Stack::Item(next_value, next_stack)) + } } } } @@ -314,24 +307,24 @@ where FCont: FnMut(&mut Continuation, &CachedAssertion) -> (), (self.m_cont)(c, self.outer_value); let mut empty_const_paths = Vec::new(); for (const_paths, const_val_map) in &mut c.leaf_map { - let const_vals = project_paths(self.outer_value_term, const_paths); - let leaf_opt = if self.create_leaf_if_absent { - Some(const_val_map.entry(const_vals.clone()).or_insert_with(Leaf::new)) - } else { - const_val_map.get_mut(&const_vals) - }; - if let Some(leaf) = leaf_opt { - (self.m_leaf)(leaf, self.outer_value); - for (capture_paths, endpoints) in &mut leaf.endpoints_map { - if is_unrestricted(&capture_paths, self.restriction_paths) { - (self.m_endpoints)(endpoints, - project_paths(self.outer_value_term, &capture_paths)); + if let Some(const_vals) = project_paths(self.outer_value, const_paths) { + let leaf_opt = if self.create_leaf_if_absent { + Some(const_val_map.entry(const_vals.clone()).or_insert_with(Leaf::new)) + } else { + const_val_map.get_mut(&const_vals) + }; + if let Some(leaf) = leaf_opt { + (self.m_leaf)(leaf, self.outer_value); + for (capture_paths, endpoints) in &mut leaf.endpoints_map { + if let Some(cs) = project_paths(self.outer_value, &capture_paths) { + (self.m_endpoints)(endpoints, cs); + } } - } - if leaf.is_empty() { - const_val_map.remove(&const_vals); - if const_val_map.is_empty() { - empty_const_paths.push(const_paths.clone()); + if leaf.is_empty() { + const_val_map.remove(&const_vals); + if const_val_map.is_empty() { + empty_const_paths.push(const_paths.clone()); + } } } } @@ -344,71 +337,124 @@ where FCont: FnMut(&mut Continuation, &CachedAssertion) -> (), fn class_of(v: &Assertion) -> Option { match v.value() { - Value::Sequence(ref vs) => Some(Guard::Seq(vs.len())), - Value::Record(ref r) => Some(Guard::Rec(r.label().clone(), r.arity())), + Value::Sequence(vs) => Some(Guard::Seq(vs.len())), + Value::Record(r) => Some(Guard::Rec(r.label().clone(), r.arity())), + Value::Dictionary(_) => Some(Guard::Map), _ => None, } } -fn project_path<'a>(v: &'a Assertion, p: &Path) -> &'a Assertion { +fn project_path<'a>(v: &'a Assertion, p: &Path) -> Option<&'a Assertion> { let mut v = v; for i in p { - v = step(v, *i); + match step(v, i) { + Some(w) => v = w, + None => return None, + } } - v + Some(v) } -fn project_paths<'a>(v: &'a Assertion, ps: &Paths) -> Captures { - Arc::new(ps.iter().map(|p| project_path(v, p)).cloned().collect()) -} - -fn step(v: &Assertion, i: usize) -> &Assertion { - match v.value() { - Value::Sequence(ref vs) => &vs[i], - Value::Record(ref r) => &r.fields()[i], - _ => panic!("step: non-sequence, non-record {:?}", v) +fn project_paths<'a>(v: &'a Assertion, ps: &Paths) -> Option { + let mut vs = Vec::new(); + for p in ps { + match project_path(v, p) { + Some(c) => vs.push(c.clone()), + None => return None, + } } + Some(Captures::new(vs)) } -#[derive(Debug)] -struct Continuation { - cached_assertions: Set, - leaf_map: Map>, +fn step<'a>(v: &'a Assertion, s: &PathStep) -> Option<&'a Assertion> { + match (v.value(), s) { + (Value::Sequence(vs), PathStep::Index(i)) => + if *i < vs.len() { Some(&vs[*i]) } else { None }, + (Value::Record(r), PathStep::Index(i)) => + if *i < r.arity() { Some(&r.fields()[*i]) } else { None }, + (Value::Dictionary(m), PathStep::Key(k)) => + m.get(k), + _ => + None, + } } impl Continuation { - fn new(cached_assertions: Set) -> Self { + fn new(cached_assertions: Set) -> Self { Continuation { cached_assertions, leaf_map: Map::new() } } -} -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] -struct Selector { - pop_count: usize, - index: usize, -} + pub fn add_observer( + &mut self, + t: &mut Activation, + analysis: &pattern::PatternAnalysis, + observer: &Arc, + ) { + let cached_assertions = &self.cached_assertions; + let const_val_map = + self.leaf_map.entry(analysis.const_paths.clone()).or_insert_with({ + || { + let mut cvm = Map::new(); + for a in cached_assertions { + if let Some(key) = project_paths(a, &analysis.const_paths) { + cvm.entry(key).or_insert_with(Leaf::new) + .cached_assertions.insert(a.clone()); + } + } + cvm + } + }); + let leaf = const_val_map.entry(analysis.const_values.clone()).or_insert_with(Leaf::new); + let leaf_cached_assertions = &leaf.cached_assertions; + let endpoints = leaf.endpoints_map.entry(analysis.capture_paths.clone()).or_insert_with(|| { + let mut b = Bag::new(); + for term in leaf_cached_assertions { + if let Some(captures) = project_paths(term, &analysis.capture_paths) { + *b.entry(captures).or_insert(0) += 1; + } + } + Endpoints { cached_captures: b, endpoints: Map::new() } + }); + let mut capture_map = Map::new(); + for cs in endpoints.cached_captures.keys() { + capture_map.insert(cs.clone(), t.assert(observer.clone(), cs.clone())); + } + endpoints.endpoints.insert(observer.clone(), capture_map); + } -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] -pub enum Guard { - Rec(Assertion, usize), - Seq(usize), -} - -impl Guard { - fn arity(&self) -> usize { - match self { - Guard::Rec(_, s) => *s, - Guard::Seq(s) => *s + pub fn remove_observer( + &mut self, + analysis: pattern::PatternAnalysis, + observer: &Arc, + ) { + if let Entry::Occupied(mut const_val_map_entry) + = self.leaf_map.entry(analysis.const_paths) + { + let const_val_map = const_val_map_entry.get_mut(); + if let Entry::Occupied(mut leaf_entry) + = const_val_map.entry(analysis.const_values) + { + let leaf = leaf_entry.get_mut(); + if let Entry::Occupied(mut endpoints_entry) + = leaf.endpoints_map.entry(analysis.capture_paths) + { + let endpoints = endpoints_entry.get_mut(); + endpoints.endpoints.remove(observer); + if endpoints.endpoints.is_empty() { + endpoints_entry.remove_entry(); + } + } + if leaf.is_empty() { + leaf_entry.remove_entry(); + } + } + if const_val_map.is_empty() { + const_val_map_entry.remove_entry(); + } } } } -#[derive(Debug)] -struct Leaf { // aka Topic - cached_assertions: Set, - endpoints_map: Map, -} - impl Leaf { fn new() -> Self { Leaf { cached_assertions: Set::new(), endpoints_map: Map::new() } @@ -418,192 +464,3 @@ impl Leaf { self.cached_assertions.is_empty() && self.endpoints_map.is_empty() } } - -#[derive(Debug)] -struct Endpoints { - cached_captures: Bag, - endpoints: Set, -} - -impl Endpoints { - fn new(cached_captures: Bag) -> Self { - Endpoints { cached_captures, endpoints: Set::new() } - } -} - -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] -pub enum CachedAssertion { - VisibilityRestricted(Paths, Assertion), - Unrestricted(Assertion), -} - -impl From<&Assertion> for CachedAssertion { - fn from(a: &Assertion) -> Self { - CachedAssertion::Unrestricted(a.clone()) - } -} - -impl CachedAssertion { - fn unscope(&self) -> &Assertion { - match self { - CachedAssertion::VisibilityRestricted(_, a) => a, - CachedAssertion::Unrestricted(a) => a, - } - } - - fn unpack(&self) -> (Option<&Paths>, &Assertion) { - match self { - CachedAssertion::VisibilityRestricted(ps, a) => (Some(ps), a), - CachedAssertion::Unrestricted(a) => (None, a), - } - } -} - -fn is_unrestricted(capture_paths: &Paths, restriction_paths: Option<&Paths>) -> bool { - // We are "unrestricted" if Set(capture_paths) ⊆ Set(restriction_paths). Since both - // variables really hold lists, we operate with awareness of the order the lists are - // built here. We know that the lists are built in fringe order; that is, they are - // sorted wrt `pathCmp`. - match restriction_paths { - None => true, // not visibility-restricted in the first place - Some(rpaths) => { - let mut rpi = rpaths.iter(); - 'outer: for c in capture_paths { - 'inner: loop { - match rpi.next() { - None => { - // there's at least one capture_paths entry (`c`) that does - // not appear in restriction_paths, so we are restricted - return false; - } - Some(r) => match c.cmp(r) { - Ordering::Less => { - // `c` is less than `r`, but restriction_paths is sorted, - // so `c` does not appear in restriction_paths, and we are - // thus restricted. - return false; - } - Ordering::Equal => { - // `c` is equal to `r`, so we may yet be unrestricted. - // Discard both `c` and `r` and continue. - continue 'outer; - } - Ordering::Greater => { - // `c` is greater than `r`, but capture_paths and - // restriction_paths are sorted, so while we might yet - // come to an `r` that is equal to `c`, we will never find - // another `c` that is less than this `c`. Discard this - // `r` then, keeping the `c`, and compare against the next - // `r`. - continue 'inner; - } - } - } - } - } - // We went all the way through capture_paths without finding any `c` not in - // restriction_paths. - true - } - } -} - -pub struct Analyzer { - const_paths: Paths, - const_vals: Vec, - capture_paths: Paths, - path: Path, -} - -impl Analyzer { - fn walk(&mut self, mut a: &Assertion) -> Skeleton { - while let Some(fields) = a.value().as_simple_record("capture", Some(1)) { - self.capture_paths.push(self.path.clone()); - a = &fields[0]; - } - - if a.value().is_simple_record("discard", Some(0)) { - Skeleton::Blank - } else { - match class_of(a) { - Some(cls) => { - let arity = cls.arity(); - Skeleton::Guarded(cls, - (0..arity).map(|i| { - self.path.push(i); - let s = self.walk(step(a, i)); - self.path.pop(); - s - }).collect()) - } - None => { - self.const_paths.push(self.path.clone()); - self.const_vals.push(a.clone()); - Skeleton::Blank - } - } - } - } -} - -pub fn analyze(a: &Assertion) -> AnalysisResults { - let mut z = Analyzer { - const_paths: Vec::new(), - const_vals: Vec::new(), - capture_paths: Vec::new(), - path: Vec::new(), - }; - let skeleton = z.walk(a); - AnalysisResults { - skeleton, - const_paths: z.const_paths, - const_vals: Arc::new(z.const_vals), - capture_paths: z.capture_paths, - assertion: a.clone(), - } -} - -// pub fn instantiate_assertion(a: &Assertion, cs: Captures) -> CachedAssertion { -// let mut capture_paths = Vec::new(); -// let mut path = Vec::new(); -// let mut vs: Vec = (*cs).clone(); -// vs.reverse(); -// let instantiated = instantiate_assertion_walk(&mut capture_paths, &mut path, &mut vs, a); -// CachedAssertion::VisibilityRestricted(capture_paths, instantiated) -// } - -// fn instantiate_assertion_walk(capture_paths: &mut Paths, -// path: &mut Path, -// vs: &mut Vec, -// a: &Assertion) -> Assertion { -// if let Some(fields) = a.value().as_simple_record("capture", Some(1)) { -// capture_paths.push(path.clone()); -// let v = vs.pop().unwrap(); -// instantiate_assertion_walk(capture_paths, path, vs, &fields[0]); -// v -// } else if a.value().is_simple_record("discard", Some(0)) { -// Value::Domain(Syndicate::new_placeholder()).wrap() -// } else { -// let f = |(i, aa)| { -// path.push(i); -// let vv = instantiate_assertion_walk(capture_paths, -// path, -// vs, -// aa); -// path.pop(); -// vv -// }; -// match class_of(a) { -// Some(Guard::Seq(_)) => -// Value::from(Vec::from_iter(a.value().as_sequence().unwrap() -// .iter().enumerate().map(f))) -// .wrap(), -// Some(Guard::Rec(l, fieldcount)) => -// Value::record(l, a.value().as_record(Some(fieldcount)).unwrap().1 -// .iter().enumerate().map(f).collect()) -// .wrap(), -// None => -// a.clone(), -// } -// } -// } diff --git a/src/spaces.rs b/src/spaces.rs deleted file mode 100644 index 527aca3..0000000 --- a/src/spaces.rs +++ /dev/null @@ -1,54 +0,0 @@ -use super::V; -use super::dataspace; - -use std::sync::Arc; - -use tracing::{info, debug}; - -use preserves::value::Map; - -pub struct Spaces { - index: Map, -} - -impl Spaces { - pub fn new() -> Self { - Self { index: Map::new() } - } - - pub fn lookup(&mut self, name: &V) -> dataspace::DataspaceRef { - let (is_new, space) = match self.index.get(name) { - Some(s) => (false, s.clone()), - None => { - let s = dataspace::Dataspace::new_ref(name); - self.index.insert(name.clone(), s.clone()); - (true, s) - } - }; - - debug!(name = debug(name), - action = display(if is_new { "created" } else { "accessed" })); - - space - } - - pub fn cleanup(&mut self) { - self.index = self.index.iter() - .filter(|s| s.1.read().unwrap().peer_count() > 0) - .map(|(k,v)| (k.clone(), Arc::clone(v))) - .collect(); - } - - pub fn dump_stats(&self, delta: core::time::Duration) { - for (dsname, dsref) in &self.index { - let mut ds = dsref.write().unwrap(); - info!(name = debug(dsname), - connections = display(format!("{} (+{}/-{})", ds.peer_count(), ds.churn.peers_added, ds.churn.peers_removed)), - assertions = display(format!("{} (+{}/-{})", ds.assertion_count(), ds.churn.assertions_added, ds.churn.assertions_removed)), - endpoints = display(format!("{} (+{}/-{})", ds.endpoint_count(), ds.churn.endpoints_added, ds.churn.endpoints_removed)), - msg_in_rate = display(ds.churn.messages_injected as f32 / delta.as_secs() as f32), - msg_out_rate = display(ds.churn.messages_delivered as f32 / delta.as_secs() as f32)); - ds.churn.reset(); - } - } -}