diff --git a/Cargo.lock b/Cargo.lock index 0698291..2d72e85 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -198,15 +198,6 @@ dependencies = [ "digest 0.10.7", ] -[[package]] -name = "block-buffer" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4" -dependencies = [ - "generic-array", -] - [[package]] name = "block-buffer" version = "0.10.4" @@ -385,16 +376,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" -[[package]] -name = "core-foundation" -version = "0.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "194a7a9e6de53fa55116934067c844d9d749312f75c6f6d0980e8c252f8c2146" -dependencies = [ - "core-foundation-sys", - "libc", -] - [[package]] name = "core-foundation-sys" version = "0.8.4" @@ -532,6 +513,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "data-encoding" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308" + [[package]] name = "digest" version = "0.9.0" @@ -547,7 +534,7 @@ version = "0.10.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ - "block-buffer 0.10.4", + "block-buffer", "crypto-common", "subtle", ] @@ -564,22 +551,6 @@ version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" -[[package]] -name = "errno" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c18ee0ed65a5f1f81cac6b1d213b69c35fa47d4252ad41f1486dbd8226fe36e" -dependencies = [ - "libc", - "windows-sys", -] - -[[package]] -name = "fastrand" -version = "2.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" - [[package]] name = "filetime" version = "0.2.22" @@ -856,12 +827,64 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-body" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" +dependencies = [ + "bytes", + "http", + "pin-project-lite", +] + [[package]] name = "httparse" version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" +[[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + +[[package]] +name = "hyper" +version = "0.14.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffb1cfd654a8219eaef89881fdb3bb3b1cdc5fa75ded05d6933b2b382e395468" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "tokio", + "tower-service", + "tracing", + "want", +] + +[[package]] +name = "hyper-tungstenite" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cc7dcb1ab67cd336f468a12491765672e61a3b6b148634dbfe2fe8acd3fe7d9" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-tungstenite", + "tungstenite", +] + [[package]] name = "iana-time-zone" version = "0.1.58" @@ -915,15 +938,6 @@ dependencies = [ "libc", ] -[[package]] -name = "input_buffer" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f97967975f448f1a7ddb12b0bc41069d09ed6a1c161a92687e057325db35d413" -dependencies = [ - "bytes", -] - [[package]] name = "instant" version = "0.1.12" @@ -994,12 +1008,6 @@ version = "0.2.150" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c" -[[package]] -name = "linux-raw-sys" -version = "0.4.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "969488b55f8ac402214f3f5fd243ebb7206cf82de60d3172994707a4bcc2b829" - [[package]] name = "lock_api" version = "0.4.11" @@ -1103,24 +1111,6 @@ dependencies = [ "ws2_32-sys", ] -[[package]] -name = "native-tls" -version = "0.2.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" -dependencies = [ - "lazy_static", - "libc", - "log", - "openssl", - "openssl-probe", - "openssl-sys", - "schannel", - "security-framework", - "security-framework-sys", - "tempfile", -] - [[package]] name = "net2" version = "0.2.39" @@ -1314,12 +1304,6 @@ dependencies = [ "syn 2.0.39", ] -[[package]] -name = "openssl-probe" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" - [[package]] name = "openssl-src" version = "300.1.6+3.1.4" @@ -1350,7 +1334,17 @@ checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" dependencies = [ "instant", "lock_api", - "parking_lot_core", + "parking_lot_core 0.8.6", +] + +[[package]] +name = "parking_lot" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +dependencies = [ + "lock_api", + "parking_lot_core 0.9.9", ] [[package]] @@ -1367,6 +1361,19 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "parking_lot_core" +version = "0.9.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" +dependencies = [ + "cfg-if 1.0.0", + "libc", + "redox_syscall 0.4.1", + "smallvec", + "windows-targets", +] + [[package]] name = "percent-encoding" version = "2.3.0" @@ -1676,19 +1683,6 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" -[[package]] -name = "rustix" -version = "0.38.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b426b0506e5d50a7d8dafcf2e81471400deb602392c7dd110815afb4eaf02a3" -dependencies = [ - "bitflags 2.4.1", - "errno", - "libc", - "linux-raw-sys", - "windows-sys", -] - [[package]] name = "ryu" version = "1.0.15" @@ -1704,44 +1698,12 @@ dependencies = [ "winapi-util", ] -[[package]] -name = "schannel" -version = "0.1.22" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c3733bf4cf7ea0880754e19cb5a462007c4a8c1914bff372ccc95b464f1df88" -dependencies = [ - "windows-sys", -] - [[package]] name = "scopeguard" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" -[[package]] -name = "security-framework" -version = "2.9.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05b64fb303737d99b81884b2c63433e9ae28abebe5eb5045dcdd175dc2ecf4de" -dependencies = [ - "bitflags 1.3.2", - "core-foundation", - "core-foundation-sys", - "libc", - "security-framework-sys", -] - -[[package]] -name = "security-framework-sys" -version = "2.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e932934257d3b408ed8f30db49d85ea163bfe74961f017f405b025af298f0c7a" -dependencies = [ - "core-foundation-sys", - "libc", -] - [[package]] name = "serde" version = "1.0.192" @@ -1793,16 +1755,14 @@ dependencies = [ ] [[package]] -name = "sha-1" -version = "0.9.8" +name = "sha1" +version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99cd6713db3cf16b6c84e06321e049a9b9f699826e16096d23bbcc44d15d51a6" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" dependencies = [ - "block-buffer 0.9.0", "cfg-if 1.0.0", "cpufeatures", - "digest 0.9.0", - "opaque-debug", + "digest 0.10.7", ] [[package]] @@ -1935,7 +1895,7 @@ dependencies = [ "hmac", "lazy_static", "openssl", - "parking_lot", + "parking_lot 0.11.2", "preserves", "preserves-schema", "tokio", @@ -1971,10 +1931,13 @@ version = "0.34.1" dependencies = [ "chrono", "futures", + "hyper", + "hyper-tungstenite", "lazy_static", "noise-protocol", "noise-rust-crypto", "notify", + "parking_lot 0.12.1", "preserves-schema", "structopt", "syndicate", @@ -1982,12 +1945,11 @@ dependencies = [ "syndicate-schema-plugin", "tikv-jemallocator", "tokio", - "tokio-tungstenite", + "tokio-stream", "tokio-util", "tracing", "tracing-futures", "tracing-subscriber", - "tungstenite", ] [[package]] @@ -2002,19 +1964,6 @@ dependencies = [ "syndicate", ] -[[package]] -name = "tempfile" -version = "3.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ef1adac450ad7f4b3c28589471ade84f25f731a7a0fe30d71dfa9f60fd808e5" -dependencies = [ - "cfg-if 1.0.0", - "fastrand", - "redox_syscall 0.4.1", - "rustix", - "windows-sys", -] - [[package]] name = "textwrap" version = "0.11.0" @@ -2129,14 +2078,24 @@ dependencies = [ ] [[package]] -name = "tokio-tungstenite" -version = "0.14.0" +name = "tokio-stream" +version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e96bb520beab540ab664bd5a9cfeaa1fcd846fa68c830b42e2c8963071251d2" +checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-tungstenite" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c" dependencies = [ "futures-util", "log", - "pin-project", "tokio", "tungstenite", ] @@ -2155,6 +2114,12 @@ dependencies = [ "tokio", ] +[[package]] +name = "tower-service" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" + [[package]] name = "tracing" version = "0.1.40" @@ -2241,21 +2206,25 @@ dependencies = [ ] [[package]] -name = "tungstenite" -version = "0.13.0" +name = "try-lock" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fe8dada8c1a3aeca77d6b51a4f1314e0f4b8e438b7b1b71e3ddaca8080e4093" +checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" + +[[package]] +name = "tungstenite" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e3dac10fd62eaf6617d3a904ae222845979aec67c615d1c842b4002c7666fb9" dependencies = [ - "base64", "byteorder", "bytes", + "data-encoding", "http", "httparse", - "input_buffer", "log", - "native-tls", "rand", - "sha-1", + "sha1", "thiserror", "url", "utf-8", @@ -2367,6 +2336,15 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "want" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e" +dependencies = [ + "try-lock", +] + [[package]] name = "wasi" version = "0.9.0+wasi-snapshot-preview1" diff --git a/syndicate-server/Cargo.toml b/syndicate-server/Cargo.toml index a763ae9..8d83d30 100644 --- a/syndicate-server/Cargo.toml +++ b/syndicate-server/Cargo.toml @@ -32,15 +32,16 @@ structopt = "0.3" tikv-jemallocator = { version = "0.5.0", optional = true } -tungstenite = "0.13" -tokio-tungstenite = "0.14" - tokio = { version = "1.10", features = ["io-std", "time", "process"] } tokio-util = "0.6" +tokio-stream = "0.1" tracing = "0.1" tracing-subscriber = "0.2" tracing-futures = "0.2" +hyper = { version = "0.14.27", features = ["server", "http1", "stream"] } +hyper-tungstenite = "0.11.1" +parking_lot = "0.12.1" [package.metadata.workspaces] independent = true diff --git a/syndicate-server/protocols/schema-bundle.bin b/syndicate-server/protocols/schema-bundle.bin index 81d8226..640b836 100644 --- a/syndicate-server/protocols/schema-bundle.bin +++ b/syndicate-server/protocols/schema-bundle.bin @@ -3,8 +3,11 @@ ByteString ProcessDir´³orµµ±present´³dict·³dir´³named³dir´³atom³String„„„„„µ±invalid´³dict·³dir´³named³dir³any„„„„µ±absent´³dict·„„„„„³ ProcessEnv´³orµµ±present´³dict·³env´³named³env´³dictof´³refµ„³ EnvVariable„´³refµ„³EnvValue„„„„„„µ±invalid´³dict·³env´³named³env³any„„„„µ±absent´³dict·„„„„„³ CommandLine´³orµµ±shell´³atom³String„„µ±full´³refµ„³FullCommandLine„„„„³ EnvVariable´³orµµ±string´³atom³String„„µ±symbol´³atom³Symbol„„µ±invalid³any„„„³ FullProcess´³andµ´³dict·³argv´³named³argv´³refµ„³ CommandLine„„„„´³named³env´³refµ„³ ProcessEnv„„´³named³dir´³refµ„³ -ProcessDir„„´³named³clearEnv´³refµ„³ClearEnv„„„„³ ReadyOnStart´³orµµ±present´³dict·³ readyOnStart´³named³ readyOnStart´³atom³Boolean„„„„„µ±invalid´³dict·³ readyOnStart´³named³ readyOnStart³any„„„„µ±absent´³dict·„„„„„³ RestartField´³orµµ±present´³dict·³restart´³named³restart´³refµ„³ RestartPolicy„„„„„µ±invalid´³dict·³restart´³named³restart³any„„„„µ±absent´³dict·„„„„„³ DaemonProcess´³rec´³lit³daemon„´³tupleµ´³named³id³any„´³named³config´³refµ„³DaemonProcessSpec„„„„„³ DaemonService´³rec´³lit³daemon„´³tupleµ´³named³id³any„„„„³ ProtocolField´³orµµ±present´³dict·³protocol´³named³protocol´³refµ„³Protocol„„„„„µ±invalid´³dict·³protocol´³named³protocol³any„„„„µ±absent´³dict·„„„„„³ RestartPolicy´³orµµ±always´³lit³always„„µ±onError´³lit³on-error„„µ±all´³lit³all„„µ±never´³lit³never„„„„³FullCommandLine´³ tuplePrefixµ´³named³program´³atom³String„„„´³named³args´³seqof´³atom³String„„„„³DaemonProcessSpec´³orµµ±simple´³refµ„³ CommandLine„„µ±oneShot´³rec´³lit³one-shot„´³tupleµ´³named³setup´³refµ„³ CommandLine„„„„„„µ±full´³refµ„³FullDaemonProcess„„„„³FullDaemonProcess´³andµ´³named³process´³refµ„³ FullProcess„„´³named³ readyOnStart´³refµ„³ ReadyOnStart„„´³named³restart´³refµ„³ RestartField„„´³named³protocol´³refµ„³ ProtocolField„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„µ³internalServices„´³schema·³version°³ definitions·³ ConfigEnv´³dictof´³atom³Symbol„³any„³ DebtReporter´³rec´³lit³ debt-reporter„´³tupleµ´³named³intervalSeconds´³atom³Double„„„„„³ ConfigWatcher´³rec´³lit³config-watcher„´³tupleµ´³named³path´³atom³String„„´³named³env´³refµ„³ ConfigEnv„„„„„³TcpRelayListener´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Tcp„„´³named³ +ProcessDir„„´³named³clearEnv´³refµ„³ClearEnv„„„„³ ReadyOnStart´³orµµ±present´³dict·³ readyOnStart´³named³ readyOnStart´³atom³Boolean„„„„„µ±invalid´³dict·³ readyOnStart´³named³ readyOnStart³any„„„„µ±absent´³dict·„„„„„³ RestartField´³orµµ±present´³dict·³restart´³named³restart´³refµ„³ RestartPolicy„„„„„µ±invalid´³dict·³restart´³named³restart³any„„„„µ±absent´³dict·„„„„„³ DaemonProcess´³rec´³lit³daemon„´³tupleµ´³named³id³any„´³named³config´³refµ„³DaemonProcessSpec„„„„„³ DaemonService´³rec´³lit³daemon„´³tupleµ´³named³id³any„„„„³ ProtocolField´³orµµ±present´³dict·³protocol´³named³protocol´³refµ„³Protocol„„„„„µ±invalid´³dict·³protocol´³named³protocol³any„„„„µ±absent´³dict·„„„„„³ RestartPolicy´³orµµ±always´³lit³always„„µ±onError´³lit³on-error„„µ±all´³lit³all„„µ±never´³lit³never„„„„³FullCommandLine´³ tuplePrefixµ´³named³program´³atom³String„„„´³named³args´³seqof´³atom³String„„„„³DaemonProcessSpec´³orµµ±simple´³refµ„³ CommandLine„„µ±oneShot´³rec´³lit³one-shot„´³tupleµ´³named³setup´³refµ„³ CommandLine„„„„„„µ±full´³refµ„³FullDaemonProcess„„„„³FullDaemonProcess´³andµ´³named³process´³refµ„³ FullProcess„„´³named³ readyOnStart´³refµ„³ ReadyOnStart„„´³named³restart´³refµ„³ RestartField„„´³named³protocol´³refµ„³ ProtocolField„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„µ³internalServices„´³schema·³version°³ definitions·³ ConfigEnv´³dictof´³atom³Symbol„³any„³ +HttpRouter´³rec´³lit³ http-router„´³tupleµ´³named³httpd´³embedded³any„„„„„³ TcpWithHttp´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Tcp„„´³named³ gatekeeper´³embedded´³refµ³ -gatekeeper„³Resolve„„„„„„³UnixRelayListener´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Unix„„´³named³ +gatekeeper„³Resolve„„„´³named³httpd´³embedded´³refµ³http„³ HttpContext„„„„„„³ DebtReporter´³rec´³lit³ debt-reporter„´³tupleµ´³named³intervalSeconds´³atom³Double„„„„„³ ConfigWatcher´³rec´³lit³config-watcher„´³tupleµ´³named³path´³atom³String„„´³named³env´³refµ„³ ConfigEnv„„„„„³TcpWithoutHttp´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Tcp„„´³named³ +gatekeeper´³embedded´³refµ³ +gatekeeper„³Resolve„„„„„„³TcpRelayListener´³orµµ±TcpWithoutHttp´³refµ„³TcpWithoutHttp„„µ± TcpWithHttp´³refµ„³ TcpWithHttp„„„„³UnixRelayListener´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Unix„„´³named³ gatekeeper´³embedded´³refµ³ gatekeeper„³Resolve„„„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„„„ \ No newline at end of file diff --git a/syndicate-server/protocols/schemas/internalServices.prs b/syndicate-server/protocols/schemas/internalServices.prs index d98cca8..189c23f 100644 --- a/syndicate-server/protocols/schemas/internalServices.prs +++ b/syndicate-server/protocols/schemas/internalServices.prs @@ -3,8 +3,13 @@ embeddedType EntityRef.Cap . DebtReporter = . -TcpRelayListener = . +TcpRelayListener = TcpWithoutHttp / TcpWithHttp . +TcpWithoutHttp = . +TcpWithHttp = . + UnixRelayListener = . ConfigWatcher = . ConfigEnv = { symbol: any ...:... }. + +HttpRouter = . diff --git a/syndicate-server/src/http.rs b/syndicate-server/src/http.rs new file mode 100644 index 0000000..940d4f3 --- /dev/null +++ b/syndicate-server/src/http.rs @@ -0,0 +1,185 @@ +use std::convert::TryInto; +use std::sync::Arc; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; + +use hyper::{Request, Response, Body, StatusCode}; +use hyper::body; +use hyper::header::HeaderName; +use hyper::header::HeaderValue; + +use syndicate::actor::*; +use syndicate::error::Error; +use syndicate::trace; +use syndicate::value::Map; +use syndicate::value::NestedValue; + +use syndicate::schemas::http; + +use tokio::sync::oneshot; +use tokio::sync::mpsc::{UnboundedSender, unbounded_channel}; +use tokio_stream::wrappers::UnboundedReceiverStream; + +use crate::language; + +static NEXT_SEQ: AtomicU64 = AtomicU64::new(0); + +pub fn empty_response(code: StatusCode) -> Response { + let mut r = Response::new(Body::empty()); + *r.status_mut() = code; + r +} + +type ChunkItem = Result>; + +enum ResponseCollector { + Pending { + tx: oneshot::Sender>, + body_tx: UnboundedSender, + res: Response, + }, + Done +} + +impl ResponseCollector { + fn new(tx: oneshot::Sender>) -> Self { + let (body_tx, body_rx) = unbounded_channel(); + let body_stream: Box + Send> = + Box::new(UnboundedReceiverStream::new(body_rx)); + ResponseCollector::Pending { + tx, + body_tx, + res: Response::new(body_stream.into()), + } + } + + fn with_res) -> ActorResult>(&mut self, f: F) -> ActorResult { + if let ResponseCollector::Pending { res, .. } = self { + f(res)?; + } + Ok(()) + } + + fn add_chunk(&mut self, value: http::Chunk) -> ActorResult { + if let ResponseCollector::Pending { body_tx, .. } = self { + body_tx.send(Ok(match value { + http::Chunk::Bytes(bs) => bs.into(), + http::Chunk::String(s) => s.as_bytes().to_vec().into(), + }))?; + } + Ok(()) + } + + fn finish(&mut self) -> ActorResult { + match std::mem::replace(self, ResponseCollector::Done) { + ResponseCollector::Pending { tx, res, .. } => { + let _ = tx.send(res); + } + ResponseCollector::Done => (), + } + Ok(()) + } +} + +impl Entity for ResponseCollector { + fn message(&mut self, _turn: &mut Activation, message: http::HttpResponse) -> ActorResult { + match message { + http::HttpResponse::Status { code, .. } => self.with_res(|r| { + *r.status_mut() = StatusCode::from_u16( + (&code).try_into().map_err(|_| "bad status code")?)?; + Ok(()) + }), + http::HttpResponse::Header { name, value } => self.with_res(|r| { + r.headers_mut().insert(HeaderName::from_bytes(name.as_bytes())?, + HeaderValue::from_str(value.as_str())?); + Ok(()) + }), + http::HttpResponse::Chunk { chunk } => self.add_chunk(*chunk), + http::HttpResponse::Done { chunk } => { + self.add_chunk(*chunk)?; + self.finish() + } + } + } +} + +pub async fn serve( + trace_collector: Option, + facet: FacetRef, + httpd: Arc, + mut req: Request, + port: u16, +) -> Result, Error> { + let host = match req.headers().get("host").and_then(|v| v.to_str().ok()) { + None => return Ok(empty_response(StatusCode::BAD_REQUEST)), + Some(h) => match h.rsplit_once(':') { + None => h.to_string(), + Some((h, _port)) => h.to_string(), + } + }; + + let uri = req.uri(); + let mut path: Vec = uri.path().split('/').map(|s| s.to_string()).collect(); + path.remove(0); + + let mut query: Map> = Map::new(); + for piece in uri.query().unwrap_or("").split('&').into_iter() { + match piece.split_once('=') { + Some((k, v)) => { + let key = k.to_string(); + let value = v.to_string(); + match query.get_mut(&key) { + None => { query.insert(key, vec![http::QueryValue::String(value)]); }, + Some(vs) => { vs.push(http::QueryValue::String(value)); }, + } + } + None => { + if piece.len() > 0 { + let key = piece.to_string(); + if !query.contains_key(&key) { + query.insert(key, vec![]); + } + } + } + } + } + + let mut headers: Map = Map::new(); + for h in req.headers().into_iter() { + match h.1.to_str() { + Ok(v) => { headers.insert(h.0.as_str().to_string().to_lowercase(), v.to_string()); }, + Err(_) => return Ok(empty_response(StatusCode::BAD_REQUEST)), + } + } + + let body = match body::to_bytes(req.body_mut()).await { + Ok(b) => http::RequestBody::Present(b.to_vec()), + Err(_) => return Ok(empty_response(StatusCode::BAD_REQUEST)), + }; + + let account = Account::new(Some(AnyValue::symbol("http")), trace_collector); + + let (tx, rx) = oneshot::channel(); + + facet.activate(&account, Some(trace::TurnCause::external("http")), move |t| { + let sreq = http::HttpRequest { + sequence_number: NEXT_SEQ.fetch_add(1, Ordering::Relaxed).into(), + host, + port: port.into(), + method: req.method().to_string().to_lowercase(), + path, + headers: http::Headers(headers), + query, + body, + }; + tracing::info!(?sreq); + let srep = Cap::guard(&language().syndicate, t.create(ResponseCollector::new(tx))); + httpd.assert(t, language(), &http::HttpContext { req: sreq, res: srep }); + Ok(()) + }); + + match rx.await { + Ok(response) => Ok(response), + Err(_ /* sender dropped */) => Ok(empty_response(StatusCode::INTERNAL_SERVER_ERROR)), + } +} diff --git a/syndicate-server/src/main.rs b/syndicate-server/src/main.rs index d8a514b..e1dfbf4 100644 --- a/syndicate-server/src/main.rs +++ b/syndicate-server/src/main.rs @@ -20,6 +20,7 @@ use syndicate::value::NestedValue; mod counter; mod dependencies; mod gatekeeper; +mod http; mod language; mod lifecycle; mod protocol; @@ -126,6 +127,7 @@ async fn main() -> ActorResult { services::config_watcher::on_demand(t, Arc::clone(&server_config_ds)); services::daemon::on_demand(t, Arc::clone(&server_config_ds), Arc::clone(&log_ds)); services::debt_reporter::on_demand(t, Arc::clone(&server_config_ds)); + services::http_router::on_demand(t, Arc::clone(&server_config_ds)); services::tcp_relay_listener::on_demand(t, Arc::clone(&server_config_ds)); services::unix_relay_listener::on_demand(t, Arc::clone(&server_config_ds)); @@ -139,12 +141,13 @@ async fn main() -> ActorResult { for port in config.ports.clone() { server_config_ds.assert(t, language(), &service::RunService { - service_name: language().unparse(&internal_services::TcpRelayListener { + service_name: language().unparse(&internal_services::TcpWithHttp { addr: transport_address::Tcp { host: "0.0.0.0".to_owned(), port: (port as i32).into(), }, gatekeeper: gatekeeper.clone(), + httpd: server_config_ds.clone(), }), }); } diff --git a/syndicate-server/src/protocol.rs b/syndicate-server/src/protocol.rs index 71add82..064fdc9 100644 --- a/syndicate-server/src/protocol.rs +++ b/syndicate-server/src/protocol.rs @@ -1,11 +1,14 @@ use futures::SinkExt; use futures::StreamExt; +use hyper::service::service_fn; use std::future::ready; -use std::io; use std::sync::Arc; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; use syndicate::actor::*; +use syndicate::enclose; use syndicate::error::Error; use syndicate::error::error; use syndicate::relay; @@ -14,7 +17,7 @@ use syndicate::value::NestedValue; use tokio::net::TcpStream; -use tungstenite::Message; +use hyper_tungstenite::tungstenite::Message; struct ExitListener; @@ -53,34 +56,69 @@ pub async fn detect_protocol( facet: FacetRef, stream: TcpStream, gateway: Arc, + httpd: Option>, addr: std::net::SocketAddr, + server_port: u16, ) -> ActorResult { - let (i, o) = { - let mut buf = [0; 1]; // peek at the first byte to see what kind of connection to expect - match stream.peek(&mut buf).await? { - 1 => match buf[0] { - b'G' /* ASCII 'G' for "GET" */ => { - tracing::info!(protocol = %"websocket", peer = ?addr); - let s = tokio_tungstenite::accept_async(stream).await - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - let (o, i) = s.split(); - let i = i.filter_map(|r| ready(extract_binary_packets(r).transpose())); - let o = o.sink_map_err(message_error).with(|bs| ready(Ok(Message::Binary(bs)))); - (relay::Input::Packets(Box::pin(i)), relay::Output::Packets(Box::pin(o))) - }, - _ => { - tracing::info!(protocol = %"raw", peer = ?addr); - let (i, o) = stream.into_split(); - (relay::Input::Bytes(Box::pin(i)), - relay::Output::Bytes(Box::pin(o /* BufWriter::new(o) */))) - } + let mut buf = [0; 1]; // peek at the first byte to see what kind of connection to expect + match stream.peek(&mut buf).await? { + 1 => match buf[0] { + v if v == b'[' /* Turn */ || v == b'<' /* Error and Extension */ || v >= 128 => { + tracing::info!(protocol = %"raw", peer = ?addr); + let (i, o) = stream.into_split(); + let i = relay::Input::Bytes(Box::pin(i)); + let o = relay::Output::Bytes(Box::pin(o /* BufWriter::new(o) */)); + run_connection(trace_collector, facet, i, o, gateway); + Ok(()) } - 0 => Err(error("closed before starting", AnyValue::new(false)))?, - _ => unreachable!() + _ => { + let upgraded = Arc::new(AtomicBool::new(false)); + let keepalive = facet.actor.keep_alive(); + let mut http = hyper::server::conn::Http::new(); + http.http1_keep_alive(true); + http.http1_only(true); + let service = service_fn(|mut req| enclose!( + (upgraded, keepalive, trace_collector, facet, gateway, httpd) async move { + if hyper_tungstenite::is_upgrade_request(&req) { + tracing::info!(protocol = %"websocket", ?req); + let (response, websocket) = hyper_tungstenite::upgrade(&mut req, None) + .map_err(|e| message_error(e))?; + upgraded.store(true, Ordering::SeqCst); + tokio::spawn(enclose!(() async move { + let (o, i) = websocket.await?.split(); + let i = i.filter_map(|r| ready(extract_binary_packets(r).transpose())); + let o = o.sink_map_err(message_error).with(|bs| ready(Ok(Message::Binary(bs)))); + let i = relay::Input::Packets(Box::pin(i)); + let o = relay::Output::Packets(Box::pin(o)); + run_connection(trace_collector, facet, i, o, gateway); + drop(keepalive); + Ok(()) as ActorResult + })); + Ok(response) + } else { + match httpd { + None => Ok(crate::http::empty_response( + hyper::StatusCode::SERVICE_UNAVAILABLE)), + Some(httpd) => { + tracing::info!(protocol = %"http", ?req); + crate::http::serve(trace_collector, facet, httpd, req, server_port).await + } + } + } + })); + http.serve_connection(stream, service).with_upgrades().await?; + if upgraded.load(Ordering::SeqCst) { + tracing::info!("serve_connection completed after upgrade to websocket"); + } else { + tracing::info!("serve_connection completed after regular HTTP session"); + facet.activate(&Account::new(None, None), None, |t| Ok(t.stop())); + } + Ok(()) + }, } - }; - run_connection(trace_collector, facet, i, o, gateway); - Ok(()) + 0 => Err(error("closed before starting", AnyValue::new(false)))?, + _ => unreachable!() + } } fn message_error(e: E) -> Error { @@ -88,7 +126,7 @@ fn message_error(e: E) -> Error { } fn extract_binary_packets( - r: Result, + r: Result, ) -> Result>, Error> { match r { Ok(m) => match m { @@ -102,6 +140,8 @@ fn extract_binary_packets( Ok(None), // unsolicited pongs are to be ignored Message::Close(_) => Ok(None), // we're about to see the end of the stream, so ignore this + Message::Frame(_) => + Err("Raw frames are not accepted")?, }, Err(e) => Err(message_error(e)), } diff --git a/syndicate-server/src/services/http_router.rs b/syndicate-server/src/services/http_router.rs new file mode 100644 index 0000000..0f72ad9 --- /dev/null +++ b/syndicate-server/src/services/http_router.rs @@ -0,0 +1,164 @@ +use preserves_schema::Codec; + +use std::sync::Arc; + +use syndicate::actor::*; +use syndicate::enclose; +use syndicate::error::Error; +use syndicate::preserves::rec; +use syndicate::preserves::value::Map; +use syndicate::preserves::value::NestedValue; +use syndicate::preserves::value::Set; +use syndicate::schemas::http; + +use crate::language::language; +use crate::lifecycle; +use crate::schemas::internal_services::HttpRouter; + +use syndicate_macros::during; + +pub fn on_demand(t: &mut Activation, ds: Arc) { + t.spawn(Some(AnyValue::symbol("http_router_listener")), move |t| { + Ok(during!(t, ds, language(), >, |t: &mut Activation| { + t.spawn_link(Some(rec![AnyValue::symbol("http_router"), language().unparse(&spec)]), + enclose!((ds) |t| run(t, ds, spec))); + Ok(()) + })) + }); +} + +type MethodTable = Map>>; +type RoutingTable = Map>; + +fn run(t: &mut Activation, ds: Arc, spec: HttpRouter) -> ActorResult { + ds.assert(t, language(), &lifecycle::started(&spec)); + ds.assert(t, language(), &lifecycle::ready(&spec)); + let httpd = spec.httpd; + + during!(t, httpd, language(), , |t: &mut Activation| { + let routes: Arc> = t.named_field("routes", Map::new()); + let port1 = port.clone(); + enclose!((httpd, routes) during!(t, httpd, language(), , enclose!((routes) |t: &mut Activation| { + during!(t, httpd, language(), , |t: &mut Activation| { + let host = language().parse::(&host)?; + let path = language().parse::(&path)?; + let method = language().parse::(&method)?; + let handler = handler.value().to_embedded()?; + t.get_mut(&routes) + .entry(host.clone()).or_default() + .entry(path.clone()).or_default() + .entry(method.clone()).or_default() + .insert(handler.clone()); + t.on_stop(enclose!((routes, handler, method, path, host) move |t| { + let host_map = t.get_mut(&routes); + let path_map = host_map.entry(host.clone()).or_default(); + let method_map = path_map.entry(path.clone()).or_default(); + let handler_set = method_map.entry(method.clone()).or_default(); + handler_set.remove(&handler); + if handler_set.is_empty() { + method_map.remove(&method); + } + if method_map.is_empty() { + path_map.remove(&path); + } + if path_map.is_empty() { + host_map.remove(&host); + } + Ok(()) + })); + Ok(()) + }); + Ok(()) + }))); + during!(t, httpd, language(), , |t: &mut Activation| { + let req = match language().parse::(&req) { Ok(v) => v, Err(_) => return Ok(()) }; + let res = match res.value().to_embedded() { Ok(v) => v, Err(_) => return Ok(()) }; + + let methods = match try_hostname(t, &routes, http::HostPattern::Host(req.host.clone()), &req.path)? { + Some(methods) => methods, + None => match try_hostname(t, &routes, http::HostPattern::Any, &req.path)? { + Some(methods) => methods, + None => { + res.message(t, language(), &http::HttpResponse::Status { + code: 404.into(), message: "Not found".into() }); + res.message(t, language(), &http::HttpResponse::Done { + chunk: Box::new(http::Chunk::Bytes(vec![])) }); + return Ok(()) + } + } + }; + + let handlers = match methods.get(&http::MethodPattern::Specific(req.method.clone())) { + Some(handlers) => handlers, + None => match methods.get(&http::MethodPattern::Any) { + Some(handlers) => handlers, + None => { + let allowed = methods.keys().map(|k| match k { + http::MethodPattern::Specific(m) => m.to_uppercase(), + http::MethodPattern::Any => unreachable!(), + }).collect::>().join(", "); + res.message(t, language(), &http::HttpResponse::Status { + code: 405.into(), message: "Method Not Allowed".into() }); + res.message(t, language(), &http::HttpResponse::Header { + name: "allow".into(), value: allowed }); + res.message(t, language(), &http::HttpResponse::Done { + chunk: Box::new(http::Chunk::Bytes(vec![])) }); + return Ok(()) + } + } + }; + + if handlers.len() > 1 { + tracing::warn!(?req, "Too many handlers available"); + } + let handler = handlers.first().expect("Nonempty handler set").clone(); + handler.assert(t, language(), &http::HttpContext { req, res: res.clone() }); + + Ok(()) + }); + Ok(()) + }); + + Ok(()) +} + +fn path_pattern_matches(path_pat: &http::PathPattern, path: &Vec) -> bool { + let mut path_iter = path.iter(); + for pat_elem in path_pat.0.iter() { + match pat_elem { + http::PathPatternElement::Label(v) => match path_iter.next() { + Some(path_elem) => { + if v != path_elem { + return false; + } + } + None => return false, + }, + http::PathPatternElement::Wildcard => match path_iter.next() { + Some(_) => (), + None => return false, + }, + http::PathPatternElement::Rest => return true, + } + } + true +} + +fn try_hostname<'turn, 'routes>( + t: &'routes mut Activation<'turn>, + routes: &'routes Arc>, + host_pat: http::HostPattern, + path: &Vec, +) -> Result, Error> { + match t.get(routes).get(&host_pat) { + None => Ok(None), + Some(path_table) => { + for (path_pat, method_table) in path_table.iter() { + if path_pattern_matches(path_pat, path) { + return Ok(Some(method_table)); + } + } + Ok(None) + } + } +} diff --git a/syndicate-server/src/services/mod.rs b/syndicate-server/src/services/mod.rs index a134df5..00b0801 100644 --- a/syndicate-server/src/services/mod.rs +++ b/syndicate-server/src/services/mod.rs @@ -1,5 +1,6 @@ pub mod config_watcher; pub mod daemon; pub mod debt_reporter; +pub mod http_router; pub mod tcp_relay_listener; pub mod unix_relay_listener; diff --git a/syndicate-server/src/services/tcp_relay_listener.rs b/syndicate-server/src/services/tcp_relay_listener.rs index f8f12e0..a39828b 100644 --- a/syndicate-server/src/services/tcp_relay_listener.rs +++ b/syndicate-server/src/services/tcp_relay_listener.rs @@ -15,27 +15,46 @@ use tokio::net::TcpListener; use crate::language::language; use crate::lifecycle; use crate::protocol::detect_protocol; -use crate::schemas::internal_services::TcpRelayListener; +use crate::schemas::internal_services::{TcpWithHttp, TcpWithoutHttp, TcpRelayListener}; use syndicate_macros::during; pub fn on_demand(t: &mut Activation, ds: Arc) { t.spawn(Some(AnyValue::symbol("tcp_relay_listener")), move |t| { - Ok(during!(t, ds, language(), >, |t| { - Supervisor::start( - t, - Some(rec![AnyValue::symbol("relay"), language().unparse(&spec)]), - SupervisorConfiguration::default(), - enclose!((ds, spec) lifecycle::updater(ds, spec)), - enclose!((ds) move |t| enclose!((ds, spec) run(t, ds, spec)))) - })) + enclose!((ds) during!(t, ds, language(), >, |t: &mut Activation| { + spec.httpd.assert(t, language(), &syndicate::schemas::http::HttpListener { port: spec.addr.port.clone() }); + run_supervisor(t, ds.clone(), TcpRelayListener::TcpWithHttp(Box::new(spec))) + })); + enclose!((ds) during!(t, ds, language(), >, |t| { + run_supervisor(t, ds.clone(), TcpRelayListener::TcpWithoutHttp(Box::new(spec))) + })); + Ok(()) }); } +fn run_supervisor(t: &mut Activation, ds: Arc, spec: TcpRelayListener) -> ActorResult { + Supervisor::start( + t, + Some(rec![AnyValue::symbol("relay"), language().unparse(&spec)]), + SupervisorConfiguration::default(), + enclose!((ds, spec) lifecycle::updater(ds, spec)), + enclose!((ds) move |t| enclose!((ds, spec) run(t, ds, spec)))) +} + fn run(t: &mut Activation, ds: Arc, spec: TcpRelayListener) -> ActorResult { lifecycle::terminate_on_service_restart(t, &ds, &spec); - let host = spec.addr.host.clone(); - let port = u16::try_from(&spec.addr.port).map_err(|_| "Invalid TCP port number")?; + let (addr, gatekeeper, httpd) = match spec.clone() { + TcpRelayListener::TcpWithHttp(b) => { + let TcpWithHttp { addr, gatekeeper, httpd } = *b; + (addr, gatekeeper, Some(httpd)) + } + TcpRelayListener::TcpWithoutHttp(b) => { + let TcpWithoutHttp { addr, gatekeeper } = *b; + (addr, gatekeeper, None) + } + }; + let host = addr.host.clone(); + let port = u16::try_from(&addr.port).map_err(|_| "Invalid TCP port number")?; let facet = t.facet.clone(); let trace_collector = t.trace_collector(); t.linked_task(Some(AnyValue::symbol("listener")), async move { @@ -58,17 +77,23 @@ fn run(t: &mut Activation, ds: Arc, spec: TcpRelayListener) -> ActorResult loop { let (stream, addr) = listener.accept().await?; - let gatekeeper = spec.gatekeeper.clone(); + let gatekeeper = gatekeeper.clone(); let name = Some(rec![AnyValue::symbol("tcp"), AnyValue::new(format!("{}", &addr))]); let cause = trace_collector.as_ref().map(|_| trace::TurnCause::external("connect")); let account = Account::new(name.clone(), trace_collector.clone()); if !facet.activate( - &account, cause, enclose!((trace_collector) move |t| { + &account, cause, enclose!((trace_collector, httpd) move |t| { t.spawn(name, move |t| { Ok(t.linked_task(None, { let facet = t.facet.clone(); async move { - detect_protocol(trace_collector, facet, stream, gatekeeper, addr).await?; + detect_protocol(trace_collector, + facet, + stream, + gatekeeper, + httpd.map(|r| r.clone()), + addr, + port).await?; Ok(LinkedTaskTermination::KeepFacet) } }))