From 3c44768a7265613efec7c7637c629e4404f29cf8 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sat, 30 Mar 2024 11:00:22 +0100 Subject: [PATCH] Convenience syndicate::relay::stdio_service --- syndicate/Cargo.toml | 2 +- syndicate/src/relay.rs | 50 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/syndicate/Cargo.toml b/syndicate/Cargo.toml index 59f6d4f..c218e21 100644 --- a/syndicate/Cargo.toml +++ b/syndicate/Cargo.toml @@ -19,7 +19,7 @@ preserves-schema = "5.995" preserves = "4.995" preserves-schema = "5.995" -tokio = { version = "1.10", features = ["io-util", "macros", "rt", "rt-multi-thread", "time"] } +tokio = { version = "1.10", features = ["io-std", "io-util", "macros", "rt", "rt-multi-thread", "time"] } tokio-util = "0.6" bytes = "1.0" diff --git a/syndicate/src/relay.rs b/syndicate/src/relay.rs index 749cde0..d3d1c99 100644 --- a/syndicate/src/relay.rs +++ b/syndicate/src/relay.rs @@ -228,7 +228,57 @@ impl std::fmt::Debug for Membrane { macro_rules! dump_membranes { ($e:expr) => { tracing::trace!("membranes: {:#?}", $e); } } // macro_rules! dump_membranes { ($e:expr) => { (); } } +/// Main entry point for stdio-based Syndicate services. +pub async fn stdio_service(f: F) -> ! +where + F: 'static + Send + FnOnce(&mut Activation) -> Result, ActorError> +{ + let result = Actor::top(None, move |t| { + let service = f(t)?; + Ok(TunnelRelay::stdio_service(t, service)) + }).await; + + // Because we're currently using tokio::io::stdin(), which can prevent shutdown of the + // runtime, this routine uses std::process::exit directly as a special case. It's a + // stopgap: eventually, we'd like to do things Properly, as indicated in the comment + // attached (at the time of writing) to tokio::io::stdin(), which reads in part: + // + // This handle is best used for non-interactive uses, such as when a file + // is piped into the application. For technical reasons, `stdin` is + // implemented by using an ordinary blocking read on a separate thread, and + // it is impossible to cancel that read. This can make shutdown of the + // runtime hang until the user presses enter. + // + // For interactive uses, it is recommended to spawn a thread dedicated to + // user input and use blocking IO directly in that thread. + // + // TODO: Revisit this. + + match result { + Ok(Ok(())) => { + std::process::exit(0); + } + Ok(Err(e)) => { + tracing::error!("Main stdio_service actor failed: {}", e); + std::process::exit(1); + }, + Err(e) => { + tracing::error!("Join of main stdio_service actor failed: {}", e); + std::process::exit(2); + } + } +} + impl TunnelRelay { + pub fn stdio_service(t: &mut Activation, service: Arc) -> () { + TunnelRelay::run(t, + Input::Bytes(Box::pin(tokio::io::stdin())), + Output::Bytes(Box::pin(tokio::io::stdout())), + Some(service), + None, + false); + } + pub fn run( t: &mut Activation, i: Input,