Dataspace links into skeleton; seems to be working

This commit is contained in:
Tony Garnock-Jones 2019-10-24 20:05:39 +01:00
parent 6098c62604
commit 482f903149
5 changed files with 166 additions and 44 deletions

View File

@ -1,27 +1,31 @@
use super::V;
use super::ConnId;
use super::packets;
use super::packets::{self, Assertion, EndpointName, Captures};
use super::skeleton;
use preserves::value::Map;
use preserves::value::{self, Map, NestedValue};
use std::sync::{Arc, RwLock};
use tokio::sync::mpsc::UnboundedSender;
pub type DataspaceRef = Arc<RwLock<Dataspace>>;
pub type DataspaceError = (String, V);
pub enum DataspaceError {
#[derive(Debug)]
struct Actor {
tx: UnboundedSender<packets::Out>,
endpoints: Map<EndpointName, ActorEndpoint>,
}
impl From<DataspaceError> for std::io::Error {
fn from(v: DataspaceError) -> Self {
panic!("No DataspaceErrors to convert!")
}
#[derive(Debug)]
struct ActorEndpoint {
analysis_results: Option<skeleton::AnalysisResults>,
assertion: Assertion,
}
#[derive(Debug)]
pub struct Dataspace {
name: V,
peers: Map<ConnId, UnboundedSender<packets::Out>>,
peers: Map<ConnId, Actor>,
index: skeleton::Index,
}
@ -36,17 +40,117 @@ impl Dataspace {
pub fn register(&mut self, id: ConnId, tx: UnboundedSender<packets::Out>) {
assert!(!self.peers.contains_key(&id));
self.peers.insert(id, tx);
self.peers.insert(id, Actor {
tx,
endpoints: Map::new(),
});
}
pub fn deregister(&mut self, id: ConnId) {
self.peers.remove(&id);
let ac = self.peers.remove(&id).unwrap();
let mut outbound_turns: Map<ConnId, Vec<packets::Event>> = Map::new();
for (epname, ep) in ac.endpoints {
self.remove_endpoint(&mut outbound_turns, id, &epname, ep);
}
outbound_turns.remove(&id);
self.deliver_outbound_turns(outbound_turns);
}
pub fn turn(&mut self, actions: Vec<packets::Action>) -> Result<(), DataspaceError> {
fn remove_endpoint(&mut self,
mut outbound_turns: &mut Map<ConnId, Vec<packets::Event>>,
id: ConnId,
epname: &EndpointName,
ep: ActorEndpoint)
{
let ActorEndpoint{ analysis_results, assertion } = ep;
if let Some(ar) = analysis_results {
self.index.remove_endpoint(&ar, skeleton::Endpoint {
connection: id,
name: epname.clone(),
});
}
schedule_events(&mut outbound_turns,
self.index.remove((&assertion).into()),
|epname, cs| packets::Event::Del(epname, cs));
}
pub fn turn(&mut self, id: ConnId, actions: Vec<packets::Action>) ->
Result<(), DataspaceError>
{
let mut outbound_turns: Map<ConnId, Vec<packets::Event>> = Map::new();
for a in actions {
println!("Turn action: {:?}", &a);
match a {
packets::Action::Assert(ref epname, ref assertion) => {
let ac = self.peers.get_mut(&id).unwrap();
if ac.endpoints.contains_key(&epname) {
return Err(("Duplicate endpoint name".to_string(),
value::to_value(a).unwrap()));
}
let ar =
if let Some(fs) = assertion.value().as_simple_record("Observe", Some(1)) {
let ar = skeleton::analyze(&fs[0]);
let events = self.index.add_endpoint(&ar, skeleton::Endpoint {
connection: id,
name: epname.clone(),
});
outbound_turns.entry(id).or_insert_with(Vec::new).extend(events);
Some(ar)
} else {
None
};
schedule_events(&mut outbound_turns,
self.index.insert(assertion.into()),
|epname, cs| packets::Event::Add(epname, cs));
ac.endpoints.insert(epname.clone(), ActorEndpoint {
analysis_results: ar,
assertion: assertion.clone()
});
}
packets::Action::Clear(ref epname) => {
let ac = self.peers.get_mut(&id).unwrap();
match ac.endpoints.remove(epname) {
None => {
return Err(("Nonexistent endpoint name".to_string(),
value::to_value(a).unwrap()));
}
Some(ep) => {
self.remove_endpoint(&mut outbound_turns, id, epname, ep);
outbound_turns.entry(id).or_insert_with(Vec::new)
.push(packets::Event::End(epname.clone()));
}
}
}
packets::Action::Message(ref assertion) => {
schedule_events(&mut outbound_turns,
self.index.send(assertion.into()),
|epname, cs| packets::Event::Msg(epname, cs));
}
}
}
self.deliver_outbound_turns(outbound_turns);
Ok(())
}
fn deliver_outbound_turns(&mut self, outbound_turns: Map<ConnId, Vec<packets::Event>>) {
for (target, events) in outbound_turns {
let _ = self.peers.get_mut(&target).unwrap().tx.try_send(packets::Out::Turn(events));
}
}
}
fn schedule_events<C>(outbound_turns: &mut Map<ConnId, Vec<packets::Event>>,
events: skeleton::Events,
ctor: C)
where C: Fn(EndpointName, Captures) -> packets::Event
{
for (eps, cs) in events {
for ep in eps {
outbound_turns.entry(ep.connection).or_insert_with(Vec::new)
.push(ctor(ep.name, cs.clone()));
}
}
}

View File

@ -45,8 +45,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
id += 1;
tokio::spawn(async move {
match peer::Peer::new(connid, stream).await.run(spaces).await {
Ok(_) => (),
Err(e) => println!("Connection {:?} died with {:?}", addr, e),
Ok(_) => println!("Connection {} ({:?}) terminated", connid, addr),
Err(e) => println!("Connection {} ({:?}) died with {:?}", connid, addr, e),
}
});
}

View File

@ -35,7 +35,7 @@ pub enum In {
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub enum Out {
Err(String),
Err(String, V),
Turn(Vec<Event>),
Ping(),
Pong(),

View File

@ -1,3 +1,4 @@
use super::V;
use super::ConnId;
use super::dataspace;
use super::packets;
@ -21,8 +22,8 @@ pub struct Peer {
space: Option<dataspace::DataspaceRef>,
}
fn err(s: &str) -> packets::Out {
packets::Out::Err(s.into())
fn err(s: &str, ctx: V) -> packets::Out {
packets::Out::Err(s.into(), ctx)
}
impl Peer {
@ -47,7 +48,7 @@ impl Peer {
} else {
let e: String = format!("Expected initial Connect, got {:?}", firstpacket);
println!("{:?}: {}", self.id, e);
self.frames.send(err(&e)).await?;
self.frames.send(err(&e, value::Value::from(false).wrap())).await?;
return Ok(())
};
@ -66,14 +67,23 @@ impl Peer {
Ok(p) => {
println!("{:?}: input {:?}", self.id, &p);
match p {
packets::In::Turn(actions) =>
self.space.as_ref().unwrap().write().unwrap().turn(actions)?,
packets::In::Turn(actions) => {
match self.space.as_ref().unwrap().write().unwrap()
.turn(self.id, actions)
{
Ok(()) => (),
Err((msg, ctx)) => {
to_send.push(err(&msg, ctx));
running = false;
}
}
}
packets::In::Ping() =>
to_send.push(packets::Out::Pong()),
packets::In::Pong() =>
(),
packets::In::Connect(dsname) => {
to_send.push(err("Unexpected Connect"));
packets::In::Connect(_) => {
to_send.push(err("Unexpected Connect", value::to_value(p).unwrap()));
running = false;
}
}
@ -81,12 +91,11 @@ impl Peer {
Err(packets::DecodeError::Read(value::decoder::Error::Eof)) => running = false,
Err(packets::DecodeError::Read(value::decoder::Error::Io(e))) => return Err(e),
Err(packets::DecodeError::Read(value::decoder::Error::Syntax(s))) => {
to_send.push(err(s));
to_send.push(err(s, value::Value::from(false).wrap()));
running = false;
}
Err(packets::DecodeError::Parse(e, v)) => {
to_send.push(err(&format!(
"Packet deserialization error ({}) decoding {:?}", e, v)));
to_send.push(err(&format!("Packet deserialization error: {}", e), v));
running = false;
}
}
@ -97,15 +106,16 @@ impl Peer {
Some(msg) => to_send.push(msg),
None => {
/* weird. */
to_send.push(err("Outbound channel closed unexpectedly"));
to_send.push(err("Outbound channel closed unexpectedly",
value::Value::from(false).wrap()));
running = false;
}
}
},
}
for v in to_send {
if let packets::Out::Err(ref msg) = v {
println!("{:?}: connection crashed: {}", self.id, msg);
if let packets::Out::Err(ref msg, ref ctx) = v {
println!("{:?}: connection crashed: {}; context {:?}", self.id, msg, ctx);
} else {
println!("{:?}: output {:?}", self.id, &v);
}

View File

@ -14,21 +14,23 @@ use std::sync::Arc;
type Bag<A> = bag::BTreeBag<A>;
type Path = Vec<usize>;
type Paths = Vec<Path>;
type Events = Vec<(Vec<Endpoint>, Captures)>;
pub type Path = Vec<usize>;
pub type Paths = Vec<Path>;
pub type Events = Vec<(Vec<Endpoint>, Captures)>;
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
pub struct Endpoint {
connection: ConnId,
name: EndpointName,
pub connection: ConnId,
pub name: EndpointName,
}
#[derive(Debug)]
pub enum Skeleton {
Blank,
Guarded(Guard, Vec<Skeleton>)
}
#[derive(Debug)]
pub struct AnalysisResults {
pub skeleton: Skeleton,
pub const_paths: Paths,
@ -48,7 +50,7 @@ impl Index {
Index{ all_assertions: Bag::new(), root: Node::new(Continuation::new(Set::new())) }
}
pub fn add_endpoint(&mut self, analysis_results: AnalysisResults, endpoint: Endpoint)
pub fn add_endpoint(&mut self, analysis_results: &AnalysisResults, endpoint: Endpoint)
-> Vec<Event>
{
let continuation = self.root.extend(&analysis_results.skeleton);
@ -62,8 +64,8 @@ impl Index {
}
cvm
});
let capture_paths = analysis_results.capture_paths;
let leaf = const_val_map.entry(analysis_results.const_vals).or_insert_with(Leaf::new);
let capture_paths = &analysis_results.capture_paths;
let leaf = const_val_map.entry(analysis_results.const_vals.clone()).or_insert_with(Leaf::new);
let leaf_cached_assertions = &leaf.cached_assertions;
let endpoints = leaf.endpoints_map.entry(capture_paths.clone()).or_insert_with(|| {
let mut b = Bag::new();
@ -83,18 +85,18 @@ impl Index {
.collect()
}
pub fn remove_endpoint(&mut self, analysis_results: AnalysisResults, endpoint: Endpoint) {
pub fn remove_endpoint(&mut self, analysis_results: &AnalysisResults, endpoint: Endpoint) {
let continuation = self.root.extend(&analysis_results.skeleton);
if let Entry::Occupied(mut const_val_map_entry)
= continuation.leaf_map.entry(analysis_results.const_paths)
= continuation.leaf_map.entry(analysis_results.const_paths.clone())
{
let const_val_map = const_val_map_entry.get_mut();
if let Entry::Occupied(mut leaf_entry)
= const_val_map.entry(analysis_results.const_vals)
= const_val_map.entry(analysis_results.const_vals.clone())
{
let leaf = leaf_entry.get_mut();
if let Entry::Occupied(mut endpoints_entry)
= leaf.endpoints_map.entry(analysis_results.capture_paths)
= leaf.endpoints_map.entry(analysis_results.capture_paths.clone())
{
let endpoints = endpoints_entry.get_mut();
endpoints.endpoints.remove(&endpoint);
@ -112,15 +114,15 @@ impl Index {
}
}
fn insert(&mut self, v: CachedAssertion) -> Events {
pub fn insert(&mut self, v: CachedAssertion) -> Events {
self.adjust(v, 1)
}
fn remove(&mut self, v: CachedAssertion) -> Events {
pub fn remove(&mut self, v: CachedAssertion) -> Events {
self.adjust(v, -1)
}
fn adjust(&mut self, outer_value: CachedAssertion, delta: bag::Count) -> Events {
pub fn adjust(&mut self, outer_value: CachedAssertion, delta: bag::Count) -> Events {
let mut outputs = Vec::new();
let net = self.all_assertions.change(outer_value.clone(), delta);
match net {
@ -155,7 +157,7 @@ impl Index {
outputs
}
fn send(&mut self, outer_value: CachedAssertion) -> Events {
pub fn send(&mut self, outer_value: CachedAssertion) -> Events {
let mut outputs = Vec::new();
Modification::new(
false,
@ -273,7 +275,7 @@ where FCont: FnMut(&mut Continuation, &CachedAssertion) -> (),
}
fn perform(&mut self, n: &mut Node) {
self.node(n, &Stack::Item(self.outer_value_term, &Stack::Empty))
self.node(n, &Stack::Item(&Value::from(vec![self.outer_value_term.clone()]).wrap(), &Stack::Empty))
}
fn node(&mut self, n: &mut Node, term_stack: &Stack<&Assertion>) {
@ -417,6 +419,12 @@ pub enum CachedAssertion {
Unrestricted(Assertion),
}
impl From<&Assertion> for CachedAssertion {
fn from(a: &Assertion) -> Self {
CachedAssertion::Unrestricted(a.clone())
}
}
impl CachedAssertion {
fn unscope(&self) -> &Assertion {
match self {