First steps to schema support for python

This commit is contained in:
Tony Garnock-Jones 2021-08-15 22:55:25 -04:00
parent 5c8bacd759
commit abe60b3506
2 changed files with 359 additions and 1 deletions

View File

@ -35,7 +35,10 @@ class Float(object):
class Symbol(object):
def __init__(self, name):
self.name = name
if isinstance(name, Symbol):
self.name = name.name
else:
self.name = name
def __eq__(self, other):
return isinstance(other, Symbol) and self.name == other.name

View File

@ -0,0 +1,355 @@
from .preserves import *
import pathlib
AND = Symbol('and')
ANY = Symbol('any')
ATOM = Symbol('atom')
BOOLEAN = Symbol('Boolean')
BUNDLE = Symbol('bundle')
BYTE_STRING = Symbol('ByteString')
DEFINITIONS = Symbol('definitions')
DICT = Symbol('dict')
DICTOF = Symbol('dictof')
DOUBLE = Symbol('Double')
EMBEDDED = Symbol('embedded')
FLOAT = Symbol('Float')
LIT = Symbol('lit')
NAMED = Symbol('named')
OR = Symbol('or')
REC = Symbol('rec')
REF = Symbol('ref')
SCHEMA = Symbol('schema')
SEQOF = Symbol('seqof')
SETOF = Symbol('setof')
SIGNED_INTEGER = Symbol('SignedInteger')
STRING = Symbol('String')
SYMBOL = Symbol('Symbol')
TUPLE = Symbol('tuple')
TUPLE_PREFIX = Symbol('tuplePrefix')
VERSION = Symbol('version')
class SchemaEntity:
ROOTNS = None
SCHEMA = None
MODULE_PATH = None
NAME = None
@classmethod
def decode(cls, v):
i = cls.try_decode(v)
if i is None:
raise Exception('Could not decode ' + str(cls))
return i
@classmethod
def try_decode(cls, v):
raise Exception('Subclass responsibility')
@classmethod
def parse(cls, p, v, args):
if p == ANY:
return v
if p.key == NAMED:
i = cls.parse(p[1], v, args)
if i is not None: args.append(i)
return i
if p.key == ATOM:
k = p[0]
if k == BOOLEAN and isinstance(v, bool): return v
if k == FLOAT and isinstance(v, Float): return v
if k == DOUBLE and isinstance(v, float): return v
if k == SIGNED_INTEGER and isinstance(v, int): return v
if k == STRING and isinstance(v, str): return v
if k == BYTE_STRING and isinstance(v, bytes): return v
if k == SYMBOL and isinstance(v, Symbol): return v
return None
if p.key == EMBEDDED:
return v ## TODO: reconsider representation of embedded values?
if p.key == LIT:
if v == p[0]: return ()
return None
if p.key == SEQOF:
if not isinstance(v, tuple): return None
vv = []
for w in v:
ww = cls.parse(p[0], w, args)
if ww is None: return None
vv.append(ww)
return vv
if p.key == SETOF:
if not isinstance(v, set): return None
vv = set()
for w in v:
ww = cls.parse(p[0], w, args)
if ww is None: return None
vv.add(ww)
return vv
if p.key == DICTOF:
if not isinstance(v, dict): return None
dd = {}
for (k, w) in v.items():
kk = cls.parse(p[0], k, args)
if kk is None: return None
ww = cls.parse(p[1], w, args)
if ww is None: return None
dd[kk] = ww
return dd
if p.key == REF:
c = lookup(cls.ROOTNS, cls.MODULE_PATH if len(p[0]) == 0 else p[0], p[1])
return c.try_decode(v)
if p.key == REC:
if not isinstance(v, Record): return None
if cls.parse(p[0], v.key, args) is None: return None
if cls.parse(p[1], v.fields, args) is None: return None
return ()
if p.key == TUPLE:
if not isinstance(v, tuple): return None
if len(v) != len(p[0]): return None
i = 0
for pp in p[0]:
if cls.parse(pp, v[i], args) is None: return None
i = i + 1
return ()
if p.key == TUPLE_PREFIX:
if not isinstance(v, tuple): return None
if len(v) < len(p[0]): return None
i = 0
for pp in p[0]:
if cls.parse(pp, v[i], args) is None: return None
i = i + 1
if cls.parse(p[1], v[i:], args) is None: return None
return ()
if p.key == DICT:
if not isinstance(v, dict): return None
if len(v) < len(p[0]): return None
for (k, pp) in p[0].items():
if k not in v: return None
if cls.parse(pp, v[k], args) is None: return None
return ()
raise Exception('Bad schema')
def _encode(self):
raise Exception('Subclass responsibility')
def __repr__(self):
n = self.NAME.name
if self.VARIANT:
n = n + '.' + self.VARIANT.name
if self.SIMPLE:
return n + '(' + repr(self.value) + ')'
else:
return n + ' ' + repr(self._as_dict())
def _as_dict(self):
raise Exception('Subclass responsibility')
class Enumeration(SchemaEntity):
VARIANTS = None
def __init__(self):
raise Exception('Cannot create instance of Enumeration')
@classmethod
def _set_schema(cls, rootns, module_path, name, schema, _variant, _enumeration):
cls.ROOTNS = rootns
cls.SCHEMA = schema
cls.MODULE_PATH = module_path
cls.NAME = name
cls.VARIANTS = []
for (n, d) in schema[0]:
n = Symbol(n)
c = pretty_subclass(Definition, module_path_str(module_path + (name,)), n.name)
c._set_schema(rootns, module_path, name, d, n, cls)
cls.VARIANTS.append((n, c))
@classmethod
def try_decode(cls, v):
for (n, c) in cls.VARIANTS:
i = c.try_decode(v)
if i is not None: return i
return None
def _encode(self):
raise Exception('Cannot encode instance of Enumeration')
class Definition(SchemaEntity):
SIMPLE = False
FIELD_NAMES = []
VARIANT = None
ENUMERATION = None
def __init__(self, *args):
self._fields = args
if self.SIMPLE:
self.value = args[0]
else:
i = 0
for k in self.FIELD_NAMES:
setattr(self, k, args[i])
i = i + 1
def _accept(self, visitor):
if self.VARIANT is None:
return visitor(*self._fields)
else:
return visitor[self.VARIANT.name](*self._fields)
@classmethod
def _set_schema(cls, rootns, module_path, name, schema, variant, enumeration):
cls.ROOTNS = rootns
cls.SCHEMA = schema
cls.MODULE_PATH = module_path
cls.NAME = name
cls.SIMPLE = is_simple_pattern(schema)
cls.FIELD_NAMES = []
cls.VARIANT = variant
cls.ENUMERATION = enumeration
gather_defined_field_names(schema, cls.FIELD_NAMES)
@classmethod
def try_decode(cls, v):
if cls.SIMPLE:
i = cls.parse(cls.SCHEMA, v, [])
if i is not None: return cls(i)
else:
args = []
if cls.parse(cls.SCHEMA, v, args) is not None: return cls(*args)
return None
def _encode(self):
raise Exception('Not yet implemented')
def _as_dict(self):
return dict((k, getattr(self, k)) for k in self.FIELD_NAMES)
def __getitem__(self, name):
return getattr(self, name)
def __setitem__(self, name, value):
return setattr(self, name, value)
def module_path_str(mp):
return '.'.join([e.name for e in mp])
SIMPLE_PATTERN_KEYS = [ATOM, EMBEDDED, LIT, SEQOF, SETOF, DICTOF, REF]
def is_simple_pattern(p):
return p == ANY or (isinstance(p, Record) and p.key in SIMPLE_PATTERN_KEYS)
def gather_defined_field_names(s, acc):
if is_simple_pattern(s):
pass
elif isinstance(s, tuple):
for p in s:
gather_defined_field_names(p, acc)
elif s.key == NAMED:
acc.append(s[0].name)
gather_defined_field_names(s[1], acc)
elif s.key == AND:
gather_defined_field_names(s[0], acc)
elif s.key == REC:
gather_defined_field_names(s[0], acc)
gather_defined_field_names(s[1], acc)
elif s.key == TUPLE:
gather_defined_field_names(s[0], acc)
elif s.key == TUPLE_PREFIX:
gather_defined_field_names(s[0], acc)
gather_defined_field_names(s[1], acc)
elif s.key == DICT:
gather_defined_field_names(tuple(s[0].values()), acc)
else:
raise Exception('Bad schema')
def pretty_subclass(C, module_name, class_name):
class S(C): pass
S.__module__ = module_name
S.__name__ = class_name
S.__qualname__ = class_name
return S
def lookup(ns, module_path, name):
for e in module_path:
if e not in ns:
definition_not_found(module_path, name)
ns = ns[e]
if name not in ns:
definition_not_found(module_path, name)
return ns[name]
def definition_not_found(module_path, name):
raise Exception('Definition not found: ' + module_path_str(module_path + (name,)))
class Namespace:
def __init__(self, prefix):
super(Namespace, self).__setattr__('_items', {})
super(Namespace, self).__setattr__('_prefix', prefix)
def __getattr__(self, name):
return self[name]
def __setattr__(self, name, value):
self[name] = value
def __getitem__(self, name):
return self._items[Symbol(name)]
def __setitem__(self, name, value):
name = Symbol(name)
if name in self._items:
raise Exception('Name conflict: ' + module_path_str(self._prefix + (name.name,)))
self._items[name] = value
def __contains__(self, name):
return Symbol(name) in self._items
def __repr__(self):
return repr(self._items)
class Compiler:
def __init__(self):
self.root = Namespace(())
def load(self, filename):
filename = pathlib.Path(filename)
with open(filename, 'rb') as f:
x = Decoder(f.read()).next()
if x.key == SCHEMA:
self.load_schema((Symbol(filename.stem),), x)
elif x.key == BUNDLE:
for (p, s) in x[0].items():
self.load_schema(p, s)
def load_schema(self, module_path, schema):
if schema[0][VERSION] != 1:
raise Exception('Unsupported Schema version')
ns = self.root
for e in module_path:
if not e in ns:
ns[e] = Namespace(ns._prefix + (e,))
ns = ns[e]
for (n, d) in schema[0][DEFINITIONS].items():
if isinstance(d, Record) and d.key == OR:
superclass = Enumeration
else:
superclass = Definition
c = pretty_subclass(superclass, module_path_str(module_path), n.name)
c._set_schema(self.root, module_path, n, d, None, None)
ns[n] = c
if __name__ == '__main__':
c = Compiler()
schema_bin_filename = pathlib.Path(__file__).parent / '../../../schema/schema.bin'
c.load(schema_bin_filename)
with open(schema_bin_filename, 'rb') as f:
x = Decoder(f.read()).next()
print(c.root.schema.Schema.decode(x))
print()
d = Compiler()
path_bin_filename = pathlib.Path(__file__).parent / '../../../path/path.bin'
d.load(path_bin_filename)
with open(path_bin_filename, 'rb') as f:
x = Decoder(f.read()).next()
print(c.root.schema.Schema.decode(x))
print()
print(d.root)