diff --git a/syndicate-server/examples/consumer.rs b/syndicate-server/examples/consumer.rs index 703fecf..58a43ca 100644 --- a/syndicate-server/examples/consumer.rs +++ b/syndicate-server/examples/consumer.rs @@ -27,7 +27,7 @@ async fn main() -> Result<(), Box> { let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); Actor::new().boot(syndicate::name!("consumer"), |t| { - relay::connect_stream(t, i, o, sturdyref, (), |_state, t, ds| { + relay::connect_stream(t, i, o, false, sturdyref, (), |_state, t, ds| { let consumer = syndicate::entity(0) .on_message(|message_count, _t, m: AnyValue| { if m.value().is_boolean() { diff --git a/syndicate-server/examples/pingpong.rs b/syndicate-server/examples/pingpong.rs index 0b015ff..674dec9 100644 --- a/syndicate-server/examples/pingpong.rs +++ b/syndicate-server/examples/pingpong.rs @@ -94,7 +94,7 @@ async fn main() -> Result<(), Box> { let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); Actor::new().boot(syndicate::name!("pingpong"), |t| { - relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| { + relay::connect_stream(t, i, o, false, sturdyref, (), move |_state, t, ds| { let (send_label, recv_label, report_latency_every, should_echo, bytes_padding) = match config.mode { diff --git a/syndicate-server/examples/producer.rs b/syndicate-server/examples/producer.rs index dd74579..4caf289 100644 --- a/syndicate-server/examples/producer.rs +++ b/syndicate-server/examples/producer.rs @@ -35,7 +35,7 @@ async fn main() -> Result<(), Box> { let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); Actor::new().boot(syndicate::name!("producer"), |t| { - relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| { + relay::connect_stream(t, i, o, false, sturdyref, (), move |_state, t, ds| { let padding: AnyValue = Value::ByteString(vec![0; config.bytes_padding]).wrap(); let action_count = config.action_count; let account = Account::new(syndicate::name!("account")); diff --git a/syndicate-server/examples/state-consumer.rs b/syndicate-server/examples/state-consumer.rs index f602bf1..e45741f 100644 --- a/syndicate-server/examples/state-consumer.rs +++ b/syndicate-server/examples/state-consumer.rs @@ -27,7 +27,7 @@ async fn main() -> Result<(), Box> { let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); Actor::new().boot(syndicate::name!("state-consumer"), |t| { - relay::connect_stream(t, i, o, sturdyref, (), |_state, t, ds| { + relay::connect_stream(t, i, o, false, sturdyref, (), |_state, t, ds| { let consumer = { #[derive(Default)] struct State { diff --git a/syndicate-server/examples/state-producer.rs b/syndicate-server/examples/state-producer.rs index 805e993..ee13a5e 100644 --- a/syndicate-server/examples/state-producer.rs +++ b/syndicate-server/examples/state-producer.rs @@ -23,7 +23,7 @@ async fn main() -> Result<(), Box> { let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); Actor::new().boot(syndicate::name!("state-producer"), |t| { - relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| { + relay::connect_stream(t, i, o, false, sturdyref, (), move |_state, t, ds| { let account = Account::new(syndicate::name!("account")); t.linked_task(syndicate::name!("sender"), async move { let presence: AnyValue = Value::simple_record1( diff --git a/syndicate-server/protocols/schema-bundle.bin b/syndicate-server/protocols/schema-bundle.bin index e4157b2..2b85843 100644 --- a/syndicate-server/protocols/schema-bundle.bin +++ b/syndicate-server/protocols/schema-bundle.bin @@ -1,4 +1,4 @@ -´³bundle·µ³externalServices„´³schema·³version‘³ definitions·³Process´³orµµ±simple´³refµ„³ CommandLine„„µ±full´³refµ„³ FullProcess„„„„³Service´³refµ„³ DaemonService„³ClearEnv´³orµµ±present´³dict·³clearEnv´³named³clearEnv´³atom³Boolean„„„„„µ±invalid´³dict·³clearEnv´³named³clearEnv³any„„„„µ±absent´³dict·„„„„„³EnvValue´³orµµ±set´³atom³String„„µ±remove´³lit€„„µ±invalid³any„„„³Protocol´³orµµ±none´³lit³none„„µ± syndicate´³lit³ syndicate„„„„³ +´³bundle·µ³externalServices„´³schema·³version‘³ definitions·³Process´³orµµ±simple´³refµ„³ CommandLine„„µ±full´³refµ„³ FullProcess„„„„³Service´³refµ„³ DaemonService„³ClearEnv´³orµµ±present´³dict·³clearEnv´³named³clearEnv´³atom³Boolean„„„„„µ±invalid´³dict·³clearEnv´³named³clearEnv³any„„„„µ±absent´³dict·„„„„„³EnvValue´³orµµ±set´³atom³String„„µ±remove´³lit€„„µ±invalid³any„„„³Protocol´³orµµ±none´³lit³none„„µ±binarySyndicate´³lit³application/syndicate„„µ± textSyndicate´³lit³text/syndicate„„„„³ ProcessDir´³orµµ±present´³dict·³dir´³named³dir´³atom³String„„„„„µ±invalid´³dict·³dir´³named³dir³any„„„„µ±absent´³dict·„„„„„³ ProcessEnv´³orµµ±present´³dict·³env´³named³env´³dictof´³refµ„³ EnvVariable„´³refµ„³EnvValue„„„„„„µ±invalid´³dict·³env´³named³env³any„„„„µ±absent´³dict·„„„„„³ CommandLine´³orµµ±shell´³atom³String„„µ±full´³refµ„³FullCommandLine„„„„³ EnvVariable´³orµµ±string´³atom³String„„µ±symbol´³atom³Symbol„„µ±invalid³any„„„³ FullProcess´³andµ´³dict·³argv´³named³argv´³refµ„³ CommandLine„„„„´³named³env´³refµ„³ ProcessEnv„„´³named³dir´³refµ„³ diff --git a/syndicate-server/protocols/schemas/externalServices.prs b/syndicate-server/protocols/schemas/externalServices.prs index 82ee7cb..8ea6ade 100644 --- a/syndicate-server/protocols/schemas/externalServices.prs +++ b/syndicate-server/protocols/schemas/externalServices.prs @@ -45,7 +45,8 @@ RestartPolicy = Protocol = / ; stdin is /dev/null, output and error are logged =none -/ ; stdin and stdout are Syndicate-protocol channels - =syndicate +/ ; stdin and stdout are *binary* Syndicate-protocol channels + @binarySyndicate =application/syndicate +/ ; stdin and stdout are *text* Syndicate-protocol channels + @textSyndicate =text/syndicate . - diff --git a/syndicate-server/src/protocol.rs b/syndicate-server/src/protocol.rs index 267b6c1..d19606e 100644 --- a/syndicate-server/src/protocol.rs +++ b/syndicate-server/src/protocol.rs @@ -32,7 +32,7 @@ pub fn run_io_relay( ) -> ActorResult { let exit_listener = t.create(ExitListener); t.state.add_exit_hook(&exit_listener); - relay::TunnelRelay::run(t, i, o, Some(initial_ref), None); + relay::TunnelRelay::run(t, i, o, Some(initial_ref), None, false); Ok(()) } diff --git a/syndicate-server/src/services/daemon.rs b/syndicate-server/src/services/daemon.rs index b740f4e..e34cbbe 100644 --- a/syndicate-server/src/services/daemon.rs +++ b/syndicate-server/src/services/daemon.rs @@ -257,28 +257,13 @@ impl DaemonInstance { } match self.protocol { + Protocol::TextSyndicate => relay_facet(t, &mut child, true)?, + Protocol::BinarySyndicate => relay_facet(t, &mut child, false)?, Protocol::None => { if let Some(r) = child.stdout.take() { self.log(t, facet.clone(), pid, r, "stdout"); } } - Protocol::Syndicate => { - t.facet(|t| { - use syndicate::relay; - use syndicate::schemas::sturdy; - - let to_child = child.stdin.take().expect("pipe to child"); - let from_child = child.stdout.take().expect("pipe from child"); - let i = relay::Input::Bytes(Box::pin(from_child)); - let o = relay::Output::Bytes(Box::pin(to_child)); - - let cap = relay::TunnelRelay::run(t, i, o, None, Some(sturdy::Oid(0.into()))) - .ok_or("initial capability reference unavailable")?; - - tracing::info!(?cap); - Ok(()) - })?; - } } if self.announce_presumed_readiness { @@ -303,6 +288,25 @@ impl DaemonInstance { } } +fn relay_facet(t: &mut Activation, child: &mut process::Child, output_text: bool) -> ActorResult { + use syndicate::relay; + use syndicate::schemas::sturdy; + + let to_child = child.stdin.take().expect("pipe to child"); + let from_child = child.stdout.take().expect("pipe from child"); + + let i = relay::Input::Bytes(Box::pin(from_child)); + let o = relay::Output::Bytes(Box::pin(to_child)); + + t.facet(|t| { + let cap = relay::TunnelRelay::run(t, i, o, None, Some(sturdy::Oid(0.into())), output_text) + .ok_or("initial capability reference unavailable")?; + tracing::info!(?cap); + Ok(()) + })?; + Ok(()) +} + fn run( t: &mut Activation, config_ds: Arc, @@ -376,8 +380,10 @@ fn run( }; cmd.stdin(match &protocol { - Protocol::None => std::process::Stdio::null(), - Protocol::Syndicate => std::process::Stdio::piped(), + Protocol::None => + std::process::Stdio::null(), + Protocol::TextSyndicate | Protocol::BinarySyndicate => + std::process::Stdio::piped(), }); cmd.stdout(std::process::Stdio::piped()); cmd.stderr(std::process::Stdio::piped()); diff --git a/syndicate/src/relay.rs b/syndicate/src/relay.rs index 725757d..a0f3e88 100644 --- a/syndicate/src/relay.rs +++ b/syndicate/src/relay.rs @@ -178,6 +178,7 @@ pub fn connect_stream( t: &mut Activation, i: I, o: O, + output_text: bool, sturdyref: sturdy::SturdyRef, initial_state: E, mut f: F, @@ -189,7 +190,7 @@ pub fn connect_stream( { let i = Input::Bytes(Box::pin(i)); let o = Output::Bytes(Box::pin(o)); - let gatekeeper = TunnelRelay::run(t, i, o, None, Some(sturdy::Oid(0.into()))).unwrap(); + let gatekeeper = TunnelRelay::run(t, i, o, None, Some(sturdy::Oid(0.into())), output_text).unwrap(); let main_entity = t.create(during::entity(initial_state).on_asserted(move |state, t, a: AnyValue| { let denotation = a.value().to_embedded()?; f(state, t, Arc::clone(denotation)) @@ -219,6 +220,7 @@ impl TunnelRelay { o: Output, initial_ref: Option>, initial_oid: Option, + output_text: bool, ) -> Option> { let (output_tx, output_rx) = unbounded_channel(); let tr_ref = Arc::new(Mutex::new(None)); @@ -228,7 +230,7 @@ impl TunnelRelay { let mut tr = TunnelRelay { self_ref: Arc::clone(&tr_ref), output: output_tx, - output_text: false, + output_text, inbound_assertions: Map::new(), outbound_assertions: Map::new(), membranes: Membranes {