syndicate-pty-driver/src/main.rs

137 lines
4.8 KiB
Rust

use std::io::Write;
use std::os::unix::io::AsRawFd;
use std::os::unix::io::FromRawFd;
use std::os::unix::prelude::CommandExt;
use std::sync::Arc;
use syndicate::actor::Account;
use syndicate::actor::Activation;
use syndicate::actor::Actor;
use syndicate::actor::ActorError;
use syndicate::actor::ActorResult;
use syndicate::actor::AnyValue;
use syndicate::actor::Cap;
use syndicate::actor::LinkedTaskTermination;
use syndicate::dataspace::Dataspace;
use syndicate::enclose;
use syndicate::relay;
use syndicate::value::NestedValue;
use syndicate::value::NoEmbeddedDomainCodec;
use syndicate::value::TextWriter;
use syndicate_macros::during;
use syndicate_macros::on_message;
use tokio::io::AsyncReadExt;
mod schemas {
include!(concat!(env!("OUT_DIR"), "/src/schemas/mod.rs"));
}
preserves_schema::define_language!(language(): Language<AnyValue> {
syndicate: syndicate::schemas::Language,
main: crate::schemas::Language,
});
use schemas::pty::{PtySession, PtySessionRunning, Chunk, PtyOutput};
fn stringify(v: &AnyValue) -> Result<String, ActorError> {
Ok(TextWriter::encode(&mut NoEmbeddedDomainCodec, v)?)
}
fn pty_session(t: &mut Activation, local_ds: Arc<Cap>, s: PtySession<AnyValue>) -> ActorResult {
let mut pieces: Vec<String> = Vec::new();
for piece_value in s.command_line.args.into_iter() {
pieces.push(stringify(&piece_value)?);
}
let mut cmd = std::process::Command::new(stringify(&s.command_line.command)?);
cmd.args(pieces);
tracing::info!(?cmd);
let fork = pty::fork::Fork::from_ptmx()?;
match &fork {
pty::prelude::Fork::Parent(child_pid, parent) => {
tracing::info!(?child_pid);
let mut parent_w: std::fs::File = unsafe { FromRawFd::from_raw_fd(libc::dup(parent.as_raw_fd())) };
let mut parent_r: tokio::fs::File = unsafe { FromRawFd::from_raw_fd(libc::dup(parent.as_raw_fd())) };
let child_pid = *child_pid;
let facet = t.facet.clone();
local_ds.assert(t, language(), &PtySessionRunning {
id: s.id.clone(),
});
on_message!(t, local_ds, language(), <pty-input #(&s.id) $body: Chunk>, |_t| {
tracing::trace!(?body, "sending to child");
parent_w.write_all(&body.0[..])?;
Ok(())
});
t.linked_task(Some(AnyValue::symbol("waiter")), async move {
let mut status: i32 = 0;
loop {
match unsafe { libc::waitpid(child_pid, &mut status, libc::WNOHANG) } {
0 => tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await,
-1 => Err(std::io::Error::last_os_error())?,
_ => break,
}
}
tracing::info!(?status, "child exited");
Ok(LinkedTaskTermination::Normal)
});
t.linked_task(Some(AnyValue::symbol("reader")), async move {
let read_account = Account::new(Some(AnyValue::symbol("reader_account")), None);
let mut buf = [0; 1024];
loop {
let n = parent_r.read(&mut buf[..]).await?;
tracing::trace!(?n, buf=?&buf[0..n], "receiving from child");
if n == 0 || !facet.activate(&read_account, None, |t| {
local_ds.message(t, language(), &PtyOutput {
id: s.id.clone(),
data: Chunk(buf[0..n].to_vec()),
});
Ok(())
}) {
unsafe {
libc::kill(child_pid, libc::SIGTERM);
}
return Ok(LinkedTaskTermination::Normal);
}
}
});
Ok(())
}
pty::prelude::Fork::Child(_child) => {
Err(cmd.exec())?
}
}
}
#[tokio::main]
async fn main() -> ActorResult {
syndicate::convenient_logging()?;
Actor::top(None, move |t| {
let local_ds = Cap::new(&t.create(Dataspace::new(Some(AnyValue::symbol("pty")))));
relay::TunnelRelay::run(t,
relay::Input::Bytes(Box::pin(tokio::io::stdin())),
relay::Output::Bytes(Box::pin(tokio::io::stdout())),
Some(local_ds.clone()),
None,
false);
during!(t, local_ds, language(), $s: PtySession::<AnyValue>, |t: &mut Activation| {
t.spawn_link(Some(s.id.clone()), enclose!((local_ds) |t| {
pty_session(t, local_ds, s)
}));
Ok(())
});
Ok(())
}).await??;
Ok(())
}