More fine-grained state and new ownership relations, to potentially permit avoiding scheduling overhead by directly entering an actor's runtime context

This commit is contained in:
Tony Garnock-Jones 2021-07-24 23:22:01 +02:00
parent 90bb32e38c
commit 35f510aa0b
12 changed files with 689 additions and 511 deletions

View File

@ -30,7 +30,7 @@ struct ShutdownEntity;
impl Entity<_Any> for ShutdownEntity { impl Entity<_Any> for ShutdownEntity {
fn message(&mut self, t: &mut Activation, _m: _Any) -> ActorResult { fn message(&mut self, t: &mut Activation, _m: _Any) -> ActorResult {
t.actor.shutdown(); t.state.shutdown();
Ok(()) Ok(())
} }
} }
@ -52,24 +52,25 @@ pub fn bench_pub(c: &mut Criterion) {
b.iter_custom(|iters| { b.iter_custom(|iters| {
let start = Instant::now(); let start = Instant::now();
rt.block_on(async move { rt.block_on(async move {
let mut ac = Actor::new(); Actor::new().boot(syndicate::name!("dataspace"), move |t| {
let ds = ac.create(Dataspace::new()); let ds = t.state.create(Dataspace::new());
let shutdown = ac.create(ShutdownEntity); let shutdown = t.state.create(ShutdownEntity);
let debtor = Debtor::new(syndicate::name!("sender-debtor")); let debtor = Debtor::new(syndicate::name!("sender-debtor"));
ac.linked_task(syndicate::name!("sender"), async move { t.state.linked_task(syndicate::name!("sender"), async move {
for _ in 0..iters { for _ in 0..iters {
let ds = Arc::clone(&ds); let ds = Arc::clone(&ds);
external_event(&Arc::clone(&ds.mailbox), &debtor, Box::new( external_event(&Arc::clone(&ds.mailbox), &debtor, Box::new(
move |t| ds.with_entity( move |t| ds.with_entity(
|e| e.message(t, says(_Any::new("bench_pub"), |e| e.message(t, says(_Any::new("bench_pub"),
Value::ByteString(vec![]).wrap())))))? Value::ByteString(vec![]).wrap())))))?
} }
external_event(&Arc::clone(&shutdown.mailbox), &debtor, Box::new( external_event(&Arc::clone(&shutdown.mailbox), &debtor, Box::new(
move |t| shutdown.with_entity( move |t| shutdown.with_entity(
|e| e.message(t, _Any::new(true)))))?; |e| e.message(t, _Any::new(true)))))?;
Ok(())
});
Ok(()) Ok(())
}); }).await.unwrap().unwrap();
ac.start(syndicate::name!("dataspace")).await.unwrap().unwrap();
}); });
start.elapsed() start.elapsed()
}) })
@ -79,24 +80,25 @@ pub fn bench_pub(c: &mut Criterion) {
b.iter_custom(|iters| { b.iter_custom(|iters| {
let start = Instant::now(); let start = Instant::now();
rt.block_on(async move { rt.block_on(async move {
let ds = Cap::new(&Actor::create_and_start(syndicate::name!("dataspace"), Dataspace::new())); let ds = Cap::new(
&Actor::create_and_start(syndicate::name!("dataspace"), Dataspace::new()));
let turn_count = Arc::new(AtomicU64::new(0)); let turn_count = Arc::new(AtomicU64::new(0));
struct Receiver(Arc<AtomicU64>);
impl Entity<_Any> for Receiver {
fn message(&mut self, _t: &mut Activation, _m: _Any) -> ActorResult {
self.0.fetch_add(1, Ordering::Relaxed);
Ok(())
}
}
let mut ac = Actor::new();
let shutdown = Cap::new(&ac.create(ShutdownEntity));
let receiver = Cap::new(&ac.create(Receiver(Arc::clone(&turn_count))));
{ {
let iters = iters.clone(); let iters = iters.clone();
ac.boot(syndicate::name!("dataspace"), move |t| Box::pin(async move { let turn_count = Arc::clone(&turn_count);
Actor::new().boot(syndicate::name!("consumer"), move |t| {
struct Receiver(Arc<AtomicU64>);
impl Entity<_Any> for Receiver {
fn message(&mut self, _t: &mut Activation, _m: _Any) -> ActorResult {
self.0.fetch_add(1, Ordering::Relaxed);
Ok(())
}
}
let shutdown = Cap::new(&t.state.create(ShutdownEntity));
let receiver = Cap::new(&t.state.create(Receiver(Arc::clone(&turn_count))));
ds.assert(t, &Observe { ds.assert(t, &Observe {
pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec { pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec {
ctor: Box::new(p::CRec { ctor: Box::new(p::CRec {
@ -124,8 +126,8 @@ pub fn bench_pub(c: &mut Criterion) {
})), })),
observer: shutdown, observer: shutdown,
}); });
let debtor = t.debtor.clone(); let debtor = Arc::clone(t.debtor());
t.actor.linked_task(syndicate::name!("sender"), async move { t.state.linked_task(syndicate::name!("sender"), async move {
for _ in 0..iters { for _ in 0..iters {
let ds = Arc::clone(&ds); let ds = Arc::clone(&ds);
external_event(&Arc::clone(&ds.underlying.mailbox), &debtor, Box::new( external_event(&Arc::clone(&ds.underlying.mailbox), &debtor, Box::new(
@ -142,8 +144,9 @@ pub fn bench_pub(c: &mut Criterion) {
Ok(()) Ok(())
}); });
Ok(()) Ok(())
})).await.unwrap().unwrap(); }).await.unwrap().unwrap();
} }
let actual_turns = turn_count.load(Ordering::SeqCst); let actual_turns = turn_count.load(Ordering::SeqCst);
if actual_turns != iters { if actual_turns != iters {
panic!("Expected {}, got {} messages", iters, actual_turns); panic!("Expected {}, got {} messages", iters, actual_turns);

View File

@ -26,57 +26,62 @@ pub struct Config {
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
syndicate::convenient_logging()?; syndicate::convenient_logging()?;
Actor::new().boot(syndicate::name!("consumer"), |t| Box::pin(async move { Actor::new().boot(syndicate::name!("consumer"), |t| {
let config = Config::from_args(); let ac = t.actor.clone();
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; let boot_debtor = Arc::clone(t.debtor());
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); Ok(t.state.linked_task(tracing::Span::current(), async move {
relay::connect_stream(t, i, o, sturdyref, (), |_state, t, ds| { let config = Config::from_args();
let consumer = syndicate::entity(0) let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?;
.on_message(|message_count, _t, m: _Any| { let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
if m.value().is_boolean() { Activation::for_actor(&ac, boot_debtor, |t| {
tracing::info!("{:?} messages in the last second", message_count); relay::connect_stream(t, i, o, sturdyref, (), |_state, t, ds| {
*message_count = 0; let consumer = syndicate::entity(0)
} else { .on_message(|message_count, _t, m: _Any| {
*message_count += 1; if m.value().is_boolean() {
} tracing::info!("{:?} messages in the last second", message_count);
Ok(()) *message_count = 0;
}) } else {
.create_cap(t.actor); *message_count += 1;
}
Ok(())
})
.create_cap(t.state);
ds.assert(t, &Observe {
pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec {
ctor: Box::new(p::CRec {
label: Value::symbol("Says").wrap(),
arity: 2.into(),
}),
members: Map::from_iter(vec![
(0.into(), p::Pattern::DBind(Box::new(p::DBind {
name: "who".to_owned(),
pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)),
}))),
(1.into(), p::Pattern::DBind(Box::new(p::DBind {
name: "what".to_owned(),
pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)),
}))),
].into_iter()),
})),
observer: Arc::clone(&consumer),
});
ds.assert(t, &Observe { t.state.linked_task(syndicate::name!("tick"), async move {
pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec { let mut stats_timer = interval(Duration::from_secs(1));
ctor: Box::new(p::CRec { loop {
label: Value::symbol("Says").wrap(), stats_timer.tick().await;
arity: 2.into(), let consumer = Arc::clone(&consumer);
}), external_event(&Arc::clone(&consumer.underlying.mailbox),
members: Map::from_iter(vec![ &Debtor::new(syndicate::name!("debtor")),
(0.into(), p::Pattern::DBind(Box::new(p::DBind { Box::new(move |t| consumer.underlying.with_entity(
name: "who".to_owned(), |e| e.message(t, _Any::new(true)))))?;
pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)), }
}))), });
(1.into(), p::Pattern::DBind(Box::new(p::DBind { Ok(None)
name: "what".to_owned(), });
pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)), Ok(())
}))), })
].into_iter()), }))
})), }).await??;
observer: Arc::clone(&consumer),
});
t.actor.linked_task(syndicate::name!("tick"), async move {
let mut stats_timer = interval(Duration::from_secs(1));
loop {
stats_timer.tick().await;
let consumer = Arc::clone(&consumer);
external_event(&Arc::clone(&consumer.underlying.mailbox),
&Debtor::new(syndicate::name!("debtor")),
Box::new(move |t| consumer.underlying.with_entity(
|e| e.message(t, _Any::new(true)))))?;
}
});
Ok(None)
});
Ok(())
})).await??;
Ok(()) Ok(())
} }

View File

@ -93,138 +93,144 @@ fn report_latencies(rtt_ns_samples: &Vec<u64>) {
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
syndicate::convenient_logging()?; syndicate::convenient_logging()?;
Actor::new().boot(syndicate::name!("pingpong"), move |t| Box::pin(async move { Actor::new().boot(syndicate::name!("pingpong"), |t| {
let config = Config::from_args(); let ac = t.actor.clone();
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; let boot_debtor = Arc::clone(t.debtor());
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); Ok(t.state.linked_task(tracing::Span::current(), async move {
relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| { let config = Config::from_args();
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?;
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
Activation::for_actor(&ac, boot_debtor, |t| {
relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| {
let (send_label, recv_label, report_latency_every, should_echo, bytes_padding) = let (send_label, recv_label, report_latency_every, should_echo, bytes_padding) =
match config.mode { match config.mode {
PingPongMode::Ping(ref c) => PingPongMode::Ping(ref c) =>
("Ping", "Pong", c.report_latency_every, false, c.bytes_padding), ("Ping", "Pong", c.report_latency_every, false, c.bytes_padding),
PingPongMode::Pong => PingPongMode::Pong =>
("Pong", "Ping", 0, true, 0), ("Pong", "Ping", 0, true, 0),
}; };
let consumer = { let consumer = {
let ds = Arc::clone(&ds); let ds = Arc::clone(&ds);
let mut turn_counter: u64 = 0; let mut turn_counter: u64 = 0;
let mut event_counter: u64 = 0; let mut event_counter: u64 = 0;
let mut rtt_ns_samples: Vec<u64> = vec![0; report_latency_every]; let mut rtt_ns_samples: Vec<u64> = vec![0; report_latency_every];
let mut rtt_batch_count: usize = 0; let mut rtt_batch_count: usize = 0;
let mut current_reply = None; let mut current_reply = None;
let self_ref = t.actor.create_inert(); let self_ref = t.state.create_inert();
self_ref.become_entity( self_ref.become_entity(
syndicate::entity(Arc::clone(&self_ref)) syndicate::entity(Arc::clone(&self_ref))
.on_message(move |self_ref, t, m: _Any| { .on_message(move |self_ref, t, m: _Any| {
match m.value().as_boolean() { match m.value().as_boolean() {
Some(true) => { Some(true) => {
tracing::info!("{:?} turns, {:?} events in the last second", tracing::info!("{:?} turns, {:?} events in the last second",
turn_counter, turn_counter,
event_counter); event_counter);
turn_counter = 0; turn_counter = 0;
event_counter = 0; event_counter = 0;
} }
Some(false) => { Some(false) => {
current_reply = None; current_reply = None;
} }
None => { None => {
event_counter += 1; event_counter += 1;
let bindings = m.value().to_sequence()?; let bindings = m.value().to_sequence()?;
let timestamp = &bindings[0]; let timestamp = &bindings[0];
let padding = &bindings[1]; let padding = &bindings[1];
if should_echo || (report_latency_every == 0) { if should_echo || (report_latency_every == 0) {
ds.message(t, simple_record2(&send_label, ds.message(t, simple_record2(&send_label,
timestamp.clone(), timestamp.clone(),
padding.clone())); padding.clone()));
} else { } else {
if let None = current_reply { if let None = current_reply {
turn_counter += 1; turn_counter += 1;
t.message_immediate_self(&self_ref, _Any::new(false)); t.message_for_myself(&self_ref, _Any::new(false));
let rtt_ns = now() - timestamp.value().to_u64()?; let rtt_ns = now() - timestamp.value().to_u64()?;
rtt_ns_samples[rtt_batch_count] = rtt_ns; rtt_ns_samples[rtt_batch_count] = rtt_ns;
rtt_batch_count += 1; rtt_batch_count += 1;
if rtt_batch_count == report_latency_every { if rtt_batch_count == report_latency_every {
rtt_ns_samples.sort(); rtt_ns_samples.sort();
report_latencies(&rtt_ns_samples); report_latencies(&rtt_ns_samples);
rtt_batch_count = 0; rtt_batch_count = 0;
} }
current_reply = Some( current_reply = Some(
simple_record2(&send_label, simple_record2(&send_label,
Value::from(now()).wrap(), Value::from(now()).wrap(),
padding.clone())); padding.clone()));
}
ds.message(t, current_reply.as_ref().expect("some reply").clone());
}
} }
ds.message(t, current_reply.as_ref().expect("some reply").clone());
} }
Ok(())
}));
Cap::new(&self_ref)
};
ds.assert(t, &Observe {
pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec {
ctor: Box::new(p::CRec {
label: Value::symbol(recv_label).wrap(),
arity: 2.into(),
}),
members: Map::from_iter(vec![
(0.into(), p::Pattern::DBind(Box::new(p::DBind {
name: "timestamp".to_owned(),
pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)),
}))),
(1.into(), p::Pattern::DBind(Box::new(p::DBind {
name: "padding".to_owned(),
pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)),
}))),
].into_iter()),
})),
observer: Arc::clone(&consumer),
});
t.state.linked_task(syndicate::name!("tick"), async move {
let mut stats_timer = interval(Duration::from_secs(1));
loop {
stats_timer.tick().await;
let consumer = Arc::clone(&consumer);
external_event(&Arc::clone(&consumer.underlying.mailbox),
&Debtor::new(syndicate::name!("debtor")),
Box::new(move |t| consumer.underlying.with_entity(
|e| e.message(t, _Any::new(true)))))?;
}
});
if let PingPongMode::Ping(c) = &config.mode {
let turn_count = c.turn_count;
let action_count = c.action_count;
let debtor = Arc::clone(t.debtor());
t.state.linked_task(syndicate::name!("boot-ping"), async move {
let padding: _Any = Value::ByteString(vec![0; bytes_padding]).wrap();
for _ in 0..turn_count {
let mut events: PendingEventQueue = vec![];
let current_rec = simple_record2(send_label,
Value::from(now()).wrap(),
padding.clone());
for _ in 0..action_count {
let ds = Arc::clone(&ds);
let current_rec = current_rec.clone();
events.push(Box::new(move |t| ds.underlying.with_entity(
|e| e.message(t, current_rec))));
} }
external_events(&ds.underlying.mailbox, &debtor, events)?
} }
Ok(()) Ok(())
})); });
Cap::new(&self_ref)
};
ds.assert(t, &Observe {
pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec {
ctor: Box::new(p::CRec {
label: Value::symbol(recv_label).wrap(),
arity: 2.into(),
}),
members: Map::from_iter(vec![
(0.into(), p::Pattern::DBind(Box::new(p::DBind {
name: "timestamp".to_owned(),
pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)),
}))),
(1.into(), p::Pattern::DBind(Box::new(p::DBind {
name: "padding".to_owned(),
pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)),
}))),
].into_iter()),
})),
observer: Arc::clone(&consumer),
});
t.actor.linked_task(syndicate::name!("tick"), async move {
let mut stats_timer = interval(Duration::from_secs(1));
loop {
stats_timer.tick().await;
let consumer = Arc::clone(&consumer);
external_event(&Arc::clone(&consumer.underlying.mailbox),
&Debtor::new(syndicate::name!("debtor")),
Box::new(move |t| consumer.underlying.with_entity(
|e| e.message(t, _Any::new(true)))))?;
}
});
if let PingPongMode::Ping(c) = &config.mode {
let turn_count = c.turn_count;
let action_count = c.action_count;
let debtor = t.debtor.clone();
t.actor.linked_task(syndicate::name!("boot-ping"), async move {
let padding: _Any = Value::ByteString(vec![0; bytes_padding]).wrap();
for _ in 0..turn_count {
let mut events: PendingEventQueue = vec![];
let current_rec = simple_record2(send_label,
Value::from(now()).wrap(),
padding.clone());
for _ in 0..action_count {
let ds = Arc::clone(&ds);
let current_rec = current_rec.clone();
events.push(Box::new(move |t| ds.underlying.with_entity(
|e| e.message(t, current_rec))));
}
external_events(&ds.underlying.mailbox, &debtor, events)?
} }
Ok(())
});
}
Ok(None) Ok(None)
}); });
Ok(()) Ok(())
})).await??; })
}))
}).await??;
Ok(()) Ok(())
} }

View File

@ -33,30 +33,36 @@ fn says(who: _Any, what: _Any) -> _Any {
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
syndicate::convenient_logging()?; syndicate::convenient_logging()?;
syndicate::actor::start_debt_reporter(); syndicate::actor::start_debt_reporter();
Actor::new().boot(syndicate::name!("producer"), |t| Box::pin(async move { Actor::new().boot(syndicate::name!("producer"), |t| {
let config = Config::from_args(); let ac = t.actor.clone();
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; let boot_debtor = Arc::clone(t.debtor());
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); Ok(t.state.linked_task(tracing::Span::current(), async move {
relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| { let config = Config::from_args();
let padding: _Any = Value::ByteString(vec![0; config.bytes_padding]).wrap(); let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?;
let action_count = config.action_count; let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
let debtor = Debtor::new(syndicate::name!("debtor")); Activation::for_actor(&ac, boot_debtor, |t| {
t.actor.linked_task(syndicate::name!("sender"), async move { relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| {
loop { let padding: _Any = Value::ByteString(vec![0; config.bytes_padding]).wrap();
debtor.ensure_clear_funds().await; let action_count = config.action_count;
let mut events: PendingEventQueue = Vec::new(); let debtor = Debtor::new(syndicate::name!("debtor"));
for _ in 0..action_count { t.state.linked_task(syndicate::name!("sender"), async move {
let ds = Arc::clone(&ds); loop {
let padding = padding.clone(); debtor.ensure_clear_funds().await;
events.push(Box::new(move |t| ds.underlying.with_entity( let mut events: PendingEventQueue = Vec::new();
|e| e.message(t, says(Value::from("producer").wrap(), padding))))); for _ in 0..action_count {
} let ds = Arc::clone(&ds);
external_events(&ds.underlying.mailbox, &debtor, events)?; let padding = padding.clone();
} events.push(Box::new(move |t| ds.underlying.with_entity(
}); |e| e.message(t, says(Value::from("producer").wrap(), padding)))));
Ok(None) }
}); external_events(&ds.underlying.mailbox, &debtor, events)?;
Ok(()) }
})).await??; });
Ok(None)
});
Ok(())
})
}))
}).await??;
Ok(()) Ok(())
} }

View File

@ -26,73 +26,79 @@ pub struct Config {
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
syndicate::convenient_logging()?; syndicate::convenient_logging()?;
Actor::new().boot(syndicate::name!("consumer"), |t| Box::pin(async move { Actor::new().boot(syndicate::name!("state-consumer"), |t| {
let config = Config::from_args(); let ac = t.actor.clone();
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; let boot_debtor = Arc::clone(t.debtor());
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); Ok(t.state.linked_task(tracing::Span::current(), async move {
relay::connect_stream(t, i, o, sturdyref, (), |_state, t, ds| { let config = Config::from_args();
let consumer = { let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?;
#[derive(Default)] let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
struct State { Activation::for_actor(&ac, boot_debtor, |t| {
event_counter: u64, relay::connect_stream(t, i, o, sturdyref, (), |_state, t, ds| {
arrival_counter: u64, let consumer = {
departure_counter: u64, #[derive(Default)]
occupancy: u64, struct State {
} event_counter: u64,
syndicate::entity(State::default()).on_asserted(move |s, _, _| { arrival_counter: u64,
s.event_counter += 1; departure_counter: u64,
s.arrival_counter += 1; occupancy: u64,
s.occupancy += 1; }
Ok(Some(Box::new(|s, _| { syndicate::entity(State::default()).on_asserted(move |s, _, _| {
s.event_counter += 1; s.event_counter += 1;
s.departure_counter += 1; s.arrival_counter += 1;
s.occupancy -= 1; s.occupancy += 1;
Ok(()) Ok(Some(Box::new(|s, _| {
}))) s.event_counter += 1;
}).on_message(move |s, _, _| { s.departure_counter += 1;
tracing::info!( s.occupancy -= 1;
"{:?} events, {:?} arrivals, {:?} departures, {:?} present in the last second", Ok(())
s.event_counter, })))
s.arrival_counter, }).on_message(move |s, _, _| {
s.departure_counter, tracing::info!(
s.occupancy); "{:?} events, {:?} arrivals, {:?} departures, {:?} present in the last second",
s.event_counter = 0; s.event_counter,
s.arrival_counter = 0; s.arrival_counter,
s.departure_counter = 0; s.departure_counter,
Ok(()) s.occupancy);
}).create_cap(t.actor) s.event_counter = 0;
}; s.arrival_counter = 0;
s.departure_counter = 0;
Ok(())
}).create_cap(t.state)
};
ds.assert(t, &Observe { ds.assert(t, &Observe {
pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec { pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec {
ctor: Box::new(p::CRec { ctor: Box::new(p::CRec {
label: Value::symbol("Present").wrap(), label: Value::symbol("Present").wrap(),
arity: 1.into(), arity: 1.into(),
}), }),
members: Map::from_iter(vec![ members: Map::from_iter(vec![
(0.into(), p::Pattern::DBind(Box::new(p::DBind { (0.into(), p::Pattern::DBind(Box::new(p::DBind {
name: "who".to_owned(), name: "who".to_owned(),
pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)), pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)),
}))), }))),
].into_iter()), ].into_iter()),
})), })),
observer: Arc::clone(&consumer), observer: Arc::clone(&consumer),
}); });
t.actor.linked_task(syndicate::name!("tick"), async move { t.state.linked_task(syndicate::name!("tick"), async move {
let mut stats_timer = interval(Duration::from_secs(1)); let mut stats_timer = interval(Duration::from_secs(1));
loop { loop {
stats_timer.tick().await; stats_timer.tick().await;
let consumer = Arc::clone(&consumer); let consumer = Arc::clone(&consumer);
external_event(&Arc::clone(&consumer.underlying.mailbox), external_event(&Arc::clone(&consumer.underlying.mailbox),
&Debtor::new(syndicate::name!("debtor")), &Debtor::new(syndicate::name!("debtor")),
Box::new(move |t| consumer.underlying.with_entity( Box::new(move |t| consumer.underlying.with_entity(
|e| e.message(t, _Any::new(true)))))?; |e| e.message(t, _Any::new(true)))))?;
} }
}); });
Ok(None) Ok(None)
}); });
Ok(()) Ok(())
})).await??; })
}))
}).await??;
Ok(()) Ok(())
} }

View File

@ -18,40 +18,46 @@ pub struct Config {
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
syndicate::convenient_logging()?; syndicate::convenient_logging()?;
Actor::new().boot(syndicate::name!("producer"), |t| Box::pin(async move { Actor::new().boot(syndicate::name!("state-producer"), |t| {
let config = Config::from_args(); let ac = t.actor.clone();
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; let boot_debtor = Arc::clone(t.debtor());
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); Ok(t.state.linked_task(tracing::Span::current(), async move {
relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| { let config = Config::from_args();
let debtor = Debtor::new(syndicate::name!("debtor")); let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?;
t.actor.linked_task(syndicate::name!("sender"), async move { let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
let presence: _Any = Value::simple_record1( Activation::for_actor(&ac, boot_debtor, |t| {
"Present", relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| {
Value::from(std::process::id()).wrap()).wrap(); let debtor = Debtor::new(syndicate::name!("debtor"));
let handle = syndicate::next_handle(); t.state.linked_task(syndicate::name!("sender"), async move {
let assert_e = || { let presence: _Any = Value::simple_record1(
let ds = Arc::clone(&ds); "Present",
let presence = presence.clone(); Value::from(std::process::id()).wrap()).wrap();
let handle = handle.clone(); let handle = syndicate::next_handle();
external_event(&Arc::clone(&ds.underlying.mailbox), &debtor, Box::new( let assert_e = || {
move |t| ds.underlying.with_entity(|e| e.assert(t, presence, handle)))) let ds = Arc::clone(&ds);
}; let presence = presence.clone();
let retract_e = || { let handle = handle.clone();
let ds = Arc::clone(&ds); external_event(&Arc::clone(&ds.underlying.mailbox), &debtor, Box::new(
let handle = handle.clone(); move |t| ds.underlying.with_entity(|e| e.assert(t, presence, handle))))
external_event(&Arc::clone(&ds.underlying.mailbox), &debtor, Box::new( };
move |t| ds.underlying.with_entity(|e| e.retract(t, handle)))) let retract_e = || {
}; let ds = Arc::clone(&ds);
assert_e()?; let handle = handle.clone();
loop { external_event(&Arc::clone(&ds.underlying.mailbox), &debtor, Box::new(
debtor.ensure_clear_funds().await; move |t| ds.underlying.with_entity(|e| e.retract(t, handle))))
retract_e()?; };
assert_e()?; assert_e()?;
} loop {
}); debtor.ensure_clear_funds().await;
Ok(None) retract_e()?;
}); assert_e()?;
Ok(()) }
})).await??; });
Ok(None)
});
Ok(())
})
}))
}).await??;
Ok(()) Ok(())
} }

View File

@ -5,6 +5,7 @@ pub use std::future::ready;
use super::ActorId; use super::ActorId;
use super::schemas::sturdy; use super::schemas::sturdy;
use super::error::Error; use super::error::Error;
use super::error::encode_error;
use super::error::error; use super::error::error;
use super::rewrite::CaveatError; use super::rewrite::CaveatError;
use super::rewrite::CheckedCaveat; use super::rewrite::CheckedCaveat;
@ -60,9 +61,12 @@ pub trait Entity<M>: Send + Sync {
} }
} }
pub struct InertEntity;
impl<M> Entity<M> for InertEntity {}
enum CleanupAction { enum CleanupAction {
ImmediateSelf(Action), ForMyself(Action),
Remote(Arc<Mailbox>, Action), ForAnother(Arc<Mailbox>, Action),
} }
type CleanupActions = Map<Handle, CleanupAction>; type CleanupActions = Map<Handle, CleanupAction>;
@ -72,10 +76,15 @@ pub type PendingEventQueue = Vec<Action>;
// This is what other implementations call a "Turn", renamed here to // This is what other implementations call a "Turn", renamed here to
// avoid conflicts with schemas::internal_protocol::Turn. // avoid conflicts with schemas::internal_protocol::Turn.
pub struct Activation<'activation> { pub struct Activation<'activation> {
pub actor: &'activation mut Actor, pub actor: ActorRef,
pub state: &'activation mut RunningActor,
pending: EventBuffer,
}
struct EventBuffer {
pub debtor: Arc<Debtor>, pub debtor: Arc<Debtor>,
queues: HashMap<ActorId, (UnboundedSender<SystemMessage>, PendingEventQueue)>, queues: HashMap<ActorId, (UnboundedSender<SystemMessage>, PendingEventQueue)>,
immediate_self: PendingEventQueue, for_myself: PendingEventQueue,
} }
#[derive(Debug)] #[derive(Debug)]
@ -104,15 +113,31 @@ pub struct Mailbox {
} }
pub struct Actor { pub struct Actor {
actor_id: ActorId,
tx: UnboundedSender<SystemMessage>,
rx: UnboundedReceiver<SystemMessage>, rx: UnboundedReceiver<SystemMessage>,
ac_ref: ActorRef,
}
#[derive(Clone)]
pub struct ActorRef {
pub actor_id: ActorId,
state: Arc<RwLock<ActorState>>,
}
pub enum ActorState {
Running(RunningActor),
Terminated {
exit_status: Arc<ActorResult>,
},
}
pub struct RunningActor {
pub actor_id: ActorId,
tx: UnboundedSender<SystemMessage>,
mailbox: Weak<Mailbox>, mailbox: Weak<Mailbox>,
cleanup_actions: CleanupActions, cleanup_actions: CleanupActions,
next_task_id: u64, next_task_id: u64,
linked_tasks: Map<u64, CancellationToken>, linked_tasks: Map<u64, CancellationToken>,
exit_hooks: Vec<Action>, exit_hooks: Vec<Box<dyn Send + Sync + FnOnce(&mut Activation, &Arc<ActorResult>) -> ActorResult>>,
exit_status: Option<Arc<ActorResult>>,
} }
pub struct Ref<M> { pub struct Ref<M> {
@ -152,8 +177,8 @@ preserves_schema::support::lazy_static! {
} }
pub fn start_debt_reporter() { pub fn start_debt_reporter() {
Actor::new().boot(crate::name!("debt-reporter"), |t| Box::pin(async move { Actor::new().boot(crate::name!("debt-reporter"), |t| {
t.actor.linked_task(crate::name!("tick"), async move { t.state.linked_task(crate::name!("tick"), async {
let mut timer = tokio::time::interval(core::time::Duration::from_secs(1)); let mut timer = tokio::time::interval(core::time::Duration::from_secs(1));
loop { loop {
timer.tick().await; timer.tick().await;
@ -164,7 +189,7 @@ pub fn start_debt_reporter() {
} }
}); });
Ok(()) Ok(())
})); });
} }
impl TryFrom<&_Any> for Synced { impl TryFrom<&_Any> for Synced {
@ -185,18 +210,66 @@ impl From<&Synced> for _Any {
} }
impl<'activation> Activation<'activation> { impl<'activation> Activation<'activation> {
pub fn new(actor: &'activation mut Actor, debtor: Arc<Debtor>) -> Self { fn make(actor: &ActorRef, debtor: Arc<Debtor>, state: &'activation mut RunningActor) -> Self {
Activation { Activation {
actor, actor: actor.clone(),
debtor, state,
queues: HashMap::new(), pending: EventBuffer::new(debtor),
immediate_self: Vec::new(), }
}
pub fn for_actor<F>(
actor: &ActorRef,
debtor: Arc<Debtor>,
f: F,
) -> ActorResult where
F: FnOnce(&mut Activation) -> ActorResult,
{
match Self::for_actor_exit(actor, debtor, |t| match f(t) {
Ok(()) => None,
Err(e) => Some(Err(e)),
}) {
None => Ok(()),
Some(e) => Err(error("Could not activate terminated actor", encode_error(e))),
}
}
pub fn for_actor_exit<F>(
actor: &ActorRef,
debtor: Arc<Debtor>,
f: F,
) -> Option<ActorResult> where
F: FnOnce(&mut Activation) -> Option<ActorResult>,
{
match actor.state.write() {
Err(_) => panicked_err(),
Ok(mut g) => match &mut *g {
ActorState::Terminated { exit_status } =>
Some((**exit_status).clone()),
ActorState::Running(state) =>
match f(&mut Activation::make(actor, debtor, state)) {
None => None,
Some(exit_status) => {
let exit_status = Arc::new(exit_status);
let mut t = Activation::make(actor, Debtor::new(crate::name!("shutdown")), state);
for action in std::mem::take(&mut t.state.exit_hooks) {
if let Err(err) = action(&mut t, &exit_status) {
tracing::error!(err = debug(err), "error in exit hook");
}
}
*g = ActorState::Terminated {
exit_status: Arc::clone(&exit_status),
};
Some((*exit_status).clone())
}
},
}
} }
} }
fn immediate_oid<M>(&self, r: &Arc<Ref<M>>) { fn immediate_oid<M>(&self, r: &Arc<Ref<M>>) {
if r.mailbox.actor_id != self.actor.actor_id { if r.mailbox.actor_id != self.actor.actor_id {
panic!("Cannot use immediate_self to send to remote peers"); panic!("Cannot use for_myself to send to remote peers");
} }
} }
@ -204,71 +277,89 @@ impl<'activation> Activation<'activation> {
let handle = crate::next_handle(); let handle = crate::next_handle();
{ {
let r = Arc::clone(r); let r = Arc::clone(r);
self.queue_for(&r).push(Box::new( self.pending.queue_for(&r).push(Box::new(
move |t| r.with_entity(|e| e.assert(t, a, handle)))); move |t| r.with_entity(|e| e.assert(t, a, handle))));
} }
{ {
let r = Arc::clone(r); let r = Arc::clone(r);
self.actor.cleanup_actions.insert( self.state.cleanup_actions.insert(
handle, handle,
CleanupAction::Remote(Arc::clone(&r.mailbox), Box::new( CleanupAction::ForAnother(Arc::clone(&r.mailbox), Box::new(
move |t| r.with_entity(|e| e.retract(t, handle))))); move |t| r.with_entity(|e| e.retract(t, handle)))));
} }
handle handle
} }
pub fn assert_immediate_self<M: 'static + Send + Sync>(&mut self, r: &Arc<Ref<M>>, a: M) -> Handle { pub fn assert_for_myself<M: 'static + Send + Sync>(&mut self, r: &Arc<Ref<M>>, a: M) -> Handle {
self.immediate_oid(r); self.immediate_oid(r);
let handle = crate::next_handle(); let handle = crate::next_handle();
{ {
let r = Arc::clone(r); let r = Arc::clone(r);
self.immediate_self.push(Box::new( self.pending.for_myself.push(Box::new(
move |t| r.with_entity(|e| e.assert(t, a, handle)))); move |t| r.with_entity(|e| e.assert(t, a, handle))));
} }
{ {
let r = Arc::clone(r); let r = Arc::clone(r);
self.actor.cleanup_actions.insert( self.state.cleanup_actions.insert(
handle, handle,
CleanupAction::ImmediateSelf(Box::new( CleanupAction::ForMyself(Box::new(
move |t| r.with_entity(|e| e.retract(t, handle))))); move |t| r.with_entity(|e| e.retract(t, handle)))));
} }
handle handle
} }
pub fn retract(&mut self, handle: Handle) { pub fn retract(&mut self, handle: Handle) {
if let Some(d) = self.actor.cleanup_actions.remove(&handle) { if let Some(d) = self.state.cleanup_actions.remove(&handle) {
self.retract_known_ref(d) self.pending.execute_cleanup_action(d)
}
}
fn retract_known_ref(&mut self, d: CleanupAction) {
match d {
CleanupAction::Remote(mailbox, action) =>
self.queue_for_mailbox(&mailbox).push(action),
CleanupAction::ImmediateSelf(action) =>
self.immediate_self.push(action),
} }
} }
pub fn message<M: 'static + Send + Sync>(&mut self, r: &Arc<Ref<M>>, m: M) { pub fn message<M: 'static + Send + Sync>(&mut self, r: &Arc<Ref<M>>, m: M) {
let r = Arc::clone(r); let r = Arc::clone(r);
self.queue_for(&r).push(Box::new( self.pending.queue_for(&r).push(Box::new(
move |t| r.with_entity(|e| e.message(t, m)))) move |t| r.with_entity(|e| e.message(t, m))))
} }
pub fn message_immediate_self<M: 'static + Send + Sync>(&mut self, r: &Arc<Ref<M>>, m: M) { pub fn message_for_myself<M: 'static + Send + Sync>(&mut self, r: &Arc<Ref<M>>, m: M) {
self.immediate_oid(r); self.immediate_oid(r);
let r = Arc::clone(r); let r = Arc::clone(r);
self.immediate_self.push(Box::new( self.pending.for_myself.push(Box::new(
move |t| r.with_entity(|e| e.message(t, m)))) move |t| r.with_entity(|e| e.message(t, m))))
} }
pub fn sync<M: 'static + Send + Sync>(&mut self, r: &Arc<Ref<M>>, peer: Arc<Ref<Synced>>) { pub fn sync<M: 'static + Send + Sync>(&mut self, r: &Arc<Ref<M>>, peer: Arc<Ref<Synced>>) {
let r = Arc::clone(r); let r = Arc::clone(r);
self.queue_for(&r).push(Box::new( self.pending.queue_for(&r).push(Box::new(
move |t| r.with_entity(|e| e.sync(t, peer)))) move |t| r.with_entity(|e| e.sync(t, peer))))
} }
pub fn debtor(&self) -> &Arc<Debtor> {
&self.pending.debtor
}
pub fn deliver(&mut self) {
self.pending.deliver();
}
}
impl EventBuffer {
fn new(debtor: Arc<Debtor>) -> Self {
EventBuffer {
debtor,
queues: HashMap::new(),
for_myself: Vec::new(),
}
}
fn execute_cleanup_action(&mut self, d: CleanupAction) {
match d {
CleanupAction::ForAnother(mailbox, action) =>
self.queue_for_mailbox(&mailbox).push(action),
CleanupAction::ForMyself(action) =>
self.for_myself.push(action),
}
}
fn queue_for<M>(&mut self, r: &Arc<Ref<M>>) -> &mut PendingEventQueue { fn queue_for<M>(&mut self, r: &Arc<Ref<M>>) -> &mut PendingEventQueue {
self.queue_for_mailbox(&r.mailbox) self.queue_for_mailbox(&r.mailbox)
} }
@ -279,8 +370,8 @@ impl<'activation> Activation<'activation> {
} }
fn deliver(&mut self) { fn deliver(&mut self) {
if !self.immediate_self.is_empty() { if !self.for_myself.is_empty() {
panic!("Unprocessed immediate_self events remain at deliver() time"); panic!("Unprocessed for_myself events remain at deliver() time");
} }
for (_actor_id, (tx, turn)) in std::mem::take(&mut self.queues).into_iter() { for (_actor_id, (tx, turn)) in std::mem::take(&mut self.queues).into_iter() {
let _ = send_actions(&tx, &self.debtor, turn); let _ = send_actions(&tx, &self.debtor, turn);
@ -288,7 +379,7 @@ impl<'activation> Activation<'activation> {
} }
} }
impl<'activation> Drop for Activation<'activation> { impl Drop for EventBuffer {
fn drop(&mut self) { fn drop(&mut self) {
self.deliver() self.deliver()
} }
@ -407,15 +498,19 @@ impl Actor {
let actor_id = crate::next_actor_id(); let actor_id = crate::next_actor_id();
// tracing::trace!(id = actor_id, "Actor::new"); // tracing::trace!(id = actor_id, "Actor::new");
Actor { Actor {
actor_id,
tx,
rx, rx,
mailbox: Weak::new(), ac_ref: ActorRef {
cleanup_actions: Map::new(), actor_id,
next_task_id: 0, state: Arc::new(RwLock::new(ActorState::Running(RunningActor {
linked_tasks: Map::new(), actor_id,
exit_hooks: Vec::new(), tx,
exit_status: None, mailbox: Weak::new(),
cleanup_actions: Map::new(),
next_task_id: 0,
linked_tasks: Map::new(),
exit_hooks: Vec::new(),
}))),
},
} }
} }
@ -429,14 +524,121 @@ impl Actor {
} }
pub fn create_and_start_inert<M>(name: tracing::Span) -> Arc<Ref<M>> { pub fn create_and_start_inert<M>(name: tracing::Span) -> Arc<Ref<M>> {
let mut ac = Self::new(); let ac = Self::new();
let r = ac.create_inert(); let r = ac.ac_ref.write(|s| s.unwrap().expect_running().create_inert());
ac.start(name); ac.start(name);
r r
} }
pub fn id(&self) -> ActorId { pub fn boot<F: 'static + Send + FnOnce(&mut Activation) -> ActorResult>(
self.actor_id mut self,
name: tracing::Span,
boot: F,
) -> ActorHandle {
name.record("actor_id", &self.ac_ref.actor_id);
tokio::spawn(async move {
tracing::trace!("start");
self.run(boot).await;
let result = self.ac_ref.exit_status().expect("terminated");
match &result {
Ok(()) => tracing::trace!("normal stop"),
Err(e) => tracing::error!("error stop: {}", e),
}
result
}.instrument(name))
}
pub fn start(self, name: tracing::Span) -> ActorHandle {
self.boot(name, |_ac| Ok(()))
}
fn terminate(&mut self, result: ActorResult) {
let _ = Activation::for_actor_exit(
&self.ac_ref, Debtor::new(crate::name!("shutdown")), |_| Some(result));
}
async fn run<F: 'static + Send + FnOnce(&mut Activation) -> ActorResult>(
&mut self,
boot: F,
) -> () {
if Activation::for_actor(&self.ac_ref, Debtor::new(crate::name!("boot")), boot).is_err() {
return;
}
loop {
match self.rx.recv().await {
None => {
return self.terminate(Err(error("Unexpected channel close", _Any::new(false))));
}
Some(m) => match m {
SystemMessage::Release => {
tracing::trace!("SystemMessage::Release");
return self.terminate(Ok(()));
}
SystemMessage::Turn(mut loaned_item) => {
let mut actions = std::mem::take(&mut loaned_item.item);
let r = Activation::for_actor(
&self.ac_ref, Arc::clone(&loaned_item.debtor), |t| {
loop {
for action in actions.into_iter() { action(t)? }
actions = std::mem::take(&mut t.pending.for_myself);
if actions.is_empty() { break; }
}
Ok(())
});
if r.is_err() { return; }
}
SystemMessage::Crash(e) => {
tracing::trace!("SystemMessage::Crash({:?})", &e);
return self.terminate(Err(e));
}
}
}
}
}
}
fn panicked_err() -> Option<ActorResult> {
Some(Err(error("Actor panicked", _Any::new(false))))
}
impl ActorRef {
pub fn read<R, F: FnOnce(Option<&ActorState>) -> R>(&self, f: F) -> R {
match self.state.read() {
Err(_) => f(None),
Ok(g) => f(Some(&*g)),
}
}
pub fn write<R, F: FnOnce(Option<&mut ActorState>) -> R>(&self, f: F) -> R {
match self.state.write() {
Err(_) => f(None),
Ok(mut g) => f(Some(&mut *g)),
}
}
pub fn exit_status(&self) -> Option<ActorResult> {
self.read(|s| s.map_or_else(
panicked_err,
|state| match state {
ActorState::Running(_) => None,
ActorState::Terminated { exit_status } => Some((**exit_status).clone()),
}))
}
}
impl ActorState {
fn expect_running(&mut self) -> &mut RunningActor {
match self {
ActorState::Terminated { .. } => panic!("Expected a running actor"),
ActorState::Running(r) => r,
}
}
}
impl RunningActor {
pub fn shutdown(&self) {
let _ = self.tx.send(SystemMessage::Release);
} }
fn mailbox(&mut self) -> Arc<Mailbox> { fn mailbox(&mut self) -> Arc<Mailbox> {
@ -453,9 +655,8 @@ impl Actor {
} }
} }
pub fn shutdown(&mut self) { pub fn inert_entity<M>(&mut self) -> Arc<Ref<M>> {
let _ = self.tx.send(SystemMessage::Release); self.create(InertEntity)
()
} }
pub fn create<M, E: Entity<M> + Send + Sync + 'static>(&mut self, e: E) -> Arc<Ref<M>> { pub fn create<M, E: Entity<M> + Send + Sync + 'static>(&mut self, e: E) -> Arc<Ref<M>> {
@ -471,93 +672,14 @@ impl Actor {
}) })
} }
pub fn boot<F: 'static + Send + for<'a> FnOnce(&'a mut Activation) -> BoxFuture<'a, ActorResult>>(
mut self,
name: tracing::Span,
boot: F,
) -> ActorHandle {
name.record("actor_id", &self.id());
tokio::spawn(async move {
tracing::trace!("start");
let result = self.run(boot).await;
self.exit_status = Some(Arc::new(result.clone()));
{
let mut t = Activation::new(&mut self, Debtor::new(crate::name!("shutdown")));
for action in std::mem::take(&mut t.actor.exit_hooks) {
if let Err(err) = action(&mut t) {
tracing::error!(err = debug(err), "error in exit hook");
}
}
}
match &result {
Ok(()) => {
tracing::trace!("normal stop");
()
}
Err(e) => tracing::error!("error stop: {}", e),
}
result
}.instrument(name))
}
pub fn start(self, name: tracing::Span) -> ActorHandle {
self.boot(name, |_ac| Box::pin(ready(Ok(()))))
}
async fn run<F: 'static + Send + for<'a> FnOnce(&'a mut Activation) -> BoxFuture<'a, ActorResult>>(
&mut self,
boot: F,
) -> ActorResult {
let _id = self.id();
// tracing::trace!(_id, "boot");
boot(&mut Activation::new(self, Debtor::new(crate::name!("boot")))).await?;
// tracing::trace!(_id, "run");
loop {
match self.rx.recv().await {
None =>
Err(error("Unexpected channel close", _Any::new(false)))?,
Some(m) => {
let should_stop = self.handle(m).await?;
if should_stop {
return Ok(());
}
}
}
}
}
pub fn add_exit_hook<M: 'static + Send + Sync>(&mut self, r: &Arc<Ref<M>>) { pub fn add_exit_hook<M: 'static + Send + Sync>(&mut self, r: &Arc<Ref<M>>) {
let r = Arc::clone(r); let r = Arc::clone(r);
self.exit_hooks.push(Box::new(move |t| { self.exit_hooks.push(Box::new(move |t, exit_status| {
let exit_status = Arc::clone(t.actor.exit_status.as_ref().expect("exited"));
r.with_entity(|e| e.exit_hook(t, &exit_status)) r.with_entity(|e| e.exit_hook(t, &exit_status))
})) }))
} }
async fn handle(&mut self, m: SystemMessage) -> Result<bool, Error> { pub fn linked_task<F: 'static + Send + futures::Future<Output = ActorResult>>(
match m {
SystemMessage::Release => {
tracing::trace!("SystemMessage::Release");
Ok(true)
}
SystemMessage::Turn(mut loaned_item) => {
let mut actions = std::mem::take(&mut loaned_item.item);
let mut t = Activation::new(self, Arc::clone(&loaned_item.debtor));
loop {
for action in actions.into_iter() { action(&mut t)? }
actions = std::mem::take(&mut t.immediate_self);
if actions.is_empty() { break; }
}
Ok(false)
}
SystemMessage::Crash(e) => {
tracing::trace!("SystemMessage::Crash({:?})", &e);
Err(e)?
}
}
}
pub fn linked_task<F: futures::Future<Output = ActorResult> + Send + 'static>(
&mut self, &mut self,
name: tracing::Span, name: tracing::Span,
boot: F, boot: F,
@ -600,17 +722,21 @@ impl Actor {
impl Drop for Actor { impl Drop for Actor {
fn drop(&mut self) { fn drop(&mut self) {
self.rx.close(); self.rx.close();
}
}
impl Drop for RunningActor {
fn drop(&mut self) {
for (_task_id, token) in std::mem::take(&mut self.linked_tasks).into_iter() { for (_task_id, token) in std::mem::take(&mut self.linked_tasks).into_iter() {
token.cancel(); token.cancel();
} }
let to_clear = std::mem::take(&mut self.cleanup_actions); let to_clear = std::mem::take(&mut self.cleanup_actions);
{ {
let mut t = Activation::new(self, Debtor::new(crate::name!("drop"))); let mut b = EventBuffer::new(Debtor::new(crate::name!("drop")));
for (_handle, r) in to_clear.into_iter() { for (_handle, r) in to_clear.into_iter() {
tracing::trace!(h = debug(&_handle), "retract on termination"); tracing::trace!(h = debug(&_handle), "retract on termination");
t.retract_known_ref(r); b.execute_cleanup_action(r);
} }
} }

View File

@ -77,7 +77,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
syndicate::entity(Arc::clone(&ds)).on_asserted(handle_resolve))); syndicate::entity(Arc::clone(&ds)).on_asserted(handle_resolve)));
{ {
let ds = Arc::clone(&ds); let ds = Arc::clone(&ds);
Actor::new().boot(syndicate::name!("rootcap"), |t| Box::pin(async move { Actor::new().boot(syndicate::name!("rootcap"), move |t| {
use syndicate::schemas::gatekeeper; use syndicate::schemas::gatekeeper;
let key = vec![0; 16]; let key = vec![0; 16];
let sr = sturdy::SturdyRef::mint(_Any::new("syndicate"), &key); let sr = sturdy::SturdyRef::mint(_Any::new("syndicate"), &key);
@ -85,7 +85,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing::info!(rootcap = display(sr.to_hex())); tracing::info!(rootcap = display(sr.to_hex()));
ds.assert(t, &gatekeeper::Bind { oid: sr.oid.clone(), key, target: ds.clone() }); ds.assert(t, &gatekeeper::Bind { oid: sr.oid.clone(), key, target: ds.clone() });
Ok(()) Ok(())
})); });
} }
for port in config.ports.clone() { for port in config.ports.clone() {
@ -93,8 +93,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = Arc::clone(&config); let config = Arc::clone(&config);
daemons.push(Actor::new().boot( daemons.push(Actor::new().boot(
syndicate::name!("tcp", port), syndicate::name!("tcp", port),
move |t| Box::pin(ready(Ok(t.actor.linked_task(syndicate::name!("listener"), move |t| Ok(t.state.linked_task(syndicate::name!("listener"),
run_listener(gateway, port, config))))))); run_listener(gateway, port, config)))));
} }
futures::future::join_all(daemons).await; futures::future::join_all(daemons).await;
@ -128,7 +128,7 @@ fn extract_binary_packets(
} }
async fn run_connection( async fn run_connection(
t: &mut Activation<'_>, ac: ActorRef,
stream: TcpStream, stream: TcpStream,
gateway: Arc<Cap>, gateway: Arc<Cap>,
addr: std::net::SocketAddr, addr: std::net::SocketAddr,
@ -156,17 +156,19 @@ async fn run_connection(
0 => Err(error("closed before starting", _Any::new(false)))?, 0 => Err(error("closed before starting", _Any::new(false)))?,
_ => unreachable!() _ => unreachable!()
}; };
struct ExitListener; Activation::for_actor(&ac, Debtor::new(syndicate::name!("start-session")), |t| {
impl Entity<()> for ExitListener { struct ExitListener;
fn exit_hook(&mut self, _t: &mut Activation, exit_status: &Arc<ActorResult>) -> ActorResult { impl Entity<()> for ExitListener {
tracing::info!(exit_status = debug(exit_status), "disconnect"); fn exit_hook(&mut self, _t: &mut Activation, exit_status: &Arc<ActorResult>) -> ActorResult {
Ok(()) tracing::info!(exit_status = debug(exit_status), "disconnect");
Ok(())
}
} }
} let exit_listener = t.state.create(ExitListener);
let exit_listener = t.actor.create(ExitListener); t.state.add_exit_hook(&exit_listener);
t.actor.add_exit_hook(&exit_listener); relay::TunnelRelay::run(t, i, o, Some(gateway), None);
relay::TunnelRelay::run(t, i, o, Some(gateway), None); Ok(())
Ok(()) })
} }
async fn run_listener( async fn run_listener(
@ -183,7 +185,9 @@ async fn run_listener(
let config = Arc::clone(&config); let config = Arc::clone(&config);
let ac = Actor::new(); let ac = Actor::new();
ac.boot(syndicate::name!(parent: None, "connection"), ac.boot(syndicate::name!(parent: None, "connection"),
move |t| Box::pin(run_connection(t, stream, gateway, addr, config))); move |t| Ok(t.state.linked_task(
tracing::Span::current(),
run_connection(t.actor.clone(), stream, gateway, addr, config))));
} }
} }
@ -222,7 +226,7 @@ fn handle_resolve(
} }
} }
}) })
.create_cap(t.actor); .create_cap(t.state);
if let Some(oh) = ds.assert(t, &dataspace::Observe { if let Some(oh) = ds.assert(t, &dataspace::Observe {
// TODO: codegen plugin to generate pattern constructors // TODO: codegen plugin to generate pattern constructors
pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec { pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec {

View File

@ -98,7 +98,7 @@ where
} }
} }
pub fn create(self, ac: &mut Actor) -> Arc<Ref<M>> { pub fn create(self, ac: &mut RunningActor) -> Arc<Ref<M>> {
ac.create(self) ac.create(self)
} }
} }
@ -109,7 +109,7 @@ where
Fa: 'static + Send + Sync + FnMut(&mut E, &mut Activation, _Any) -> DuringResult<E>, Fa: 'static + Send + Sync + FnMut(&mut E, &mut Activation, _Any) -> DuringResult<E>,
Fm: 'static + Send + Sync + FnMut(&mut E, &mut Activation, _Any) -> ActorResult, Fm: 'static + Send + Sync + FnMut(&mut E, &mut Activation, _Any) -> ActorResult,
{ {
pub fn create_cap(self, ac: &mut Actor) -> Arc<Cap> pub fn create_cap(self, ac: &mut RunningActor) -> Arc<Cap>
{ {
Cap::new(&self.create(ac)) Cap::new(&self.create(ac))
} }

View File

@ -3,6 +3,7 @@ pub use super::schemas::internal_protocol::_Ptr;
pub use super::schemas::internal_protocol::Error; pub use super::schemas::internal_protocol::Error;
use preserves::value::NestedValue; use preserves::value::NestedValue;
use preserves::value::Value;
use preserves_schema::support::ParseError; use preserves_schema::support::ParseError;
impl std::error::Error for Error {} impl std::error::Error for Error {}
@ -20,6 +21,21 @@ pub fn error<Detail>(message: &str, detail: Detail) -> Error where _Any: From<De
} }
} }
pub fn encode_error(result: Result<(), Error>) -> _Any {
match result {
Ok(()) => {
let mut r = Value::record(Value::symbol("Ok").wrap(), 1);
r.fields_vec_mut().push(Value::record(Value::symbol("tuple").wrap(), 0).finish().wrap());
r.finish().wrap()
}
Err(e) => {
let mut r = Value::record(Value::symbol("Err").wrap(), 1);
r.fields_vec_mut().push((&e).into());
r.finish().wrap()
}
}
}
impl From<&str> for Error { impl From<&str> for Error {
fn from(v: &str) -> Self { fn from(v: &str) -> Self {
error(v, _Any::new(false)) error(v, _Any::new(false))

View File

@ -167,7 +167,7 @@ pub fn connect_stream<I, O, E, F>(
let i = Input::Bytes(Box::pin(i)); let i = Input::Bytes(Box::pin(i));
let o = Output::Bytes(Box::pin(o)); let o = Output::Bytes(Box::pin(o));
let gatekeeper = TunnelRelay::run(t, i, o, None, Some(sturdy::Oid(0.into()))).unwrap(); let gatekeeper = TunnelRelay::run(t, i, o, None, Some(sturdy::Oid(0.into()))).unwrap();
let main_entity = t.actor.create(during::entity(initial_state).on_asserted(move |state, t, a: _Any| { let main_entity = t.state.create(during::entity(initial_state).on_asserted(move |state, t, a: _Any| {
let denotation = a.value().to_embedded()?; let denotation = a.value().to_embedded()?;
f(state, t, Arc::clone(denotation)) f(state, t, Arc::clone(denotation))
})); }));
@ -187,7 +187,7 @@ impl TunnelRelay {
) -> Option<Arc<Cap>> { ) -> Option<Arc<Cap>> {
let (output_tx, output_rx) = unbounded_channel(); let (output_tx, output_rx) = unbounded_channel();
let mut tr = TunnelRelay { let mut tr = TunnelRelay {
self_ref: t.actor.create_inert(), self_ref: t.state.create_inert(),
input_buffer: BytesMut::with_capacity(1024), input_buffer: BytesMut::with_capacity(1024),
output: output_tx, output: output_tx,
inbound_assertions: Map::new(), inbound_assertions: Map::new(),
@ -203,12 +203,12 @@ impl TunnelRelay {
tr.membranes.export_ref(ir, true); tr.membranes.export_ref(ir, true);
} }
let result = initial_oid.map( let result = initial_oid.map(
|io| Arc::clone(&tr.membranes.import_oid(t.actor, &tr.self_ref, io).obj)); |io| Arc::clone(&tr.membranes.import_oid(t.state, &tr.self_ref, io).obj));
let tr_ref = Arc::clone(&tr.self_ref); let tr_ref = Arc::clone(&tr.self_ref);
tr_ref.become_entity(tr); tr_ref.become_entity(tr);
t.actor.add_exit_hook(&tr_ref); t.state.add_exit_hook(&tr_ref);
t.actor.linked_task(crate::name!("writer"), output_loop(o, output_rx)); t.state.linked_task(crate::name!("writer"), output_loop(o, output_rx));
t.actor.linked_task(crate::name!("reader"), input_loop(i, tr_ref)); t.state.linked_task(crate::name!("reader"), input_loop(i, tr_ref));
result result
} }
@ -222,7 +222,6 @@ impl TunnelRelay {
Err(*b) Err(*b)
}, },
P::Packet::Turn(b) => { P::Packet::Turn(b) => {
let t = &mut Activation::new(t.actor, Arc::clone(&t.debtor));
let P::Turn(events) = *b; let P::Turn(events) = *b;
for P::TurnEvent { oid, event } in events { for P::TurnEvent { oid, event } in events {
let target = match self.membranes.exported.oid_map.get(&sturdy::Oid(oid.0.clone())) { let target = match self.membranes.exported.oid_map.get(&sturdy::Oid(oid.0.clone())) {
@ -282,7 +281,7 @@ impl TunnelRelay {
Ok(()) Ok(())
} }
} }
let k = t.actor.create(SyncPeer { let k = t.state.create(SyncPeer {
tr: Arc::clone(&self.self_ref), tr: Arc::clone(&self.self_ref),
peer: Arc::clone(&peer), peer: Arc::clone(&peer),
}); });
@ -290,6 +289,7 @@ impl TunnelRelay {
} }
} }
} }
t.deliver();
Ok(()) Ok(())
} }
} }
@ -358,7 +358,7 @@ impl Membranes {
fn import_oid( fn import_oid(
&mut self, &mut self,
ac: &mut Actor, ac: &mut RunningActor,
relay_ref: &TunnelRelayRef, relay_ref: &TunnelRelayRef,
oid: sturdy::Oid, oid: sturdy::Oid,
) -> Arc<WireSymbol> { ) -> Arc<WireSymbol> {
@ -379,7 +379,7 @@ impl Membranes {
let oid = *b; let oid = *b;
match self.imported.oid_map.get(&oid) { match self.imported.oid_map.get(&oid) {
Some(ws) => Ok(Arc::clone(&ws.obj)), Some(ws) => Ok(Arc::clone(&ws.obj)),
None => Ok(Arc::clone(&self.import_oid(t.actor, relay_ref, oid).obj)), None => Ok(Arc::clone(&self.import_oid(t.state, relay_ref, oid).obj)),
} }
} }
sturdy::WireRef::Yours { oid: b, attenuation } => { sturdy::WireRef::Yours { oid: b, attenuation } => {
@ -397,7 +397,7 @@ impl Membranes {
})?) })?)
} }
} }
None => Ok(Cap::new(&t.actor.create_inert())), None => Ok(Cap::new(&t.state.inert_entity())),
} }
} }
} }
@ -544,7 +544,7 @@ impl Entity<RelayProtocol> for TunnelRelay {
fn message(&mut self, t: &mut Activation, m: RelayProtocol) -> ActorResult { fn message(&mut self, t: &mut Activation, m: RelayProtocol) -> ActorResult {
match m { match m {
RelayProtocol::Input(RelayInput::Eof) => { RelayProtocol::Input(RelayInput::Eof) => {
t.actor.shutdown(); t.state.shutdown();
} }
RelayProtocol::Input(RelayInput::Packet(bs)) => { RelayProtocol::Input(RelayInput::Packet(bs)) => {
let mut src = BytesBinarySource::new(&bs); let mut src = BytesBinarySource::new(&bs);
@ -579,7 +579,7 @@ impl Entity<RelayProtocol> for TunnelRelay {
} }
RelayProtocol::Output(oid, event) => { RelayProtocol::Output(oid, event) => {
if self.pending_outbound.is_empty() { if self.pending_outbound.is_empty() {
t.message_immediate_self(&self.self_ref, RelayProtocol::Flush); t.message_for_myself(&self.self_ref, RelayProtocol::Flush);
} }
let turn_event = P::TurnEvent { let turn_event = P::TurnEvent {
oid: P::Oid(oid.0), oid: P::Oid(oid.0),
@ -595,7 +595,7 @@ impl Entity<RelayProtocol> for TunnelRelay {
} }
RelayProtocol::Flush => { RelayProtocol::Flush => {
let events = std::mem::take(&mut self.pending_outbound); let events = std::mem::take(&mut self.pending_outbound);
self.send_packet(&t.debtor, events.len(), P::Packet::Turn(Box::new(P::Turn(events))))? self.send_packet(&t.debtor(), events.len(), P::Packet::Turn(Box::new(P::Turn(events))))?
} }
} }
Ok(()) Ok(())
@ -604,7 +604,7 @@ impl Entity<RelayProtocol> for TunnelRelay {
fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc<ActorResult>) -> ActorResult { fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc<ActorResult>) -> ActorResult {
if let Err(e) = &**exit_status { if let Err(e) = &**exit_status {
let e = e.clone(); let e = e.clone();
self.send_packet(&t.debtor, 1, P::Packet::Error(Box::new(e)))?; self.send_packet(&t.debtor(), 1, P::Packet::Error(Box::new(e)))?;
} }
Ok(()) Ok(())
} }

View File

@ -9,7 +9,7 @@ fn set_name_oid<M>(t: &mut Tracer, r: &Arc<Ref<M>>) {
t.0.record("oid", &tracing::field::display(&r.oid())); t.0.record("oid", &tracing::field::display(&r.oid()));
} }
pub fn tracer<M: Debug>(ac: &mut Actor, name: tracing::Span) -> Arc<Ref<M>> { pub fn tracer<M: Debug>(ac: &mut RunningActor, name: tracing::Span) -> Arc<Ref<M>> {
let mut e = Tracer(name); let mut e = Tracer(name);
let r = ac.create_inert(); let r = ac.create_inert();
set_name_oid(&mut e, &r); set_name_oid(&mut e, &r);