Better handling of activation after termination, which repairs a scary-looking-but-harmless panic in config_watcher's private thread

This commit is contained in:
Tony Garnock-Jones 2022-01-16 00:02:33 +01:00
parent a37a2739a0
commit 3d3c1ebf70
9 changed files with 157 additions and 115 deletions

View File

@ -9,7 +9,7 @@ use syndicate::value::NestedValue;
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
syndicate::convenient_logging()?; syndicate::convenient_logging()?;
Actor::new(None).boot(tracing::Span::current(), |t| { Actor::new(None).boot(tracing::Span::current(), |t| {
let ds = Cap::new(&t.create(Dataspace::new())); let ds = Cap::new(&t.create(Dataspace::new(None)));
let _ = t.prevent_inert_check(); let _ = t.prevent_inert_check();
t.spawn(syndicate::name!("box"), enclose!((ds) move |t| { t.spawn(syndicate::name!("box"), enclose!((ds) move |t| {

View File

@ -41,9 +41,9 @@ pub fn run_connection(
i: relay::Input, i: relay::Input,
o: relay::Output, o: relay::Output,
initial_ref: Arc<Cap>, initial_ref: Arc<Cap>,
) -> ActorResult { ) {
facet.activate(Account::new(syndicate::name!("start-session")), facet.activate(Account::new(syndicate::name!("start-session")),
|t| run_io_relay(t, i, o, initial_ref)) |t| run_io_relay(t, i, o, initial_ref));
} }
pub async fn detect_protocol( pub async fn detect_protocol(
@ -76,7 +76,8 @@ pub async fn detect_protocol(
_ => unreachable!() _ => unreachable!()
} }
}; };
run_connection(facet, i, o, gateway) run_connection(facet, i, o, gateway);
Ok(())
} }
fn message_error<E: std::fmt::Display>(e: E) -> Error { fn message_error<E: std::fmt::Display>(e: E) -> Error {

View File

@ -181,14 +181,17 @@ fn run(
let mut path_state: Map<PathBuf, FacetId> = Map::new(); let mut path_state: Map<PathBuf, FacetId> = Map::new();
{ if !facet.activate(
facet.activate(Account::new(syndicate::name!("initial_scan")), |t| { Account::new(syndicate::name!("initial_scan")),
|t| {
initial_scan(t, &mut path_state, &config_ds, env.clone()); initial_scan(t, &mut path_state, &config_ds, env.clone());
config_ds.assert(t, language(), &lifecycle::ready(&spec)); config_ds.assert(t, language(), &lifecycle::ready(&spec));
Ok(()) Ok(())
}).unwrap(); })
tracing::trace!("initial_scan complete"); {
return;
} }
tracing::trace!("initial_scan complete");
let mut rescan = |paths: Vec<PathBuf>| { let mut rescan = |paths: Vec<PathBuf>| {
facet.activate(Account::new(syndicate::name!("rescan")), |t| { facet.activate(Account::new(syndicate::name!("rescan")), |t| {
@ -209,15 +212,15 @@ fn run(
t.stop_facet(facet_id); t.stop_facet(facet_id);
} }
Ok(()) Ok(())
}).unwrap() })
}; };
while let Ok(event) = rx.recv() { while let Ok(event) = rx.recv() {
tracing::trace!("notification: {:?}", &event); tracing::trace!("notification: {:?}", &event);
match event { let keep_running = match event {
DebouncedEvent::NoticeWrite(_p) | DebouncedEvent::NoticeWrite(_p) |
DebouncedEvent::NoticeRemove(_p) => DebouncedEvent::NoticeRemove(_p) =>
(), true,
DebouncedEvent::Create(p) | DebouncedEvent::Create(p) |
DebouncedEvent::Write(p) | DebouncedEvent::Write(p) |
DebouncedEvent::Chmod(p) | DebouncedEvent::Chmod(p) |
@ -225,12 +228,15 @@ fn run(
rescan(vec![p]), rescan(vec![p]),
DebouncedEvent::Rename(p, q) => DebouncedEvent::Rename(p, q) =>
rescan(vec![p, q]), rescan(vec![p, q]),
_ => _ => {
tracing::info!("{:?}", event), tracing::info!("{:?}", event);
} true
}
};
if !keep_running { break; }
} }
let _ = facet.activate(Account::new(syndicate::name!("termination")), |t| { facet.activate(Account::new(syndicate::name!("termination")), |t| {
tracing::trace!("linked thread terminating associated facet"); tracing::trace!("linked thread terminating associated facet");
Ok(t.stop()) Ok(t.stop())
}); });

View File

@ -249,7 +249,7 @@ impl DaemonInstance {
Err(_) => AnyValue::bytestring(buf), Err(_) => AnyValue::bytestring(buf),
}; };
let now = AnyValue::new(chrono::Utc::now().to_rfc3339()); let now = AnyValue::new(chrono::Utc::now().to_rfc3339());
if facet.activate( if !facet.activate(
Account::new(tracing::Span::current()), Account::new(tracing::Span::current()),
enclose!((pid, service, kind) |t| { enclose!((pid, service, kind) |t| {
log_ds.message(t, &(), &syndicate_macros::template!( log_ds.message(t, &(), &syndicate_macros::template!(
@ -260,7 +260,7 @@ impl DaemonInstance {
line: =buf, line: =buf,
}>")); }>"));
Ok(()) Ok(())
})).is_err() }))
{ {
break; break;
} }
@ -313,7 +313,7 @@ impl DaemonInstance {
facet.activate(Account::new(syndicate::name!("instance-terminated")), |t| { facet.activate(Account::new(syndicate::name!("instance-terminated")), |t| {
let m = if status.success() { None } else { Some(format!("{}", status)) }; let m = if status.success() { None } else { Some(format!("{}", status)) };
self.handle_exit(t, m) self.handle_exit(t, m)
})?; });
Ok(LinkedTaskTermination::Normal) Ok(LinkedTaskTermination::Normal)
})); }));
Ok(()) Ok(())
@ -443,7 +443,7 @@ fn run(
facet.activate(Account::new(syndicate::name!("instance-startup")), |t| { facet.activate(Account::new(syndicate::name!("instance-startup")), |t| {
daemon_instance.start(t) daemon_instance.start(t)
})?; });
Ok(LinkedTaskTermination::KeepFacet) Ok(LinkedTaskTermination::KeepFacet)
}); });
Ok(()) Ok(())

View File

@ -36,27 +36,38 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: TcpRelayListener) -> ActorResult
t.linked_task(syndicate::name!("listener"), async move { t.linked_task(syndicate::name!("listener"), async move {
let listen_addr = format!("{}:{}", host, port); let listen_addr = format!("{}:{}", host, port);
let listener = TcpListener::bind(listen_addr).await?; let listener = TcpListener::bind(listen_addr).await?;
facet.activate(Account::new(syndicate::name!("readiness")), |t| {
tracing::info!("listening"); if !facet.activate(
ds.assert(t, language(), &lifecycle::ready(&spec)); Account::new(syndicate::name!("readiness")), |t| {
Ok(()) tracing::info!("listening");
})?; ds.assert(t, language(), &lifecycle::ready(&spec));
Ok(())
})
{
return Ok(LinkedTaskTermination::Normal);
}
loop { loop {
let (stream, addr) = listener.accept().await?; let (stream, addr) = listener.accept().await?;
let gatekeeper = spec.gatekeeper.clone(); let gatekeeper = spec.gatekeeper.clone();
let name = syndicate::name!(parent: parent_span.clone(), "conn"); let name = syndicate::name!(parent: parent_span.clone(), "conn");
facet.activate(Account::new(name.clone()), move |t| { if !facet.activate(
t.spawn(name, move |t| { Account::new(name.clone()),
Ok(t.linked_task(tracing::Span::current(), { move |t| {
let facet = t.facet.clone(); t.spawn(name, move |t| {
async move { Ok(t.linked_task(tracing::Span::current(), {
detect_protocol(facet, stream, gatekeeper, addr).await?; let facet = t.facet.clone();
Ok(LinkedTaskTermination::KeepFacet) async move {
} detect_protocol(facet, stream, gatekeeper, addr).await?;
})) Ok(LinkedTaskTermination::KeepFacet)
}); }
Ok(()) }))
})?; });
Ok(())
})
{
return Ok(LinkedTaskTermination::Normal);
}
} }
}); });
Ok(()) Ok(())

View File

@ -38,11 +38,17 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: UnixRelayListener) -> ActorResult
let facet = t.facet.clone(); let facet = t.facet.clone();
t.linked_task(syndicate::name!("listener"), async move { t.linked_task(syndicate::name!("listener"), async move {
let listener = bind_unix_listener(&PathBuf::from(path_str)).await?; let listener = bind_unix_listener(&PathBuf::from(path_str)).await?;
facet.activate(Account::new(syndicate::name!("readiness")), |t| {
tracing::info!("listening"); if !facet.activate(
ds.assert(t, language(), &lifecycle::ready(&spec)); Account::new(syndicate::name!("readiness")), |t| {
Ok(()) tracing::info!("listening");
})?; ds.assert(t, language(), &lifecycle::ready(&spec));
Ok(())
})
{
return Ok(LinkedTaskTermination::Normal);
}
loop { loop {
let (stream, _addr) = listener.accept().await?; let (stream, _addr) = listener.accept().await?;
let peer = stream.peer_cred()?; let peer = stream.peer_cred()?;
@ -50,23 +56,28 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: UnixRelayListener) -> ActorResult
let name = syndicate::name!(parent: parent_span.clone(), "conn", let name = syndicate::name!(parent: parent_span.clone(), "conn",
pid = ?peer.pid().unwrap_or(-1), pid = ?peer.pid().unwrap_or(-1),
uid = peer.uid()); uid = peer.uid());
facet.activate(Account::new(name.clone()), move |t| { if !facet.activate(
t.spawn(name, |t| { Account::new(name.clone()),
Ok(t.linked_task(tracing::Span::current(), { move |t| {
let facet = t.facet.clone(); t.spawn(name, |t| {
async move { Ok(t.linked_task(tracing::Span::current(), {
tracing::info!(protocol = %"unix"); let facet = t.facet.clone();
let (i, o) = stream.into_split(); async move {
run_connection(facet, tracing::info!(protocol = %"unix");
relay::Input::Bytes(Box::pin(i)), let (i, o) = stream.into_split();
relay::Output::Bytes(Box::pin(o)), run_connection(facet,
gatekeeper)?; relay::Input::Bytes(Box::pin(i)),
Ok(LinkedTaskTermination::KeepFacet) relay::Output::Bytes(Box::pin(o)),
} gatekeeper);
})) Ok(LinkedTaskTermination::KeepFacet)
}); }
Ok(()) }))
})?; });
Ok(())
})
{
return Ok(LinkedTaskTermination::Normal);
}
} }
}); });
Ok(()) Ok(())

View File

@ -53,7 +53,7 @@ pub fn bench_pub(c: &mut Criterion) {
let start = Instant::now(); let start = Instant::now();
rt.block_on(async move { rt.block_on(async move {
Actor::new(None).boot(syndicate::name!("dataspace"), move |t| { Actor::new(None).boot(syndicate::name!("dataspace"), move |t| {
let ds = t.create(Dataspace::new()); let ds = t.create(Dataspace::new(None));
let shutdown = t.create(ShutdownEntity); let shutdown = t.create(ShutdownEntity);
let account = Account::new(syndicate::name!("sender-account")); let account = Account::new(syndicate::name!("sender-account"));
t.linked_task(syndicate::name!("sender"), async move { t.linked_task(syndicate::name!("sender"), async move {
@ -88,7 +88,7 @@ pub fn bench_pub(c: &mut Criterion) {
let turn_count = Arc::clone(&turn_count); let turn_count = Arc::clone(&turn_count);
move |t| { move |t| {
let ds = Cap::new(&t.create(Dataspace::new())); let ds = Cap::new(&t.create(Dataspace::new(None)));
let shutdown = entity(()) let shutdown = entity(())
.on_asserted(|_, _, _| Ok(Some(Box::new(|_, t| Ok(t.stop()))))) .on_asserted(|_, _, _| Ok(Some(Box::new(|_, t| Ok(t.stop())))))

View File

@ -7,7 +7,6 @@
use super::dataflow::Graph; use super::dataflow::Graph;
use super::error::Error; use super::error::Error;
use super::error::encode_error;
use super::error::error; use super::error::error;
use super::rewrite::CaveatError; use super::rewrite::CaveatError;
use super::rewrite::CheckedCaveat; use super::rewrite::CheckedCaveat;
@ -556,12 +555,14 @@ impl FacetRef {
/// [commits the turn][Activation::deliver] and performs the buffered actions; otherwise, /// [commits the turn][Activation::deliver] and performs the buffered actions; otherwise,
/// [abandons the turn][Activation::clear] and discards the buffered actions. /// [abandons the turn][Activation::clear] and discards the buffered actions.
/// ///
/// Returns `true` if, at the end of the activation, `actor` had not yet terminated.
///
/// Bills any activity to `account`. /// Bills any activity to `account`.
pub fn activate<F>( pub fn activate<F>(
&self, &self,
account: Arc<Account>, account: Arc<Account>,
f: F, f: F,
) -> ActorResult where ) -> bool where
F: FnOnce(&mut Activation) -> ActorResult, F: FnOnce(&mut Activation) -> ActorResult,
{ {
self.activate_exit(account, |t| f(t).into()) self.activate_exit(account, |t| f(t).into())
@ -572,25 +573,26 @@ impl FacetRef {
/// returns `None`, leaves `actor` in runnable state. [Commits buffered /// returns `None`, leaves `actor` in runnable state. [Commits buffered
/// actions][Activation::deliver] unless `actor` terminates with an `Err` status. /// actions][Activation::deliver] unless `actor` terminates with an `Err` status.
/// ///
/// Returns `true` if, at the end of the activation, `actor` had not yet terminated.
///
/// Bills any activity to `account`. /// Bills any activity to `account`.
pub fn activate_exit<F>( pub fn activate_exit<F>(
&self, &self,
account: Arc<Account>, account: Arc<Account>,
f: F, f: F,
) -> ActorResult where ) -> bool where
F: FnOnce(&mut Activation) -> RunDisposition, F: FnOnce(&mut Activation) -> RunDisposition,
{ {
let mut g = self.actor.state.lock(); let mut g = self.actor.state.lock();
match &mut *g { match &mut *g {
ActorState::Terminated { exit_status } => ActorState::Terminated { .. } =>
Err(error("Could not activate terminated actor", false,
encode_error((**exit_status).clone()))),
ActorState::Running(state) => { ActorState::Running(state) => {
tracing::trace!(actor_id=?self.actor.actor_id, "activate"); tracing::trace!(actor_id=?self.actor.actor_id, "activate");
let mut activation = Activation::make(self, account, state); let mut activation = Activation::make(self, account, state);
let f_result = f(&mut activation); let f_result = f(&mut activation);
let result = match activation.restore_invariants(f_result) { let is_alive = match activation.restore_invariants(f_result) {
RunDisposition::Continue => Ok(()), RunDisposition::Continue => true,
RunDisposition::Terminate(exit_status) => { RunDisposition::Terminate(exit_status) => {
if exit_status.is_err() { if exit_status.is_err() {
activation.clear(); activation.clear();
@ -598,14 +600,12 @@ impl FacetRef {
drop(activation); drop(activation);
let exit_status = Arc::new(exit_status); let exit_status = Arc::new(exit_status);
state.cleanup(&self.actor, &exit_status); state.cleanup(&self.actor, &exit_status);
*g = ActorState::Terminated { *g = ActorState::Terminated { exit_status };
exit_status: Arc::clone(&exit_status), false
};
(*exit_status).clone()
} }
}; };
tracing::trace!(actor_id=?self.actor.actor_id, "deactivate"); tracing::trace!(actor_id=?self.actor.actor_id, "deactivate");
result is_alive
} }
} }
} }
@ -933,17 +933,16 @@ impl<'activation> Activation<'activation> {
} }
} }
}; };
let _ = facet.activate( facet.activate(Account::new(crate::name!("release_linked_task")), |t| {
Account::new(crate::name!("release_linked_task")), |t| { if let Some(f) = t.active_facet() {
if let Some(f) = t.active_facet() { tracing::trace!(task_id, "cancellation token removed");
tracing::trace!(task_id, "cancellation token removed"); f.linked_tasks.remove(&task_id);
f.linked_tasks.remove(&task_id); }
} if let LinkedTaskTermination::Normal = result {
if let LinkedTaskTermination::Normal = result { t.stop();
t.stop(); }
} Ok(())
Ok(()) });
});
Ok::<(), Error>(()) Ok::<(), Error>(())
}.instrument(name)); }.instrument(name));
} }
@ -976,8 +975,9 @@ impl<'activation> Activation<'activation> {
loop { loop {
timer.tick().await; timer.tick().await;
let _entry = span.enter(); let _entry = span.enter();
facet.activate(Arc::clone(&account), |t| a(t))?; if !facet.activate(Arc::clone(&account), |t| a(t)) { break; }
} }
Ok(LinkedTaskTermination::Normal)
}); });
Ok(()) Ok(())
} }
@ -996,7 +996,7 @@ impl<'activation> Activation<'activation> {
self.linked_task(crate::name!(parent: None, "Activation::at"), async move { self.linked_task(crate::name!(parent: None, "Activation::at"), async move {
tokio::time::sleep_until(instant.into()).await; tokio::time::sleep_until(instant.into()).await;
let _entry = span.enter(); let _entry = span.enter();
facet.activate(account, a)?; facet.activate(account, a);
Ok(LinkedTaskTermination::KeepFacet) Ok(LinkedTaskTermination::KeepFacet)
}); });
} }
@ -1031,7 +1031,7 @@ impl<'activation> Activation<'activation> {
let facet_id = self.facet.facet_id; let facet_id = self.facet.facet_id;
self.pending.for_myself.push(Box::new(move |t| { self.pending.for_myself.push(Box::new(move |t| {
t.with_facet(true, facet_id, move |t| { t.with_facet(true, facet_id, move |t| {
ac.link(t).boot(name, boot); ac.link(t)?.boot(name, boot);
Ok(()) Ok(())
}) })
})); }));
@ -1526,16 +1526,21 @@ impl Actor {
Actor { rx, ac_ref } Actor { rx, ac_ref }
} }
fn link(self, t_parent: &mut Activation) -> Self { fn link(self, t_parent: &mut Activation) -> Result<Self, Error> {
if t_parent.active_facet().is_none() { if t_parent.active_facet().is_none() {
panic!("No active facet when calling spawn_link"); panic!("No active facet when calling spawn_link");
} }
self.ac_ref.root_facet_ref().activate(Account::new(crate::name!("link")), |t_child| { let is_alive = self.ac_ref.root_facet_ref().activate(
t_parent.half_link(t_child); Account::new(crate::name!("link")), |t_child| {
t_child.half_link(t_parent); t_parent.half_link(t_child);
Ok(()) t_child.half_link(t_parent);
}).expect("Failed during link"); Ok(())
self });
if is_alive {
Ok(self)
} else {
Err(error("spawn_link'd actor terminated before link could happen", AnyValue::new(false)))
}
} }
/// Start the actor's mainloop. Takes ownership of `self`. The /// Start the actor's mainloop. Takes ownership of `self`. The
@ -1571,11 +1576,11 @@ impl Actor {
let root_facet_ref = self.ac_ref.root_facet_ref(); let root_facet_ref = self.ac_ref.root_facet_ref();
let terminate = |result: ActorResult| { let terminate = |result: ActorResult| {
let _ = root_facet_ref.activate_exit(Account::new(crate::name!("shutdown")), root_facet_ref.activate_exit(Account::new(crate::name!("shutdown")),
|_| RunDisposition::Terminate(result)); |_| RunDisposition::Terminate(result));
}; };
if root_facet_ref.activate(Account::new(crate::name!("boot")), boot).is_err() { if !root_facet_ref.activate(Account::new(crate::name!("boot")), boot) {
return; return;
} }
@ -1600,11 +1605,12 @@ impl Actor {
SystemMessage::Turn(mut loaned_item) => { SystemMessage::Turn(mut loaned_item) => {
tracing::trace!(actor_id = ?self.ac_ref.actor_id, "SystemMessage::Turn"); tracing::trace!(actor_id = ?self.ac_ref.actor_id, "SystemMessage::Turn");
let actions = std::mem::take(&mut loaned_item.item); let actions = std::mem::take(&mut loaned_item.item);
let r = root_facet_ref.activate(Arc::clone(&loaned_item.account), |t| { let is_alive = root_facet_ref.activate(
for action in actions.into_iter() { action(t)? } Arc::clone(&loaned_item.account), |t| {
Ok(()) for action in actions.into_iter() { action(t)? }
}); Ok(())
if r.is_err() { return; } });
if !is_alive { return; }
} }
SystemMessage::Crash(e) => { SystemMessage::Crash(e) => {
tracing::trace!(actor_id = ?self.ac_ref.actor_id, tracing::trace!(actor_id = ?self.ac_ref.actor_id,

View File

@ -624,12 +624,15 @@ async fn input_loop(
loop { loop {
account.ensure_clear_funds().await; account.ensure_clear_funds().await;
match src.next().await { match src.next().await {
None => return Ok(LinkedTaskTermination::Normal), None => break,
Some(bs) => facet.activate(Arc::clone(&account), |t| { Some(bs) => {
let mut g = relay.lock(); let is_alive = facet.activate(Arc::clone(&account), |t| {
let tr = g.as_mut().expect("initialized"); let mut g = relay.lock();
tr.handle_inbound_datagram(t, &bs?) let tr = g.as_mut().expect("initialized");
})?, tr.handle_inbound_datagram(t, &bs?)
});
if !is_alive { break; }
}
} }
} }
} }
@ -643,22 +646,26 @@ async fn input_loop(
Ok(n) => n, Ok(n) => n,
Err(e) => Err(e) =>
if e.kind() == io::ErrorKind::ConnectionReset { if e.kind() == io::ErrorKind::ConnectionReset {
return Ok(LinkedTaskTermination::Normal); break;
} else { } else {
return Err(e)?; return Err(e)?;
}, },
}; };
match n { match n {
0 => return Ok(LinkedTaskTermination::Normal), 0 => break,
_ => facet.activate(Arc::clone(&account), |t| { _ => {
let mut g = relay.lock(); let is_alive = facet.activate(Arc::clone(&account), |t| {
let tr = g.as_mut().expect("initialized"); let mut g = relay.lock();
tr.handle_inbound_stream(t, &mut buf) let tr = g.as_mut().expect("initialized");
})?, tr.handle_inbound_stream(t, &mut buf)
});
if !is_alive { break; }
}
} }
} }
} }
} }
Ok(LinkedTaskTermination::Normal)
} }
async fn output_loop( async fn output_loop(