diff --git a/Cargo.toml b/Cargo.toml index 1c0f24f..2253c86 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,9 +5,9 @@ authors = ["Tony Garnock-Jones "] edition = "2018" [dependencies] -preserves = "0.2.3" +preserves = "0.3.0" -serde = { version = "1.0", features = ["derive"] } +serde = { version = "1.0", features = ["derive", "rc"] } serde_bytes = "0.11" tokio = "0.2.0-alpha" diff --git a/src/bag.rs b/src/bag.rs index 6b0ab11..0573f3c 100644 --- a/src/bag.rs +++ b/src/bag.rs @@ -2,8 +2,9 @@ use std::collections::BTreeMap; use std::collections::btree_map::{Iter, Keys, Entry}; use std::iter::{FromIterator, IntoIterator}; -type Count = i32; +pub type Count = i32; +#[derive(Debug, PartialEq, Eq)] pub enum Net { PresentToAbsent, AbsentToAbsent, diff --git a/src/main.rs b/src/main.rs index 916fd4e..055ae63 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,8 +11,24 @@ use preserves::value; use std::sync::{Mutex, Arc}; use tokio::net::TcpListener; +use std::sync::atomic::{AtomicUsize, Ordering}; + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum Syndicate { + Placeholder(usize), +} + +impl value::Domain for Syndicate {} + +static NEXT_PLACEHOLDER: AtomicUsize = AtomicUsize::new(0); +impl Syndicate { + pub fn new_placeholder() -> Self { + Self::Placeholder(NEXT_PLACEHOLDER.fetch_add(1, Ordering::SeqCst)) + } +} + pub type ConnId = u64; -pub type V = value::ArcValue; +pub type V = value::ArcValue; #[tokio::main] async fn main() -> Result<(), Box> { diff --git a/src/packets.rs b/src/packets.rs index 48e73e5..8a6440d 100644 --- a/src/packets.rs +++ b/src/packets.rs @@ -1,12 +1,14 @@ use super::V; +use super::Syndicate; use bytes::BytesMut; use preserves::value; use std::io; +use std::sync::Arc; pub type EndpointName = V; pub type Assertion = V; -pub type Captures = Vec; +pub type Captures = Arc>; #[derive(Debug, serde::Serialize, serde::Deserialize)] pub enum Action { @@ -44,7 +46,7 @@ pub enum Out { #[derive(Debug)] pub enum DecodeError { Read(value::decoder::Error), - Parse(value::error::Error, V), + Parse(value::error::Error, V), } impl From for DecodeError { @@ -58,7 +60,7 @@ impl From for DecodeError { #[derive(Debug)] pub enum EncodeError { Write(value::encoder::Error), - Unparse(value::error::Error), + Unparse(value::error::Error), } impl From for EncodeError { @@ -67,8 +69,8 @@ impl From for EncodeError { } } -impl From for EncodeError { - fn from(v: value::error::Error) -> Self { +impl From> for EncodeError { + fn from(v: value::error::Error) -> Self { EncodeError::Unparse(v) } } @@ -86,11 +88,11 @@ impl From for io::Error { //--------------------------------------------------------------------------- pub struct Codec { - codec: value::Codec, + codec: value::Codec, } impl Codec { - pub fn new(codec: value::Codec) -> Self { + pub fn new(codec: value::Codec) -> Self { Codec { codec } } } diff --git a/src/skeleton.rs b/src/skeleton.rs index 2435a7f..ef26c7f 100644 --- a/src/skeleton.rs +++ b/src/skeleton.rs @@ -1,4 +1,6 @@ use super::ConnId; +use super::Syndicate; +use super::bag; use super::packets::Assertion; use super::packets::Captures; use super::packets::EndpointName; @@ -6,13 +8,16 @@ use super::packets::Event; use preserves::value::{Map, Set, Value, NestedValue}; use std::collections::btree_map::Entry; +use std::iter::FromIterator; +use std::sync::Arc; -type Bag = super::bag::BTreeBag; +type Bag = bag::BTreeBag; type Path = Vec; type Paths = Vec; +type Events = Vec<(Vec, Captures)>; -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] pub struct Endpoint { connection: ConnId, name: EndpointName, @@ -24,16 +29,16 @@ pub enum Skeleton { } pub struct AnalysisResults { - skeleton: Skeleton, - const_paths: Paths, - const_vals: Vec, - capture_paths: Paths, - assertion: Assertion, + pub skeleton: Skeleton, + pub const_paths: Paths, + pub const_vals: Captures, + pub capture_paths: Paths, + pub assertion: Assertion, } #[derive(Debug)] pub struct Index { - all_assertions: Bag, + all_assertions: Bag, root: Node, } @@ -105,6 +110,61 @@ impl Index { } } } + + fn insert(&mut self, v: CachedAssertion) -> Events { + self.adjust(v, 1) + } + + fn remove(&mut self, v: CachedAssertion) -> Events { + self.adjust(v, -1) + } + + fn adjust(&mut self, outer_value: CachedAssertion, delta: bag::Count) -> Events { + let mut outputs = Vec::new(); + let net = self.all_assertions.change(outer_value.clone(), delta); + match net { + bag::Net::AbsentToPresent => { + Modification::new( + true, + &outer_value, + |c, v| { c.cached_assertions.insert(v.clone()); }, + |l, v| { l.cached_assertions.insert(v.clone()); }, + |es, cs| { + if es.cached_captures.change(cs.clone(), 1) == bag::Net::AbsentToPresent { + outputs.push((es.endpoints.iter().cloned().collect(), cs.clone())) + } + }) + .perform(&mut self.root); + } + bag::Net::PresentToAbsent => { + Modification::new( + false, + &outer_value, + |c, v| { c.cached_assertions.remove(v); }, + |l, v| { l.cached_assertions.remove(v); }, + |es, cs| { + if es.cached_captures.change(cs.clone(), -1) == bag::Net::PresentToAbsent { + outputs.push((es.endpoints.iter().cloned().collect(), cs.clone())) + } + }) + .perform(&mut self.root); + } + _ => () + } + outputs + } + + fn send(&mut self, outer_value: CachedAssertion) -> Events { + let mut outputs = Vec::new(); + Modification::new( + false, + &outer_value, + |_c, _v| (), + |_l, _v| (), + |es, cs| outputs.push((es.endpoints.iter().cloned().collect(), cs.clone()))) + .perform(&mut self.root); + outputs + } } #[derive(Debug)] @@ -153,6 +213,114 @@ impl Node { } } +#[derive(Debug)] +pub enum Stack<'a, T> { + Empty, + Item(T, &'a Stack<'a, T>) +} + +impl<'a, T> Stack<'a, T> { + fn pop(&self) -> &Self { + match self { + Stack::Empty => panic!("Internal error: pop: Incorrect pop_count computation"), + Stack::Item(_, tail) => tail + } + } + + fn top(&self) -> &T { + match self { + Stack::Empty => panic!("Internal error: top: Incorrect pop_count computation"), + Stack::Item(item, _) => item + } + } +} + +struct Modification<'op, FCont, FLeaf, FEndpoints> +where FCont: FnMut(&mut Continuation, &CachedAssertion) -> (), + FLeaf: FnMut(&mut Leaf, &CachedAssertion) -> (), + FEndpoints: FnMut(&mut Endpoints, Captures) -> () +{ + create_leaf_if_absent: bool, + outer_value: &'op CachedAssertion, + restriction_paths: Option<&'op Paths>, + outer_value_term: &'op Assertion, + m_cont: FCont, + m_leaf: FLeaf, + m_endpoints: FEndpoints, +} + +impl<'op, FCont, FLeaf, FEndpoints> Modification<'op, FCont, FLeaf, FEndpoints> +where FCont: FnMut(&mut Continuation, &CachedAssertion) -> (), + FLeaf: FnMut(&mut Leaf, &CachedAssertion) -> (), + FEndpoints: FnMut(&mut Endpoints, Captures) -> () +{ + fn new(create_leaf_if_absent: bool, + outer_value: &'op CachedAssertion, + m_cont: FCont, + m_leaf: FLeaf, + m_endpoints: FEndpoints) -> Self { + let (restriction_paths, outer_value_term) = outer_value.unpack(); + Modification { + create_leaf_if_absent, + outer_value, + restriction_paths, + outer_value_term, + m_cont, + m_leaf, + m_endpoints, + } + } + + fn perform(&mut self, n: &mut Node) { + self.node(n, &Stack::Item(self.outer_value_term, &Stack::Empty)) + } + + fn node(&mut self, n: &mut Node, term_stack: &Stack<&Assertion>) { + self.continuation(&mut n.continuation); + for (selector, table) in &mut n.edges { + let mut next_stack = term_stack; + for _ in 0..selector.pop_count { next_stack = next_stack.pop() } + let next_value = step(next_stack.top(), selector.index); + if let Some(next_class) = class_of(next_value) { + if let Some(next_node) = table.get_mut(&next_class) { + self.node(next_node, &Stack::Item(next_value, next_stack)) + } + } + } + } + + fn continuation(&mut self, c: &mut Continuation) { + (self.m_cont)(c, self.outer_value); + let mut empty_const_paths = Vec::new(); + for (const_paths, const_val_map) in &mut c.leaf_map { + let const_vals = project_paths(self.outer_value_term, const_paths); + let leaf_opt = if self.create_leaf_if_absent { + Some(const_val_map.entry(const_vals.clone()).or_insert_with(Leaf::new)) + } else { + const_val_map.get_mut(&const_vals) + }; + if let Some(leaf) = leaf_opt { + (self.m_leaf)(leaf, self.outer_value); + for (capture_paths, endpoints) in &mut leaf.endpoints_map { + if is_unrestricted(&capture_paths, self.restriction_paths) { + (self.m_endpoints)(endpoints, + project_paths(self.outer_value_term, &capture_paths)); + } + } + if leaf.is_empty() { + const_val_map.remove(&const_vals); + if const_val_map.is_empty() { + empty_const_paths.push(const_paths.clone()); + } + } + } + } + for const_paths in empty_const_paths { + c.leaf_map.remove(&const_paths); + } + } +} + fn class_of(v: &Assertion) -> Option { match v.value() { Value::Sequence(ref vs) => Some(Guard::Seq(vs.len())), @@ -169,8 +337,8 @@ fn project_path<'a>(v: &'a Assertion, p: &Path) -> &'a Assertion { v } -fn project_paths<'a>(v: &'a Assertion, ps: &Paths) -> Vec { - ps.iter().map(|p| project_path(v, p)).cloned().collect() +fn project_paths<'a>(v: &'a Assertion, ps: &Paths) -> Captures { + Arc::new(ps.iter().map(|p| project_path(v, p)).cloned().collect()) } fn step(v: &Assertion, i: usize) -> &Assertion { @@ -184,7 +352,7 @@ fn step(v: &Assertion, i: usize) -> &Assertion { #[derive(Debug)] struct Continuation { cached_assertions: Set, - leaf_map: Map, Leaf>>, + leaf_map: Map>, } impl Continuation { @@ -205,6 +373,15 @@ pub enum Guard { Seq(usize), } +impl Guard { + fn arity(&self) -> usize { + match self { + Guard::Rec(_, s) => *s, + Guard::Seq(s) => *s + } + } +} + #[derive(Debug)] struct Leaf { // aka Topic cached_assertions: Set, @@ -234,7 +411,7 @@ impl Endpoints { } #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] -enum CachedAssertion { +pub enum CachedAssertion { VisibilityRestricted(Paths, Assertion), Unrestricted(Assertion), } @@ -262,3 +439,105 @@ fn is_unrestricted(capture_paths: &Paths, restriction_paths: Option<&Paths>) -> panic!("Not yet implemented"); } + +pub struct Analyzer { + const_paths: Paths, + const_vals: Vec, + capture_paths: Paths, + path: Path, +} + +impl Analyzer { + pub fn analyze(a: &Assertion) -> AnalysisResults { + let mut z = Analyzer { + const_paths: Vec::new(), + const_vals: Vec::new(), + capture_paths: Vec::new(), + path: Vec::new(), + }; + let skeleton = z.walk(a); + AnalysisResults { + skeleton, + const_paths: z.const_paths, + const_vals: Arc::new(z.const_vals), + capture_paths: z.capture_paths, + assertion: a.clone(), + } + } + + fn walk(&mut self, mut a: &Assertion) -> Skeleton { + while let Some(fields) = a.value().as_simple_record("Capture", Some(1)) { + self.capture_paths.push(self.path.clone()); + a = &fields[0]; + } + + if a.value().is_simple_record("Discard", Some(0)) { + return Skeleton::Blank; + } else { + match class_of(a) { + Some(cls) => { + let arity = cls.arity(); + Skeleton::Guarded(cls, + (0..arity).map(|i| { + self.path.push(i); + let s = self.walk(step(a, i)); + self.path.pop(); + s + }).collect()) + } + None => { + self.const_paths.push(self.path.clone()); + self.const_vals.push(a.clone()); + Skeleton::Blank + } + } + } + } +} + +pub fn instantiate_assertion(a: &Assertion, cs: Captures) -> CachedAssertion { + let mut capture_paths = Vec::new(); + let mut path = Vec::new(); + let mut vs: Vec = (*cs).clone(); + vs.reverse(); + let instantiated = instantiate_assertion_walk(&mut capture_paths, &mut path, &mut vs, a); + CachedAssertion::VisibilityRestricted(capture_paths, instantiated) +} + +fn instantiate_assertion_walk(capture_paths: &mut Paths, + path: &mut Path, + vs: &mut Vec, + a: &Assertion) -> Assertion { + if let Some(fields) = a.value().as_simple_record("Capture", Some(1)) { + capture_paths.push(path.clone()); + let v = vs.pop().unwrap(); + instantiate_assertion_walk(capture_paths, path, vs, &fields[0]); + return v; + } + + if a.value().is_simple_record("Discard", Some(0)) { + return Value::Domain(Syndicate::new_placeholder()).wrap(); + } else { + let f = |(i, aa)| { + path.push(i); + let vv = instantiate_assertion_walk(capture_paths, + path, + vs, + aa); + path.pop(); + vv + }; + match class_of(a) { + Some(Guard::Seq(_)) => + Value::from(Vec::from_iter(a.value().as_sequence().unwrap() + .iter().enumerate().map(f))) + .wrap(), + Some(Guard::Rec(l, _)) => + Value::record(l, a.value().as_record().unwrap().1 + .iter().enumerate().map(f).collect()) + .wrap(), + None => + a.clone(), + } + } +}