use std::io::Write; use std::os::unix::io::AsRawFd; use std::os::unix::io::FromRawFd; use std::os::unix::prelude::CommandExt; use std::sync::Arc; use syndicate::actor::Account; use syndicate::actor::Activation; use syndicate::actor::Actor; use syndicate::actor::ActorError; use syndicate::actor::ActorResult; use syndicate::actor::AnyValue; use syndicate::actor::Cap; use syndicate::actor::Field; use syndicate::actor::Handle; use syndicate::actor::LinkedTaskTermination; use syndicate::dataspace::Dataspace; use syndicate::enclose; use syndicate::relay; use syndicate::value::NestedValue; use syndicate::value::NoEmbeddedDomainCodec; use syndicate::value::Set; use syndicate::value::TextWriter; use syndicate::value::Value; use syndicate_macros::during; use syndicate_macros::on_message; use tokio::io::AsyncReadExt; mod schemas { include!(concat!(env!("OUT_DIR"), "/src/schemas/mod.rs")); } preserves_schema::define_language!(language(): Language { syndicate: syndicate::schemas::Language, main: crate::schemas::Language, }); use schemas::pty::{PtySession, PtySessionRunning, PtyOutput, PtySize}; fn stringify(v: &AnyValue) -> Result { match v.value() { Value::String(s) => Ok(s.clone()), Value::Symbol(s) => Ok(s.clone()), _ => Ok(TextWriter::encode(&mut NoEmbeddedDomainCodec, v)?), } } fn terminate_pid(t: &mut Activation, child_pid: &Field>) -> ActorResult { match std::mem::replace(t.get_mut(child_pid), None) { None => Ok(()), Some(pid) => { tracing::trace!(?pid, "SIGHUP"); unsafe { libc::kill(pid, libc::SIGHUP); } tokio::spawn(async move { let mut status: i32 = 0; tokio::time::sleep(std::time::Duration::from_secs(1)).await; if unsafe { libc::waitpid(pid, &mut status, libc::WNOHANG) } != 0 { tracing::trace!(?pid, ?status, "SIGHUP was effective"); return; } tracing::trace!(?pid, "SIGTERM"); unsafe { libc::kill(pid, libc::SIGTERM); } tokio::time::sleep(std::time::Duration::from_secs(2)).await; if unsafe { libc::waitpid(pid, &mut status, libc::WNOHANG) } != 0 { tracing::trace!(?pid, ?status, "SIGTERM was effective"); return; } tracing::trace!(?pid, "SIGKILL"); unsafe { libc::kill(pid, libc::SIGKILL); } unsafe { libc::waitpid(pid, &mut status, 0); } tracing::trace!(?pid, ?status, "exited"); }); Ok(()) } } } #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] struct TerminalSize { columns: u16, rows: u16, } fn pty_session(t: &mut Activation, local_ds: Arc, s: PtySession) -> ActorResult { let session_id = s.id.clone(); let mut pieces: Vec = Vec::new(); for piece_value in s.command_line.args.into_iter() { pieces.push(stringify(&piece_value)?); } let mut cmd = std::process::Command::new(stringify(&s.command_line.command)?); cmd.args(pieces); tracing::info!(?cmd); let fork = pty::fork::Fork::from_ptmx()?; match &fork { pty::prelude::Fork::Parent(child_pid, parent) => { tracing::info!(?child_pid); let parent_fd = parent.as_raw_fd(); let child = pty::fork::Slave::new(parent.ptsname()?)?; let child_fd = unsafe { libc::dup(child.as_raw_fd()) }; let mut parent_w: std::fs::File = unsafe { FromRawFd::from_raw_fd(libc::dup(parent_fd)) }; let mut parent_r: tokio::fs::File = unsafe { FromRawFd::from_raw_fd(libc::dup(parent_fd)) }; let child_pid = t.named_field("child_pid", Some(*child_pid)); let facet = t.facet_ref(); local_ds.assert(t, language(), &PtySessionRunning { id: session_id.clone(), }); t.on_stop(enclose!((child_pid) move |t| { tracing::info!("terminating child on facet stop"); terminate_pid(t, &child_pid) })); on_message!(t, local_ds, language(), >, |_t| { match body.value().as_bytestring() { Some(bytes) => { tracing::trace!(?bytes, "sending to child"); // TODO: use async here parent_w.write_all(&bytes[..])?; } None => tracing::info!(?body, "received unexpected item as pty-input"), } Ok(()) }); let sizes = t.named_field("sizes", Set::::new()); let size = t.named_field("size", TerminalSize { columns: 80, rows: 24 }); enclose!((sizes) during!( t, local_ds, language(), >, enclose!((sizes) move |t: &mut Activation| { let ts = TerminalSize { columns: columns.value().as_u16().unwrap_or(80), rows: rows.value().as_u16().unwrap_or(24), }; tracing::trace!(?ts, "size added"); t.get_mut(&sizes).insert(ts.clone()); t.on_stop(enclose!((sizes) move |t| { tracing::trace!(?ts, "size removed"); t.get_mut(&sizes).remove(&ts); Ok(()) })); Ok(()) }))); t.dataflow(enclose!((sizes, size) move |t| { let mut ts: Option = None; tracing::trace!(sizes=?t.get(&sizes), "candidate sizes"); for s in t.get(&sizes).iter() { ts = Some(ts.unwrap_or(s.clone()).min(s.clone())); } if let Some(s) = ts { if t.get(&size) != &s { *t.get_mut(&size) = s; } } Ok(()) }))?; let mut size_handle: Option = None; t.dataflow(enclose!((local_ds, session_id, size) move |t| { let TerminalSize { columns, rows } = t.get(&size).clone(); tracing::trace!(?columns, ?rows, "size change"); let result = unsafe { libc::ioctl(child_fd, libc::TIOCSWINSZ, &libc::winsize { ws_row: rows, ws_col: columns, ws_xpixel: 0, ws_ypixel: 0, }) }; if result == 0 { tracing::trace!(?columns, ?rows, "size changed"); local_ds.update(t, &mut size_handle, language(), Some(&PtySize { id: session_id.clone(), columns: columns.into(), rows: rows.into(), })); } else { tracing::error!(err=?std::io::Error::last_os_error(), ?columns, ?rows, ?child_fd, "size change failed"); } Ok(()) }))?; t.every(tokio::time::Duration::from_secs(1), enclose!((child_pid) move |t| { match t.get(&child_pid) { None => Ok(()), Some(pid) => { let mut status: i32 = 0; match unsafe { libc::waitpid(*pid, &mut status, libc::WNOHANG) } { 0 => Ok(()), // hasn't exited yet -1 => Err(std::io::Error::last_os_error())?, _ => { tracing::info!(?status, "child exited"); *t.get_mut(&child_pid) = None; t.stop(); Ok(()) } } } } }))?; t.linked_task(Some(AnyValue::symbol("reader")), async move { let read_account = Account::new(Some(AnyValue::symbol("reader_account")), None); let mut buf = [0; 1024]; loop { let n = parent_r.read(&mut buf[..]).await?; tracing::trace!(?n, buf=?&buf[0..n], "receiving from child"); if n == 0 { facet.activate(&read_account, None, |t| terminate_pid(t, &child_pid)); break; } else { if !facet.activate(&read_account, None, |t| { local_ds.message(t, language(), &PtyOutput { id: session_id.clone(), data: buf[0..n].to_vec(), }); Ok(()) }) { break; } } } return Ok(LinkedTaskTermination::Normal); }); Ok(()) } pty::prelude::Fork::Child(_child) => { Err(cmd.exec())? } } } #[tokio::main] async fn main() -> ActorResult { syndicate::convenient_logging()?; Actor::top(None, move |t| { let local_ds = Cap::new(&t.create(Dataspace::new(Some(AnyValue::symbol("pty"))))); relay::TunnelRelay::run(t, relay::Input::Bytes(Box::pin(tokio::io::stdin())), relay::Output::Bytes(Box::pin(tokio::io::stdout())), Some(local_ds.clone()), None, false); during!(t, local_ds, language(), $s: PtySession::, |t: &mut Activation| { t.spawn_link(Some(s.id.clone()), enclose!((local_ds) |t| { pty_session(t, local_ds, s) })); Ok(()) }); Ok(()) }).await??; Ok(()) }