From c7c0173c8b3f39633785ea1873ce4997d6485ada Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 20 Nov 2023 20:43:11 +0100 Subject: [PATCH] Fix termination; add pty resizing --- protocols/schema-bundle.bin | 2 +- protocols/schemas/pty.prs | 3 + src/main.rs | 125 ++++++++++++++++++++++++++++-------- 3 files changed, 104 insertions(+), 26 deletions(-) diff --git a/protocols/schema-bundle.bin b/protocols/schema-bundle.bin index 55e7dcc..68a1bd2 100644 --- a/protocols/schema-bundle.bin +++ b/protocols/schema-bundle.bin @@ -1,4 +1,4 @@ ´³bundle·µ³pty„´³schema·³version°³ definitions·³Chunk´³atom³ -ByteString„³PtyInput´³rec´³lit³ pty-input„´³tupleµ´³named³id³any„´³named³data´³refµ„³Chunk„„„„„³ PtyOutput´³rec´³lit³ +ByteString„³PtySize´³rec´³lit³pty-size„´³tupleµ´³named³id³any„´³named³columns´³atom³ SignedInteger„„´³named³rows´³atom³ SignedInteger„„„„„³PtyInput´³rec´³lit³ pty-input„´³tupleµ´³named³id³any„´³named³data´³refµ„³Chunk„„„„„³ PtyOutput´³rec´³lit³ pty-output„´³tupleµ´³named³id³any„´³named³data´³refµ„³Chunk„„„„„³ PtySession´³rec´³lit³ pty-session„´³tupleµ´³named³id³any„´³named³ commandLine´³refµ„³ CommandLine„„„„„³ CommandLine´³ tuplePrefixµ´³named³command³any„„´³named³args´³seqof³any„„„³PtySessionRunning´³rec´³lit³pty-session-running„´³tupleµ´³named³id³any„„„„„³ embeddedType€„„„„ \ No newline at end of file diff --git a/protocols/schemas/pty.prs b/protocols/schemas/pty.prs index 510dbca..85edf8a 100644 --- a/protocols/schemas/pty.prs +++ b/protocols/schemas/pty.prs @@ -5,6 +5,9 @@ CommandLine = [@command any @args any ...] . PtySessionRunning = . +# Message, driver interprets it as a request to execute TIOCSWINSZ +PtySize = . + # To the subprocess PtyInput = . diff --git a/src/main.rs b/src/main.rs index eea1507..403f5ea 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,14 +11,15 @@ use syndicate::actor::ActorError; use syndicate::actor::ActorResult; use syndicate::actor::AnyValue; use syndicate::actor::Cap; +use syndicate::actor::Field; use syndicate::actor::LinkedTaskTermination; use syndicate::dataspace::Dataspace; use syndicate::enclose; use syndicate::relay; use syndicate::value::NestedValue; - use syndicate::value::NoEmbeddedDomainCodec; use syndicate::value::TextWriter; +use syndicate::value::Value; use syndicate_macros::during; use syndicate_macros::on_message; @@ -36,7 +37,46 @@ preserves_schema::define_language!(language(): Language { use schemas::pty::{PtySession, PtySessionRunning, Chunk, PtyOutput}; fn stringify(v: &AnyValue) -> Result { - Ok(TextWriter::encode(&mut NoEmbeddedDomainCodec, v)?) + match v.value() { + Value::String(s) => Ok(s.clone()), + Value::Symbol(s) => Ok(s.clone()), + _ => Ok(TextWriter::encode(&mut NoEmbeddedDomainCodec, v)?), + } +} + +fn terminate_pid(t: &mut Activation, child_pid: &Field>) -> ActorResult { + match std::mem::replace(t.get_mut(child_pid), None) { + None => Ok(()), + Some(pid) => { + tracing::trace!(?pid, "SIGHUP"); + unsafe { libc::kill(pid, libc::SIGHUP); } + tokio::spawn(async move { + let mut status: i32 = 0; + + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + if unsafe { libc::waitpid(pid, &mut status, libc::WNOHANG) } != 0 { + tracing::trace!(?pid, ?status, "SIGHUP was effective"); + return; + } + + tracing::trace!(?pid, "SIGTERM"); + unsafe { libc::kill(pid, libc::SIGTERM); } + + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + if unsafe { libc::waitpid(pid, &mut status, libc::WNOHANG) } != 0 { + tracing::trace!(?pid, ?status, "SIGTERM was effective"); + return; + } + + tracing::trace!(?pid, "SIGKILL"); + unsafe { libc::kill(pid, libc::SIGKILL); } + + unsafe { libc::waitpid(pid, &mut status, 0); } + tracing::trace!(?pid, ?status, "exited"); + }); + Ok(()) + } + } } fn pty_session(t: &mut Activation, local_ds: Arc, s: PtySession) -> ActorResult { @@ -52,15 +92,23 @@ fn pty_session(t: &mut Activation, local_ds: Arc, s: PtySession) match &fork { pty::prelude::Fork::Parent(child_pid, parent) => { tracing::info!(?child_pid); - let mut parent_w: std::fs::File = unsafe { FromRawFd::from_raw_fd(libc::dup(parent.as_raw_fd())) }; - let mut parent_r: tokio::fs::File = unsafe { FromRawFd::from_raw_fd(libc::dup(parent.as_raw_fd())) }; - let child_pid = *child_pid; + let parent_fd = parent.as_raw_fd(); + let child = pty::fork::Slave::new(parent.ptsname()?)?; + let child_fd = unsafe { libc::dup(child.as_raw_fd()) }; + let mut parent_w: std::fs::File = unsafe { FromRawFd::from_raw_fd(libc::dup(parent_fd)) }; + let mut parent_r: tokio::fs::File = unsafe { FromRawFd::from_raw_fd(libc::dup(parent_fd)) }; + let child_pid = t.named_field("child_pid", Some(*child_pid)); let facet = t.facet.clone(); local_ds.assert(t, language(), &PtySessionRunning { id: s.id.clone(), }); + t.on_stop(enclose!((child_pid) move |t| { + tracing::info!("terminating child on facet stop"); + terminate_pid(t, &child_pid) + })); + on_message!(t, local_ds, language(), , |_t| { tracing::trace!(?body, "sending to child"); // TODO: use async here @@ -68,18 +116,42 @@ fn pty_session(t: &mut Activation, local_ds: Arc, s: PtySession) Ok(()) }); - t.linked_task(Some(AnyValue::symbol("waiter")), async move { - let mut status: i32 = 0; - loop { - match unsafe { libc::waitpid(child_pid, &mut status, libc::WNOHANG) } { - 0 => tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await, - -1 => Err(std::io::Error::last_os_error())?, - _ => break, + on_message!(t, local_ds, language(), , |_t| { + tracing::trace!(?columns, ?rows, "size change"); + let result = unsafe { + libc::ioctl(child_fd, libc::TIOCSWINSZ, &libc::winsize { + ws_row: rows.value().as_u16().unwrap_or(24), + ws_col: columns.value().as_u16().unwrap_or(80), + ws_xpixel: 0, + ws_ypixel: 0, + }) + }; + if result == 0 { + tracing::trace!(?columns, ?rows, "size changed"); + } else { + tracing::error!(err=?std::io::Error::last_os_error(), ?columns, ?rows, ?child_fd, "size change failed"); + } + Ok(()) + }); + + t.every(tokio::time::Duration::from_secs(1), enclose!((child_pid) move |t| { + match t.get(&child_pid) { + None => Ok(()), + Some(pid) => { + let mut status: i32 = 0; + match unsafe { libc::waitpid(*pid, &mut status, libc::WNOHANG) } { + 0 => Ok(()), // hasn't exited yet + -1 => Err(std::io::Error::last_os_error())?, + _ => { + tracing::info!(?status, "child exited"); + *t.get_mut(&child_pid) = None; + t.stop(); + Ok(()) + } + } } } - tracing::info!(?status, "child exited"); - Ok(LinkedTaskTermination::Normal) - }); + }))?; t.linked_task(Some(AnyValue::symbol("reader")), async move { let read_account = Account::new(Some(AnyValue::symbol("reader_account")), None); @@ -87,19 +159,22 @@ fn pty_session(t: &mut Activation, local_ds: Arc, s: PtySession) loop { let n = parent_r.read(&mut buf[..]).await?; tracing::trace!(?n, buf=?&buf[0..n], "receiving from child"); - if n == 0 || !facet.activate(&read_account, None, |t| { - local_ds.message(t, language(), &PtyOutput { - id: s.id.clone(), - data: Chunk(buf[0..n].to_vec()), - }); - Ok(()) - }) { - unsafe { - libc::kill(child_pid, libc::SIGTERM); + if n == 0 { + facet.activate(&read_account, None, |t| terminate_pid(t, &child_pid)); + break; + } else { + if !facet.activate(&read_account, None, |t| { + local_ds.message(t, language(), &PtyOutput { + id: s.id.clone(), + data: Chunk(buf[0..n].to_vec()), + }); + Ok(()) + }) { + break; } - return Ok(LinkedTaskTermination::Normal); } } + return Ok(LinkedTaskTermination::Normal); }); Ok(()) }