Multiplex regular HTTP on existing TCP/WebSocket connections

This commit is contained in:
Tony Garnock-Jones 2023-11-13 21:52:27 +01:00
parent 090ac8780f
commit 65dae05890
10 changed files with 607 additions and 202 deletions

286
Cargo.lock generated
View File

@ -198,15 +198,6 @@ dependencies = [
"digest 0.10.7",
]
[[package]]
name = "block-buffer"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4"
dependencies = [
"generic-array",
]
[[package]]
name = "block-buffer"
version = "0.10.4"
@ -385,16 +376,6 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e"
[[package]]
name = "core-foundation"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "194a7a9e6de53fa55116934067c844d9d749312f75c6f6d0980e8c252f8c2146"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "core-foundation-sys"
version = "0.8.4"
@ -532,6 +513,12 @@ dependencies = [
"zeroize",
]
[[package]]
name = "data-encoding"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308"
[[package]]
name = "digest"
version = "0.9.0"
@ -547,7 +534,7 @@ version = "0.10.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292"
dependencies = [
"block-buffer 0.10.4",
"block-buffer",
"crypto-common",
"subtle",
]
@ -564,22 +551,6 @@ version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07"
[[package]]
name = "errno"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c18ee0ed65a5f1f81cac6b1d213b69c35fa47d4252ad41f1486dbd8226fe36e"
dependencies = [
"libc",
"windows-sys",
]
[[package]]
name = "fastrand"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5"
[[package]]
name = "filetime"
version = "0.2.22"
@ -856,12 +827,64 @@ dependencies = [
"itoa",
]
[[package]]
name = "http-body"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1"
dependencies = [
"bytes",
"http",
"pin-project-lite",
]
[[package]]
name = "httparse"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904"
[[package]]
name = "httpdate"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
[[package]]
name = "hyper"
version = "0.14.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffb1cfd654a8219eaef89881fdb3bb3b1cdc5fa75ded05d6933b2b382e395468"
dependencies = [
"bytes",
"futures-channel",
"futures-core",
"futures-util",
"http",
"http-body",
"httparse",
"httpdate",
"itoa",
"pin-project-lite",
"tokio",
"tower-service",
"tracing",
"want",
]
[[package]]
name = "hyper-tungstenite"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7cc7dcb1ab67cd336f468a12491765672e61a3b6b148634dbfe2fe8acd3fe7d9"
dependencies = [
"hyper",
"pin-project-lite",
"tokio",
"tokio-tungstenite",
"tungstenite",
]
[[package]]
name = "iana-time-zone"
version = "0.1.58"
@ -915,15 +938,6 @@ dependencies = [
"libc",
]
[[package]]
name = "input_buffer"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f97967975f448f1a7ddb12b0bc41069d09ed6a1c161a92687e057325db35d413"
dependencies = [
"bytes",
]
[[package]]
name = "instant"
version = "0.1.12"
@ -994,12 +1008,6 @@ version = "0.2.150"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c"
[[package]]
name = "linux-raw-sys"
version = "0.4.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "969488b55f8ac402214f3f5fd243ebb7206cf82de60d3172994707a4bcc2b829"
[[package]]
name = "lock_api"
version = "0.4.11"
@ -1103,24 +1111,6 @@ dependencies = [
"ws2_32-sys",
]
[[package]]
name = "native-tls"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e"
dependencies = [
"lazy_static",
"libc",
"log",
"openssl",
"openssl-probe",
"openssl-sys",
"schannel",
"security-framework",
"security-framework-sys",
"tempfile",
]
[[package]]
name = "net2"
version = "0.2.39"
@ -1314,12 +1304,6 @@ dependencies = [
"syn 2.0.39",
]
[[package]]
name = "openssl-probe"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "openssl-src"
version = "300.1.6+3.1.4"
@ -1350,7 +1334,17 @@ checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
dependencies = [
"instant",
"lock_api",
"parking_lot_core",
"parking_lot_core 0.8.6",
]
[[package]]
name = "parking_lot"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [
"lock_api",
"parking_lot_core 0.9.9",
]
[[package]]
@ -1367,6 +1361,19 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "parking_lot_core"
version = "0.9.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e"
dependencies = [
"cfg-if 1.0.0",
"libc",
"redox_syscall 0.4.1",
"smallvec",
"windows-targets",
]
[[package]]
name = "percent-encoding"
version = "2.3.0"
@ -1676,19 +1683,6 @@ version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
[[package]]
name = "rustix"
version = "0.38.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b426b0506e5d50a7d8dafcf2e81471400deb602392c7dd110815afb4eaf02a3"
dependencies = [
"bitflags 2.4.1",
"errno",
"libc",
"linux-raw-sys",
"windows-sys",
]
[[package]]
name = "ryu"
version = "1.0.15"
@ -1704,44 +1698,12 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "schannel"
version = "0.1.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c3733bf4cf7ea0880754e19cb5a462007c4a8c1914bff372ccc95b464f1df88"
dependencies = [
"windows-sys",
]
[[package]]
name = "scopeguard"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "security-framework"
version = "2.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05b64fb303737d99b81884b2c63433e9ae28abebe5eb5045dcdd175dc2ecf4de"
dependencies = [
"bitflags 1.3.2",
"core-foundation",
"core-foundation-sys",
"libc",
"security-framework-sys",
]
[[package]]
name = "security-framework-sys"
version = "2.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e932934257d3b408ed8f30db49d85ea163bfe74961f017f405b025af298f0c7a"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "serde"
version = "1.0.192"
@ -1793,16 +1755,14 @@ dependencies = [
]
[[package]]
name = "sha-1"
version = "0.9.8"
name = "sha1"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99cd6713db3cf16b6c84e06321e049a9b9f699826e16096d23bbcc44d15d51a6"
checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
dependencies = [
"block-buffer 0.9.0",
"cfg-if 1.0.0",
"cpufeatures",
"digest 0.9.0",
"opaque-debug",
"digest 0.10.7",
]
[[package]]
@ -1935,7 +1895,7 @@ dependencies = [
"hmac",
"lazy_static",
"openssl",
"parking_lot",
"parking_lot 0.11.2",
"preserves",
"preserves-schema",
"tokio",
@ -1971,10 +1931,13 @@ version = "0.34.1"
dependencies = [
"chrono",
"futures",
"hyper",
"hyper-tungstenite",
"lazy_static",
"noise-protocol",
"noise-rust-crypto",
"notify",
"parking_lot 0.12.1",
"preserves-schema",
"structopt",
"syndicate",
@ -1982,12 +1945,11 @@ dependencies = [
"syndicate-schema-plugin",
"tikv-jemallocator",
"tokio",
"tokio-tungstenite",
"tokio-stream",
"tokio-util",
"tracing",
"tracing-futures",
"tracing-subscriber",
"tungstenite",
]
[[package]]
@ -2002,19 +1964,6 @@ dependencies = [
"syndicate",
]
[[package]]
name = "tempfile"
version = "3.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ef1adac450ad7f4b3c28589471ade84f25f731a7a0fe30d71dfa9f60fd808e5"
dependencies = [
"cfg-if 1.0.0",
"fastrand",
"redox_syscall 0.4.1",
"rustix",
"windows-sys",
]
[[package]]
name = "textwrap"
version = "0.11.0"
@ -2129,14 +2078,24 @@ dependencies = [
]
[[package]]
name = "tokio-tungstenite"
version = "0.14.0"
name = "tokio-stream"
version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e96bb520beab540ab664bd5a9cfeaa1fcd846fa68c830b42e2c8963071251d2"
checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]]
name = "tokio-tungstenite"
version = "0.20.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c"
dependencies = [
"futures-util",
"log",
"pin-project",
"tokio",
"tungstenite",
]
@ -2155,6 +2114,12 @@ dependencies = [
"tokio",
]
[[package]]
name = "tower-service"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52"
[[package]]
name = "tracing"
version = "0.1.40"
@ -2241,21 +2206,25 @@ dependencies = [
]
[[package]]
name = "tungstenite"
version = "0.13.0"
name = "try-lock"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fe8dada8c1a3aeca77d6b51a4f1314e0f4b8e438b7b1b71e3ddaca8080e4093"
checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed"
[[package]]
name = "tungstenite"
version = "0.20.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e3dac10fd62eaf6617d3a904ae222845979aec67c615d1c842b4002c7666fb9"
dependencies = [
"base64",
"byteorder",
"bytes",
"data-encoding",
"http",
"httparse",
"input_buffer",
"log",
"native-tls",
"rand",
"sha-1",
"sha1",
"thiserror",
"url",
"utf-8",
@ -2367,6 +2336,15 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "want"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e"
dependencies = [
"try-lock",
]
[[package]]
name = "wasi"
version = "0.9.0+wasi-snapshot-preview1"

View File

@ -32,15 +32,16 @@ structopt = "0.3"
tikv-jemallocator = { version = "0.5.0", optional = true }
tungstenite = "0.13"
tokio-tungstenite = "0.14"
tokio = { version = "1.10", features = ["io-std", "time", "process"] }
tokio-util = "0.6"
tokio-stream = "0.1"
tracing = "0.1"
tracing-subscriber = "0.2"
tracing-futures = "0.2"
hyper = { version = "0.14.27", features = ["server", "http1", "stream"] }
hyper-tungstenite = "0.11.1"
parking_lot = "0.12.1"
[package.metadata.workspaces]
independent = true

View File

@ -3,8 +3,11 @@ ByteString
ProcessDir´³orµµ±present´³dict·³dir´³named³dir´³atom³String„„„„„µ±invalid´³dict·³dir´³named³dir³any„„„„µ±absent´³dict·„„„„„³
ProcessEnv´³orµµ±present´³dict·³env´³named³env´³dictof´³refµ„³ EnvVariable„´³refµ„³EnvValue„„„„„„µ±invalid´³dict·³env´³named³env³any„„„„µ±absent´³dict·„„„„„³ CommandLine´³orµµ±shell´³atom³String„„µ±full´³refµ„³FullCommandLine„„„„³ EnvVariable´³orµµ±string´³atom³String„„µ±symbol´³atom³Symbol„„µ±invalid³any„„„³ FullProcess´³andµ´³dict·³argv´³named³argv´³refµ„³ CommandLine„„„„´³named³env´³refµ„³
ProcessEnv„„´³named³dir´³refµ„³
ProcessDir„„´³named³clearEnv´³refµ„³ClearEnv„„„„³ ReadyOnStart´³orµµ±present´³dict·³ readyOnStart´³named³ readyOnStart´³atom³Boolean„„„„„µ±invalid´³dict·³ readyOnStart´³named³ readyOnStart³any„„„„µ±absent´³dict·„„„„„³ RestartField´³orµµ±present´³dict·³restart´³named³restart´³refµ„³ RestartPolicy„„„„„µ±invalid´³dict·³restart´³named³restart³any„„„„µ±absent´³dict·„„„„„³ DaemonProcess´³rec´³lit³daemon„´³tupleµ´³named³id³any„´³named³config´³refµ„³DaemonProcessSpec„„„„„³ DaemonService´³rec´³lit³daemon„´³tupleµ´³named³id³any„„„„³ ProtocolField´³orµµ±present´³dict·³protocol´³named³protocol´³refµ„³Protocol„„„„„µ±invalid´³dict·³protocol´³named³protocol³any„„„„µ±absent´³dict·„„„„„³ RestartPolicy´³orµµ±always´³lit³always„„µ±onError´³lit³on-error„„µ±all´³lit³all„„µ±never´³lit³never„„„„³FullCommandLine´³ tuplePrefixµ´³named³program´³atom³String„„„´³named³args´³seqof´³atom³String„„„„³DaemonProcessSpec´³orµµ±simple´³refµ„³ CommandLine„„µ±oneShot´³rec´³lit³one-shot„´³tupleµ´³named³setup´³refµ„³ CommandLine„„„„„„µ±full´³refµ„³FullDaemonProcess„„„„³FullDaemonProcess´³andµ´³named³process´³refµ„³ FullProcess„„´³named³ readyOnStart´³refµ„³ ReadyOnStart„„´³named³restart´³refµ„³ RestartField„„´³named³protocol´³refµ„³ ProtocolField„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„µ³internalServices„´³schema·³version°³ definitions·³ ConfigEnv´³dictof´³atom³Symbol„³any„³ DebtReporter´³rec´³lit³ debt-reporter„´³tupleµ´³named³intervalSeconds´³atom³Double„„„„„³ ConfigWatcher´³rec´³lit³config-watcher„´³tupleµ´³named³path´³atom³String„„´³named³env´³refµ„³ ConfigEnv„„„„„³TcpRelayListener´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Tcp„„´³named³
ProcessDir„„´³named³clearEnv´³refµ„³ClearEnv„„„„³ ReadyOnStart´³orµµ±present´³dict·³ readyOnStart´³named³ readyOnStart´³atom³Boolean„„„„„µ±invalid´³dict·³ readyOnStart´³named³ readyOnStart³any„„„„µ±absent´³dict·„„„„„³ RestartField´³orµµ±present´³dict·³restart´³named³restart´³refµ„³ RestartPolicy„„„„„µ±invalid´³dict·³restart´³named³restart³any„„„„µ±absent´³dict·„„„„„³ DaemonProcess´³rec´³lit³daemon„´³tupleµ´³named³id³any„´³named³config´³refµ„³DaemonProcessSpec„„„„„³ DaemonService´³rec´³lit³daemon„´³tupleµ´³named³id³any„„„„³ ProtocolField´³orµµ±present´³dict·³protocol´³named³protocol´³refµ„³Protocol„„„„„µ±invalid´³dict·³protocol´³named³protocol³any„„„„µ±absent´³dict·„„„„„³ RestartPolicy´³orµµ±always´³lit³always„„µ±onError´³lit³on-error„„µ±all´³lit³all„„µ±never´³lit³never„„„„³FullCommandLine´³ tuplePrefixµ´³named³program´³atom³String„„„´³named³args´³seqof´³atom³String„„„„³DaemonProcessSpec´³orµµ±simple´³refµ„³ CommandLine„„µ±oneShot´³rec´³lit³one-shot„´³tupleµ´³named³setup´³refµ„³ CommandLine„„„„„„µ±full´³refµ„³FullDaemonProcess„„„„³FullDaemonProcess´³andµ´³named³process´³refµ„³ FullProcess„„´³named³ readyOnStart´³refµ„³ ReadyOnStart„„´³named³restart´³refµ„³ RestartField„„´³named³protocol´³refµ„³ ProtocolField„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„µ³internalServices„´³schema·³version°³ definitions·³ ConfigEnv´³dictof´³atom³Symbol„³any„³
HttpRouter´³rec´³lit³ http-router„´³tupleµ´³named³httpd´³embedded³any„„„„„³ TcpWithHttp´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Tcp„„´³named³
gatekeeper´³embedded´³refµ³
gatekeeper„³Resolve„„„„„„³UnixRelayListener´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Unix„„´³named³
gatekeeper„³Resolve„„„´³named³httpd´³embedded´³refµ³http„³ HttpContext„„„„„„³ DebtReporter´³rec´³lit³ debt-reporter„´³tupleµ´³named³intervalSeconds´³atom³Double„„„„„³ ConfigWatcher´³rec´³lit³config-watcher„´³tupleµ´³named³path´³atom³String„„´³named³env´³refµ„³ ConfigEnv„„„„„³TcpWithoutHttp´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Tcp„„´³named³
gatekeeper´³embedded´³refµ³
gatekeeper„³Resolve„„„„„„³TcpRelayListener´³orµµ±TcpWithoutHttp´³refµ„³TcpWithoutHttp„„µ± TcpWithHttp´³refµ„³ TcpWithHttp„„„„³UnixRelayListener´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Unix„„´³named³
gatekeeper´³embedded´³refµ³
gatekeeper„³Resolve„„„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„„„

View File

@ -3,8 +3,13 @@ embeddedType EntityRef.Cap .
DebtReporter = <debt-reporter @intervalSeconds double>.
TcpRelayListener = <relay-listener @addr TransportAddress.Tcp @gatekeeper #!gatekeeper.Resolve> .
TcpRelayListener = TcpWithoutHttp / TcpWithHttp .
TcpWithoutHttp = <relay-listener @addr TransportAddress.Tcp @gatekeeper #!gatekeeper.Resolve> .
TcpWithHttp = <relay-listener @addr TransportAddress.Tcp @gatekeeper #!gatekeeper.Resolve @httpd #!http.HttpContext> .
UnixRelayListener = <relay-listener @addr TransportAddress.Unix @gatekeeper #!gatekeeper.Resolve> .
ConfigWatcher = <config-watcher @path string @env ConfigEnv>.
ConfigEnv = { symbol: any ...:... }.
HttpRouter = <http-router @httpd #!any> .

View File

@ -0,0 +1,185 @@
use std::convert::TryInto;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use hyper::{Request, Response, Body, StatusCode};
use hyper::body;
use hyper::header::HeaderName;
use hyper::header::HeaderValue;
use syndicate::actor::*;
use syndicate::error::Error;
use syndicate::trace;
use syndicate::value::Map;
use syndicate::value::NestedValue;
use syndicate::schemas::http;
use tokio::sync::oneshot;
use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
use tokio_stream::wrappers::UnboundedReceiverStream;
use crate::language;
static NEXT_SEQ: AtomicU64 = AtomicU64::new(0);
pub fn empty_response(code: StatusCode) -> Response<Body> {
let mut r = Response::new(Body::empty());
*r.status_mut() = code;
r
}
type ChunkItem = Result<body::Bytes, Box<dyn std::error::Error + Send + Sync>>;
enum ResponseCollector {
Pending {
tx: oneshot::Sender<Response<Body>>,
body_tx: UnboundedSender<ChunkItem>,
res: Response<Body>,
},
Done
}
impl ResponseCollector {
fn new(tx: oneshot::Sender<Response<Body>>) -> Self {
let (body_tx, body_rx) = unbounded_channel();
let body_stream: Box<dyn futures::Stream<Item = ChunkItem> + Send> =
Box::new(UnboundedReceiverStream::new(body_rx));
ResponseCollector::Pending {
tx,
body_tx,
res: Response::new(body_stream.into()),
}
}
fn with_res<F: FnOnce(&mut Response<Body>) -> ActorResult>(&mut self, f: F) -> ActorResult {
if let ResponseCollector::Pending { res, .. } = self {
f(res)?;
}
Ok(())
}
fn add_chunk(&mut self, value: http::Chunk) -> ActorResult {
if let ResponseCollector::Pending { body_tx, .. } = self {
body_tx.send(Ok(match value {
http::Chunk::Bytes(bs) => bs.into(),
http::Chunk::String(s) => s.as_bytes().to_vec().into(),
}))?;
}
Ok(())
}
fn finish(&mut self) -> ActorResult {
match std::mem::replace(self, ResponseCollector::Done) {
ResponseCollector::Pending { tx, res, .. } => {
let _ = tx.send(res);
}
ResponseCollector::Done => (),
}
Ok(())
}
}
impl Entity<http::HttpResponse> for ResponseCollector {
fn message(&mut self, _turn: &mut Activation, message: http::HttpResponse) -> ActorResult {
match message {
http::HttpResponse::Status { code, .. } => self.with_res(|r| {
*r.status_mut() = StatusCode::from_u16(
(&code).try_into().map_err(|_| "bad status code")?)?;
Ok(())
}),
http::HttpResponse::Header { name, value } => self.with_res(|r| {
r.headers_mut().insert(HeaderName::from_bytes(name.as_bytes())?,
HeaderValue::from_str(value.as_str())?);
Ok(())
}),
http::HttpResponse::Chunk { chunk } => self.add_chunk(*chunk),
http::HttpResponse::Done { chunk } => {
self.add_chunk(*chunk)?;
self.finish()
}
}
}
}
pub async fn serve(
trace_collector: Option<trace::TraceCollector>,
facet: FacetRef,
httpd: Arc<Cap>,
mut req: Request<Body>,
port: u16,
) -> Result<Response<Body>, Error> {
let host = match req.headers().get("host").and_then(|v| v.to_str().ok()) {
None => return Ok(empty_response(StatusCode::BAD_REQUEST)),
Some(h) => match h.rsplit_once(':') {
None => h.to_string(),
Some((h, _port)) => h.to_string(),
}
};
let uri = req.uri();
let mut path: Vec<String> = uri.path().split('/').map(|s| s.to_string()).collect();
path.remove(0);
let mut query: Map<String, Vec<http::QueryValue>> = Map::new();
for piece in uri.query().unwrap_or("").split('&').into_iter() {
match piece.split_once('=') {
Some((k, v)) => {
let key = k.to_string();
let value = v.to_string();
match query.get_mut(&key) {
None => { query.insert(key, vec![http::QueryValue::String(value)]); },
Some(vs) => { vs.push(http::QueryValue::String(value)); },
}
}
None => {
if piece.len() > 0 {
let key = piece.to_string();
if !query.contains_key(&key) {
query.insert(key, vec![]);
}
}
}
}
}
let mut headers: Map<String, String> = Map::new();
for h in req.headers().into_iter() {
match h.1.to_str() {
Ok(v) => { headers.insert(h.0.as_str().to_string().to_lowercase(), v.to_string()); },
Err(_) => return Ok(empty_response(StatusCode::BAD_REQUEST)),
}
}
let body = match body::to_bytes(req.body_mut()).await {
Ok(b) => http::RequestBody::Present(b.to_vec()),
Err(_) => return Ok(empty_response(StatusCode::BAD_REQUEST)),
};
let account = Account::new(Some(AnyValue::symbol("http")), trace_collector);
let (tx, rx) = oneshot::channel();
facet.activate(&account, Some(trace::TurnCause::external("http")), move |t| {
let sreq = http::HttpRequest {
sequence_number: NEXT_SEQ.fetch_add(1, Ordering::Relaxed).into(),
host,
port: port.into(),
method: req.method().to_string().to_lowercase(),
path,
headers: http::Headers(headers),
query,
body,
};
tracing::info!(?sreq);
let srep = Cap::guard(&language().syndicate, t.create(ResponseCollector::new(tx)));
httpd.assert(t, language(), &http::HttpContext { req: sreq, res: srep });
Ok(())
});
match rx.await {
Ok(response) => Ok(response),
Err(_ /* sender dropped */) => Ok(empty_response(StatusCode::INTERNAL_SERVER_ERROR)),
}
}

View File

@ -20,6 +20,7 @@ use syndicate::value::NestedValue;
mod counter;
mod dependencies;
mod gatekeeper;
mod http;
mod language;
mod lifecycle;
mod protocol;
@ -126,6 +127,7 @@ async fn main() -> ActorResult {
services::config_watcher::on_demand(t, Arc::clone(&server_config_ds));
services::daemon::on_demand(t, Arc::clone(&server_config_ds), Arc::clone(&log_ds));
services::debt_reporter::on_demand(t, Arc::clone(&server_config_ds));
services::http_router::on_demand(t, Arc::clone(&server_config_ds));
services::tcp_relay_listener::on_demand(t, Arc::clone(&server_config_ds));
services::unix_relay_listener::on_demand(t, Arc::clone(&server_config_ds));
@ -139,12 +141,13 @@ async fn main() -> ActorResult {
for port in config.ports.clone() {
server_config_ds.assert(t, language(), &service::RunService {
service_name: language().unparse(&internal_services::TcpRelayListener {
service_name: language().unparse(&internal_services::TcpWithHttp {
addr: transport_address::Tcp {
host: "0.0.0.0".to_owned(),
port: (port as i32).into(),
},
gatekeeper: gatekeeper.clone(),
httpd: server_config_ds.clone(),
}),
});
}

View File

@ -1,11 +1,14 @@
use futures::SinkExt;
use futures::StreamExt;
use hyper::service::service_fn;
use std::future::ready;
use std::io;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use syndicate::actor::*;
use syndicate::enclose;
use syndicate::error::Error;
use syndicate::error::error;
use syndicate::relay;
@ -14,7 +17,7 @@ use syndicate::value::NestedValue;
use tokio::net::TcpStream;
use tungstenite::Message;
use hyper_tungstenite::tungstenite::Message;
struct ExitListener;
@ -53,34 +56,69 @@ pub async fn detect_protocol(
facet: FacetRef,
stream: TcpStream,
gateway: Arc<Cap>,
httpd: Option<Arc<Cap>>,
addr: std::net::SocketAddr,
server_port: u16,
) -> ActorResult {
let (i, o) = {
let mut buf = [0; 1]; // peek at the first byte to see what kind of connection to expect
match stream.peek(&mut buf).await? {
1 => match buf[0] {
b'G' /* ASCII 'G' for "GET" */ => {
tracing::info!(protocol = %"websocket", peer = ?addr);
let s = tokio_tungstenite::accept_async(stream).await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let (o, i) = s.split();
let i = i.filter_map(|r| ready(extract_binary_packets(r).transpose()));
let o = o.sink_map_err(message_error).with(|bs| ready(Ok(Message::Binary(bs))));
(relay::Input::Packets(Box::pin(i)), relay::Output::Packets(Box::pin(o)))
},
_ => {
tracing::info!(protocol = %"raw", peer = ?addr);
let (i, o) = stream.into_split();
(relay::Input::Bytes(Box::pin(i)),
relay::Output::Bytes(Box::pin(o /* BufWriter::new(o) */)))
}
let mut buf = [0; 1]; // peek at the first byte to see what kind of connection to expect
match stream.peek(&mut buf).await? {
1 => match buf[0] {
v if v == b'[' /* Turn */ || v == b'<' /* Error and Extension */ || v >= 128 => {
tracing::info!(protocol = %"raw", peer = ?addr);
let (i, o) = stream.into_split();
let i = relay::Input::Bytes(Box::pin(i));
let o = relay::Output::Bytes(Box::pin(o /* BufWriter::new(o) */));
run_connection(trace_collector, facet, i, o, gateway);
Ok(())
}
0 => Err(error("closed before starting", AnyValue::new(false)))?,
_ => unreachable!()
_ => {
let upgraded = Arc::new(AtomicBool::new(false));
let keepalive = facet.actor.keep_alive();
let mut http = hyper::server::conn::Http::new();
http.http1_keep_alive(true);
http.http1_only(true);
let service = service_fn(|mut req| enclose!(
(upgraded, keepalive, trace_collector, facet, gateway, httpd) async move {
if hyper_tungstenite::is_upgrade_request(&req) {
tracing::info!(protocol = %"websocket", ?req);
let (response, websocket) = hyper_tungstenite::upgrade(&mut req, None)
.map_err(|e| message_error(e))?;
upgraded.store(true, Ordering::SeqCst);
tokio::spawn(enclose!(() async move {
let (o, i) = websocket.await?.split();
let i = i.filter_map(|r| ready(extract_binary_packets(r).transpose()));
let o = o.sink_map_err(message_error).with(|bs| ready(Ok(Message::Binary(bs))));
let i = relay::Input::Packets(Box::pin(i));
let o = relay::Output::Packets(Box::pin(o));
run_connection(trace_collector, facet, i, o, gateway);
drop(keepalive);
Ok(()) as ActorResult
}));
Ok(response)
} else {
match httpd {
None => Ok(crate::http::empty_response(
hyper::StatusCode::SERVICE_UNAVAILABLE)),
Some(httpd) => {
tracing::info!(protocol = %"http", ?req);
crate::http::serve(trace_collector, facet, httpd, req, server_port).await
}
}
}
}));
http.serve_connection(stream, service).with_upgrades().await?;
if upgraded.load(Ordering::SeqCst) {
tracing::info!("serve_connection completed after upgrade to websocket");
} else {
tracing::info!("serve_connection completed after regular HTTP session");
facet.activate(&Account::new(None, None), None, |t| Ok(t.stop()));
}
Ok(())
},
}
};
run_connection(trace_collector, facet, i, o, gateway);
Ok(())
0 => Err(error("closed before starting", AnyValue::new(false)))?,
_ => unreachable!()
}
}
fn message_error<E: std::fmt::Display>(e: E) -> Error {
@ -88,7 +126,7 @@ fn message_error<E: std::fmt::Display>(e: E) -> Error {
}
fn extract_binary_packets(
r: Result<Message, tungstenite::Error>,
r: Result<Message, hyper_tungstenite::tungstenite::Error>,
) -> Result<Option<Vec<u8>>, Error> {
match r {
Ok(m) => match m {
@ -102,6 +140,8 @@ fn extract_binary_packets(
Ok(None), // unsolicited pongs are to be ignored
Message::Close(_) =>
Ok(None), // we're about to see the end of the stream, so ignore this
Message::Frame(_) =>
Err("Raw frames are not accepted")?,
},
Err(e) => Err(message_error(e)),
}

View File

@ -0,0 +1,164 @@
use preserves_schema::Codec;
use std::sync::Arc;
use syndicate::actor::*;
use syndicate::enclose;
use syndicate::error::Error;
use syndicate::preserves::rec;
use syndicate::preserves::value::Map;
use syndicate::preserves::value::NestedValue;
use syndicate::preserves::value::Set;
use syndicate::schemas::http;
use crate::language::language;
use crate::lifecycle;
use crate::schemas::internal_services::HttpRouter;
use syndicate_macros::during;
pub fn on_demand(t: &mut Activation, ds: Arc<Cap>) {
t.spawn(Some(AnyValue::symbol("http_router_listener")), move |t| {
Ok(during!(t, ds, language(), <run-service $spec: HttpRouter::<AnyValue>>, |t: &mut Activation| {
t.spawn_link(Some(rec![AnyValue::symbol("http_router"), language().unparse(&spec)]),
enclose!((ds) |t| run(t, ds, spec)));
Ok(())
}))
});
}
type MethodTable = Map<http::MethodPattern, Set<Arc<Cap>>>;
type RoutingTable = Map<http::HostPattern, Map<http::PathPattern, MethodTable>>;
fn run(t: &mut Activation, ds: Arc<Cap>, spec: HttpRouter) -> ActorResult {
ds.assert(t, language(), &lifecycle::started(&spec));
ds.assert(t, language(), &lifecycle::ready(&spec));
let httpd = spec.httpd;
during!(t, httpd, language(), <http-bind _ $port _ _ _>, |t: &mut Activation| {
let routes: Arc<Field<RoutingTable>> = t.named_field("routes", Map::new());
let port1 = port.clone();
enclose!((httpd, routes) during!(t, httpd, language(), <http-listener #(&port1)>, enclose!((routes) |t: &mut Activation| {
during!(t, httpd, language(), <http-bind $host #(&port) $method $path $handler>, |t: &mut Activation| {
let host = language().parse::<http::HostPattern>(&host)?;
let path = language().parse::<http::PathPattern>(&path)?;
let method = language().parse::<http::MethodPattern>(&method)?;
let handler = handler.value().to_embedded()?;
t.get_mut(&routes)
.entry(host.clone()).or_default()
.entry(path.clone()).or_default()
.entry(method.clone()).or_default()
.insert(handler.clone());
t.on_stop(enclose!((routes, handler, method, path, host) move |t| {
let host_map = t.get_mut(&routes);
let path_map = host_map.entry(host.clone()).or_default();
let method_map = path_map.entry(path.clone()).or_default();
let handler_set = method_map.entry(method.clone()).or_default();
handler_set.remove(&handler);
if handler_set.is_empty() {
method_map.remove(&method);
}
if method_map.is_empty() {
path_map.remove(&path);
}
if path_map.is_empty() {
host_map.remove(&host);
}
Ok(())
}));
Ok(())
});
Ok(())
})));
during!(t, httpd, language(), <request $req $res>, |t: &mut Activation| {
let req = match language().parse::<http::HttpRequest>(&req) { Ok(v) => v, Err(_) => return Ok(()) };
let res = match res.value().to_embedded() { Ok(v) => v, Err(_) => return Ok(()) };
let methods = match try_hostname(t, &routes, http::HostPattern::Host(req.host.clone()), &req.path)? {
Some(methods) => methods,
None => match try_hostname(t, &routes, http::HostPattern::Any, &req.path)? {
Some(methods) => methods,
None => {
res.message(t, language(), &http::HttpResponse::Status {
code: 404.into(), message: "Not found".into() });
res.message(t, language(), &http::HttpResponse::Done {
chunk: Box::new(http::Chunk::Bytes(vec![])) });
return Ok(())
}
}
};
let handlers = match methods.get(&http::MethodPattern::Specific(req.method.clone())) {
Some(handlers) => handlers,
None => match methods.get(&http::MethodPattern::Any) {
Some(handlers) => handlers,
None => {
let allowed = methods.keys().map(|k| match k {
http::MethodPattern::Specific(m) => m.to_uppercase(),
http::MethodPattern::Any => unreachable!(),
}).collect::<Vec<String>>().join(", ");
res.message(t, language(), &http::HttpResponse::Status {
code: 405.into(), message: "Method Not Allowed".into() });
res.message(t, language(), &http::HttpResponse::Header {
name: "allow".into(), value: allowed });
res.message(t, language(), &http::HttpResponse::Done {
chunk: Box::new(http::Chunk::Bytes(vec![])) });
return Ok(())
}
}
};
if handlers.len() > 1 {
tracing::warn!(?req, "Too many handlers available");
}
let handler = handlers.first().expect("Nonempty handler set").clone();
handler.assert(t, language(), &http::HttpContext { req, res: res.clone() });
Ok(())
});
Ok(())
});
Ok(())
}
fn path_pattern_matches(path_pat: &http::PathPattern, path: &Vec<String>) -> bool {
let mut path_iter = path.iter();
for pat_elem in path_pat.0.iter() {
match pat_elem {
http::PathPatternElement::Label(v) => match path_iter.next() {
Some(path_elem) => {
if v != path_elem {
return false;
}
}
None => return false,
},
http::PathPatternElement::Wildcard => match path_iter.next() {
Some(_) => (),
None => return false,
},
http::PathPatternElement::Rest => return true,
}
}
true
}
fn try_hostname<'turn, 'routes>(
t: &'routes mut Activation<'turn>,
routes: &'routes Arc<Field<RoutingTable>>,
host_pat: http::HostPattern,
path: &Vec<String>,
) -> Result<Option<&'routes MethodTable>, Error> {
match t.get(routes).get(&host_pat) {
None => Ok(None),
Some(path_table) => {
for (path_pat, method_table) in path_table.iter() {
if path_pattern_matches(path_pat, path) {
return Ok(Some(method_table));
}
}
Ok(None)
}
}
}

View File

@ -1,5 +1,6 @@
pub mod config_watcher;
pub mod daemon;
pub mod debt_reporter;
pub mod http_router;
pub mod tcp_relay_listener;
pub mod unix_relay_listener;

View File

@ -15,27 +15,46 @@ use tokio::net::TcpListener;
use crate::language::language;
use crate::lifecycle;
use crate::protocol::detect_protocol;
use crate::schemas::internal_services::TcpRelayListener;
use crate::schemas::internal_services::{TcpWithHttp, TcpWithoutHttp, TcpRelayListener};
use syndicate_macros::during;
pub fn on_demand(t: &mut Activation, ds: Arc<Cap>) {
t.spawn(Some(AnyValue::symbol("tcp_relay_listener")), move |t| {
Ok(during!(t, ds, language(), <run-service $spec: TcpRelayListener::<AnyValue>>, |t| {
Supervisor::start(
t,
Some(rec![AnyValue::symbol("relay"), language().unparse(&spec)]),
SupervisorConfiguration::default(),
enclose!((ds, spec) lifecycle::updater(ds, spec)),
enclose!((ds) move |t| enclose!((ds, spec) run(t, ds, spec))))
}))
enclose!((ds) during!(t, ds, language(), <run-service $spec: TcpWithHttp::<AnyValue>>, |t: &mut Activation| {
spec.httpd.assert(t, language(), &syndicate::schemas::http::HttpListener { port: spec.addr.port.clone() });
run_supervisor(t, ds.clone(), TcpRelayListener::TcpWithHttp(Box::new(spec)))
}));
enclose!((ds) during!(t, ds, language(), <run-service $spec: TcpWithoutHttp::<AnyValue>>, |t| {
run_supervisor(t, ds.clone(), TcpRelayListener::TcpWithoutHttp(Box::new(spec)))
}));
Ok(())
});
}
fn run_supervisor(t: &mut Activation, ds: Arc<Cap>, spec: TcpRelayListener) -> ActorResult {
Supervisor::start(
t,
Some(rec![AnyValue::symbol("relay"), language().unparse(&spec)]),
SupervisorConfiguration::default(),
enclose!((ds, spec) lifecycle::updater(ds, spec)),
enclose!((ds) move |t| enclose!((ds, spec) run(t, ds, spec))))
}
fn run(t: &mut Activation, ds: Arc<Cap>, spec: TcpRelayListener) -> ActorResult {
lifecycle::terminate_on_service_restart(t, &ds, &spec);
let host = spec.addr.host.clone();
let port = u16::try_from(&spec.addr.port).map_err(|_| "Invalid TCP port number")?;
let (addr, gatekeeper, httpd) = match spec.clone() {
TcpRelayListener::TcpWithHttp(b) => {
let TcpWithHttp { addr, gatekeeper, httpd } = *b;
(addr, gatekeeper, Some(httpd))
}
TcpRelayListener::TcpWithoutHttp(b) => {
let TcpWithoutHttp { addr, gatekeeper } = *b;
(addr, gatekeeper, None)
}
};
let host = addr.host.clone();
let port = u16::try_from(&addr.port).map_err(|_| "Invalid TCP port number")?;
let facet = t.facet.clone();
let trace_collector = t.trace_collector();
t.linked_task(Some(AnyValue::symbol("listener")), async move {
@ -58,17 +77,23 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: TcpRelayListener) -> ActorResult
loop {
let (stream, addr) = listener.accept().await?;
let gatekeeper = spec.gatekeeper.clone();
let gatekeeper = gatekeeper.clone();
let name = Some(rec![AnyValue::symbol("tcp"), AnyValue::new(format!("{}", &addr))]);
let cause = trace_collector.as_ref().map(|_| trace::TurnCause::external("connect"));
let account = Account::new(name.clone(), trace_collector.clone());
if !facet.activate(
&account, cause, enclose!((trace_collector) move |t| {
&account, cause, enclose!((trace_collector, httpd) move |t| {
t.spawn(name, move |t| {
Ok(t.linked_task(None, {
let facet = t.facet.clone();
async move {
detect_protocol(trace_collector, facet, stream, gatekeeper, addr).await?;
detect_protocol(trace_collector,
facet,
stream,
gatekeeper,
httpd.map(|r| r.clone()),
addr,
port).await?;
Ok(LinkedTaskTermination::KeepFacet)
}
}))