Binary and text support

This commit is contained in:
Tony Garnock-Jones 2021-10-05 21:11:16 +02:00
parent f74bc2e069
commit 7117215963
10 changed files with 40 additions and 31 deletions

View File

@ -27,7 +27,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?;
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
Actor::new().boot(syndicate::name!("consumer"), |t| {
relay::connect_stream(t, i, o, sturdyref, (), |_state, t, ds| {
relay::connect_stream(t, i, o, false, sturdyref, (), |_state, t, ds| {
let consumer = syndicate::entity(0)
.on_message(|message_count, _t, m: AnyValue| {
if m.value().is_boolean() {

View File

@ -94,7 +94,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?;
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
Actor::new().boot(syndicate::name!("pingpong"), |t| {
relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| {
relay::connect_stream(t, i, o, false, sturdyref, (), move |_state, t, ds| {
let (send_label, recv_label, report_latency_every, should_echo, bytes_padding) =
match config.mode {

View File

@ -35,7 +35,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?;
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
Actor::new().boot(syndicate::name!("producer"), |t| {
relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| {
relay::connect_stream(t, i, o, false, sturdyref, (), move |_state, t, ds| {
let padding: AnyValue = Value::ByteString(vec![0; config.bytes_padding]).wrap();
let action_count = config.action_count;
let account = Account::new(syndicate::name!("account"));

View File

@ -27,7 +27,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?;
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
Actor::new().boot(syndicate::name!("state-consumer"), |t| {
relay::connect_stream(t, i, o, sturdyref, (), |_state, t, ds| {
relay::connect_stream(t, i, o, false, sturdyref, (), |_state, t, ds| {
let consumer = {
#[derive(Default)]
struct State {

View File

@ -23,7 +23,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?;
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
Actor::new().boot(syndicate::name!("state-producer"), |t| {
relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| {
relay::connect_stream(t, i, o, false, sturdyref, (), move |_state, t, ds| {
let account = Account::new(syndicate::name!("account"));
t.linked_task(syndicate::name!("sender"), async move {
let presence: AnyValue = Value::simple_record1(

View File

@ -1,4 +1,4 @@
´³bundle·µ³externalServices„´³schema·³version³ definitions·³Process´³orµµ±simple´³refµ„³ CommandLine„„µ±full´³refµ„³ FullProcess„„„„³Service´³refµ„³ DaemonService„³ClearEnv´³orµµ±present´³dict·³clearEnv´³named³clearEnv´³atom³Boolean„„„„„µ±invalid´³dict·³clearEnv´³named³clearEnv³any„„„„µ±absent´³dict·„„„„„³EnvValue´³orµµ±set´³atom³String„„µ±remove´³lit€„„µ±invalid³any„„„³Protocol´³orµµ±none´³lit³none„„µ± syndicate´³lit³ syndicate„„„„³
´³bundle·µ³externalServices„´³schema·³version³ definitions·³Process´³orµµ±simple´³refµ„³ CommandLine„„µ±full´³refµ„³ FullProcess„„„„³Service´³refµ„³ DaemonService„³ClearEnv´³orµµ±present´³dict·³clearEnv´³named³clearEnv´³atom³Boolean„„„„„µ±invalid´³dict·³clearEnv´³named³clearEnv³any„„„„µ±absent´³dict·„„„„„³EnvValue´³orµµ±set´³atom³String„„µ±remove´³lit€„„µ±invalid³any„„„³Protocol´³orµµ±none´³lit³none„„µ±binarySyndicate´³lit³application/syndicate„„µ± textSyndicate´³lit³text/syndicate„„„„³
ProcessDir´³orµµ±present´³dict·³dir´³named³dir´³atom³String„„„„„µ±invalid´³dict·³dir´³named³dir³any„„„„µ±absent´³dict·„„„„„³
ProcessEnv´³orµµ±present´³dict·³env´³named³env´³dictof´³refµ„³ EnvVariable„´³refµ„³EnvValue„„„„„„µ±invalid´³dict·³env´³named³env³any„„„„µ±absent´³dict·„„„„„³ CommandLine´³orµµ±shell´³atom³String„„µ±full´³refµ„³FullCommandLine„„„„³ EnvVariable´³orµµ±string´³atom³String„„µ±symbol´³atom³Symbol„„µ±invalid³any„„„³ FullProcess´³andµ´³dict·³argv´³named³argv´³refµ„³ CommandLine„„„„´³named³env´³refµ„³
ProcessEnv„„´³named³dir´³refµ„³

View File

@ -45,7 +45,8 @@ RestartPolicy =
Protocol =
/ ; stdin is /dev/null, output and error are logged
=none
/ ; stdin and stdout are Syndicate-protocol channels
=syndicate
/ ; stdin and stdout are *binary* Syndicate-protocol channels
@binarySyndicate =application/syndicate
/ ; stdin and stdout are *text* Syndicate-protocol channels
@textSyndicate =text/syndicate
.

View File

@ -32,7 +32,7 @@ pub fn run_io_relay(
) -> ActorResult {
let exit_listener = t.create(ExitListener);
t.state.add_exit_hook(&exit_listener);
relay::TunnelRelay::run(t, i, o, Some(initial_ref), None);
relay::TunnelRelay::run(t, i, o, Some(initial_ref), None, false);
Ok(())
}

View File

@ -257,28 +257,13 @@ impl DaemonInstance {
}
match self.protocol {
Protocol::TextSyndicate => relay_facet(t, &mut child, true)?,
Protocol::BinarySyndicate => relay_facet(t, &mut child, false)?,
Protocol::None => {
if let Some(r) = child.stdout.take() {
self.log(t, facet.clone(), pid, r, "stdout");
}
}
Protocol::Syndicate => {
t.facet(|t| {
use syndicate::relay;
use syndicate::schemas::sturdy;
let to_child = child.stdin.take().expect("pipe to child");
let from_child = child.stdout.take().expect("pipe from child");
let i = relay::Input::Bytes(Box::pin(from_child));
let o = relay::Output::Bytes(Box::pin(to_child));
let cap = relay::TunnelRelay::run(t, i, o, None, Some(sturdy::Oid(0.into())))
.ok_or("initial capability reference unavailable")?;
tracing::info!(?cap);
Ok(())
})?;
}
}
if self.announce_presumed_readiness {
@ -303,6 +288,25 @@ impl DaemonInstance {
}
}
fn relay_facet(t: &mut Activation, child: &mut process::Child, output_text: bool) -> ActorResult {
use syndicate::relay;
use syndicate::schemas::sturdy;
let to_child = child.stdin.take().expect("pipe to child");
let from_child = child.stdout.take().expect("pipe from child");
let i = relay::Input::Bytes(Box::pin(from_child));
let o = relay::Output::Bytes(Box::pin(to_child));
t.facet(|t| {
let cap = relay::TunnelRelay::run(t, i, o, None, Some(sturdy::Oid(0.into())), output_text)
.ok_or("initial capability reference unavailable")?;
tracing::info!(?cap);
Ok(())
})?;
Ok(())
}
fn run(
t: &mut Activation,
config_ds: Arc<Cap>,
@ -376,8 +380,10 @@ fn run(
};
cmd.stdin(match &protocol {
Protocol::None => std::process::Stdio::null(),
Protocol::Syndicate => std::process::Stdio::piped(),
Protocol::None =>
std::process::Stdio::null(),
Protocol::TextSyndicate | Protocol::BinarySyndicate =>
std::process::Stdio::piped(),
});
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::piped());

View File

@ -178,6 +178,7 @@ pub fn connect_stream<I, O, E, F>(
t: &mut Activation,
i: I,
o: O,
output_text: bool,
sturdyref: sturdy::SturdyRef,
initial_state: E,
mut f: F,
@ -189,7 +190,7 @@ pub fn connect_stream<I, O, E, F>(
{
let i = Input::Bytes(Box::pin(i));
let o = Output::Bytes(Box::pin(o));
let gatekeeper = TunnelRelay::run(t, i, o, None, Some(sturdy::Oid(0.into()))).unwrap();
let gatekeeper = TunnelRelay::run(t, i, o, None, Some(sturdy::Oid(0.into())), output_text).unwrap();
let main_entity = t.create(during::entity(initial_state).on_asserted(move |state, t, a: AnyValue| {
let denotation = a.value().to_embedded()?;
f(state, t, Arc::clone(denotation))
@ -219,6 +220,7 @@ impl TunnelRelay {
o: Output,
initial_ref: Option<Arc<Cap>>,
initial_oid: Option<sturdy::Oid>,
output_text: bool,
) -> Option<Arc<Cap>> {
let (output_tx, output_rx) = unbounded_channel();
let tr_ref = Arc::new(Mutex::new(None));
@ -228,7 +230,7 @@ impl TunnelRelay {
let mut tr = TunnelRelay {
self_ref: Arc::clone(&tr_ref),
output: output_tx,
output_text: false,
output_text,
inbound_assertions: Map::new(),
outbound_assertions: Map::new(),
membranes: Membranes {