Explain why parsing failed

This commit is contained in:
Tony Garnock-Jones 2022-06-08 16:22:10 +02:00
parent d5a8440a9e
commit 8b8da80f61
1 changed files with 95 additions and 61 deletions

View File

@ -32,6 +32,49 @@ VERSION = Symbol('version')
def sequenceish(x):
return isinstance(x, tuple) or isinstance(x, list)
class SchemaDecodeFailed(ValueError):
def __init__(self, cls, p, v, failures=None):
super().__init__()
self.cls = cls
self.pattern = p
self.value = v
self.failures = [] if failures is None else failures
def __str__(self):
b = ExplanationBuilder()
return f'Could not decode {b.truncated(stringify(self.value))} using {self.cls}' + \
b.explain(self)
class ExplanationBuilder:
INDENT = 2
def __init__(self):
self.indentLevel = self.INDENT
self.deepest_failure = (-1, None)
def truncated(self, s):
return s[:36] + ' ...' if len(s) > 40 else s
def explain(self, failure):
tree = self._tree(failure)
deepest = self.deepest_failure[1]
if deepest is None:
return tree
else:
return f'\nMost likely reason: {self._node(deepest)}\nFull explanation: {tree}'
def _node(self, failure):
pexp = ' matching' if failure.pattern is None else f' {stringify(failure.pattern)} didn\'t match'
c = failure.cls.__module__ + '.' + failure.cls.__qualname__
return f'in {c}:{pexp} {self.truncated(stringify(failure.value))}'
def _tree(self, failure):
if self.indentLevel >= self.deepest_failure[0]:
self.deepest_failure = (self.indentLevel, failure)
self.indentLevel += self.INDENT
nested = [self._tree(f) for f in failure.failures]
self.indentLevel -= self.INDENT
return '\n' + ' ' * self.indentLevel + self._node(failure) + ''.join(nested)
class SchemaObject:
ROOTNS = None
SCHEMA = None
@ -41,14 +84,14 @@ class SchemaObject:
@classmethod
def decode(cls, v):
i = cls.try_decode(v)
if i is None:
raise ValueError('Could not decode ' + str(cls))
return i
raise NotImplementedError('Subclass responsibility')
@classmethod
def try_decode(cls, v):
raise NotImplementedError('Subclass responsibility')
try:
return cls.decode(v)
except SchemaDecodeFailed:
return None
@classmethod
def parse(cls, p, v, args):
@ -56,7 +99,7 @@ class SchemaObject:
return v
if p.key == NAMED:
i = cls.parse(p[1], v, args)
if i is not None: args.append(i)
args.append(i)
return i
if p.key == ATOM:
k = p[0]
@ -67,74 +110,63 @@ class SchemaObject:
if k == STRING and isinstance(v, str): return v
if k == BYTE_STRING and isinstance(v, bytes): return v
if k == SYMBOL and isinstance(v, Symbol): return v
return None
raise SchemaDecodeFailed(cls, p, v)
if p.key == EMBEDDED:
if not isinstance(v, Embedded): return None
if not isinstance(v, Embedded): raise SchemaDecodeFailed(cls, p, v)
return v.embeddedValue
if p.key == LIT:
if v == p[0]: return ()
return None
raise SchemaDecodeFailed(cls, p, v)
if p.key == SEQOF:
if not sequenceish(v): return None
vv = []
for w in v:
ww = cls.parse(p[0], w, args)
if ww is None: return None
vv.append(ww)
return vv
if not sequenceish(v): raise SchemaDecodeFailed(cls, p, v)
return [cls.parse(p[0], w, args) for w in v]
if p.key == SETOF:
if not isinstance(v, set): return None
vv = set()
for w in v:
ww = cls.parse(p[0], w, args)
if ww is None: return None
vv.add(ww)
return vv
if not isinstance(v, set): raise SchemaDecodeFailed(cls, p, v)
return set(cls.parse(p[0], w, args) for w in v)
if p.key == DICTOF:
if not isinstance(v, dict): return None
dd = {}
for (k, w) in v.items():
kk = cls.parse(p[0], k, args)
if kk is None: return None
ww = cls.parse(p[1], w, args)
if ww is None: return None
dd[kk] = ww
return dd
if not isinstance(v, dict): raise SchemaDecodeFailed(cls, p, v)
return dict((cls.parse(p[0], k, args), cls.parse(p[1], w, args))
for (k, w) in v.items())
if p.key == REF:
c = lookup(cls.ROOTNS, cls.MODULE_PATH if len(p[0]) == 0 else p[0], p[1])
return c.try_decode(v)
failure = None
try:
return c.decode(v)
except SchemaDecodeFailed as exn:
failure = exn
raise SchemaDecodeFailed(cls, p, v, [failure])
if p.key == REC:
if not isinstance(v, Record): return None
if cls.parse(p[0], v.key, args) is None: return None
if cls.parse(p[1], v.fields, args) is None: return None
if not isinstance(v, Record): raise SchemaDecodeFailed(cls, p, v)
cls.parse(p[0], v.key, args)
cls.parse(p[1], v.fields, args)
return ()
if p.key == TUPLE:
if not sequenceish(v): return None
if len(v) != len(p[0]): return None
if not sequenceish(v): raise SchemaDecodeFailed(cls, p, v)
if len(v) != len(p[0]): raise SchemaDecodeFailed(cls, p, v)
i = 0
for pp in p[0]:
if cls.parse(pp, v[i], args) is None: return None
cls.parse(pp, v[i], args)
i = i + 1
return ()
if p.key == TUPLE_PREFIX:
if not sequenceish(v): return None
if len(v) < len(p[0]): return None
if not sequenceish(v): raise SchemaDecodeFailed(cls, p, v)
if len(v) < len(p[0]): raise SchemaDecodeFailed(cls, p, v)
i = 0
for pp in p[0]:
if cls.parse(pp, v[i], args) is None: return None
cls.parse(pp, v[i], args)
i = i + 1
if cls.parse(p[1], v[i:], args) is None: return None
cls.parse(p[1], v[i:], args)
return ()
if p.key == DICT:
if not isinstance(v, dict): return None
if len(v) < len(p[0]): return None
if not isinstance(v, dict): raise SchemaDecodeFailed(cls, p, v)
if len(v) < len(p[0]): raise SchemaDecodeFailed(cls, p, v)
for (k, pp) in compare.sorted_items(p[0]):
if k not in v: return None
if cls.parse(pp, v[k], args) is None: return None
if k not in v: raise SchemaDecodeFailed(cls, p, v)
cls.parse(pp, v[k], args)
return ()
if p.key == AND:
for pp in p[0]:
if cls.parse(pp, v, args) is None: return None
cls.parse(pp, v, args)
return ()
raise ValueError(f'Bad schema {p}')
@ -176,11 +208,15 @@ class Enumeration(SchemaObject):
safesetattr(cls, n.name, c)
@classmethod
def try_decode(cls, v):
def decode(cls, v):
failures = None
for (n, c) in cls.VARIANTS:
i = c.try_decode(v)
if i is not None: return i
return None
try:
return c.decode(v)
except SchemaDecodeFailed as failure:
if failures is None: failures = []
failures.append(failure)
raise SchemaDecodeFailed(cls, None, v, failures)
def __preserve__(self):
raise TypeError('Cannot encode instance of Enumeration')
@ -267,19 +303,17 @@ class Definition(SchemaObject):
cls.SAFE_FIELD_NAMES = [safeattrname(n) for n in cls.FIELD_NAMES]
@classmethod
def try_decode(cls, v):
def decode(cls, v):
if cls.SIMPLE:
i = cls.parse(cls.SCHEMA, v, [])
if i is not None:
if cls.EMPTY:
return cls()
else:
return cls(i)
if cls.EMPTY:
return cls()
else:
return cls(i)
else:
args = []
if cls.parse(cls.SCHEMA, v, args) is not None:
return cls(*args)
return None
cls.parse(cls.SCHEMA, v, args)
return cls(*args)
def __preserve__(self):
if self.SIMPLE: