Step toward inferior syndicate processes
This commit is contained in:
parent
9af31cfaad
commit
d87ff4f62f
|
@ -1,5 +1,5 @@
|
|||
´³bundle·µ³externalServices„´³schema·³version‘³definitions·³Process´³orµµ±simple´³refµ„³CommandLine„„µ±full´³refµ„³FullProcess„„„„³Service´³refµ„³
DaemonService„³ClearEnv´³orµµ±present´³dict·³clearEnv´³named³clearEnv´³atom³Boolean„„„„„µ±invalid´³dict·³clearEnv´³named³clearEnv³any„„„„µ±absent´³dict·„„„„„³EnvValue´³orµµ±set´³atom³String„„µ±remove´³lit€„„µ±invalid³any„„„³
|
||||
´³bundle·µ³externalServices„´³schema·³version‘³definitions·³Process´³orµµ±simple´³refµ„³CommandLine„„µ±full´³refµ„³FullProcess„„„„³Service´³refµ„³
DaemonService„³ClearEnv´³orµµ±present´³dict·³clearEnv´³named³clearEnv´³atom³Boolean„„„„„µ±invalid´³dict·³clearEnv´³named³clearEnv³any„„„„µ±absent´³dict·„„„„„³EnvValue´³orµµ±set´³atom³String„„µ±remove´³lit€„„µ±invalid³any„„„³Protocol´³orµµ±none´³lit³none„„µ± syndicate´³lit³ syndicate„„„„³
|
||||
ProcessDir´³orµµ±present´³dict·³dir´³named³dir´³atom³String„„„„„µ±invalid´³dict·³dir´³named³dir³any„„„„µ±absent´³dict·„„„„„³
|
||||
ProcessEnv´³orµµ±present´³dict·³env´³named³env´³dictof´³refµ„³EnvVariable„´³refµ„³EnvValue„„„„„„µ±invalid´³dict·³env´³named³env³any„„„„µ±absent´³dict·„„„„„³CommandLine´³orµµ±shell´³atom³String„„µ±full´³refµ„³FullCommandLine„„„„³EnvVariable´³orµµ±string´³atom³String„„µ±symbol´³atom³Symbol„„µ±invalid³any„„„³FullProcess´³andµ´³dict·³argv´³named³argv´³refµ„³CommandLine„„„„´³named³env´³refµ„³
|
||||
ProcessEnv„„´³named³dir´³refµ„³
|
||||
ProcessDir„„´³named³clearEnv´³refµ„³ClearEnv„„„„³ReadyOnStart´³orµµ±present´³dict·³readyOnStart´³named³readyOnStart´³atom³Boolean„„„„„µ±invalid´³dict·³readyOnStart´³named³readyOnStart³any„„„„µ±absent´³dict·„„„„„³RestartField´³orµµ±present´³dict·³restart´³named³restart´³refµ„³
RestartPolicy„„„„„µ±invalid´³dict·³restart´³named³restart³any„„„„µ±absent´³dict·„„„„„³
DaemonProcess´³rec´³lit³daemon„´³tupleµ´³named³id³any„´³named³config´³refµ„³DaemonProcessSpec„„„„„³
DaemonService´³rec´³lit³daemon„´³tupleµ´³named³id³any„„„„³
RestartPolicy´³orµµ±always´³lit³always„„µ±onError´³lit³on-error„„µ±all´³lit³all„„„„³FullCommandLine´³tuplePrefixµ´³named³program´³atom³String„„„´³named³args´³seqof´³atom³String„„„„³DaemonProcessSpec´³orµµ±simple´³refµ„³CommandLine„„µ±full´³refµ„³FullDaemonProcess„„„„³FullDaemonProcess´³andµ´³named³process´³refµ„³FullProcess„„´³named³readyOnStart´³refµ„³ReadyOnStart„„´³named³restart´³refµ„³RestartField„„„„„³embeddedType´³refµ³ EntityRef„³Cap„„„µ³internalServices„´³schema·³version‘³definitions·³ Milestone´³rec´³lit³ milestone„´³tupleµ´³named³name³any„„„„³DebtReporter´³lit³
debt-reporter„³
ConfigWatcher´³rec´³lit³config-watcher„´³tupleµ´³named³path´³atom³String„„„„„³TcpRelayListener´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Tcp„„„„„³UnixRelayListener´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Unix„„„„„„³embeddedType´³refµ³ EntityRef„³Cap„„„„„
|
||||
ProcessDir„„´³named³clearEnv´³refµ„³ClearEnv„„„„³ReadyOnStart´³orµµ±present´³dict·³readyOnStart´³named³readyOnStart´³atom³Boolean„„„„„µ±invalid´³dict·³readyOnStart´³named³readyOnStart³any„„„„µ±absent´³dict·„„„„„³RestartField´³orµµ±present´³dict·³restart´³named³restart´³refµ„³
RestartPolicy„„„„„µ±invalid´³dict·³restart´³named³restart³any„„„„µ±absent´³dict·„„„„„³
DaemonProcess´³rec´³lit³daemon„´³tupleµ´³named³id³any„´³named³config´³refµ„³DaemonProcessSpec„„„„„³
DaemonService´³rec´³lit³daemon„´³tupleµ´³named³id³any„„„„³
ProtocolField´³orµµ±present´³dict·³protocol´³named³protocol´³refµ„³Protocol„„„„„µ±invalid´³dict·³protocol´³named³protocol³any„„„„µ±absent´³dict·„„„„„³
RestartPolicy´³orµµ±always´³lit³always„„µ±onError´³lit³on-error„„µ±all´³lit³all„„„„³FullCommandLine´³tuplePrefixµ´³named³program´³atom³String„„„´³named³args´³seqof´³atom³String„„„„³DaemonProcessSpec´³orµµ±simple´³refµ„³CommandLine„„µ±full´³refµ„³FullDaemonProcess„„„„³FullDaemonProcess´³andµ´³named³process´³refµ„³FullProcess„„´³named³readyOnStart´³refµ„³ReadyOnStart„„´³named³restart´³refµ„³RestartField„„´³named³protocol´³refµ„³
ProtocolField„„„„„³embeddedType´³refµ³ EntityRef„³Cap„„„µ³internalServices„´³schema·³version‘³definitions·³ Milestone´³rec´³lit³ milestone„´³tupleµ´³named³name³any„„„„³DebtReporter´³lit³
debt-reporter„³
ConfigWatcher´³rec´³lit³config-watcher„´³tupleµ´³named³path´³atom³String„„„„„³TcpRelayListener´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Tcp„„„„„³UnixRelayListener´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Unix„„„„„„³embeddedType´³refµ³ EntityRef„³Cap„„„„„
|
|
@ -7,9 +7,10 @@ DaemonService = <daemon @id any> .
|
|||
DaemonProcess = <daemon @id any @config DaemonProcessSpec>.
|
||||
|
||||
DaemonProcessSpec = @simple CommandLine / @full FullDaemonProcess .
|
||||
FullDaemonProcess = @process FullProcess & @readyOnStart ReadyOnStart & @restart RestartField .
|
||||
FullDaemonProcess = @process FullProcess & @readyOnStart ReadyOnStart & @restart RestartField & @protocol ProtocolField .
|
||||
ReadyOnStart = @present { readyOnStart: bool } / @invalid { readyOnStart: any } / @absent {} .
|
||||
RestartField = @present { restart: RestartPolicy } / @invalid { restart: any } / @absent {} .
|
||||
ProtocolField = @present { protocol: Protocol } / @invalid { protocol: any } / @absent {} .
|
||||
|
||||
Process = @simple CommandLine / @full FullProcess .
|
||||
FullProcess =
|
||||
|
@ -40,3 +41,11 @@ RestartPolicy =
|
|||
; within the daemon).
|
||||
=all
|
||||
.
|
||||
|
||||
Protocol =
|
||||
/ ; stdin is /dev/null, output and error are logged
|
||||
=none
|
||||
/ ; stdin and stdout are Syndicate-protocol channels
|
||||
=syndicate
|
||||
.
|
||||
|
||||
|
|
|
@ -99,12 +99,7 @@ impl FullProcess {
|
|||
return None;
|
||||
}
|
||||
}
|
||||
|
||||
cmd.stdin(std::process::Stdio::null());
|
||||
cmd.stdout(std::process::Stdio::piped());
|
||||
cmd.stderr(std::process::Stdio::piped());
|
||||
cmd.kill_on_drop(true);
|
||||
|
||||
Some(cmd)
|
||||
}
|
||||
}
|
||||
|
@ -116,6 +111,7 @@ impl DaemonProcessSpec {
|
|||
process: Process::Simple(command_line).elaborate(),
|
||||
ready_on_start: ReadyOnStart::Absent,
|
||||
restart: RestartField::Absent,
|
||||
protocol: ProtocolField::Absent,
|
||||
},
|
||||
DaemonProcessSpec::Full(spec) => *spec,
|
||||
}
|
||||
|
@ -143,6 +139,7 @@ struct DaemonInstance {
|
|||
unready_configs: Arc<Field<isize>>,
|
||||
completed_processes: Arc<Field<isize>>,
|
||||
restart_policy: RestartPolicy,
|
||||
protocol: Protocol,
|
||||
}
|
||||
|
||||
impl DaemonInstance {
|
||||
|
@ -255,8 +252,34 @@ impl DaemonInstance {
|
|||
|
||||
let facet = t.facet.clone();
|
||||
|
||||
if let Some(r) = child.stdout.take() { self.log(t, facet.clone(), pid, r, "stdout"); }
|
||||
if let Some(r) = child.stderr.take() { self.log(t, facet.clone(), pid, r, "stderr"); }
|
||||
if let Some(r) = child.stderr.take() {
|
||||
self.log(t, facet.clone(), pid, r, "stderr");
|
||||
}
|
||||
|
||||
match self.protocol {
|
||||
Protocol::None => {
|
||||
if let Some(r) = child.stdout.take() {
|
||||
self.log(t, facet.clone(), pid, r, "stdout");
|
||||
}
|
||||
}
|
||||
Protocol::Syndicate => {
|
||||
t.facet(|t| {
|
||||
use syndicate::relay;
|
||||
use syndicate::schemas::sturdy;
|
||||
|
||||
let to_child = child.stdin.take().expect("pipe to child");
|
||||
let from_child = child.stdout.take().expect("pipe from child");
|
||||
let i = relay::Input::Bytes(Box::pin(from_child));
|
||||
let o = relay::Output::Bytes(Box::pin(to_child));
|
||||
|
||||
let cap = relay::TunnelRelay::run(t, i, o, None, Some(sturdy::Oid(0.into())))
|
||||
.ok_or("initial capability reference unavailable")?;
|
||||
|
||||
tracing::info!(?cap);
|
||||
Ok(())
|
||||
})?;
|
||||
}
|
||||
}
|
||||
|
||||
if self.announce_presumed_readiness {
|
||||
counter::adjust(t, &self.unready_configs, -1);
|
||||
|
@ -325,7 +348,7 @@ fn run(
|
|||
let config = config.elaborate();
|
||||
let facet = t.facet.clone();
|
||||
t.linked_task(syndicate::name!("subprocess"), async move {
|
||||
let cmd = config.process.build_command().ok_or("Cannot start daemon process")?;
|
||||
let mut cmd = config.process.build_command().ok_or("Cannot start daemon process")?;
|
||||
|
||||
let announce_presumed_readiness = match config.ready_on_start {
|
||||
ReadyOnStart::Present { ready_on_start } => ready_on_start,
|
||||
|
@ -343,6 +366,21 @@ fn run(
|
|||
Err("Invalid restart value")?
|
||||
}
|
||||
};
|
||||
let protocol = match config.protocol {
|
||||
ProtocolField::Present { protocol } => *protocol,
|
||||
ProtocolField::Absent => Protocol::None,
|
||||
ProtocolField::Invalid { protocol } => {
|
||||
tracing::error!(?protocol, "Invalid protocol value");
|
||||
Err("Invalid protocol value")?
|
||||
}
|
||||
};
|
||||
|
||||
cmd.stdin(match &protocol {
|
||||
Protocol::None => std::process::Stdio::null(),
|
||||
Protocol::Syndicate => std::process::Stdio::piped(),
|
||||
});
|
||||
cmd.stdout(std::process::Stdio::piped());
|
||||
cmd.stderr(std::process::Stdio::piped());
|
||||
|
||||
let daemon_instance = DaemonInstance {
|
||||
log_ds: root_ds,
|
||||
|
@ -353,6 +391,7 @@ fn run(
|
|||
unready_configs,
|
||||
completed_processes,
|
||||
restart_policy,
|
||||
protocol,
|
||||
};
|
||||
|
||||
facet.activate(Account::new(syndicate::name!("instance-startup")), |t| {
|
||||
|
|
Loading…
Reference in New Issue