Track enough information to allow piecing-together of parent/child relationships among actors

This commit is contained in:
Tony Garnock-Jones 2022-01-10 12:50:55 +01:00
parent 58bde1e29d
commit c3a9525ef1
12 changed files with 56 additions and 45 deletions

View File

@ -8,11 +8,11 @@ use syndicate::value::NestedValue;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
syndicate::convenient_logging()?;
Actor::new().boot(tracing::Span::current(), |t| {
Actor::new(None).boot(tracing::Span::current(), |t| {
let ds = Cap::new(&t.create(Dataspace::new()));
let _ = t.prevent_inert_check();
Actor::new().boot(syndicate::name!("box"), enclose!((ds) move |t| {
t.spawn(syndicate::name!("box"), enclose!((ds) move |t| {
let current_value = t.named_field("current_value", 0u64);
t.dataflow({
@ -49,7 +49,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}));
Actor::new().boot(syndicate::name!("client"), enclose!((ds) move |t| {
t.spawn(syndicate::name!("client"), enclose!((ds) move |t| {
let box_state_handler = syndicate::entity(0u32)
.on_asserted(enclose!((ds) move |count, t, captures: AnyValue| {
*count = *count + 1;

View File

@ -26,7 +26,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = Config::from_args();
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?;
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
Actor::new().boot(syndicate::name!("consumer"), |t| {
Actor::new(None).boot(syndicate::name!("consumer"), |t| {
relay::connect_stream(t, i, o, false, sturdyref, (), |_state, t, ds| {
let consumer = syndicate::entity(0)
.on_message(|message_count, _t, m: AnyValue| {

View File

@ -93,7 +93,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = Config::from_args();
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?;
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
Actor::new().boot(syndicate::name!("pingpong"), |t| {
Actor::new(None).boot(syndicate::name!("pingpong"), |t| {
relay::connect_stream(t, i, o, false, sturdyref, (), move |_state, t, ds| {
let (send_label, recv_label, report_latency_every, should_echo, bytes_padding) =

View File

@ -34,7 +34,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = Config::from_args();
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?;
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
Actor::new().boot(syndicate::name!("producer"), |t| {
Actor::new(None).boot(syndicate::name!("producer"), |t| {
relay::connect_stream(t, i, o, false, sturdyref, (), move |_state, t, ds| {
let padding: AnyValue = Value::ByteString(vec![0; config.bytes_padding]).wrap();
let action_count = config.action_count;

View File

@ -26,7 +26,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = Config::from_args();
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?;
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
Actor::new().boot(syndicate::name!("state-consumer"), |t| {
Actor::new(None).boot(syndicate::name!("state-consumer"), |t| {
relay::connect_stream(t, i, o, false, sturdyref, (), |_state, t, ds| {
let consumer = {
#[derive(Default)]

View File

@ -22,7 +22,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = Config::from_args();
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?;
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
Actor::new().boot(syndicate::name!("state-producer"), |t| {
Actor::new(None).boot(syndicate::name!("state-producer"), |t| {
relay::connect_stream(t, i, o, false, sturdyref, (), move |_state, t, ds| {
let account = Account::new(syndicate::name!("account"));
t.linked_task(syndicate::name!("sender"), async move {

View File

@ -83,7 +83,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing::trace!("startup");
Actor::new().boot(tracing::Span::current(), move |t| {
Actor::new(None).boot(tracing::Span::current(), move |t| {
let server_config_ds = Cap::new(&t.create(Dataspace::new()));
let log_ds = Cap::new(&t.create(Dataspace::new()));

View File

@ -44,15 +44,19 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: TcpRelayListener) -> ActorResult
loop {
let (stream, addr) = listener.accept().await?;
let gatekeeper = spec.gatekeeper.clone();
Actor::new().boot(
syndicate::name!(parent: parent_span.clone(), "conn"),
move |t| Ok(t.linked_task(tracing::Span::current(), {
let facet = t.facet.clone();
async move {
detect_protocol(facet, stream, gatekeeper, addr).await?;
Ok(LinkedTaskTermination::KeepFacet)
}
})));
let name = syndicate::name!(parent: parent_span.clone(), "conn");
facet.activate(Account::new(name.clone()), move |t| {
t.spawn(name, move |t| {
Ok(t.linked_task(tracing::Span::current(), {
let facet = t.facet.clone();
async move {
detect_protocol(facet, stream, gatekeeper, addr).await?;
Ok(LinkedTaskTermination::KeepFacet)
}
}))
});
Ok(())
})?;
}
});
Ok(())

View File

@ -47,22 +47,26 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: UnixRelayListener) -> ActorResult
let (stream, _addr) = listener.accept().await?;
let peer = stream.peer_cred()?;
let gatekeeper = spec.gatekeeper.clone();
Actor::new().boot(
syndicate::name!(parent: parent_span.clone(), "conn",
pid = ?peer.pid().unwrap_or(-1),
uid = peer.uid()),
|t| Ok(t.linked_task(tracing::Span::current(), {
let facet = t.facet.clone();
async move {
tracing::info!(protocol = %"unix");
let (i, o) = stream.into_split();
run_connection(facet,
relay::Input::Bytes(Box::pin(i)),
relay::Output::Bytes(Box::pin(o)),
gatekeeper)?;
Ok(LinkedTaskTermination::KeepFacet)
}
})));
let name = syndicate::name!(parent: parent_span.clone(), "conn",
pid = ?peer.pid().unwrap_or(-1),
uid = peer.uid());
facet.activate(Account::new(name.clone()), move |t| {
t.spawn(name, |t| {
Ok(t.linked_task(tracing::Span::current(), {
let facet = t.facet.clone();
async move {
tracing::info!(protocol = %"unix");
let (i, o) = stream.into_split();
run_connection(facet,
relay::Input::Bytes(Box::pin(i)),
relay::Output::Bytes(Box::pin(o)),
gatekeeper)?;
Ok(LinkedTaskTermination::KeepFacet)
}
}))
});
Ok(())
})?;
}
});
Ok(())

View File

@ -52,7 +52,7 @@ pub fn bench_pub(c: &mut Criterion) {
b.iter_custom(|iters| {
let start = Instant::now();
rt.block_on(async move {
Actor::new().boot(syndicate::name!("dataspace"), move |t| {
Actor::new(None).boot(syndicate::name!("dataspace"), move |t| {
let ds = t.create(Dataspace::new());
let shutdown = t.create(ShutdownEntity);
let account = Account::new(syndicate::name!("sender-account"));
@ -83,7 +83,7 @@ pub fn bench_pub(c: &mut Criterion) {
rt.block_on(async move {
let turn_count = Arc::new(AtomicU64::new(0));
Actor::new().boot(syndicate::name!("dataspace"), {
Actor::new(None).boot(syndicate::name!("dataspace"), {
let iters = iters.clone();
let turn_count = Arc::clone(&turn_count);
@ -103,7 +103,7 @@ pub fn bench_pub(c: &mut Criterion) {
observer: shutdown,
});
Actor::new().boot(syndicate::name!("consumer"), move |t| {
t.spawn(syndicate::name!("consumer"), move |t| {
struct Receiver(Arc<AtomicU64>);
impl Entity<AnyValue> for Receiver {
fn message(&mut self, _t: &mut Activation, _m: AnyValue) -> ActorResult {

View File

@ -88,7 +88,7 @@ pub fn bench_ring(c: &mut Criterion) {
self.i += 1;
let spawner_ref = Arc::clone(&self.self_ref);
ACTORS_CREATED.fetch_add(1, Ordering::Relaxed);
Actor::new().boot(syndicate::name!("forwarder", ?i), move |t| {
t.spawn(syndicate::name!("forwarder", ?i), move |t| {
let _ = t.prevent_inert_check();
let f = t.create(Forwarder {
next,
@ -118,7 +118,7 @@ pub fn bench_ring(c: &mut Criterion) {
}
ACTORS_CREATED.fetch_add(1, Ordering::Relaxed);
Actor::new().boot(syndicate::name!("counter"), move |t| {
Actor::new(None).boot(syndicate::name!("counter"), move |t| {
let _ = t.prevent_inert_check();
let mut s = Spawner {
self_ref: t.create_inert(),

View File

@ -1012,7 +1012,7 @@ impl<'activation> Activation<'activation> {
name: tracing::Span,
boot: F,
) -> ActorRef {
let ac = Actor::new();
let ac = Actor::new(Some(self.state.actor_id));
let ac_ref = ac.ac_ref.clone();
self.enqueue_for_myself_at_commit(Box::new(move |_| {
ac.boot(name, boot);
@ -1031,7 +1031,7 @@ impl<'activation> Activation<'activation> {
name: tracing::Span,
boot: F,
) -> ActorRef {
let ac = Actor::new();
let ac = Actor::new(Some(self.state.actor_id));
let ac_ref = ac.ac_ref.clone();
let facet_id = self.facet.facet_id;
self.enqueue_for_myself_at_commit(Box::new(move |t| {
@ -1498,11 +1498,11 @@ impl Drop for Mailbox {
impl Actor {
/// Create a new actor. It still needs to be [`boot`ed][Self::boot].
pub fn new() -> Self {
pub fn new(parent_actor_id: Option<ActorId>) -> Self {
let (tx, rx) = unbounded_channel();
let actor_id = next_actor_id();
let root = Facet::new(None);
tracing::trace!(?actor_id, root_facet_id = ?root.facet_id, "Actor::new");
tracing::debug!(?actor_id, ?parent_actor_id, root_facet_id = ?root.facet_id, "Actor::new");
let mut st = RunningActor {
actor_id,
tx,
@ -1806,7 +1806,10 @@ impl<T: Any + Send> Drop for Field<T> {
impl Drop for Actor {
fn drop(&mut self) {
self.rx.close();
ACTORS.write().remove(&self.ac_ref.actor_id);
let _name = ACTORS.write().remove(&self.ac_ref.actor_id)
.map_or_else(|| crate::name!(parent: None, "DROPPED", actor_id=?self.ac_ref.actor_id),
|(span, _ac_ref)| span);
let _scope = _name.enter();
let mut g = self.ac_ref.state.lock();
if let ActorState::Running(ref mut state) = *g {
tracing::warn!(actor_id = ?self.ac_ref.actor_id, "Force-terminated by Actor::drop");
@ -1815,7 +1818,7 @@ impl Drop for Actor {
state.cleanup(&self.ac_ref, &exit_status);
*g = ActorState::Terminated { exit_status };
}
tracing::trace!(actor_id = ?self.ac_ref.actor_id, "Actor::drop");
tracing::debug!(actor_id = ?self.ac_ref.actor_id, "Actor::drop");
}
}