2023-03-16 23:07:27 +00:00
|
|
|
"""The [preserves.text][] module implements the [Preserves human-readable text
|
|
|
|
syntax](https://preserves.dev/preserves-text.html).
|
|
|
|
|
|
|
|
The main entry points are functions [stringify][preserves.text.stringify],
|
|
|
|
[parse][preserves.text.parse], and
|
|
|
|
[parse_with_annotations][preserves.text.parse_with_annotations].
|
|
|
|
|
|
|
|
```python
|
|
|
|
>>> stringify(Record(Symbol('hi'), [1, [2, 3]]))
|
|
|
|
'<hi 1 [2 3]>'
|
|
|
|
>>> parse('<hi 1 [2 3]>')
|
|
|
|
#hi(1, (2, 3))
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
"""
|
2023-03-16 16:51:19 +00:00
|
|
|
|
2021-08-17 12:04:38 +00:00
|
|
|
import numbers
|
|
|
|
import struct
|
|
|
|
import base64
|
2022-11-06 21:27:01 +00:00
|
|
|
import math
|
2021-08-17 12:04:38 +00:00
|
|
|
|
|
|
|
from .values import *
|
|
|
|
from .error import *
|
|
|
|
from .compat import basestring_, unichr_
|
|
|
|
from .binary import Decoder
|
|
|
|
|
2023-03-16 16:51:19 +00:00
|
|
|
class TextCodec(object):
|
|
|
|
pass
|
2021-08-17 12:04:38 +00:00
|
|
|
|
2022-11-06 21:27:01 +00:00
|
|
|
NUMBER_RE = re.compile(r'^([-+]?\d+)(((\.\d+([eE][-+]?\d+)?)|([eE][-+]?\d+))([fF]?))?$')
|
|
|
|
|
2021-08-17 12:04:38 +00:00
|
|
|
class Parser(TextCodec):
|
2023-03-16 23:07:27 +00:00
|
|
|
"""Parser for the human-readable Preserves text syntax.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
input_buffer (str):
|
|
|
|
initial contents of the input buffer; may subsequently be extended by calling
|
|
|
|
[extend][preserves.text.Parser.extend].
|
|
|
|
|
|
|
|
include_annotations (bool):
|
|
|
|
if `True`, wrap each value and subvalue in an
|
|
|
|
[Annotated][preserves.values.Annotated] object.
|
|
|
|
|
|
|
|
parse_embedded:
|
|
|
|
function accepting a `Value` and returning a possibly-decoded form of that value
|
|
|
|
suitable for placing into an [Embedded][preserves.values.Embedded] object.
|
|
|
|
|
|
|
|
Normal usage is to supply input text, and keep calling [next][preserves.text.Parser.next]
|
|
|
|
until a [ShortPacket][preserves.error.ShortPacket] exception is raised:
|
|
|
|
|
|
|
|
```python
|
|
|
|
>>> d = Parser('123 "hello" @x []')
|
|
|
|
>>> d.next()
|
|
|
|
123
|
|
|
|
>>> d.next()
|
|
|
|
'hello'
|
|
|
|
>>> d.next()
|
|
|
|
()
|
|
|
|
>>> d.next()
|
|
|
|
Traceback (most recent call last):
|
|
|
|
...
|
|
|
|
preserves.error.ShortPacket: Short input buffer
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
Alternatively, keep calling [try_next][preserves.text.Parser.try_next] until it yields
|
|
|
|
`None`, which is not in the domain of Preserves `Value`s:
|
|
|
|
|
|
|
|
```python
|
|
|
|
>>> d = Parser('123 "hello" @x []')
|
|
|
|
>>> d.try_next()
|
|
|
|
123
|
|
|
|
>>> d.try_next()
|
|
|
|
'hello'
|
|
|
|
>>> d.try_next()
|
|
|
|
()
|
|
|
|
>>> d.try_next()
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
For convenience, [Parser][preserves.text.Parser] implements the iterator interface,
|
|
|
|
backing it with [try_next][preserves.text.Parser.try_next], so you can simply iterate
|
|
|
|
over all complete values in an input:
|
|
|
|
|
|
|
|
```python
|
|
|
|
>>> d = Parser('123 "hello" @x []')
|
|
|
|
>>> list(d)
|
|
|
|
[123, 'hello', ()]
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
```python
|
|
|
|
>>> for v in Parser('123 "hello" @x []'):
|
|
|
|
... print(repr(v))
|
|
|
|
123
|
|
|
|
'hello'
|
|
|
|
()
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
Supply `include_annotations=True` to read annotations alongside the annotated values:
|
|
|
|
|
|
|
|
```python
|
|
|
|
>>> d = Parser('123 "hello" @x []', include_annotations=True)
|
|
|
|
>>> list(d)
|
|
|
|
[123, 'hello', @#x ()]
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
If you are incrementally reading from, say, a socket, you can use
|
|
|
|
[extend][preserves.text.Parser.extend] to add new input as if comes available:
|
|
|
|
|
|
|
|
```python
|
|
|
|
>>> d = Parser('123 "he')
|
|
|
|
>>> d.try_next()
|
|
|
|
123
|
|
|
|
>>> d.try_next() # returns None because the input is incomplete
|
|
|
|
>>> d.extend('llo"')
|
|
|
|
>>> d.try_next()
|
|
|
|
'hello'
|
|
|
|
>>> d.try_next()
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
Attributes:
|
|
|
|
input_buffer (str): buffered input waiting to be processed
|
|
|
|
index (int): read position within `input_buffer`
|
|
|
|
|
|
|
|
"""
|
2023-03-16 16:51:19 +00:00
|
|
|
|
2021-08-17 18:06:52 +00:00
|
|
|
def __init__(self, input_buffer=u'', include_annotations=False, parse_embedded=lambda x: x):
|
2021-08-17 12:04:38 +00:00
|
|
|
super(Parser, self).__init__()
|
|
|
|
self.input_buffer = input_buffer
|
|
|
|
self.index = 0
|
|
|
|
self.include_annotations = include_annotations
|
|
|
|
self.parse_embedded = parse_embedded
|
|
|
|
|
|
|
|
def extend(self, text):
|
2023-03-16 23:07:27 +00:00
|
|
|
"""Appends `text` to the remaining contents of `self.input_buffer`, trimming already-processed
|
|
|
|
text from the front of `self.input_buffer` and resetting `self.index` to zero."""
|
2021-08-17 12:04:38 +00:00
|
|
|
self.input_buffer = self.input_buffer[self.index:] + text
|
|
|
|
self.index = 0
|
|
|
|
|
|
|
|
def _atend(self):
|
|
|
|
return self.index >= len(self.input_buffer)
|
|
|
|
|
|
|
|
def peek(self):
|
|
|
|
if self._atend():
|
|
|
|
raise ShortPacket('Short input buffer')
|
|
|
|
return self.input_buffer[self.index]
|
|
|
|
|
|
|
|
def skip(self):
|
|
|
|
self.index = self.index + 1
|
|
|
|
|
|
|
|
def nextchar(self):
|
|
|
|
c = self.peek()
|
|
|
|
self.skip()
|
|
|
|
return c
|
|
|
|
|
2023-11-01 13:45:58 +00:00
|
|
|
def skip_whitespace(self, skip_commas = False):
|
2021-08-17 12:04:38 +00:00
|
|
|
while not self._atend():
|
|
|
|
c = self.peek()
|
2023-11-01 13:45:58 +00:00
|
|
|
if not (c.isspace() or (skip_commas and c == ',')):
|
2021-08-17 12:04:38 +00:00
|
|
|
break
|
|
|
|
self.skip()
|
|
|
|
|
|
|
|
def comment_line(self):
|
|
|
|
s = []
|
|
|
|
while True:
|
|
|
|
c = self.nextchar()
|
|
|
|
if c == '\r' or c == '\n':
|
|
|
|
return self.wrap(u''.join(s))
|
|
|
|
s.append(c)
|
|
|
|
|
|
|
|
def read_stringlike(self, terminator, hexescape, hexescaper):
|
|
|
|
acc = []
|
|
|
|
while True:
|
|
|
|
c = self.nextchar()
|
|
|
|
if c == terminator:
|
|
|
|
return u''.join(acc)
|
|
|
|
if c == '\\':
|
|
|
|
c = self.nextchar()
|
|
|
|
if c == hexescape: hexescaper(acc)
|
|
|
|
elif c == terminator or c == '\\' or c == '/': acc.append(c)
|
|
|
|
elif c == 'b': acc.append(u'\x08')
|
|
|
|
elif c == 'f': acc.append(u'\x0c')
|
|
|
|
elif c == 'n': acc.append(u'\x0a')
|
|
|
|
elif c == 'r': acc.append(u'\x0d')
|
|
|
|
elif c == 't': acc.append(u'\x09')
|
|
|
|
else: raise DecodeError('Invalid escape code')
|
|
|
|
else:
|
|
|
|
acc.append(c)
|
|
|
|
|
|
|
|
def hexnum(self, count):
|
|
|
|
v = 0
|
|
|
|
for i in range(count):
|
|
|
|
c = self.nextchar().lower()
|
|
|
|
if c >= '0' and c <= '9':
|
|
|
|
v = v << 4 | (ord(c) - ord('0'))
|
|
|
|
elif c >= 'a' and c <= 'f':
|
|
|
|
v = v << 4 | (ord(c) - ord('a') + 10)
|
|
|
|
else:
|
|
|
|
raise DecodeError('Bad hex escape')
|
|
|
|
return v
|
|
|
|
|
|
|
|
def read_string(self, delimiter):
|
|
|
|
def u16_escape(acc):
|
|
|
|
n1 = self.hexnum(4)
|
2023-10-29 20:30:54 +00:00
|
|
|
if n1 >= 0xd800 and n1 <= 0xdfff:
|
|
|
|
if n1 >= 0xdc00:
|
|
|
|
raise DecodeError('Bad first half of surrogate pair')
|
2021-08-17 12:04:38 +00:00
|
|
|
ok = True
|
|
|
|
ok = ok and self.nextchar() == '\\'
|
|
|
|
ok = ok and self.nextchar() == 'u'
|
|
|
|
if not ok:
|
|
|
|
raise DecodeError('Missing second half of surrogate pair')
|
|
|
|
n2 = self.hexnum(4)
|
|
|
|
if n2 >= 0xdc00 and n2 <= 0xdfff:
|
|
|
|
n = ((n1 - 0xd800) << 10) + (n2 - 0xdc00) + 0x10000
|
|
|
|
acc.append(unichr_(n))
|
|
|
|
else:
|
|
|
|
raise DecodeError('Bad second half of surrogate pair')
|
|
|
|
else:
|
|
|
|
acc.append(unichr_(n1))
|
|
|
|
return self.read_stringlike(delimiter, 'u', u16_escape)
|
|
|
|
|
|
|
|
def read_literal_binary(self):
|
|
|
|
s = self.read_stringlike('"', 'x', lambda acc: acc.append(unichr_(self.hexnum(2))))
|
|
|
|
return s.encode('latin-1')
|
|
|
|
|
|
|
|
def read_hex_binary(self):
|
|
|
|
acc = bytearray()
|
|
|
|
while True:
|
|
|
|
self.skip_whitespace()
|
|
|
|
if self.peek() == '"':
|
|
|
|
self.skip()
|
|
|
|
return bytes(acc)
|
|
|
|
acc.append(self.hexnum(2))
|
|
|
|
|
|
|
|
def read_base64_binary(self):
|
|
|
|
acc = []
|
|
|
|
while True:
|
|
|
|
self.skip_whitespace()
|
|
|
|
c = self.nextchar()
|
|
|
|
if c == ']':
|
|
|
|
acc.append(u'====')
|
|
|
|
return base64.b64decode(u''.join(acc))
|
|
|
|
if c == '-': c = '+'
|
|
|
|
if c == '_': c = '/'
|
|
|
|
if c == '=': continue
|
|
|
|
acc.append(c)
|
|
|
|
|
2022-11-06 21:27:01 +00:00
|
|
|
def read_hex_float(self, bytecount):
|
|
|
|
if self.nextchar() != '"':
|
|
|
|
raise DecodeError('Missing open-double-quote in hex-encoded floating-point number')
|
|
|
|
bs = self.read_hex_binary()
|
|
|
|
if len(bs) != bytecount:
|
|
|
|
raise DecodeError('Incorrect number of bytes in hex-encoded floating-point number')
|
|
|
|
if bytecount == 4: return Float.from_bytes(bs)
|
|
|
|
if bytecount == 8: return struct.unpack('>d', bs)[0]
|
|
|
|
raise DecodeError('Unsupported byte count in hex-encoded floating-point number')
|
|
|
|
|
2023-11-01 13:45:58 +00:00
|
|
|
def upto(self, delimiter, skip_commas):
|
2021-08-17 12:04:38 +00:00
|
|
|
vs = []
|
|
|
|
while True:
|
2023-11-01 13:45:58 +00:00
|
|
|
self.skip_whitespace(skip_commas)
|
2021-08-17 12:04:38 +00:00
|
|
|
if self.peek() == delimiter:
|
|
|
|
self.skip()
|
|
|
|
return tuple(vs)
|
|
|
|
vs.append(self.next())
|
|
|
|
|
2023-10-29 18:27:29 +00:00
|
|
|
def read_set(self):
|
2023-11-01 13:45:58 +00:00
|
|
|
items = self.upto('}', True)
|
2023-10-29 18:27:29 +00:00
|
|
|
s = set()
|
|
|
|
for i in items:
|
|
|
|
if i in s: raise DecodeError('Duplicate value in set: ' + repr(i))
|
|
|
|
s.add(i)
|
|
|
|
return frozenset(s)
|
|
|
|
|
2021-08-17 12:04:38 +00:00
|
|
|
def read_dictionary(self):
|
|
|
|
acc = []
|
|
|
|
while True:
|
2023-11-01 13:45:58 +00:00
|
|
|
self.skip_whitespace(True)
|
2021-08-17 12:04:38 +00:00
|
|
|
if self.peek() == '}':
|
|
|
|
self.skip()
|
|
|
|
return ImmutableDict.from_kvs(acc)
|
|
|
|
acc.append(self.next())
|
|
|
|
self.skip_whitespace()
|
|
|
|
if self.nextchar() != ':':
|
|
|
|
raise DecodeError('Missing expected key/value separator')
|
|
|
|
acc.append(self.next())
|
|
|
|
|
2023-10-29 20:04:52 +00:00
|
|
|
def require_delimiter(self, prefix):
|
|
|
|
if not self.delimiter_follows():
|
|
|
|
raise DecodeError('Delimiter must follow ' + prefix)
|
|
|
|
|
|
|
|
def delimiter_follows(self):
|
|
|
|
if self._atend(): return True
|
|
|
|
c = self.peek()
|
|
|
|
return c.isspace() or c in '(){}[]<>";,@#:|'
|
|
|
|
|
2022-11-06 21:27:01 +00:00
|
|
|
def read_raw_symbol_or_number(self, acc):
|
2023-10-29 20:04:52 +00:00
|
|
|
while not self.delimiter_follows():
|
|
|
|
acc.append(self.nextchar())
|
2022-11-06 21:27:01 +00:00
|
|
|
acc = u''.join(acc)
|
|
|
|
m = NUMBER_RE.match(acc)
|
|
|
|
if m:
|
|
|
|
if m[2] is None:
|
|
|
|
return int(m[1])
|
|
|
|
elif m[7] == '':
|
|
|
|
return float(m[1] + m[3])
|
|
|
|
else:
|
|
|
|
return Float(float(m[1] + m[3]))
|
|
|
|
else:
|
|
|
|
return Symbol(acc)
|
2021-08-17 12:04:38 +00:00
|
|
|
|
|
|
|
def wrap(self, v):
|
|
|
|
return Annotated(v) if self.include_annotations else v
|
|
|
|
|
2023-10-15 13:11:27 +00:00
|
|
|
def unshift_annotation(self, a, v):
|
|
|
|
if self.include_annotations:
|
|
|
|
# TODO: this will end up O(n^2) for multiple annotations in a row
|
|
|
|
v.annotations.insert(0, a)
|
|
|
|
return v
|
|
|
|
|
2021-08-17 12:04:38 +00:00
|
|
|
def next(self):
|
2023-03-16 23:07:27 +00:00
|
|
|
"""Reads the next complete `Value` from the internal buffer, raising
|
|
|
|
[ShortPacket][preserves.error.ShortPacket] if too few bytes are available, or
|
|
|
|
[DecodeError][preserves.error.DecodeError] if the input is invalid somehow.
|
|
|
|
|
|
|
|
"""
|
2021-08-17 12:04:38 +00:00
|
|
|
self.skip_whitespace()
|
|
|
|
c = self.peek()
|
|
|
|
if c == '"':
|
|
|
|
self.skip()
|
|
|
|
return self.wrap(self.read_string('"'))
|
|
|
|
if c == '|':
|
|
|
|
self.skip()
|
|
|
|
return self.wrap(Symbol(self.read_string('|')))
|
2023-10-15 13:11:27 +00:00
|
|
|
if c == '@':
|
|
|
|
self.skip()
|
|
|
|
return self.unshift_annotation(self.next(), self.next())
|
|
|
|
if c == ';':
|
|
|
|
raise DecodeError('Semicolon is reserved syntax')
|
2021-08-17 12:04:38 +00:00
|
|
|
if c == ':':
|
|
|
|
raise DecodeError('Unexpected key/value separator between items')
|
|
|
|
if c == '#':
|
|
|
|
self.skip()
|
|
|
|
c = self.nextchar()
|
2023-10-15 13:11:27 +00:00
|
|
|
if c in ' \t': return self.unshift_annotation(self.comment_line(), self.next())
|
|
|
|
if c in '\n\r': return self.unshift_annotation('', self.next())
|
2023-10-29 20:04:52 +00:00
|
|
|
if c == 'f': self.require_delimiter('#f'); return self.wrap(False)
|
|
|
|
if c == 't': self.require_delimiter('#t'); return self.wrap(True)
|
2023-10-29 18:27:29 +00:00
|
|
|
if c == '{': return self.wrap(self.read_set())
|
2021-08-17 12:04:38 +00:00
|
|
|
if c == '"': return self.wrap(self.read_literal_binary())
|
|
|
|
if c == 'x':
|
2022-11-06 21:27:01 +00:00
|
|
|
c = self.nextchar()
|
|
|
|
if c == '"': return self.wrap(self.read_hex_binary())
|
|
|
|
if c == 'f': return self.wrap(self.read_hex_float(4))
|
|
|
|
if c == 'd': return self.wrap(self.read_hex_float(8))
|
|
|
|
raise DecodeError('Invalid #x syntax')
|
2021-08-17 12:04:38 +00:00
|
|
|
if c == '[': return self.wrap(self.read_base64_binary())
|
|
|
|
if c == '!':
|
2021-08-17 18:06:52 +00:00
|
|
|
if self.parse_embedded is None:
|
|
|
|
raise DecodeError('No parse_embedded function supplied')
|
|
|
|
return self.wrap(Embedded(self.parse_embedded(self.next())))
|
2021-08-17 12:04:38 +00:00
|
|
|
raise DecodeError('Invalid # syntax')
|
|
|
|
if c == '<':
|
|
|
|
self.skip()
|
2023-11-01 13:45:58 +00:00
|
|
|
vs = self.upto('>', False)
|
2021-08-17 12:04:38 +00:00
|
|
|
if len(vs) == 0:
|
|
|
|
raise DecodeError('Missing record label')
|
|
|
|
return self.wrap(Record(vs[0], vs[1:]))
|
|
|
|
if c == '[':
|
|
|
|
self.skip()
|
2023-11-01 13:45:58 +00:00
|
|
|
return self.wrap(self.upto(']', True))
|
2021-08-17 12:04:38 +00:00
|
|
|
if c == '{':
|
|
|
|
self.skip()
|
|
|
|
return self.wrap(self.read_dictionary())
|
2023-11-01 13:45:58 +00:00
|
|
|
if c in '>]},':
|
2021-08-17 12:04:38 +00:00
|
|
|
raise DecodeError('Unexpected ' + c)
|
|
|
|
self.skip()
|
2022-11-06 21:27:01 +00:00
|
|
|
return self.wrap(self.read_raw_symbol_or_number([c]))
|
2021-08-17 12:04:38 +00:00
|
|
|
|
|
|
|
def try_next(self):
|
2023-03-16 23:07:27 +00:00
|
|
|
"""Like [next][preserves.text.Parser.next], but returns `None` instead of raising
|
|
|
|
[ShortPacket][preserves.error.ShortPacket]."""
|
2021-08-17 12:04:38 +00:00
|
|
|
start = self.index
|
|
|
|
try:
|
|
|
|
return self.next()
|
|
|
|
except ShortPacket:
|
|
|
|
self.index = start
|
|
|
|
return None
|
|
|
|
|
2021-08-17 18:06:52 +00:00
|
|
|
def __iter__(self):
|
|
|
|
return self
|
|
|
|
|
|
|
|
def __next__(self):
|
|
|
|
v = self.try_next()
|
|
|
|
if v is None:
|
|
|
|
raise StopIteration
|
|
|
|
return v
|
|
|
|
|
2023-03-16 23:07:27 +00:00
|
|
|
def parse(text, **kwargs):
|
|
|
|
"""Yields the first complete encoded value from `text`, passing `kwargs` through to the
|
|
|
|
[Parser][preserves.text.Parser] constructor. Raises exceptions as per
|
|
|
|
[next][preserves.text.Parser.next].
|
|
|
|
|
|
|
|
Args:
|
|
|
|
text (str): encoded data to decode
|
|
|
|
|
|
|
|
"""
|
|
|
|
return Parser(input_buffer=text, **kwargs).next()
|
2021-08-17 12:04:38 +00:00
|
|
|
|
|
|
|
def parse_with_annotations(bs, **kwargs):
|
2023-03-16 23:07:27 +00:00
|
|
|
"""Like [parse][preserves.text.parse], but supplying `include_annotations=True` to the
|
|
|
|
[Parser][preserves.text.Parser] constructor."""
|
2021-08-17 12:04:38 +00:00
|
|
|
return Parser(input_buffer=bs, include_annotations=True, **kwargs).next()
|
|
|
|
|
|
|
|
class Formatter(TextCodec):
|
2023-03-16 23:07:27 +00:00
|
|
|
"""Printer (and indenting pretty-printer) for producing human-readable syntax from
|
|
|
|
Preserves `Value`s.
|
|
|
|
|
|
|
|
```python
|
|
|
|
>>> f = Formatter()
|
|
|
|
>>> f.append({'a': 1, 'b': 2})
|
|
|
|
>>> f.append(Record(Symbol('label'), ['field1', ['field2item1', 'field2item2']]))
|
|
|
|
>>> print(f.contents())
|
|
|
|
{"a": 1 "b": 2} <label "field1" ["field2item1" "field2item2"]>
|
|
|
|
|
|
|
|
>>> f = Formatter(indent=4)
|
|
|
|
>>> f.append({'a': 1, 'b': 2})
|
|
|
|
>>> f.append(Record(Symbol('label'), ['field1', ['field2item1', 'field2item2']]))
|
|
|
|
>>> print(f.contents())
|
|
|
|
{
|
|
|
|
"a": 1
|
|
|
|
"b": 2
|
|
|
|
}
|
|
|
|
<label "field1" [
|
|
|
|
"field2item1"
|
|
|
|
"field2item2"
|
|
|
|
]>
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
Args:
|
|
|
|
format_embedded:
|
|
|
|
function accepting an [Embedded][preserves.values.Embedded].embeddedValue and
|
|
|
|
returning a `Value` for serialization.
|
|
|
|
|
|
|
|
indent (int | None):
|
|
|
|
`None` disables indented pretty-printing; otherwise, an `int` specifies indentation
|
|
|
|
per nesting-level.
|
|
|
|
|
|
|
|
with_commas (bool):
|
|
|
|
`True` causes commas to separate sequence and set items and dictionary entries;
|
|
|
|
`False` omits commas.
|
|
|
|
|
|
|
|
trailing_comma (bool):
|
|
|
|
`True` causes a comma to be printed *after* the final item or entry in a sequence,
|
|
|
|
set or dictionary; `False` omits this trailing comma
|
|
|
|
|
|
|
|
include_annotations (bool):
|
|
|
|
`True` causes annotations to be included in the output; `False` causes them to be
|
|
|
|
omitted.
|
|
|
|
|
|
|
|
Attributes:
|
|
|
|
indent_delta (int): indentation per nesting-level
|
|
|
|
chunks (list[str]): fragments of output
|
|
|
|
"""
|
2022-02-11 10:38:29 +00:00
|
|
|
def __init__(self,
|
|
|
|
format_embedded=lambda x: x,
|
|
|
|
indent=None,
|
|
|
|
with_commas=False,
|
2023-03-16 19:55:49 +00:00
|
|
|
trailing_comma=False,
|
|
|
|
include_annotations=True):
|
2021-08-17 12:04:38 +00:00
|
|
|
super(Formatter, self).__init__()
|
2022-01-12 01:02:36 +00:00
|
|
|
self.indent_delta = 0 if indent is None else indent
|
|
|
|
self.indent_distance = 0
|
2023-03-16 23:07:27 +00:00
|
|
|
self.nesting = 0
|
2022-02-11 10:38:29 +00:00
|
|
|
self.with_commas = with_commas
|
|
|
|
self.trailing_comma = trailing_comma
|
2021-08-17 12:04:38 +00:00
|
|
|
self.chunks = []
|
2021-08-17 18:06:52 +00:00
|
|
|
self._format_embedded = format_embedded
|
2023-03-16 19:55:49 +00:00
|
|
|
self.include_annotations = include_annotations
|
2021-08-17 18:06:52 +00:00
|
|
|
|
|
|
|
def format_embedded(self, v):
|
|
|
|
if self._format_embedded is None:
|
|
|
|
raise EncodeError('No format_embedded function supplied')
|
|
|
|
return self._format_embedded(v)
|
2021-08-17 12:04:38 +00:00
|
|
|
|
|
|
|
def contents(self):
|
2023-03-16 23:07:27 +00:00
|
|
|
"""Returns a `str` constructed from the join of the chunks in `self.chunks`."""
|
2021-08-17 12:04:38 +00:00
|
|
|
return u''.join(self.chunks)
|
|
|
|
|
2022-01-12 01:02:36 +00:00
|
|
|
def is_indenting(self):
|
2023-03-16 23:07:27 +00:00
|
|
|
"""Returns `True` iff this [Formatter][preserves.text.Formatter] is in pretty-printing
|
|
|
|
indenting mode."""
|
2022-01-12 01:02:36 +00:00
|
|
|
return self.indent_delta > 0
|
|
|
|
|
|
|
|
def write_indent(self):
|
|
|
|
if self.is_indenting():
|
|
|
|
self.chunks.append('\n' + ' ' * self.indent_distance)
|
|
|
|
|
|
|
|
def write_indent_space(self):
|
|
|
|
if self.is_indenting():
|
|
|
|
self.write_indent()
|
|
|
|
else:
|
|
|
|
self.chunks.append(' ')
|
|
|
|
|
2021-08-17 12:04:38 +00:00
|
|
|
def write_stringlike_char(self, c):
|
|
|
|
if c == '\\': self.chunks.append('\\\\')
|
|
|
|
elif c == '\x08': self.chunks.append('\\b')
|
|
|
|
elif c == '\x0c': self.chunks.append('\\f')
|
|
|
|
elif c == '\x0a': self.chunks.append('\\n')
|
|
|
|
elif c == '\x0d': self.chunks.append('\\r')
|
|
|
|
elif c == '\x09': self.chunks.append('\\t')
|
|
|
|
else: self.chunks.append(c)
|
|
|
|
|
2022-01-12 01:02:36 +00:00
|
|
|
def write_seq(self, opener, closer, vs, appender):
|
|
|
|
vs = list(vs)
|
|
|
|
itemcount = len(vs)
|
2021-08-17 12:04:38 +00:00
|
|
|
self.chunks.append(opener)
|
2022-01-12 01:02:36 +00:00
|
|
|
if itemcount == 0:
|
|
|
|
pass
|
|
|
|
elif itemcount == 1:
|
|
|
|
appender(vs[0])
|
|
|
|
else:
|
|
|
|
self.indent_distance = self.indent_distance + self.indent_delta
|
|
|
|
self.write_indent()
|
|
|
|
appender(vs[0])
|
|
|
|
for v in vs[1:]:
|
2022-02-11 10:38:29 +00:00
|
|
|
if self.with_commas: self.chunks.append(',')
|
2022-01-12 01:02:36 +00:00
|
|
|
self.write_indent_space()
|
|
|
|
appender(v)
|
|
|
|
self.indent_distance = self.indent_distance - self.indent_delta
|
2022-02-11 10:38:29 +00:00
|
|
|
if self.trailing_comma: self.chunks.append(',')
|
2022-01-12 01:02:36 +00:00
|
|
|
self.write_indent()
|
2021-08-17 12:04:38 +00:00
|
|
|
self.chunks.append(closer)
|
|
|
|
|
|
|
|
def append(self, v):
|
2023-03-16 23:07:27 +00:00
|
|
|
"""Extend `self.chunks` with at least one chunk, together making up the text
|
|
|
|
representation of `v`."""
|
|
|
|
if self.chunks and self.nesting == 0:
|
|
|
|
self.write_indent_space()
|
|
|
|
try:
|
|
|
|
self.nesting += 1
|
|
|
|
self._append(v)
|
|
|
|
finally:
|
|
|
|
self.nesting -= 1
|
|
|
|
|
|
|
|
def _append(self, v):
|
2021-08-17 18:06:52 +00:00
|
|
|
v = preserve(v)
|
2021-08-17 12:04:38 +00:00
|
|
|
if hasattr(v, '__preserve_write_text__'):
|
|
|
|
v.__preserve_write_text__(self)
|
|
|
|
elif v is False:
|
|
|
|
self.chunks.append('#f')
|
|
|
|
elif v is True:
|
|
|
|
self.chunks.append('#t')
|
|
|
|
elif isinstance(v, float):
|
2022-11-06 21:27:01 +00:00
|
|
|
if math.isnan(v) or math.isinf(v):
|
|
|
|
self.chunks.append('#xd"' + struct.pack('>d', v).hex() + '"')
|
|
|
|
else:
|
|
|
|
self.chunks.append(repr(v))
|
2021-08-17 12:04:38 +00:00
|
|
|
elif isinstance(v, numbers.Number):
|
|
|
|
self.chunks.append('%d' % (v,))
|
|
|
|
elif isinstance(v, bytes):
|
|
|
|
self.chunks.append('#[%s]' % (base64.b64encode(v).decode('ascii'),))
|
|
|
|
elif isinstance(v, basestring_):
|
|
|
|
self.chunks.append('"')
|
|
|
|
for c in v:
|
|
|
|
if c == '"': self.chunks.append('\\"')
|
|
|
|
else: self.write_stringlike_char(c)
|
|
|
|
self.chunks.append('"')
|
|
|
|
elif isinstance(v, list):
|
2023-03-16 23:07:27 +00:00
|
|
|
self.write_seq('[', ']', v, self._append)
|
2021-08-17 12:04:38 +00:00
|
|
|
elif isinstance(v, tuple):
|
2023-03-16 23:07:27 +00:00
|
|
|
self.write_seq('[', ']', v, self._append)
|
2021-08-17 12:04:38 +00:00
|
|
|
elif isinstance(v, set):
|
2023-03-16 23:07:27 +00:00
|
|
|
self.write_seq('#{', '}', v, self._append)
|
2021-08-17 12:04:38 +00:00
|
|
|
elif isinstance(v, frozenset):
|
2023-03-16 23:07:27 +00:00
|
|
|
self.write_seq('#{', '}', v, self._append)
|
2021-08-17 12:04:38 +00:00
|
|
|
elif isinstance(v, dict):
|
2022-01-12 01:02:36 +00:00
|
|
|
def append_kv(kv):
|
2023-03-16 23:07:27 +00:00
|
|
|
self._append(kv[0])
|
2021-08-17 12:04:38 +00:00
|
|
|
self.chunks.append(': ')
|
2023-03-16 23:07:27 +00:00
|
|
|
self._append(kv[1])
|
2022-01-12 01:02:36 +00:00
|
|
|
self.write_seq('{', '}', v.items(), append_kv)
|
2021-08-17 12:04:38 +00:00
|
|
|
else:
|
|
|
|
try:
|
|
|
|
i = iter(v)
|
|
|
|
except TypeError:
|
2021-08-19 02:56:50 +00:00
|
|
|
i = None
|
|
|
|
if i is None:
|
2022-01-11 05:29:18 +00:00
|
|
|
self.cannot_format(v)
|
|
|
|
else:
|
2023-03-16 23:07:27 +00:00
|
|
|
self.write_seq('[', ']', i, self._append)
|
2022-01-11 05:29:18 +00:00
|
|
|
|
|
|
|
def cannot_format(self, v):
|
|
|
|
raise TypeError('Cannot preserves-format: ' + repr(v))
|
2021-08-17 12:04:38 +00:00
|
|
|
|
|
|
|
def stringify(v, **kwargs):
|
2023-03-16 23:07:27 +00:00
|
|
|
"""Convert a single `Value` `v` to a string. Any supplied `kwargs` are passed on to the
|
|
|
|
underlying [Formatter][preserves.text.Formatter] constructor."""
|
2021-08-17 12:04:38 +00:00
|
|
|
e = Formatter(**kwargs)
|
|
|
|
e.append(v)
|
|
|
|
return e.contents()
|