From 35f510aa0b45db7ee4af68adfe4b211d40a38bc2 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sat, 24 Jul 2021 23:22:01 +0200 Subject: [PATCH] More fine-grained state and new ownership relations, to potentially permit avoiding scheduling overhead by directly entering an actor's runtime context --- benches/bench_dataspace.rs | 73 +++---- examples/consumer.rs | 107 +++++----- examples/pingpong.rs | 254 +++++++++++----------- examples/producer.rs | 56 ++--- examples/state-consumer.rs | 138 ++++++------ examples/state-producer.rs | 76 +++---- src/actor.rs | 406 +++++++++++++++++++++++------------- src/bin/syndicate-server.rs | 38 ++-- src/during.rs | 4 +- src/error.rs | 16 ++ src/relay.rs | 30 +-- src/tracer.rs | 2 +- 12 files changed, 689 insertions(+), 511 deletions(-) diff --git a/benches/bench_dataspace.rs b/benches/bench_dataspace.rs index 201e054..1ac9bbb 100644 --- a/benches/bench_dataspace.rs +++ b/benches/bench_dataspace.rs @@ -30,7 +30,7 @@ struct ShutdownEntity; impl Entity<_Any> for ShutdownEntity { fn message(&mut self, t: &mut Activation, _m: _Any) -> ActorResult { - t.actor.shutdown(); + t.state.shutdown(); Ok(()) } } @@ -52,24 +52,25 @@ pub fn bench_pub(c: &mut Criterion) { b.iter_custom(|iters| { let start = Instant::now(); rt.block_on(async move { - let mut ac = Actor::new(); - let ds = ac.create(Dataspace::new()); - let shutdown = ac.create(ShutdownEntity); - let debtor = Debtor::new(syndicate::name!("sender-debtor")); - ac.linked_task(syndicate::name!("sender"), async move { - for _ in 0..iters { - let ds = Arc::clone(&ds); - external_event(&Arc::clone(&ds.mailbox), &debtor, Box::new( - move |t| ds.with_entity( - |e| e.message(t, says(_Any::new("bench_pub"), - Value::ByteString(vec![]).wrap())))))? - } - external_event(&Arc::clone(&shutdown.mailbox), &debtor, Box::new( - move |t| shutdown.with_entity( - |e| e.message(t, _Any::new(true)))))?; + Actor::new().boot(syndicate::name!("dataspace"), move |t| { + let ds = t.state.create(Dataspace::new()); + let shutdown = t.state.create(ShutdownEntity); + let debtor = Debtor::new(syndicate::name!("sender-debtor")); + t.state.linked_task(syndicate::name!("sender"), async move { + for _ in 0..iters { + let ds = Arc::clone(&ds); + external_event(&Arc::clone(&ds.mailbox), &debtor, Box::new( + move |t| ds.with_entity( + |e| e.message(t, says(_Any::new("bench_pub"), + Value::ByteString(vec![]).wrap())))))? + } + external_event(&Arc::clone(&shutdown.mailbox), &debtor, Box::new( + move |t| shutdown.with_entity( + |e| e.message(t, _Any::new(true)))))?; + Ok(()) + }); Ok(()) - }); - ac.start(syndicate::name!("dataspace")).await.unwrap().unwrap(); + }).await.unwrap().unwrap(); }); start.elapsed() }) @@ -79,24 +80,25 @@ pub fn bench_pub(c: &mut Criterion) { b.iter_custom(|iters| { let start = Instant::now(); rt.block_on(async move { - let ds = Cap::new(&Actor::create_and_start(syndicate::name!("dataspace"), Dataspace::new())); + let ds = Cap::new( + &Actor::create_and_start(syndicate::name!("dataspace"), Dataspace::new())); let turn_count = Arc::new(AtomicU64::new(0)); - struct Receiver(Arc); - impl Entity<_Any> for Receiver { - fn message(&mut self, _t: &mut Activation, _m: _Any) -> ActorResult { - self.0.fetch_add(1, Ordering::Relaxed); - Ok(()) - } - } - - let mut ac = Actor::new(); - let shutdown = Cap::new(&ac.create(ShutdownEntity)); - let receiver = Cap::new(&ac.create(Receiver(Arc::clone(&turn_count)))); - { let iters = iters.clone(); - ac.boot(syndicate::name!("dataspace"), move |t| Box::pin(async move { + let turn_count = Arc::clone(&turn_count); + Actor::new().boot(syndicate::name!("consumer"), move |t| { + struct Receiver(Arc); + impl Entity<_Any> for Receiver { + fn message(&mut self, _t: &mut Activation, _m: _Any) -> ActorResult { + self.0.fetch_add(1, Ordering::Relaxed); + Ok(()) + } + } + + let shutdown = Cap::new(&t.state.create(ShutdownEntity)); + let receiver = Cap::new(&t.state.create(Receiver(Arc::clone(&turn_count)))); + ds.assert(t, &Observe { pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec { ctor: Box::new(p::CRec { @@ -124,8 +126,8 @@ pub fn bench_pub(c: &mut Criterion) { })), observer: shutdown, }); - let debtor = t.debtor.clone(); - t.actor.linked_task(syndicate::name!("sender"), async move { + let debtor = Arc::clone(t.debtor()); + t.state.linked_task(syndicate::name!("sender"), async move { for _ in 0..iters { let ds = Arc::clone(&ds); external_event(&Arc::clone(&ds.underlying.mailbox), &debtor, Box::new( @@ -142,8 +144,9 @@ pub fn bench_pub(c: &mut Criterion) { Ok(()) }); Ok(()) - })).await.unwrap().unwrap(); + }).await.unwrap().unwrap(); } + let actual_turns = turn_count.load(Ordering::SeqCst); if actual_turns != iters { panic!("Expected {}, got {} messages", iters, actual_turns); diff --git a/examples/consumer.rs b/examples/consumer.rs index b1eb352..49f49e5 100644 --- a/examples/consumer.rs +++ b/examples/consumer.rs @@ -26,57 +26,62 @@ pub struct Config { #[tokio::main] async fn main() -> Result<(), Box> { syndicate::convenient_logging()?; - Actor::new().boot(syndicate::name!("consumer"), |t| Box::pin(async move { - let config = Config::from_args(); - let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; - let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); - relay::connect_stream(t, i, o, sturdyref, (), |_state, t, ds| { - let consumer = syndicate::entity(0) - .on_message(|message_count, _t, m: _Any| { - if m.value().is_boolean() { - tracing::info!("{:?} messages in the last second", message_count); - *message_count = 0; - } else { - *message_count += 1; - } - Ok(()) - }) - .create_cap(t.actor); + Actor::new().boot(syndicate::name!("consumer"), |t| { + let ac = t.actor.clone(); + let boot_debtor = Arc::clone(t.debtor()); + Ok(t.state.linked_task(tracing::Span::current(), async move { + let config = Config::from_args(); + let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; + let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); + Activation::for_actor(&ac, boot_debtor, |t| { + relay::connect_stream(t, i, o, sturdyref, (), |_state, t, ds| { + let consumer = syndicate::entity(0) + .on_message(|message_count, _t, m: _Any| { + if m.value().is_boolean() { + tracing::info!("{:?} messages in the last second", message_count); + *message_count = 0; + } else { + *message_count += 1; + } + Ok(()) + }) + .create_cap(t.state); + ds.assert(t, &Observe { + pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec { + ctor: Box::new(p::CRec { + label: Value::symbol("Says").wrap(), + arity: 2.into(), + }), + members: Map::from_iter(vec![ + (0.into(), p::Pattern::DBind(Box::new(p::DBind { + name: "who".to_owned(), + pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)), + }))), + (1.into(), p::Pattern::DBind(Box::new(p::DBind { + name: "what".to_owned(), + pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)), + }))), + ].into_iter()), + })), + observer: Arc::clone(&consumer), + }); - ds.assert(t, &Observe { - pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec { - ctor: Box::new(p::CRec { - label: Value::symbol("Says").wrap(), - arity: 2.into(), - }), - members: Map::from_iter(vec![ - (0.into(), p::Pattern::DBind(Box::new(p::DBind { - name: "who".to_owned(), - pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)), - }))), - (1.into(), p::Pattern::DBind(Box::new(p::DBind { - name: "what".to_owned(), - pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)), - }))), - ].into_iter()), - })), - observer: Arc::clone(&consumer), - }); - - t.actor.linked_task(syndicate::name!("tick"), async move { - let mut stats_timer = interval(Duration::from_secs(1)); - loop { - stats_timer.tick().await; - let consumer = Arc::clone(&consumer); - external_event(&Arc::clone(&consumer.underlying.mailbox), - &Debtor::new(syndicate::name!("debtor")), - Box::new(move |t| consumer.underlying.with_entity( - |e| e.message(t, _Any::new(true)))))?; - } - }); - Ok(None) - }); - Ok(()) - })).await??; + t.state.linked_task(syndicate::name!("tick"), async move { + let mut stats_timer = interval(Duration::from_secs(1)); + loop { + stats_timer.tick().await; + let consumer = Arc::clone(&consumer); + external_event(&Arc::clone(&consumer.underlying.mailbox), + &Debtor::new(syndicate::name!("debtor")), + Box::new(move |t| consumer.underlying.with_entity( + |e| e.message(t, _Any::new(true)))))?; + } + }); + Ok(None) + }); + Ok(()) + }) + })) + }).await??; Ok(()) } diff --git a/examples/pingpong.rs b/examples/pingpong.rs index 1aa8035..8ab50d1 100644 --- a/examples/pingpong.rs +++ b/examples/pingpong.rs @@ -93,138 +93,144 @@ fn report_latencies(rtt_ns_samples: &Vec) { async fn main() -> Result<(), Box> { syndicate::convenient_logging()?; - Actor::new().boot(syndicate::name!("pingpong"), move |t| Box::pin(async move { - let config = Config::from_args(); - let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; - let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); - relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| { + Actor::new().boot(syndicate::name!("pingpong"), |t| { + let ac = t.actor.clone(); + let boot_debtor = Arc::clone(t.debtor()); + Ok(t.state.linked_task(tracing::Span::current(), async move { + let config = Config::from_args(); + let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; + let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); + Activation::for_actor(&ac, boot_debtor, |t| { + relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| { - let (send_label, recv_label, report_latency_every, should_echo, bytes_padding) = - match config.mode { - PingPongMode::Ping(ref c) => - ("Ping", "Pong", c.report_latency_every, false, c.bytes_padding), - PingPongMode::Pong => - ("Pong", "Ping", 0, true, 0), - }; + let (send_label, recv_label, report_latency_every, should_echo, bytes_padding) = + match config.mode { + PingPongMode::Ping(ref c) => + ("Ping", "Pong", c.report_latency_every, false, c.bytes_padding), + PingPongMode::Pong => + ("Pong", "Ping", 0, true, 0), + }; - let consumer = { - let ds = Arc::clone(&ds); - let mut turn_counter: u64 = 0; - let mut event_counter: u64 = 0; - let mut rtt_ns_samples: Vec = vec![0; report_latency_every]; - let mut rtt_batch_count: usize = 0; - let mut current_reply = None; - let self_ref = t.actor.create_inert(); - self_ref.become_entity( - syndicate::entity(Arc::clone(&self_ref)) - .on_message(move |self_ref, t, m: _Any| { - match m.value().as_boolean() { - Some(true) => { - tracing::info!("{:?} turns, {:?} events in the last second", - turn_counter, - event_counter); - turn_counter = 0; - event_counter = 0; - } - Some(false) => { - current_reply = None; - } - None => { - event_counter += 1; - let bindings = m.value().to_sequence()?; - let timestamp = &bindings[0]; - let padding = &bindings[1]; - - if should_echo || (report_latency_every == 0) { - ds.message(t, simple_record2(&send_label, - timestamp.clone(), - padding.clone())); - } else { - if let None = current_reply { - turn_counter += 1; - t.message_immediate_self(&self_ref, _Any::new(false)); - let rtt_ns = now() - timestamp.value().to_u64()?; - rtt_ns_samples[rtt_batch_count] = rtt_ns; - rtt_batch_count += 1; - - if rtt_batch_count == report_latency_every { - rtt_ns_samples.sort(); - report_latencies(&rtt_ns_samples); - rtt_batch_count = 0; - } - - current_reply = Some( - simple_record2(&send_label, - Value::from(now()).wrap(), - padding.clone())); + let consumer = { + let ds = Arc::clone(&ds); + let mut turn_counter: u64 = 0; + let mut event_counter: u64 = 0; + let mut rtt_ns_samples: Vec = vec![0; report_latency_every]; + let mut rtt_batch_count: usize = 0; + let mut current_reply = None; + let self_ref = t.state.create_inert(); + self_ref.become_entity( + syndicate::entity(Arc::clone(&self_ref)) + .on_message(move |self_ref, t, m: _Any| { + match m.value().as_boolean() { + Some(true) => { + tracing::info!("{:?} turns, {:?} events in the last second", + turn_counter, + event_counter); + turn_counter = 0; + event_counter = 0; + } + Some(false) => { + current_reply = None; + } + None => { + event_counter += 1; + let bindings = m.value().to_sequence()?; + let timestamp = &bindings[0]; + let padding = &bindings[1]; + + if should_echo || (report_latency_every == 0) { + ds.message(t, simple_record2(&send_label, + timestamp.clone(), + padding.clone())); + } else { + if let None = current_reply { + turn_counter += 1; + t.message_for_myself(&self_ref, _Any::new(false)); + let rtt_ns = now() - timestamp.value().to_u64()?; + rtt_ns_samples[rtt_batch_count] = rtt_ns; + rtt_batch_count += 1; + + if rtt_batch_count == report_latency_every { + rtt_ns_samples.sort(); + report_latencies(&rtt_ns_samples); + rtt_batch_count = 0; + } + + current_reply = Some( + simple_record2(&send_label, + Value::from(now()).wrap(), + padding.clone())); + } + ds.message(t, current_reply.as_ref().expect("some reply").clone()); + } } - ds.message(t, current_reply.as_ref().expect("some reply").clone()); } + Ok(()) + })); + Cap::new(&self_ref) + }; + + ds.assert(t, &Observe { + pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec { + ctor: Box::new(p::CRec { + label: Value::symbol(recv_label).wrap(), + arity: 2.into(), + }), + members: Map::from_iter(vec![ + (0.into(), p::Pattern::DBind(Box::new(p::DBind { + name: "timestamp".to_owned(), + pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)), + }))), + (1.into(), p::Pattern::DBind(Box::new(p::DBind { + name: "padding".to_owned(), + pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)), + }))), + ].into_iter()), + })), + observer: Arc::clone(&consumer), + }); + + t.state.linked_task(syndicate::name!("tick"), async move { + let mut stats_timer = interval(Duration::from_secs(1)); + loop { + stats_timer.tick().await; + let consumer = Arc::clone(&consumer); + external_event(&Arc::clone(&consumer.underlying.mailbox), + &Debtor::new(syndicate::name!("debtor")), + Box::new(move |t| consumer.underlying.with_entity( + |e| e.message(t, _Any::new(true)))))?; + } + }); + + if let PingPongMode::Ping(c) = &config.mode { + let turn_count = c.turn_count; + let action_count = c.action_count; + let debtor = Arc::clone(t.debtor()); + t.state.linked_task(syndicate::name!("boot-ping"), async move { + let padding: _Any = Value::ByteString(vec![0; bytes_padding]).wrap(); + for _ in 0..turn_count { + let mut events: PendingEventQueue = vec![]; + let current_rec = simple_record2(send_label, + Value::from(now()).wrap(), + padding.clone()); + for _ in 0..action_count { + let ds = Arc::clone(&ds); + let current_rec = current_rec.clone(); + events.push(Box::new(move |t| ds.underlying.with_entity( + |e| e.message(t, current_rec)))); } + external_events(&ds.underlying.mailbox, &debtor, events)? } Ok(()) - })); - Cap::new(&self_ref) - }; - - ds.assert(t, &Observe { - pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec { - ctor: Box::new(p::CRec { - label: Value::symbol(recv_label).wrap(), - arity: 2.into(), - }), - members: Map::from_iter(vec![ - (0.into(), p::Pattern::DBind(Box::new(p::DBind { - name: "timestamp".to_owned(), - pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)), - }))), - (1.into(), p::Pattern::DBind(Box::new(p::DBind { - name: "padding".to_owned(), - pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)), - }))), - ].into_iter()), - })), - observer: Arc::clone(&consumer), - }); - - t.actor.linked_task(syndicate::name!("tick"), async move { - let mut stats_timer = interval(Duration::from_secs(1)); - loop { - stats_timer.tick().await; - let consumer = Arc::clone(&consumer); - external_event(&Arc::clone(&consumer.underlying.mailbox), - &Debtor::new(syndicate::name!("debtor")), - Box::new(move |t| consumer.underlying.with_entity( - |e| e.message(t, _Any::new(true)))))?; - } - }); - - if let PingPongMode::Ping(c) = &config.mode { - let turn_count = c.turn_count; - let action_count = c.action_count; - let debtor = t.debtor.clone(); - t.actor.linked_task(syndicate::name!("boot-ping"), async move { - let padding: _Any = Value::ByteString(vec![0; bytes_padding]).wrap(); - for _ in 0..turn_count { - let mut events: PendingEventQueue = vec![]; - let current_rec = simple_record2(send_label, - Value::from(now()).wrap(), - padding.clone()); - for _ in 0..action_count { - let ds = Arc::clone(&ds); - let current_rec = current_rec.clone(); - events.push(Box::new(move |t| ds.underlying.with_entity( - |e| e.message(t, current_rec)))); - } - external_events(&ds.underlying.mailbox, &debtor, events)? + }); } - Ok(()) - }); - } - Ok(None) - }); - Ok(()) - })).await??; + Ok(None) + }); + Ok(()) + }) + })) + }).await??; Ok(()) } diff --git a/examples/producer.rs b/examples/producer.rs index 2f6ffa5..5882f94 100644 --- a/examples/producer.rs +++ b/examples/producer.rs @@ -33,30 +33,36 @@ fn says(who: _Any, what: _Any) -> _Any { async fn main() -> Result<(), Box> { syndicate::convenient_logging()?; syndicate::actor::start_debt_reporter(); - Actor::new().boot(syndicate::name!("producer"), |t| Box::pin(async move { - let config = Config::from_args(); - let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; - let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); - relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| { - let padding: _Any = Value::ByteString(vec![0; config.bytes_padding]).wrap(); - let action_count = config.action_count; - let debtor = Debtor::new(syndicate::name!("debtor")); - t.actor.linked_task(syndicate::name!("sender"), async move { - loop { - debtor.ensure_clear_funds().await; - let mut events: PendingEventQueue = Vec::new(); - for _ in 0..action_count { - let ds = Arc::clone(&ds); - let padding = padding.clone(); - events.push(Box::new(move |t| ds.underlying.with_entity( - |e| e.message(t, says(Value::from("producer").wrap(), padding))))); - } - external_events(&ds.underlying.mailbox, &debtor, events)?; - } - }); - Ok(None) - }); - Ok(()) - })).await??; + Actor::new().boot(syndicate::name!("producer"), |t| { + let ac = t.actor.clone(); + let boot_debtor = Arc::clone(t.debtor()); + Ok(t.state.linked_task(tracing::Span::current(), async move { + let config = Config::from_args(); + let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; + let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); + Activation::for_actor(&ac, boot_debtor, |t| { + relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| { + let padding: _Any = Value::ByteString(vec![0; config.bytes_padding]).wrap(); + let action_count = config.action_count; + let debtor = Debtor::new(syndicate::name!("debtor")); + t.state.linked_task(syndicate::name!("sender"), async move { + loop { + debtor.ensure_clear_funds().await; + let mut events: PendingEventQueue = Vec::new(); + for _ in 0..action_count { + let ds = Arc::clone(&ds); + let padding = padding.clone(); + events.push(Box::new(move |t| ds.underlying.with_entity( + |e| e.message(t, says(Value::from("producer").wrap(), padding))))); + } + external_events(&ds.underlying.mailbox, &debtor, events)?; + } + }); + Ok(None) + }); + Ok(()) + }) + })) + }).await??; Ok(()) } diff --git a/examples/state-consumer.rs b/examples/state-consumer.rs index 5dd0787..fb7eb29 100644 --- a/examples/state-consumer.rs +++ b/examples/state-consumer.rs @@ -26,73 +26,79 @@ pub struct Config { #[tokio::main] async fn main() -> Result<(), Box> { syndicate::convenient_logging()?; - Actor::new().boot(syndicate::name!("consumer"), |t| Box::pin(async move { - let config = Config::from_args(); - let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; - let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); - relay::connect_stream(t, i, o, sturdyref, (), |_state, t, ds| { - let consumer = { - #[derive(Default)] - struct State { - event_counter: u64, - arrival_counter: u64, - departure_counter: u64, - occupancy: u64, - } - syndicate::entity(State::default()).on_asserted(move |s, _, _| { - s.event_counter += 1; - s.arrival_counter += 1; - s.occupancy += 1; - Ok(Some(Box::new(|s, _| { - s.event_counter += 1; - s.departure_counter += 1; - s.occupancy -= 1; - Ok(()) - }))) - }).on_message(move |s, _, _| { - tracing::info!( - "{:?} events, {:?} arrivals, {:?} departures, {:?} present in the last second", - s.event_counter, - s.arrival_counter, - s.departure_counter, - s.occupancy); - s.event_counter = 0; - s.arrival_counter = 0; - s.departure_counter = 0; - Ok(()) - }).create_cap(t.actor) - }; + Actor::new().boot(syndicate::name!("state-consumer"), |t| { + let ac = t.actor.clone(); + let boot_debtor = Arc::clone(t.debtor()); + Ok(t.state.linked_task(tracing::Span::current(), async move { + let config = Config::from_args(); + let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; + let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); + Activation::for_actor(&ac, boot_debtor, |t| { + relay::connect_stream(t, i, o, sturdyref, (), |_state, t, ds| { + let consumer = { + #[derive(Default)] + struct State { + event_counter: u64, + arrival_counter: u64, + departure_counter: u64, + occupancy: u64, + } + syndicate::entity(State::default()).on_asserted(move |s, _, _| { + s.event_counter += 1; + s.arrival_counter += 1; + s.occupancy += 1; + Ok(Some(Box::new(|s, _| { + s.event_counter += 1; + s.departure_counter += 1; + s.occupancy -= 1; + Ok(()) + }))) + }).on_message(move |s, _, _| { + tracing::info!( + "{:?} events, {:?} arrivals, {:?} departures, {:?} present in the last second", + s.event_counter, + s.arrival_counter, + s.departure_counter, + s.occupancy); + s.event_counter = 0; + s.arrival_counter = 0; + s.departure_counter = 0; + Ok(()) + }).create_cap(t.state) + }; - ds.assert(t, &Observe { - pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec { - ctor: Box::new(p::CRec { - label: Value::symbol("Present").wrap(), - arity: 1.into(), - }), - members: Map::from_iter(vec![ - (0.into(), p::Pattern::DBind(Box::new(p::DBind { - name: "who".to_owned(), - pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)), - }))), - ].into_iter()), - })), - observer: Arc::clone(&consumer), - }); + ds.assert(t, &Observe { + pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec { + ctor: Box::new(p::CRec { + label: Value::symbol("Present").wrap(), + arity: 1.into(), + }), + members: Map::from_iter(vec![ + (0.into(), p::Pattern::DBind(Box::new(p::DBind { + name: "who".to_owned(), + pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)), + }))), + ].into_iter()), + })), + observer: Arc::clone(&consumer), + }); - t.actor.linked_task(syndicate::name!("tick"), async move { - let mut stats_timer = interval(Duration::from_secs(1)); - loop { - stats_timer.tick().await; - let consumer = Arc::clone(&consumer); - external_event(&Arc::clone(&consumer.underlying.mailbox), - &Debtor::new(syndicate::name!("debtor")), - Box::new(move |t| consumer.underlying.with_entity( - |e| e.message(t, _Any::new(true)))))?; - } - }); - Ok(None) - }); - Ok(()) - })).await??; + t.state.linked_task(syndicate::name!("tick"), async move { + let mut stats_timer = interval(Duration::from_secs(1)); + loop { + stats_timer.tick().await; + let consumer = Arc::clone(&consumer); + external_event(&Arc::clone(&consumer.underlying.mailbox), + &Debtor::new(syndicate::name!("debtor")), + Box::new(move |t| consumer.underlying.with_entity( + |e| e.message(t, _Any::new(true)))))?; + } + }); + Ok(None) + }); + Ok(()) + }) + })) + }).await??; Ok(()) } diff --git a/examples/state-producer.rs b/examples/state-producer.rs index 4f50f89..64f5e68 100644 --- a/examples/state-producer.rs +++ b/examples/state-producer.rs @@ -18,40 +18,46 @@ pub struct Config { #[tokio::main] async fn main() -> Result<(), Box> { syndicate::convenient_logging()?; - Actor::new().boot(syndicate::name!("producer"), |t| Box::pin(async move { - let config = Config::from_args(); - let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; - let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); - relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| { - let debtor = Debtor::new(syndicate::name!("debtor")); - t.actor.linked_task(syndicate::name!("sender"), async move { - let presence: _Any = Value::simple_record1( - "Present", - Value::from(std::process::id()).wrap()).wrap(); - let handle = syndicate::next_handle(); - let assert_e = || { - let ds = Arc::clone(&ds); - let presence = presence.clone(); - let handle = handle.clone(); - external_event(&Arc::clone(&ds.underlying.mailbox), &debtor, Box::new( - move |t| ds.underlying.with_entity(|e| e.assert(t, presence, handle)))) - }; - let retract_e = || { - let ds = Arc::clone(&ds); - let handle = handle.clone(); - external_event(&Arc::clone(&ds.underlying.mailbox), &debtor, Box::new( - move |t| ds.underlying.with_entity(|e| e.retract(t, handle)))) - }; - assert_e()?; - loop { - debtor.ensure_clear_funds().await; - retract_e()?; - assert_e()?; - } - }); - Ok(None) - }); - Ok(()) - })).await??; + Actor::new().boot(syndicate::name!("state-producer"), |t| { + let ac = t.actor.clone(); + let boot_debtor = Arc::clone(t.debtor()); + Ok(t.state.linked_task(tracing::Span::current(), async move { + let config = Config::from_args(); + let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; + let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); + Activation::for_actor(&ac, boot_debtor, |t| { + relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| { + let debtor = Debtor::new(syndicate::name!("debtor")); + t.state.linked_task(syndicate::name!("sender"), async move { + let presence: _Any = Value::simple_record1( + "Present", + Value::from(std::process::id()).wrap()).wrap(); + let handle = syndicate::next_handle(); + let assert_e = || { + let ds = Arc::clone(&ds); + let presence = presence.clone(); + let handle = handle.clone(); + external_event(&Arc::clone(&ds.underlying.mailbox), &debtor, Box::new( + move |t| ds.underlying.with_entity(|e| e.assert(t, presence, handle)))) + }; + let retract_e = || { + let ds = Arc::clone(&ds); + let handle = handle.clone(); + external_event(&Arc::clone(&ds.underlying.mailbox), &debtor, Box::new( + move |t| ds.underlying.with_entity(|e| e.retract(t, handle)))) + }; + assert_e()?; + loop { + debtor.ensure_clear_funds().await; + retract_e()?; + assert_e()?; + } + }); + Ok(None) + }); + Ok(()) + }) + })) + }).await??; Ok(()) } diff --git a/src/actor.rs b/src/actor.rs index 1611b62..902a1fc 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -5,6 +5,7 @@ pub use std::future::ready; use super::ActorId; use super::schemas::sturdy; use super::error::Error; +use super::error::encode_error; use super::error::error; use super::rewrite::CaveatError; use super::rewrite::CheckedCaveat; @@ -60,9 +61,12 @@ pub trait Entity: Send + Sync { } } +pub struct InertEntity; +impl Entity for InertEntity {} + enum CleanupAction { - ImmediateSelf(Action), - Remote(Arc, Action), + ForMyself(Action), + ForAnother(Arc, Action), } type CleanupActions = Map; @@ -72,10 +76,15 @@ pub type PendingEventQueue = Vec; // This is what other implementations call a "Turn", renamed here to // avoid conflicts with schemas::internal_protocol::Turn. pub struct Activation<'activation> { - pub actor: &'activation mut Actor, + pub actor: ActorRef, + pub state: &'activation mut RunningActor, + pending: EventBuffer, +} + +struct EventBuffer { pub debtor: Arc, queues: HashMap, PendingEventQueue)>, - immediate_self: PendingEventQueue, + for_myself: PendingEventQueue, } #[derive(Debug)] @@ -104,15 +113,31 @@ pub struct Mailbox { } pub struct Actor { - actor_id: ActorId, - tx: UnboundedSender, rx: UnboundedReceiver, + ac_ref: ActorRef, +} + +#[derive(Clone)] +pub struct ActorRef { + pub actor_id: ActorId, + state: Arc>, +} + +pub enum ActorState { + Running(RunningActor), + Terminated { + exit_status: Arc, + }, +} + +pub struct RunningActor { + pub actor_id: ActorId, + tx: UnboundedSender, mailbox: Weak, cleanup_actions: CleanupActions, next_task_id: u64, linked_tasks: Map, - exit_hooks: Vec, - exit_status: Option>, + exit_hooks: Vec) -> ActorResult>>, } pub struct Ref { @@ -152,8 +177,8 @@ preserves_schema::support::lazy_static! { } pub fn start_debt_reporter() { - Actor::new().boot(crate::name!("debt-reporter"), |t| Box::pin(async move { - t.actor.linked_task(crate::name!("tick"), async move { + Actor::new().boot(crate::name!("debt-reporter"), |t| { + t.state.linked_task(crate::name!("tick"), async { let mut timer = tokio::time::interval(core::time::Duration::from_secs(1)); loop { timer.tick().await; @@ -164,7 +189,7 @@ pub fn start_debt_reporter() { } }); Ok(()) - })); + }); } impl TryFrom<&_Any> for Synced { @@ -185,18 +210,66 @@ impl From<&Synced> for _Any { } impl<'activation> Activation<'activation> { - pub fn new(actor: &'activation mut Actor, debtor: Arc) -> Self { + fn make(actor: &ActorRef, debtor: Arc, state: &'activation mut RunningActor) -> Self { Activation { - actor, - debtor, - queues: HashMap::new(), - immediate_self: Vec::new(), + actor: actor.clone(), + state, + pending: EventBuffer::new(debtor), + } + } + + pub fn for_actor( + actor: &ActorRef, + debtor: Arc, + f: F, + ) -> ActorResult where + F: FnOnce(&mut Activation) -> ActorResult, + { + match Self::for_actor_exit(actor, debtor, |t| match f(t) { + Ok(()) => None, + Err(e) => Some(Err(e)), + }) { + None => Ok(()), + Some(e) => Err(error("Could not activate terminated actor", encode_error(e))), + } + } + + pub fn for_actor_exit( + actor: &ActorRef, + debtor: Arc, + f: F, + ) -> Option where + F: FnOnce(&mut Activation) -> Option, + { + match actor.state.write() { + Err(_) => panicked_err(), + Ok(mut g) => match &mut *g { + ActorState::Terminated { exit_status } => + Some((**exit_status).clone()), + ActorState::Running(state) => + match f(&mut Activation::make(actor, debtor, state)) { + None => None, + Some(exit_status) => { + let exit_status = Arc::new(exit_status); + let mut t = Activation::make(actor, Debtor::new(crate::name!("shutdown")), state); + for action in std::mem::take(&mut t.state.exit_hooks) { + if let Err(err) = action(&mut t, &exit_status) { + tracing::error!(err = debug(err), "error in exit hook"); + } + } + *g = ActorState::Terminated { + exit_status: Arc::clone(&exit_status), + }; + Some((*exit_status).clone()) + } + }, + } } } fn immediate_oid(&self, r: &Arc>) { if r.mailbox.actor_id != self.actor.actor_id { - panic!("Cannot use immediate_self to send to remote peers"); + panic!("Cannot use for_myself to send to remote peers"); } } @@ -204,71 +277,89 @@ impl<'activation> Activation<'activation> { let handle = crate::next_handle(); { let r = Arc::clone(r); - self.queue_for(&r).push(Box::new( + self.pending.queue_for(&r).push(Box::new( move |t| r.with_entity(|e| e.assert(t, a, handle)))); } { let r = Arc::clone(r); - self.actor.cleanup_actions.insert( + self.state.cleanup_actions.insert( handle, - CleanupAction::Remote(Arc::clone(&r.mailbox), Box::new( + CleanupAction::ForAnother(Arc::clone(&r.mailbox), Box::new( move |t| r.with_entity(|e| e.retract(t, handle))))); } handle } - pub fn assert_immediate_self(&mut self, r: &Arc>, a: M) -> Handle { + pub fn assert_for_myself(&mut self, r: &Arc>, a: M) -> Handle { self.immediate_oid(r); let handle = crate::next_handle(); { let r = Arc::clone(r); - self.immediate_self.push(Box::new( + self.pending.for_myself.push(Box::new( move |t| r.with_entity(|e| e.assert(t, a, handle)))); } { let r = Arc::clone(r); - self.actor.cleanup_actions.insert( + self.state.cleanup_actions.insert( handle, - CleanupAction::ImmediateSelf(Box::new( + CleanupAction::ForMyself(Box::new( move |t| r.with_entity(|e| e.retract(t, handle))))); } handle } pub fn retract(&mut self, handle: Handle) { - if let Some(d) = self.actor.cleanup_actions.remove(&handle) { - self.retract_known_ref(d) - } - } - - fn retract_known_ref(&mut self, d: CleanupAction) { - match d { - CleanupAction::Remote(mailbox, action) => - self.queue_for_mailbox(&mailbox).push(action), - CleanupAction::ImmediateSelf(action) => - self.immediate_self.push(action), + if let Some(d) = self.state.cleanup_actions.remove(&handle) { + self.pending.execute_cleanup_action(d) } } pub fn message(&mut self, r: &Arc>, m: M) { let r = Arc::clone(r); - self.queue_for(&r).push(Box::new( + self.pending.queue_for(&r).push(Box::new( move |t| r.with_entity(|e| e.message(t, m)))) } - pub fn message_immediate_self(&mut self, r: &Arc>, m: M) { + pub fn message_for_myself(&mut self, r: &Arc>, m: M) { self.immediate_oid(r); let r = Arc::clone(r); - self.immediate_self.push(Box::new( + self.pending.for_myself.push(Box::new( move |t| r.with_entity(|e| e.message(t, m)))) } pub fn sync(&mut self, r: &Arc>, peer: Arc>) { let r = Arc::clone(r); - self.queue_for(&r).push(Box::new( + self.pending.queue_for(&r).push(Box::new( move |t| r.with_entity(|e| e.sync(t, peer)))) } + pub fn debtor(&self) -> &Arc { + &self.pending.debtor + } + + pub fn deliver(&mut self) { + self.pending.deliver(); + } +} + +impl EventBuffer { + fn new(debtor: Arc) -> Self { + EventBuffer { + debtor, + queues: HashMap::new(), + for_myself: Vec::new(), + } + } + + fn execute_cleanup_action(&mut self, d: CleanupAction) { + match d { + CleanupAction::ForAnother(mailbox, action) => + self.queue_for_mailbox(&mailbox).push(action), + CleanupAction::ForMyself(action) => + self.for_myself.push(action), + } + } + fn queue_for(&mut self, r: &Arc>) -> &mut PendingEventQueue { self.queue_for_mailbox(&r.mailbox) } @@ -279,8 +370,8 @@ impl<'activation> Activation<'activation> { } fn deliver(&mut self) { - if !self.immediate_self.is_empty() { - panic!("Unprocessed immediate_self events remain at deliver() time"); + if !self.for_myself.is_empty() { + panic!("Unprocessed for_myself events remain at deliver() time"); } for (_actor_id, (tx, turn)) in std::mem::take(&mut self.queues).into_iter() { let _ = send_actions(&tx, &self.debtor, turn); @@ -288,7 +379,7 @@ impl<'activation> Activation<'activation> { } } -impl<'activation> Drop for Activation<'activation> { +impl Drop for EventBuffer { fn drop(&mut self) { self.deliver() } @@ -407,15 +498,19 @@ impl Actor { let actor_id = crate::next_actor_id(); // tracing::trace!(id = actor_id, "Actor::new"); Actor { - actor_id, - tx, rx, - mailbox: Weak::new(), - cleanup_actions: Map::new(), - next_task_id: 0, - linked_tasks: Map::new(), - exit_hooks: Vec::new(), - exit_status: None, + ac_ref: ActorRef { + actor_id, + state: Arc::new(RwLock::new(ActorState::Running(RunningActor { + actor_id, + tx, + mailbox: Weak::new(), + cleanup_actions: Map::new(), + next_task_id: 0, + linked_tasks: Map::new(), + exit_hooks: Vec::new(), + }))), + }, } } @@ -429,14 +524,121 @@ impl Actor { } pub fn create_and_start_inert(name: tracing::Span) -> Arc> { - let mut ac = Self::new(); - let r = ac.create_inert(); + let ac = Self::new(); + let r = ac.ac_ref.write(|s| s.unwrap().expect_running().create_inert()); ac.start(name); r } - pub fn id(&self) -> ActorId { - self.actor_id + pub fn boot ActorResult>( + mut self, + name: tracing::Span, + boot: F, + ) -> ActorHandle { + name.record("actor_id", &self.ac_ref.actor_id); + tokio::spawn(async move { + tracing::trace!("start"); + self.run(boot).await; + let result = self.ac_ref.exit_status().expect("terminated"); + match &result { + Ok(()) => tracing::trace!("normal stop"), + Err(e) => tracing::error!("error stop: {}", e), + } + result + }.instrument(name)) + } + + pub fn start(self, name: tracing::Span) -> ActorHandle { + self.boot(name, |_ac| Ok(())) + } + + fn terminate(&mut self, result: ActorResult) { + let _ = Activation::for_actor_exit( + &self.ac_ref, Debtor::new(crate::name!("shutdown")), |_| Some(result)); + } + + async fn run ActorResult>( + &mut self, + boot: F, + ) -> () { + if Activation::for_actor(&self.ac_ref, Debtor::new(crate::name!("boot")), boot).is_err() { + return; + } + + loop { + match self.rx.recv().await { + None => { + return self.terminate(Err(error("Unexpected channel close", _Any::new(false)))); + } + Some(m) => match m { + SystemMessage::Release => { + tracing::trace!("SystemMessage::Release"); + return self.terminate(Ok(())); + } + SystemMessage::Turn(mut loaned_item) => { + let mut actions = std::mem::take(&mut loaned_item.item); + let r = Activation::for_actor( + &self.ac_ref, Arc::clone(&loaned_item.debtor), |t| { + loop { + for action in actions.into_iter() { action(t)? } + actions = std::mem::take(&mut t.pending.for_myself); + if actions.is_empty() { break; } + } + Ok(()) + }); + if r.is_err() { return; } + } + SystemMessage::Crash(e) => { + tracing::trace!("SystemMessage::Crash({:?})", &e); + return self.terminate(Err(e)); + } + } + } + } + } +} + +fn panicked_err() -> Option { + Some(Err(error("Actor panicked", _Any::new(false)))) +} + +impl ActorRef { + pub fn read) -> R>(&self, f: F) -> R { + match self.state.read() { + Err(_) => f(None), + Ok(g) => f(Some(&*g)), + } + } + + pub fn write) -> R>(&self, f: F) -> R { + match self.state.write() { + Err(_) => f(None), + Ok(mut g) => f(Some(&mut *g)), + } + } + + pub fn exit_status(&self) -> Option { + self.read(|s| s.map_or_else( + panicked_err, + |state| match state { + ActorState::Running(_) => None, + ActorState::Terminated { exit_status } => Some((**exit_status).clone()), + })) + } +} + +impl ActorState { + fn expect_running(&mut self) -> &mut RunningActor { + match self { + ActorState::Terminated { .. } => panic!("Expected a running actor"), + ActorState::Running(r) => r, + } + } +} + +impl RunningActor { + pub fn shutdown(&self) { + let _ = self.tx.send(SystemMessage::Release); } fn mailbox(&mut self) -> Arc { @@ -453,9 +655,8 @@ impl Actor { } } - pub fn shutdown(&mut self) { - let _ = self.tx.send(SystemMessage::Release); - () + pub fn inert_entity(&mut self) -> Arc> { + self.create(InertEntity) } pub fn create + Send + Sync + 'static>(&mut self, e: E) -> Arc> { @@ -471,93 +672,14 @@ impl Actor { }) } - pub fn boot FnOnce(&'a mut Activation) -> BoxFuture<'a, ActorResult>>( - mut self, - name: tracing::Span, - boot: F, - ) -> ActorHandle { - name.record("actor_id", &self.id()); - tokio::spawn(async move { - tracing::trace!("start"); - let result = self.run(boot).await; - self.exit_status = Some(Arc::new(result.clone())); - { - let mut t = Activation::new(&mut self, Debtor::new(crate::name!("shutdown"))); - for action in std::mem::take(&mut t.actor.exit_hooks) { - if let Err(err) = action(&mut t) { - tracing::error!(err = debug(err), "error in exit hook"); - } - } - } - match &result { - Ok(()) => { - tracing::trace!("normal stop"); - () - } - Err(e) => tracing::error!("error stop: {}", e), - } - result - }.instrument(name)) - } - - pub fn start(self, name: tracing::Span) -> ActorHandle { - self.boot(name, |_ac| Box::pin(ready(Ok(())))) - } - - async fn run FnOnce(&'a mut Activation) -> BoxFuture<'a, ActorResult>>( - &mut self, - boot: F, - ) -> ActorResult { - let _id = self.id(); - // tracing::trace!(_id, "boot"); - boot(&mut Activation::new(self, Debtor::new(crate::name!("boot")))).await?; - // tracing::trace!(_id, "run"); - loop { - match self.rx.recv().await { - None => - Err(error("Unexpected channel close", _Any::new(false)))?, - Some(m) => { - let should_stop = self.handle(m).await?; - if should_stop { - return Ok(()); - } - } - } - } - } - pub fn add_exit_hook(&mut self, r: &Arc>) { let r = Arc::clone(r); - self.exit_hooks.push(Box::new(move |t| { - let exit_status = Arc::clone(t.actor.exit_status.as_ref().expect("exited")); + self.exit_hooks.push(Box::new(move |t, exit_status| { r.with_entity(|e| e.exit_hook(t, &exit_status)) })) } - async fn handle(&mut self, m: SystemMessage) -> Result { - match m { - SystemMessage::Release => { - tracing::trace!("SystemMessage::Release"); - Ok(true) - } - SystemMessage::Turn(mut loaned_item) => { - let mut actions = std::mem::take(&mut loaned_item.item); - let mut t = Activation::new(self, Arc::clone(&loaned_item.debtor)); - loop { - for action in actions.into_iter() { action(&mut t)? } - actions = std::mem::take(&mut t.immediate_self); - if actions.is_empty() { break; } - } - Ok(false) - } - SystemMessage::Crash(e) => { - tracing::trace!("SystemMessage::Crash({:?})", &e); - Err(e)? - } - } - } - - pub fn linked_task + Send + 'static>( + pub fn linked_task>( &mut self, name: tracing::Span, boot: F, @@ -600,17 +722,21 @@ impl Actor { impl Drop for Actor { fn drop(&mut self) { self.rx.close(); + } +} +impl Drop for RunningActor { + fn drop(&mut self) { for (_task_id, token) in std::mem::take(&mut self.linked_tasks).into_iter() { token.cancel(); } let to_clear = std::mem::take(&mut self.cleanup_actions); { - let mut t = Activation::new(self, Debtor::new(crate::name!("drop"))); + let mut b = EventBuffer::new(Debtor::new(crate::name!("drop"))); for (_handle, r) in to_clear.into_iter() { tracing::trace!(h = debug(&_handle), "retract on termination"); - t.retract_known_ref(r); + b.execute_cleanup_action(r); } } diff --git a/src/bin/syndicate-server.rs b/src/bin/syndicate-server.rs index ee39720..e9cbd7a 100644 --- a/src/bin/syndicate-server.rs +++ b/src/bin/syndicate-server.rs @@ -77,7 +77,7 @@ async fn main() -> Result<(), Box> { syndicate::entity(Arc::clone(&ds)).on_asserted(handle_resolve))); { let ds = Arc::clone(&ds); - Actor::new().boot(syndicate::name!("rootcap"), |t| Box::pin(async move { + Actor::new().boot(syndicate::name!("rootcap"), move |t| { use syndicate::schemas::gatekeeper; let key = vec![0; 16]; let sr = sturdy::SturdyRef::mint(_Any::new("syndicate"), &key); @@ -85,7 +85,7 @@ async fn main() -> Result<(), Box> { tracing::info!(rootcap = display(sr.to_hex())); ds.assert(t, &gatekeeper::Bind { oid: sr.oid.clone(), key, target: ds.clone() }); Ok(()) - })); + }); } for port in config.ports.clone() { @@ -93,8 +93,8 @@ async fn main() -> Result<(), Box> { let config = Arc::clone(&config); daemons.push(Actor::new().boot( syndicate::name!("tcp", port), - move |t| Box::pin(ready(Ok(t.actor.linked_task(syndicate::name!("listener"), - run_listener(gateway, port, config))))))); + move |t| Ok(t.state.linked_task(syndicate::name!("listener"), + run_listener(gateway, port, config))))); } futures::future::join_all(daemons).await; @@ -128,7 +128,7 @@ fn extract_binary_packets( } async fn run_connection( - t: &mut Activation<'_>, + ac: ActorRef, stream: TcpStream, gateway: Arc, addr: std::net::SocketAddr, @@ -156,17 +156,19 @@ async fn run_connection( 0 => Err(error("closed before starting", _Any::new(false)))?, _ => unreachable!() }; - struct ExitListener; - impl Entity<()> for ExitListener { - fn exit_hook(&mut self, _t: &mut Activation, exit_status: &Arc) -> ActorResult { - tracing::info!(exit_status = debug(exit_status), "disconnect"); - Ok(()) + Activation::for_actor(&ac, Debtor::new(syndicate::name!("start-session")), |t| { + struct ExitListener; + impl Entity<()> for ExitListener { + fn exit_hook(&mut self, _t: &mut Activation, exit_status: &Arc) -> ActorResult { + tracing::info!(exit_status = debug(exit_status), "disconnect"); + Ok(()) + } } - } - let exit_listener = t.actor.create(ExitListener); - t.actor.add_exit_hook(&exit_listener); - relay::TunnelRelay::run(t, i, o, Some(gateway), None); - Ok(()) + let exit_listener = t.state.create(ExitListener); + t.state.add_exit_hook(&exit_listener); + relay::TunnelRelay::run(t, i, o, Some(gateway), None); + Ok(()) + }) } async fn run_listener( @@ -183,7 +185,9 @@ async fn run_listener( let config = Arc::clone(&config); let ac = Actor::new(); ac.boot(syndicate::name!(parent: None, "connection"), - move |t| Box::pin(run_connection(t, stream, gateway, addr, config))); + move |t| Ok(t.state.linked_task( + tracing::Span::current(), + run_connection(t.actor.clone(), stream, gateway, addr, config)))); } } @@ -222,7 +226,7 @@ fn handle_resolve( } } }) - .create_cap(t.actor); + .create_cap(t.state); if let Some(oh) = ds.assert(t, &dataspace::Observe { // TODO: codegen plugin to generate pattern constructors pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec { diff --git a/src/during.rs b/src/during.rs index fa3d9b6..c63c79d 100644 --- a/src/during.rs +++ b/src/during.rs @@ -98,7 +98,7 @@ where } } - pub fn create(self, ac: &mut Actor) -> Arc> { + pub fn create(self, ac: &mut RunningActor) -> Arc> { ac.create(self) } } @@ -109,7 +109,7 @@ where Fa: 'static + Send + Sync + FnMut(&mut E, &mut Activation, _Any) -> DuringResult, Fm: 'static + Send + Sync + FnMut(&mut E, &mut Activation, _Any) -> ActorResult, { - pub fn create_cap(self, ac: &mut Actor) -> Arc + pub fn create_cap(self, ac: &mut RunningActor) -> Arc { Cap::new(&self.create(ac)) } diff --git a/src/error.rs b/src/error.rs index b7c954d..28b51f9 100644 --- a/src/error.rs +++ b/src/error.rs @@ -3,6 +3,7 @@ pub use super::schemas::internal_protocol::_Ptr; pub use super::schemas::internal_protocol::Error; use preserves::value::NestedValue; +use preserves::value::Value; use preserves_schema::support::ParseError; impl std::error::Error for Error {} @@ -20,6 +21,21 @@ pub fn error(message: &str, detail: Detail) -> Error where _Any: From) -> _Any { + match result { + Ok(()) => { + let mut r = Value::record(Value::symbol("Ok").wrap(), 1); + r.fields_vec_mut().push(Value::record(Value::symbol("tuple").wrap(), 0).finish().wrap()); + r.finish().wrap() + } + Err(e) => { + let mut r = Value::record(Value::symbol("Err").wrap(), 1); + r.fields_vec_mut().push((&e).into()); + r.finish().wrap() + } + } +} + impl From<&str> for Error { fn from(v: &str) -> Self { error(v, _Any::new(false)) diff --git a/src/relay.rs b/src/relay.rs index df7d4b7..2015b2a 100644 --- a/src/relay.rs +++ b/src/relay.rs @@ -167,7 +167,7 @@ pub fn connect_stream( let i = Input::Bytes(Box::pin(i)); let o = Output::Bytes(Box::pin(o)); let gatekeeper = TunnelRelay::run(t, i, o, None, Some(sturdy::Oid(0.into()))).unwrap(); - let main_entity = t.actor.create(during::entity(initial_state).on_asserted(move |state, t, a: _Any| { + let main_entity = t.state.create(during::entity(initial_state).on_asserted(move |state, t, a: _Any| { let denotation = a.value().to_embedded()?; f(state, t, Arc::clone(denotation)) })); @@ -187,7 +187,7 @@ impl TunnelRelay { ) -> Option> { let (output_tx, output_rx) = unbounded_channel(); let mut tr = TunnelRelay { - self_ref: t.actor.create_inert(), + self_ref: t.state.create_inert(), input_buffer: BytesMut::with_capacity(1024), output: output_tx, inbound_assertions: Map::new(), @@ -203,12 +203,12 @@ impl TunnelRelay { tr.membranes.export_ref(ir, true); } let result = initial_oid.map( - |io| Arc::clone(&tr.membranes.import_oid(t.actor, &tr.self_ref, io).obj)); + |io| Arc::clone(&tr.membranes.import_oid(t.state, &tr.self_ref, io).obj)); let tr_ref = Arc::clone(&tr.self_ref); tr_ref.become_entity(tr); - t.actor.add_exit_hook(&tr_ref); - t.actor.linked_task(crate::name!("writer"), output_loop(o, output_rx)); - t.actor.linked_task(crate::name!("reader"), input_loop(i, tr_ref)); + t.state.add_exit_hook(&tr_ref); + t.state.linked_task(crate::name!("writer"), output_loop(o, output_rx)); + t.state.linked_task(crate::name!("reader"), input_loop(i, tr_ref)); result } @@ -222,7 +222,6 @@ impl TunnelRelay { Err(*b) }, P::Packet::Turn(b) => { - let t = &mut Activation::new(t.actor, Arc::clone(&t.debtor)); let P::Turn(events) = *b; for P::TurnEvent { oid, event } in events { let target = match self.membranes.exported.oid_map.get(&sturdy::Oid(oid.0.clone())) { @@ -282,7 +281,7 @@ impl TunnelRelay { Ok(()) } } - let k = t.actor.create(SyncPeer { + let k = t.state.create(SyncPeer { tr: Arc::clone(&self.self_ref), peer: Arc::clone(&peer), }); @@ -290,6 +289,7 @@ impl TunnelRelay { } } } + t.deliver(); Ok(()) } } @@ -358,7 +358,7 @@ impl Membranes { fn import_oid( &mut self, - ac: &mut Actor, + ac: &mut RunningActor, relay_ref: &TunnelRelayRef, oid: sturdy::Oid, ) -> Arc { @@ -379,7 +379,7 @@ impl Membranes { let oid = *b; match self.imported.oid_map.get(&oid) { Some(ws) => Ok(Arc::clone(&ws.obj)), - None => Ok(Arc::clone(&self.import_oid(t.actor, relay_ref, oid).obj)), + None => Ok(Arc::clone(&self.import_oid(t.state, relay_ref, oid).obj)), } } sturdy::WireRef::Yours { oid: b, attenuation } => { @@ -397,7 +397,7 @@ impl Membranes { })?) } } - None => Ok(Cap::new(&t.actor.create_inert())), + None => Ok(Cap::new(&t.state.inert_entity())), } } } @@ -544,7 +544,7 @@ impl Entity for TunnelRelay { fn message(&mut self, t: &mut Activation, m: RelayProtocol) -> ActorResult { match m { RelayProtocol::Input(RelayInput::Eof) => { - t.actor.shutdown(); + t.state.shutdown(); } RelayProtocol::Input(RelayInput::Packet(bs)) => { let mut src = BytesBinarySource::new(&bs); @@ -579,7 +579,7 @@ impl Entity for TunnelRelay { } RelayProtocol::Output(oid, event) => { if self.pending_outbound.is_empty() { - t.message_immediate_self(&self.self_ref, RelayProtocol::Flush); + t.message_for_myself(&self.self_ref, RelayProtocol::Flush); } let turn_event = P::TurnEvent { oid: P::Oid(oid.0), @@ -595,7 +595,7 @@ impl Entity for TunnelRelay { } RelayProtocol::Flush => { let events = std::mem::take(&mut self.pending_outbound); - self.send_packet(&t.debtor, events.len(), P::Packet::Turn(Box::new(P::Turn(events))))? + self.send_packet(&t.debtor(), events.len(), P::Packet::Turn(Box::new(P::Turn(events))))? } } Ok(()) @@ -604,7 +604,7 @@ impl Entity for TunnelRelay { fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc) -> ActorResult { if let Err(e) = &**exit_status { let e = e.clone(); - self.send_packet(&t.debtor, 1, P::Packet::Error(Box::new(e)))?; + self.send_packet(&t.debtor(), 1, P::Packet::Error(Box::new(e)))?; } Ok(()) } diff --git a/src/tracer.rs b/src/tracer.rs index b0ae3f4..3495d53 100644 --- a/src/tracer.rs +++ b/src/tracer.rs @@ -9,7 +9,7 @@ fn set_name_oid(t: &mut Tracer, r: &Arc>) { t.0.record("oid", &tracing::field::display(&r.oid())); } -pub fn tracer(ac: &mut Actor, name: tracing::Span) -> Arc> { +pub fn tracer(ac: &mut RunningActor, name: tracing::Span) -> Arc> { let mut e = Tracer(name); let r = ac.create_inert(); set_name_oid(&mut e, &r);