Remove placeholders from spec and implementations 4/5

Update Python implementation: remove placeholders; reject zero-length
streamed binary chunks.
This commit is contained in:
Tony Garnock-Jones 2020-05-28 23:21:18 +02:00
parent 0b0709b615
commit 83dce41092
3 changed files with 32 additions and 61 deletions

View File

@ -223,7 +223,7 @@ class BinaryStream(Stream):
minor = 2
def _emit(self, encoder):
for chunk in self._iterator:
if not isinstance(chunk, bytes):
if not isinstance(chunk, bytes) or len(chunk) == 0:
raise EncodeError('Illegal chunk in BinaryStream %r' % (chunk,))
encoder.append(chunk)
@ -302,11 +302,10 @@ def annotate(v, *anns):
return v
class Decoder(Codec):
def __init__(self, packet=b'', placeholders={}, include_annotations=False):
def __init__(self, packet=b'', include_annotations=False):
super(Decoder, self).__init__()
self.packet = packet
self.index = 0
self.placeholders = placeholders
self.include_annotations = include_annotations
def extend(self, data):
@ -361,11 +360,14 @@ class Decoder(Codec):
def binarystream(self, minor):
result = []
while not self.peekend():
chunk = self.next()
chunk = strip_annotations(self.next())
if isinstance(chunk, bytes):
result.append(chunk)
if len(chunk) > 0:
result.append(chunk)
else:
raise DecodeError('Empty binary chunks are forbidden')
else:
raise DecodeError('Unexpected non-binary chunk')
raise DecodeError('Unexpected non-binary chunk', chunk, isinstance(chunk, bytes), type(chunk))
return self.decodebinary(minor, b''.join(result))
def valuestream(self, minor):
@ -420,11 +422,7 @@ class Decoder(Codec):
return self.unshift_annotation(a, v)
raise DecodeError('Invalid format A encoding')
elif minor == 1:
n = self.wirelength(arg)
v = self.placeholders.get(n, None)
if v is None:
raise DecodeError('Invalid Preserves placeholder')
return self.wrap(v)
raise DecodeError('Invalid format A encoding')
elif minor == 2:
t = arg >> 2
n = arg & 3
@ -452,17 +450,16 @@ class Decoder(Codec):
self.index = start
return None
def decode(bs, placeholders={}):
return Decoder(packet=bs, placeholders=placeholders).next()
def decode(bs):
return Decoder(packet=bs).next()
def decode_with_annotations(bs, placeholders={}):
return Decoder(packet=bs, placeholders=placeholders, include_annotations=True).next()
def decode_with_annotations(bs):
return Decoder(packet=bs, include_annotations=True).next()
class Encoder(Codec):
def __init__(self, placeholders={}):
def __init__(self):
super(Encoder, self).__init__()
self.buffer = bytearray()
self.placeholders = placeholders
def contents(self):
return bytes(self.buffer)
@ -508,13 +505,7 @@ class Encoder(Codec):
self.leadbyte(3, 3, 15)
def append(self, v):
try:
placeholder = self.placeholders.get(v, None)
except TypeError: ## some types (e.g. list) yield 'unhashable type'
placeholder = None
if placeholder is not None:
self.header(0, 1, placeholder)
elif hasattr(v, '__preserve_on__'):
if hasattr(v, '__preserve_on__'):
v.__preserve_on__(self)
elif v is False:
self.leadbyte(0, 0, 0)
@ -552,7 +543,7 @@ class Encoder(Codec):
raise EncodeError('Cannot encode %r' % (v,))
self.encodestream(2, 1, i)
def encode(v, placeholders={}):
e = Encoder(placeholders=placeholders)
def encode(v):
e = Encoder()
e.append(v)
return e.contents()

View File

@ -33,18 +33,10 @@ def _varint(v):
return e.contents()
def _d(bs):
return decode(bs, placeholders={
0: Symbol('discard'),
1: Symbol('capture'),
2: Symbol('observe'),
})
return decode(bs)
def _e(v):
return encode(v, placeholders={
Symbol('discard'): 0,
Symbol('capture'): 1,
Symbol('observe'): 2,
})
return encode(v)
def _R(k, *args):
return Record(Symbol(k), args)
@ -77,11 +69,6 @@ class CodecTests(unittest.TestCase):
self.assertEqual(_varint(300), _buf(0b10101100, 0b00000010))
self.assertEqual(_varint(1000000000), _buf(128, 148, 235, 220, 3))
def test_shorts(self):
self._roundtrip(_R('capture', _R('discard')), _buf(0x82, 0x11, 0x81, 0x10))
self._roundtrip(_R('observe', _R('speak', _R('discard'), _R('capture', _R('discard')))),
_buf(0x82, 0x12, 0x83, 0x75, "speak", 0x81, 0x10, 0x82, 0x11, 0x81, 0x10))
def test_simple_seq(self):
self._roundtrip([1,2,3,4], _buf(0x94, 0x31, 0x32, 0x33, 0x34), back=(1,2,3,4))
self._roundtrip(SequenceStream([1,2,3,4]), _buf(0x29, 0x31, 0x32, 0x33, 0x34, 0x04),
@ -92,15 +79,14 @@ class CodecTests(unittest.TestCase):
self._roundtrip(u'hello', _buf(0x55, 'hello'))
self._roundtrip(StringStream([b'he', b'llo']), _buf(0x25, 0x62, 'he', 0x63, 'llo', 0x04),
back=u'hello')
## TODO: error with zero-size chunks
self._roundtrip(StringStream([b'he', b'll', b'', b'', b'o']),
_buf(0x25, 0x62, 'he', 0x62, 'll', 0x60, 0x60, 0x61, 'o', 0x04),
self._roundtrip(StringStream([b'he', b'll', b'o']),
_buf(0x25, 0x62, 'he', 0x62, 'll', 0x61, 'o', 0x04),
back=u'hello')
self._roundtrip(BinaryStream([b'he', b'll', b'', b'', b'o']),
_buf(0x26, 0x62, 'he', 0x62, 'll', 0x60, 0x60, 0x61, 'o', 0x04),
self._roundtrip(BinaryStream([b'he', b'll', b'o']),
_buf(0x26, 0x62, 'he', 0x62, 'll', 0x61, 'o', 0x04),
back=b'hello')
self._roundtrip(SymbolStream([b'he', b'll', b'', b'', b'o']),
_buf(0x27, 0x62, 'he', 0x62, 'll', 0x60, 0x60, 0x61, 'o', 0x04),
self._roundtrip(SymbolStream([b'he', b'll', b'o']),
_buf(0x27, 0x62, 'he', 0x62, 'll', 0x61, 'o', 0x04),
back=Symbol(u'hello'))
def test_mixed1(self):
@ -270,12 +256,7 @@ class CommonTestSuite(unittest.TestCase):
'../../../tests/samples.bin'), 'rb') as f:
samples = Decoder(f.read(), include_annotations=True).next()
TestCases = Record.makeConstructor('TestCases', 'mapping cases')
ExpectedPlaceholderMapping = Record.makeConstructor('ExpectedPlaceholderMapping', 'table')
m = TestCases._mapping(samples.peel()).strip()
placeholders_decode = ExpectedPlaceholderMapping._table(m)
placeholders_encode = dict((v,k) for (k,v) in placeholders_decode.items())
TestCases = Record.makeConstructor('TestCases', 'cases')
tests = TestCases._cases(samples.peel()).peel()
for (tName0, t0) in tests.items():
@ -294,25 +275,24 @@ class CommonTestSuite(unittest.TestCase):
self.assertIsInstance(e, DecodeError)
self.assertNotIsInstance(e, ShortPacket)
install_exn_test(locals(), tName, t[0].strip(), expected_err)
elif t.key == Symbol('DecodeShort'):
elif t.key in [Symbol('DecodeShort'), Symbol('DecodeEOF')]:
def expected_short(self, e):
self.assertIsInstance(e, ShortPacket)
install_exn_test(locals(), tName, t[0].strip(), expected_short)
elif t.key == Symbol('ParseError') or \
t.key == Symbol('ParseShort'):
elif t.key in [Symbol('ParseError'), Symbol('ParseShort'), Symbol('ParseEOF')]:
# Skipped for now, until we have an implementation of text syntax
pass
else:
raise Exception('Unsupported test kind', t.key)
def DS(self, bs):
return decode(bs, placeholders=self.placeholders_decode)
return decode(bs)
def D(self, bs):
return decode_with_annotations(bs, placeholders=self.placeholders_decode)
return decode_with_annotations(bs)
def E(self, v):
return encode(v, placeholders=self.placeholders_encode)
return encode(v)
class RecordTests(unittest.TestCase):
def test_getters(self):

View File

@ -5,7 +5,7 @@ except ImportError:
setup(
name="preserves",
version="0.2.0",
version="0.3.0",
author="Tony Garnock-Jones",
author_email="tonyg@leastfixedpoint.com",
license="Apache Software License",