Reuse a single Activation per actor: this merges RunningActor with Activation

This commit is contained in:
Tony Garnock-Jones 2024-03-03 18:28:51 +01:00
parent b7d4bd4b58
commit f4a4b4d595
13 changed files with 534 additions and 427 deletions

View File

@ -177,7 +177,7 @@ async fn main() -> ActorResult {
})?;
if let PingPongMode::Ping(c) = &config.mode {
let facet = t.facet.clone();
let facet = t.facet_ref();
let turn_count = c.turn_count;
let action_count = c.action_count;
let account = Arc::clone(t.account());

View File

@ -28,7 +28,7 @@ async fn main() -> ActorResult {
let (i, o) = TcpStream::connect("127.0.0.1:9001").await?.into_split();
Actor::top(None, |t| {
relay::connect_stream(t, i, o, false, sturdyref, (), move |_state, t, ds| {
let facet = t.facet.clone();
let facet = t.facet_ref();
let padding = AnyValue::new(&vec![0u8; config.bytes_padding][..]);
let action_count = config.action_count;
let account = Account::new(None, None);

View File

@ -22,7 +22,7 @@ async fn main() -> ActorResult {
let (i, o) = TcpStream::connect("127.0.0.1:9001").await?.into_split();
Actor::top(None, |t| {
relay::connect_stream(t, i, o, false, sturdyref, (), move |_state, t, ds| {
let facet = t.facet.clone();
let facet = t.facet_ref();
let account = Account::new(None, None);
t.linked_task(Some(AnyValue::symbol("sender")), async move {
let presence = rec![AnyValue::symbol("Present"), AnyValue::new(std::process::id())];

View File

@ -297,7 +297,7 @@ impl Entity<noise::Packet> for ResponderState {
let transport = ResponderTransport { relay_input, c_recv };
let initiator_session = Arc::clone(&details.initiator_session);
let relay_output_name = Some(AnyValue::symbol("relay_output"));
let transport_facet = t.facet.clone();
let transport_facet = t.facet_ref();
t.linked_task(relay_output_name.clone(), async move {
let account = Account::new(relay_output_name, trace_collector);
let cause = TurnCause::external("relay_output");

View File

@ -35,7 +35,7 @@ pub fn run_io_relay(
initial_ref: Arc<Cap>,
) -> ActorResult {
let exit_listener = t.create(ExitListener);
t.state.add_exit_hook(&exit_listener);
t.add_exit_hook(&exit_listener);
relay::TunnelRelay::run(t, i, o, Some(initial_ref), None, false);
Ok(())
}

View File

@ -184,7 +184,7 @@ fn run(
let mut watcher = watcher(tx, Duration::from_millis(100)).map_err(convert_notify_error)?;
watcher.watch(&env.path, RecursiveMode::Recursive).map_err(convert_notify_error)?;
let facet = t.facet.clone();
let facet = t.facet_ref();
let trace_collector = t.trace_collector();
let span = tracing::Span::current();
thread::spawn(move || {

View File

@ -41,7 +41,7 @@ fn supervise_daemon(
lifecycle::on_service_restart(t, &config_ds, &spec, enclose!(
(config_ds, root_ds, spec) move |t| {
tracing::info!(id = ?spec.id, "Terminating to restart");
t.stop_facet_and_continue(t.facet.facet_id, Some(
t.stop_facet_and_continue(t.facet_id(), Some(
enclose!((config_ds, root_ds, spec) move |t: &mut Activation| {
supervise_daemon(t, config_ds, root_ds, spec)
})))
@ -176,7 +176,7 @@ impl DaemonInstance {
fn handle_exit(self, t: &mut Activation, error_message: Option<String>) -> ActorResult {
let delay =
std::time::Duration::from_millis(if let None = error_message { 200 } else { 1000 });
t.stop_facet_and_continue(t.facet.facet_id, Some(move |t: &mut Activation| {
t.stop_facet_and_continue(t.facet_id(), Some(move |t: &mut Activation| {
#[derive(Debug)]
enum NextStep {
SleepAndRestart,
@ -230,7 +230,7 @@ impl DaemonInstance {
kind: &str
) -> ActorResult {
t.facet(|t| {
let facet = t.facet.clone();
let facet = t.facet_ref();
let log_ds = self.log_ds.clone();
let service = self.service.clone();
let kind = AnyValue::symbol(kind);
@ -290,7 +290,7 @@ impl DaemonInstance {
let pid = child.id();
tracing::debug!(?pid, cmd = ?self.cmd, "started");
let facet = t.facet.clone();
let facet = t.facet_ref();
if let Some(r) = child.stderr.take() {
self.log(t, pid, r, "stderr")?;
@ -401,7 +401,7 @@ fn run(
Ok(config) => {
tracing::info!(?config);
let config = config.elaborate();
let facet = t.facet.clone();
let facet = t.facet_ref();
t.linked_task(Some(AnyValue::symbol("subprocess")), async move {
let mut cmd = config.process.build_command().ok_or("Cannot start daemon process")?;

View File

@ -55,7 +55,7 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: TcpRelayListener) -> ActorResult
};
let host = addr.host.clone();
let port = u16::try_from(&addr.port).map_err(|_| "Invalid TCP port number")?;
let facet = t.facet.clone();
let facet = t.facet_ref();
let trace_collector = t.trace_collector();
t.linked_task(Some(AnyValue::symbol("listener")), async move {
let listen_addr = format!("{}:{}", host, port);
@ -85,7 +85,7 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: TcpRelayListener) -> ActorResult
&account, cause, enclose!((trace_collector, httpd) move |t| {
t.spawn(name, move |t| {
Ok(t.linked_task(None, {
let facet = t.facet.clone();
let facet = t.facet_ref();
async move {
detect_protocol(trace_collector,
facet,

View File

@ -39,7 +39,7 @@ pub fn on_demand(t: &mut Activation, ds: Arc<Cap>) {
fn run(t: &mut Activation, ds: Arc<Cap>, spec: UnixRelayListener) -> ActorResult {
lifecycle::terminate_on_service_restart(t, &ds, &spec);
let path_str = spec.addr.path.clone();
let facet = t.facet.clone();
let facet = t.facet_ref();
let trace_collector = t.trace_collector();
t.linked_task(Some(AnyValue::symbol("listener")), async move {
let listener = bind_unix_listener(&PathBuf::from(path_str)).await?;
@ -71,7 +71,7 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: UnixRelayListener) -> ActorResult
&account, cause, enclose!((trace_collector) move |t| {
t.spawn(name, |t| {
Ok(t.linked_task(None, {
let facet = t.facet.clone();
let facet = t.facet_ref();
async move {
tracing::info!(protocol = %"unix");
let (i, o) = stream.into_split();

File diff suppressed because it is too large Load Diff

View File

@ -175,7 +175,7 @@ where
t.on_stop_notify(&r);
}
if should_register_exit_hook {
t.state.add_exit_hook(&r);
t.add_exit_hook(&r);
}
r
}

View File

@ -112,8 +112,8 @@ struct TunnelRefEntity {
relay_ref: TunnelRelayRef,
}
struct ActivatedMembranes<'a, 'activation, 'm> {
turn: &'a mut Activation<'activation>,
struct ActivatedMembranes<'a, 'm> {
turn: &'a mut Activation,
tr_ref: &'m TunnelRelayRef,
membranes: &'m mut Membranes,
}
@ -234,7 +234,7 @@ impl TunnelRelay {
t.linked_task(Some(AnyValue::symbol("writer")),
output_loop(o, output_rx));
t.linked_task(Some(AnyValue::symbol("reader")),
input_loop(t.trace_collector(), t.facet.clone(), i, tr_ref));
input_loop(t.trace_collector(), t.facet_ref(), i, tr_ref));
result
}
@ -269,7 +269,7 @@ impl TunnelRelay {
|io| Arc::clone(&tr.membranes.import_oid(t, &tr_ref, io).inc_ref().obj));
dump_membranes!(tr.membranes);
*tr_ref.lock() = Some(tr);
t.state.add_exit_hook(&self_entity);
t.add_exit_hook(&self_entity);
(result, tr_ref, output_rx)
}
@ -604,7 +604,7 @@ impl Membranes {
}
}
impl<'a, 'activation, 'm> DomainDecode<Arc<Cap>> for ActivatedMembranes<'a, 'activation, 'm> {
impl<'a, 'm> DomainDecode<Arc<Cap>> for ActivatedMembranes<'a, 'm> {
fn decode_embedded<'de, 'src, S: BinarySource<'de>>(
&mut self,
src: &'src mut S,

View File

@ -13,6 +13,7 @@ use preserves_schema::Codec;
use super::actor::{self, AnyValue, Ref, Cap};
use super::language;
use std::num::NonZeroU64;
use std::sync::Arc;
use std::time::SystemTime;
@ -27,8 +28,8 @@ pub struct TraceCollector {
impl<M> From<&Ref<M>> for Target {
fn from(v: &Ref<M>) -> Target {
Target {
actor: ActorId(AnyValue::new(v.mailbox.actor_id)),
facet: FacetId(AnyValue::new(u64::from(v.facet_id))),
actor: v.mailbox.actor_id.into(),
facet: v.facet_id.into(),
oid: Oid(AnyValue::new(v.oid())),
}
}
@ -51,7 +52,7 @@ impl TraceCollector {
let _ = self.tx.send(TraceEntry {
timestamp: SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
.expect("Time after Unix epoch").as_secs_f64().into(),
actor: ActorId(AnyValue::new(id)),
actor: id.into(),
item: a,
});
}
@ -159,3 +160,15 @@ impl From<actor::Name> for Name {
}
}
}
impl From<NonZeroU64> for ActorId {
fn from(v: NonZeroU64) -> Self {
ActorId(AnyValue::new(u64::from(v)))
}
}
impl From<NonZeroU64> for FacetId {
fn from(v: NonZeroU64) -> Self {
FacetId(AnyValue::new(u64::from(v)))
}
}