Compare commits

..

No commits in common. "main" and "v0.2.0" have entirely different histories.
main ... v0.2.0

45 changed files with 368 additions and 1475 deletions

11
.envrc
View File

@ -1,8 +1,3 @@
if ! [ -d .venv ]
then
python3 -m venv .venv
. .venv/bin/activate
pip install -e '.[dev]'
else
. .venv/bin/activate
fi
[ -d pyenv ] || virtualenv -p python3 pyenv
. pyenv/bin/activate
pip install -r requirements.txt

5
.gitignore vendored
View File

@ -4,6 +4,5 @@ htmlcov/
build/
dist/
*.egg-info/
.eggs/
.venv/
/preserves
pyenv/
/preserves

View File

@ -1,5 +1,3 @@
PACKAGEVERSION := $(shell ./print-package-version)
all:
clean:
@ -8,28 +6,12 @@ clean:
rm -f .coverage
rm -rf *.egg-info build dist
tag:
git tag v$(PACKAGEVERSION)
publish: clean build
# sudo apt install python3-wheel twine
publish: build
twine upload dist/*
build: build-only
build-only: dist/syndicate-py-$(PACKAGEVERSION).tar.gz
dist/syndicate-py-$(PACKAGEVERSION).tar.gz:
python3 -m build
build: clean
python3 setup.py sdist bdist_wheel
veryclean: clean
rm -rf .venv
PROTOCOLS_BRANCH=main
pull-protocols:
git subtree pull -P syndicate/protocols \
-m 'Merge latest changes from the syndicate-protocols repository' \
git@git.syndicate-lang.org:syndicate-lang/syndicate-protocols \
$(PROTOCOLS_BRANCH)
chat.bin: chat.prs
preserves-schemac .:chat.prs > $@
rm -rf pyenv

View File

@ -8,13 +8,9 @@ or
git clone https://git.syndicate-lang.org/syndicate-lang/syndicate-py
cd syndicate-py
python -m venv .venv
. .venv/bin/activate
pip install -e '.[dev]'
See also
[syndicate-py-packaging](https://git.syndicate-lang.org/syndicate-lang/syndicate-py-packaging)
for Debian packaging scripts.
virtualenv -p python3 pyenv
. pyenv/bin/activate
pip install -r requirements.txt
## Running
@ -23,10 +19,10 @@ Start a Syndicate broker (such as
Find the line of broker output giving the root capability:
... rootcap=<ref {oid: "syndicate" sig: #x"69ca300c1dbfa08fba692102dd82311a"}> ...
... rootcap=<ref "syndicate" [] #x"a6480df5306611ddd0d3882b546e1977"> ...
Then, run [chat.py](chat.py) several times in several separate windows:
python chat.py \
--address '<tcp "localhost" 8001>' \
--cap '<ref {oid: "syndicate" sig: #x"69ca300c1dbfa08fba692102dd82311a"}>'
--cap '<ref "syndicate" [] #x"a6480df5306611ddd0d3882b546e1977">'

View File

@ -3,7 +3,7 @@ import argparse
import asyncio
import random
import syndicate
from syndicate import patterns as P, actor, dataspace, Record, Embedded, turn
from syndicate import patterns as P, actor, dataspace, Record, Embedded
from syndicate.during import Handler
from syndicate.schema import sturdy
@ -11,10 +11,10 @@ parser = argparse.ArgumentParser(description='Test bidirectional object referenc
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--address', metavar='\'<tcp "HOST" PORT>\'',
help='transport address of the server',
default='<ws "ws://localhost:9001/">')
default='<ws "ws://localhost:8001/">')
parser.add_argument('--cap', metavar='\'<ref ...>\'',
help='capability for the dataspace on the server',
default='<ref "syndicate" [] #[acowDB2/oI+6aSEC3YIxGg==]>')
default='<ref "syndicate" [] #[pkgN9TBmEd3Q04grVG4Zdw==]>')
parser.add_argument('--start',
help='make this instance kick off the procedure',
action='store_true')
@ -41,65 +41,46 @@ args = parser.parse_args()
#
# ----Three()--->
#
# Here's a trace from a live session of this running against syndicate-rs:
#
# B --> server: [[1, <assert <Boot #:⌜141/402:00007f3e50021ef0⌝> 3>]]
#
# A --> server: [[1, <assert <Observe <rec Boot [<bind <_>>]> #:⌜151/422:00007f3e50025090⌝> 3>]]
# A <-- server: [[1, <assert [#:⌜141/402:00007f3e50021ef0⌝] 633>]]
# A --> server: [[2, <assert <One #:⌜151/422:00007f3e5c009b00⌝> 5>]]
#
# B <-- server: [[1, <assert <One #:⌜151/422:00007f3e5c009b00⌝> 643>]]
# B --> server: [[1, <retract 3>], [2, <assert <Two> 5>]]
#
# A <-- server: [[2, <assert <Two> 653>]]
# A <-- server: [[1, <retract 633>]]
# A --> server: [[2, <message <Three>>]]
#
# B <-- server: [[1, <message <Three>>]]
#
Boot = Record.makeConstructor('Boot', 'b')
One = Record.makeConstructor('One', 'a')
Two = Record.makeConstructor('Two', '')
Three = Record.makeConstructor('Three', '')
@actor.run_system(name = 'bidi-gc', debug = False)
def main():
root_facet = turn.active_facet()
def main(turn):
root_facet = turn._facet
@syndicate.relay.connect(args.address, sturdy.SturdyRef.decode(syndicate.parse(args.cap)),
@syndicate.relay.connect(turn, args.address, sturdy.SturdyRef.decode(syndicate.parse(args.cap)),
on_disconnected = lambda _relay, _did_connect: sys.exit(1))
def on_connected(ds):
def on_connected(turn, ds):
if args.start:
# We are "A".
@dataspace.observe(ds, P.rec('Boot', P.CAPTURE))
@dataspace.observe(turn, ds, P.rec('Boot', P.CAPTURE))
@Handler().add_handler
def on_b(b):
def on_b(turn, b):
print('A got B', b)
@Handler().add_handler
def a(two):
def a(turn, two):
print('A got assertion:', two)
turn.send(b.embeddedValue, Three())
def on_two_retracted():
def on_two_retracted(turn):
print('Assertion', two, 'from B went')
turn.retract(one_handle)
return on_two_retracted
one_handle = turn.publish(b.embeddedValue, One(Embedded(turn.ref(a))))
return lambda: print('B\'s Boot record went')
return lambda turn: print('B\'s Boot record went')
else:
# We are "B".
@Handler().add_handler
def b(one):
def b(turn, one):
print('B got assertion:', one)
print('boot_handle =', boot_handle)
turn.retract(boot_handle)
turn.publish(One._a(one).embeddedValue, Two())
return lambda: print('B facet stopping')
return lambda turn: print('B facet stopping')
@b.msg_handler
def b_msg(three):
print('B got message:', three)
def b_msg(turn, three):
print('B got message: ', three)
boot_handle = turn.publish(ds, Boot(Embedded(turn.ref(b))))

View File

@ -1 +0,0 @@
´³bundle·µ³chat„´³schema·³version°³ definitions·³Says´³rec´³lit³Says„´³tupleµ´³named³who´³atom³String„„´³named³what´³atom³String„„„„„³Present´³rec´³lit³Present„´³tupleµ´³named³username´³atom³String„„„„„„³ embeddedType€„„„„

View File

@ -1,4 +0,0 @@
version 1 .
Present = <Present @username string>.
Says = <Says @who string @what string>.

37
chat.py
View File

@ -3,50 +3,45 @@ import argparse
import asyncio
import random
import syndicate
from syndicate import patterns as P, actor, dataspace, turn
from syndicate.schema import sturdy
from preserves.schema import load_schema_file
simpleChatProtocol = load_schema_file('./chat.bin').chat
from syndicate import patterns as P, actor, dataspace
from syndicate.schema import simpleChatProtocol, sturdy
parser = argparse.ArgumentParser(description='Simple dataspace-server-mediated text chat.',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--address', metavar='\'<tcp "HOST" PORT>\'',
help='transport address of the server',
default='<ws "ws://localhost:9001/">')
default='<ws "ws://localhost:8001/">')
parser.add_argument('--cap', metavar='\'<ref ...>\'',
help='capability for the dataspace on the server',
default='<ref {oid: "syndicate" sig: #[acowDB2/oI+6aSEC3YIxGg==]}>')
default='<ref "syndicate" [] #[pkgN9TBmEd3Q04grVG4Zdw==]>')
args = parser.parse_args()
Present = simpleChatProtocol.Present
Says = simpleChatProtocol.Says
@actor.run_system(name = 'chat', debug = False)
def main():
root_facet = turn.active_facet()
@syndicate.relay.connect(args.address, sturdy.SturdyRef.decode(syndicate.parse(args.cap)))
def on_connected(ds):
turn.on_stop(lambda: turn.stop(root_facet))
def main(turn):
root_facet = turn._facet
@syndicate.relay.connect(turn, args.address, sturdy.SturdyRef.decode(syndicate.parse(args.cap)))
def on_connected(turn, ds):
me = 'user_' + str(random.randint(10, 1000))
turn.publish(ds, Present(me))
@dataspace.during(ds, P.rec('Present', P.CAPTURE), inert_ok=True)
def on_presence(who):
@dataspace.during(turn, ds, P.rec('Present', P.CAPTURE), inert_ok=True)
def on_presence(turn, who):
print('%s joined' % (who,))
turn.on_stop(lambda: print('%s left' % (who,)))
turn.on_stop(lambda turn: print('%s left' % (who,)))
@dataspace.on_message(ds, P.rec('Says', P.CAPTURE, P.CAPTURE))
def on_says(who, what):
@dataspace.on_message(turn, ds, P.rec('Says', P.CAPTURE, P.CAPTURE))
def on_says(turn, who, what):
print('%s says %r' % (who, what))
@turn.linked_task()
async def accept_input(f):
reader = asyncio.StreamReader()
await f.loop.connect_read_pipe(lambda: asyncio.StreamReaderProtocol(reader), sys.stdin)
await actor.find_loop().connect_read_pipe(lambda: asyncio.StreamReaderProtocol(reader), sys.stdin)
while line := (await reader.readline()).decode('utf-8'):
turn.external(f, lambda: turn.send(ds, Says(me, line.strip())))
actor.Turn.external(f, lambda turn: turn.send(ds, Says(me, line.strip())))
actor.Turn.external(f, lambda turn: turn.stop(root_facet))

View File

@ -1,3 +0,0 @@
let ?root_ds = dataspace
<require-service <relay-listener <tcp "0.0.0.0" 9001> $gatekeeper>>
<bind <ref { oid: "syndicate" key: #x"" }> $root_ds #f>

8
inf.py
View File

@ -1,9 +1,9 @@
from syndicate import relay, turn
from syndicate import relay
from syndicate.during import During
import logging
@relay.service(name='inf', debug=True)
@During().add_handler
def main(args):
logging.info(f'in main {args}')
turn.on_stop(lambda: logging.info(f'args retracted {args}'))
def main(turn, args):
logging.info(f'in main {turn}, {args}')
turn.on_stop(lambda turn: logging.info(f'args retracted {args}'))

View File

@ -1,9 +0,0 @@
<require-service <daemon inf>>
<daemon inf {
argv: "python inf.py"
protocol: application/syndicate
}>
? <service-object <daemon inf> ?cap> [
$cap += =here-is-your-configuration!
$cap += =here-is-another-configuration!
]

32
ovlinfo.py Normal file
View File

@ -0,0 +1,32 @@
import sys
import asyncio
import random
import threading
import syndicate.mini.core as S
OverlayLink = S.Record.makeConstructor('OverlayLink', 'downNode upNode')
conn = S.Connection.from_url(sys.argv[1])
uplinks = {}
def add_uplink(turn, src, tgt):
uplinks[src] = tgt
summarise_uplinks()
def del_uplink(turn, src, tgt):
del uplinks[src]
summarise_uplinks()
def summarise_uplinks():
print(repr(uplinks))
with conn.turn() as t:
with conn.actor().react(t) as facet:
facet.add(S.Observe(OverlayLink(S.CAPTURE, S.CAPTURE)),
on_add=add_uplink,
on_del=del_uplink)
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(conn.reconnecting_main(loop))
loop.stop()
loop.run_forever()
loop.close()

View File

@ -1,5 +0,0 @@
#!/bin/sh
cd "$(dirname "$0")"
. ./.envrc
exec python -c \
'import tomllib; print(tomllib.load(open("pyproject.toml", "rb"))["project"]["version"])'

View File

@ -1,50 +0,0 @@
[project]
name = "syndicate-py"
version = "0.19.1"
description = "Syndicated Actor model and Syndicate network protocol for Python 3"
readme = "README.md"
requires-python = ">=3.6, <4"
license = {text = "GPL-3.0-or-later"}
authors = [
{name = "Tony Garnock-Jones", email = "tonyg@leastfixedpoint.com"},
]
classifiers = [
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"Topic :: Software Development :: Libraries :: Python Modules",
"License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)",
"Programming Language :: Python :: 3",
]
# "websockets" isn't listed here, but if you want to use relay.WebsocketTunnelRelay,
# you will need it.
dependencies = [
"preserves",
]
[project.urls]
Homepage = "https://git.syndicate-lang.org/syndicate-lang/syndicate-py"
Issues = "https://git.syndicate-lang.org/syndicate-lang/syndicate-py/issues"
[project.optional-dependencies]
dev = [
"build",
"twine",
]
[tool.setuptools]
packages = [
"syndicate",
"syndicate.protocols",
"syndicate.protocols.schemas",
]
[tool.setuptools.package-data]
"syndicate.protocols" = ["*.bin"]
"syndicate.protocols.schemas" = ["*.prs"]
[build-system]
requires = ["setuptools", "setuptools-scm"]
build-backend = "setuptools.build_meta"

2
requirements.txt Normal file
View File

@ -0,0 +1,2 @@
websockets
preserves

24
setup.py Normal file
View File

@ -0,0 +1,24 @@
try:
from setuptools import setup
except ImportError:
from distutils.core import setup
setup(
name="syndicate-py",
version="0.2.0",
author="Tony Garnock-Jones",
author_email="tonyg@leastfixedpoint.com",
license="GNU General Public License v3 or later (GPLv3+)",
classifiers=[
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"Topic :: Software Development :: Libraries :: Python Modules",
"License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)",
"Programming Language :: Python :: 3",
],
packages=["syndicate"],
url="https://git.syndicate-lang.org/syndicate-lang/syndicate-py",
description="Syndicated Actor model and Syndicate network protocol for Python 3",
install_requires=['websockets', 'preserves'],
python_requires=">=3.6, <4",
)

View File

@ -3,48 +3,4 @@ __path__ = __import__('pkgutil').extend_path(__path__, __name__)
# This is 'import *' in order to effectively re-export preserves as part of this module's API.
from preserves import *
def __setup():
from .actor import _active, Turn
from .metapy import staticproperty
from types import FunctionType
import sys
class turn:
@staticproperty
def active():
t = getattr(_active, 'turn', False)
if t is False:
t = _active.turn = None
return t
@staticproperty
def log():
return turn.active.log
def run(facet, action):
return Turn.run(facet, action)
def external(facet, action, loop=None):
return Turn.external(facet, action, loop=loop)
def active_facet():
return turn.active._facet
def install_definition(name, definition):
def handler(*args, **kwargs):
return definition(turn.active, *args, **kwargs)
setattr(turn, name, handler)
for (name, definition) in Turn.__dict__.items():
if name[0] == '_':
continue
elif type(definition) == FunctionType:
install_definition(name, definition)
else:
pass
return turn
turn = __setup()
from . import relay

View File

@ -3,13 +3,10 @@ import inspect
import logging
import sys
import traceback
import threading
from preserves import Embedded, preserve
from .idgen import IdGenerator
from .metapy import staticproperty
from .dataflow import Graph, Field
log = logging.getLogger(__name__)
@ -17,67 +14,31 @@ _next_actor_number = IdGenerator()
_next_handle = IdGenerator()
_next_facet_id = IdGenerator()
_active = threading.local()
_active.turn = None
# decorator
def run_system(**kwargs):
return lambda boot_proc: System().run(boot_proc, **kwargs)
return lambda boot_proc: start_actor_system(boot_proc, **kwargs)
class System:
def __init__(self, loop = None):
self.loop = loop or asyncio.get_event_loop()
self.inhabitant_count = 0
self.exit_signal = asyncio.Queue()
def start_actor_system(boot_proc, debug = False, name = None, configure_logging = True):
if configure_logging:
logging.basicConfig(level = logging.DEBUG if debug else logging.INFO)
loop = asyncio.get_event_loop()
if debug:
loop.set_debug(True)
queue_task(lambda: Actor(boot_proc, name = name), loop = loop)
loop.run_forever()
while asyncio.all_tasks(loop):
loop.stop()
loop.run_forever()
loop.close()
def run(self, boot_proc, debug = None, name = None, configure_logging = True):
if configure_logging:
logging.basicConfig(level = logging.DEBUG if debug else logging.INFO)
if debug:
self.loop.set_debug(True)
self.queue_task(lambda: Actor(boot_proc, system = self, name = name))
# From Python 3.12, we may be able to use:
# asyncio.run(self._run, debug=debug, loop_factory=lambda: self.loop)
# but until then:
with asyncio.Runner(debug=debug, loop_factory=lambda: self.loop) as r:
return r.run(self._run())
async def _run(self):
try:
await self.exit_signal.get()
except asyncio.CancelledError:
pass
finally:
log.debug('System._run main loop exit')
def adjust_engine_inhabitant_count(self, delta):
self.inhabitant_count = self.inhabitant_count + delta
if self.inhabitant_count == 0:
log.debug('Inhabitant count reached zero')
self.exit_signal.put_nowait(())
def queue_task(self, thunk):
async def task():
try:
await ensure_awaitable(thunk())
except asyncio.CancelledError:
pass
return self.loop.create_task(task())
def queue_task_threadsafe(self, thunk):
async def task():
try:
await ensure_awaitable(thunk())
except asyncio.CancelledError:
pass
return self.loop.call_soon_threadsafe(lambda: asyncio.run_coroutine_threadsafe(task(), self.loop))
async def ensure_awaitable(value):
if inspect.isawaitable(value):
return await value
else:
return value
def adjust_engine_inhabitant_count(delta):
loop = asyncio.get_running_loop()
if not hasattr(loop, '__syndicate_inhabitant_count'):
loop.__syndicate_inhabitant_count = 0
loop.__syndicate_inhabitant_count = loop.__syndicate_inhabitant_count + delta
if loop.__syndicate_inhabitant_count == 0:
log.debug('Inhabitant count reached zero')
loop.stop()
def remove_noerror(collection, item):
try:
@ -86,18 +47,16 @@ def remove_noerror(collection, item):
pass
class Actor:
def __init__(self, boot_proc, system, name = None, initial_assertions = {}, daemon = False):
def __init__(self, boot_proc, name = None, initial_assertions = {}, daemon = False):
self.name = name or 'a' + str(next(_next_actor_number))
self._system = system
self._daemon = daemon
if not daemon:
system.adjust_engine_inhabitant_count(1)
adjust_engine_inhabitant_count(1)
self.root = Facet(self, None)
self.outbound = initial_assertions or {}
self.exit_reason = None # None -> running, True -> terminated OK, exn -> error
self.exit_hooks = []
self._log = None
self._dataflow_graph = None
Turn.run(Facet(self, self.root, set(self.outbound.keys())), stop_if_inert_after(boot_proc))
def __repr__(self):
@ -111,7 +70,7 @@ class Actor:
def daemon(self, value):
if self._daemon != value:
self._daemon = value
self._system.adjust_engine_inhabitant_count(-1 if value else 1)
adjust_engine_inhabitant_count(-1 if value else 1)
@property
def alive(self):
@ -123,33 +82,27 @@ class Actor:
self._log = logging.getLogger('syndicate.Actor.%s' % (self.name,))
return self._log
@property
def dataflow_graph(self):
if self._dataflow_graph is None:
self._dataflow_graph = Graph()
return self._dataflow_graph
def at_exit(self, hook):
self.exit_hooks.append(hook)
def cancel_at_exit(self, hook):
remove_noerror(self.exit_hooks, hook)
def _repair_dataflow_graph(self):
if self._dataflow_graph is not None:
self._dataflow_graph.repair_damage(lambda a: a())
def _terminate(self, exit_reason):
def terminate(self, turn, exit_reason):
if self.exit_reason is not None: return
self.log.debug('Terminating %r with exit_reason %r', self, exit_reason)
self.exit_reason = exit_reason
if exit_reason != True:
self.log.error('crashed: %s' % (exit_reason,))
for h in self.exit_hooks:
h()
self.root._terminate(exit_reason == True)
if not self._daemon:
self._system.adjust_engine_inhabitant_count(-1)
h(turn)
def finish_termination():
Turn.run(self.root,
lambda turn: self.root._terminate(turn, exit_reason == True),
zombie_turn = True)
if not self._daemon:
adjust_engine_inhabitant_count(-1)
queue_task(finish_termination)
def _pop_outbound(self, handle, clear_from_source_facet):
e = self.outbound.pop(handle)
@ -161,10 +114,6 @@ class Actor:
return e
class Facet:
@staticproperty
def active():
return _active.turn._facet
def __init__(self, actor, parent, initial_handles=None):
self.id = next(_next_facet_id)
self.actor = actor
@ -177,17 +126,10 @@ class Facet:
self.linked_tasks = []
self.alive = True
self.inert_check_preventers = 0
self._log = None
@property
def log(self):
if self._log is None:
if self.parent is None:
p = self.actor.log
else:
p = self.parent.log
self._log = p.getChild(str(self.id))
return self._log
return self.actor.log
def _repr_labels(self):
pieces = []
@ -208,15 +150,6 @@ class Facet:
def cancel_on_stop(self, a):
remove_noerror(self.shutdown_actions, a)
def on_stop_or_crash(self, a):
def cleanup():
self.cancel_on_stop(cleanup)
self.actor.cancel_at_exit(cleanup)
a()
self.on_stop(cleanup)
self.actor.at_exit(cleanup)
return cleanup
def isinert(self):
return \
len(self.children) == 0 and \
@ -234,43 +167,27 @@ class Facet:
self.inert_check_preventers = self.inert_check_preventers - 1
return disarm
@property
def loop(self):
return self.actor._system.loop
def linked_task(self, coro_fn, run_in_executor=False):
def linked_task(self, coro_fn, loop = None):
task = None
if run_in_executor:
inner_coro_fn = coro_fn
async def outer_coro_fn(facet):
await self.loop.run_in_executor(None, lambda: inner_coro_fn(facet))
coro_fn = outer_coro_fn
@self.on_stop_or_crash
def cancel_linked_task():
def cancel_linked_task(turn):
nonlocal task
if task is not None:
remove_noerror(self.linked_tasks, task)
task.cancel()
task = None
self.cancel_on_stop(cancel_linked_task)
self.actor.cancel_at_exit(cancel_linked_task)
async def guarded_task():
should_terminate_facet = True
try:
if await coro_fn(self) is True:
should_terminate_facet = False
except asyncio.CancelledError:
pass
except:
import traceback
traceback.print_exc()
await coro_fn(self)
finally:
if should_terminate_facet:
Turn.external(self, lambda: Turn.active.stop())
else:
Turn.external(self, cancel_linked_task)
task = self.loop.create_task(guarded_task())
Turn.external(self, cancel_linked_task)
task = find_loop(loop).create_task(guarded_task())
self.linked_tasks.append(task)
self.on_stop(cancel_linked_task)
self.actor.at_exit(cancel_linked_task)
def _terminate(self, orderly):
def _terminate(self, turn, orderly):
if not self.alive: return
self.log.debug('%s terminating %r', 'orderly' if orderly else 'disorderly', self)
self.alive = False
@ -279,14 +196,12 @@ class Facet:
if parent:
parent.children.remove(self)
with ActiveFacet(self):
with ActiveFacet(turn, self):
for child in list(self.children):
child._terminate(orderly)
child._terminate(turn, orderly)
if orderly:
with ActiveFacet(self.parent or self):
for h in self.shutdown_actions:
h()
turn = Turn.active
for h in self.shutdown_actions:
h(turn)
for h in self.handles:
# Optimization: don't clear from source facet, the source facet is us and we're
# about to clear our handles in one fell swoop.
@ -296,13 +211,15 @@ class Facet:
if orderly:
if parent:
if parent.isinert():
parent._terminate(True)
Turn.run(parent, lambda turn: parent._terminate(turn, True))
else:
self.actor._terminate(True)
Turn.run(self.actor.root,
lambda turn: self.actor.terminate(turn, True),
zombie_turn = True)
class ActiveFacet:
def __init__(self, facet):
self.turn = Turn.active
def __init__(self, turn, facet):
self.turn = turn
self.outer_facet = None
self.inner_facet = facet
@ -315,17 +232,26 @@ class ActiveFacet:
self.turn._facet = self.outer_facet
self.outer_facet = None
async def ensure_awaitable(value):
if inspect.isawaitable(value):
return await value
else:
return value
def find_loop(loop = None):
return asyncio.get_running_loop() if loop is None else loop
class Turn:
@staticproperty
def active():
t = getattr(_active, 'turn', False)
if t is False:
t = _active.turn = None
return t
def queue_task(thunk, loop = None):
async def task():
await ensure_awaitable(thunk())
return find_loop(loop).create_task(task())
def queue_task_threadsafe(thunk, loop = None):
async def task():
await ensure_awaitable(thunk())
return asyncio.run_coroutine_threadsafe(task(), find_loop(loop))
class Turn:
@classmethod
def run(cls, facet, action, zombie_turn = False):
if not zombie_turn:
@ -333,27 +259,20 @@ class Turn:
if not facet.alive: return
turn = cls(facet)
try:
saved = Turn.active
_active.turn = turn
try:
action()
facet.actor._repair_dataflow_graph()
finally:
_active.turn = saved
action(turn)
except:
ei = sys.exc_info()
facet.log.error('%s', ''.join(traceback.format_exception(*ei)))
Turn.run(facet.actor.root, lambda: facet.actor._terminate(ei[1]))
Turn.run(facet.actor.root, lambda turn: facet.actor.terminate(turn, ei[1]))
else:
turn._deliver()
@classmethod
def external(cls, facet, action, loop = None):
return facet.actor._system.queue_task_threadsafe(lambda: cls.run(facet, action))
return queue_task_threadsafe(lambda: cls.run(facet, action), loop)
def __init__(self, facet):
self._facet = facet
self._system = facet.actor._system
self.queues = {}
@property
@ -366,69 +285,48 @@ class Turn:
# this actually can work as a decorator as well as a normal method!
def facet(self, boot_proc):
new_facet = Facet(self._facet.actor, self._facet)
with ActiveFacet(new_facet):
stop_if_inert_after(boot_proc)()
with ActiveFacet(self, new_facet):
stop_if_inert_after(boot_proc)(self)
return new_facet
def prevent_inert_check(self):
return self._facet.prevent_inert_check()
# decorator
def linked_task(self, **kwargs):
return lambda coro_fn: self._facet.linked_task(coro_fn, **kwargs)
def linked_task(self, loop = None):
return lambda coro_fn: self._facet.linked_task(coro_fn, loop = loop)
def stop(self, facet = None, continuation = None):
if facet is None:
facet = self._facet
if continuation is not None:
facet.on_stop(continuation)
facet._terminate(True)
def action(turn):
facet._terminate(turn, True)
if continuation is not None:
continuation(turn)
self._enqueue(facet.parent, action)
# can also be used as a decorator
def on_stop(self, a):
self._facet.on_stop(a)
# can also be used as a decorator
def on_stop_or_crash(self, a):
self._facet.on_stop_or_crash(a)
def spawn(self, boot_proc, name = None, initial_handles = None, daemon = False):
def action():
def action(turn):
new_outbound = {}
if initial_handles is not None:
for handle in initial_handles:
new_outbound[handle] = \
self._facet.actor._pop_outbound(handle, clear_from_source_facet=True)
self._system.queue_task(lambda: Actor(boot_proc,
system = self._system,
name = name,
initial_assertions = new_outbound,
daemon = daemon))
queue_task(lambda: Actor(boot_proc,
name = name,
initial_assertions = new_outbound,
daemon = daemon))
self._enqueue(self._facet, action)
def stop_actor(self):
self._enqueue(self._facet.actor.root, lambda: self._facet.actor._terminate(True))
self._enqueue(self._facet.actor.root, lambda turn: self._facet.actor.terminate(turn, True))
def crash(self, exn):
self._enqueue(self._facet.actor.root, lambda: self._facet.actor._terminate(exn))
def field(self, initial_value=None, name=None):
return Field(self._facet.actor.dataflow_graph, initial_value, name)
# can also be used as a decorator
def dataflow(self, a):
f = self._facet
f.prevent_inert_check()
def subject():
if not f.alive: return
with ActiveFacet(f):
a()
f.on_stop(lambda: f.actor.dataflow_graph.forget_subject(subject))
f.actor.dataflow_graph.with_subject(subject, lambda: subject())
def publish_dataflow(self, assertion_function):
endpoint = DataflowPublication(assertion_function)
self.dataflow(lambda: endpoint.update())
self._enqueue(self._facet.actor.root, lambda turn: self._facet.actor.terminate(turn, exn))
def publish(self, ref, assertion):
handle = next(_next_handle)
@ -442,10 +340,10 @@ class Turn:
e = OutboundAssertion(facet, handle, ref)
facet.actor.outbound[handle] = e
facet.handles.add(handle)
def action():
def action(turn):
e.established = True
self.log.debug('%r <-- publish %r handle %r', ref, assertion, handle)
ref.entity.on_publish(assertion, handle)
ref.entity.on_publish(turn, assertion, handle)
self._enqueue(ref.facet, action)
def retract(self, handle):
@ -455,99 +353,67 @@ class Turn:
self._retract(e)
def replace(self, ref, handle, assertion):
if assertion is None or ref is None:
new_handle = None
else:
new_handle = self.publish(ref, assertion)
new_handle = None if assertion is None else self.publish(ref, assertion)
self.retract(handle)
return new_handle
def _retract(self, e):
# Assumes e has already been removed from self._facet.actor.outbound and the
# appropriate set of handles
def action():
def action(turn):
if e.established:
e.established = False
self.log.debug('%r <-- retract handle %r', e.ref, e.handle)
e.ref.entity.on_retract(e.handle)
e.ref.entity.on_retract(turn, e.handle)
self._enqueue(e.ref.facet, action)
def sync(self, ref, k):
class SyncContinuation(Entity):
def on_message(self, _value):
k()
def on_message(self, turn, _value):
k(turn)
self._sync(ref, self.ref(SyncContinuation()))
def _sync(self, ref, peer):
peer = preserve(peer)
def action():
def action(turn):
self.log.debug('%r <-- sync peer %r', ref, peer)
ref.entity.on_sync(peer)
ref.entity.on_sync(turn, peer)
self._enqueue(ref.facet, action)
def send(self, ref, message):
# TODO: attenuation
message = preserve(message)
def action():
def action(turn):
self.log.debug('%r <-- message %r', ref, message)
ref.entity.on_message(message)
ref.entity.on_message(turn, message)
self._enqueue(ref.facet, action)
# decorator
def after(self, delay_seconds):
def decorate(action):
@self.linked_task()
async def task(facet):
await asyncio.sleep(delay_seconds)
Turn.external(facet, action)
return decorate
def _enqueue(self, target_facet, action):
target_actor = target_facet.actor
if target_actor not in self.queues:
self.queues[target_actor] = []
self.queues[target_actor].append((target_facet, action))
if target_facet not in self.queues:
self.queues[target_facet] = []
self.queues[target_facet].append(action)
def _deliver(self):
for (actor, q) in self.queues.items():
for (facet, q) in self.queues.items():
# Stupid python scoping bites again
def make_deliver_q(actor, q): # gratuitous
def deliver_q():
turn = Turn.active
saved_facet = turn._facet
for (facet, action) in q:
turn._facet = facet
action()
turn._facet = saved_facet
return lambda: Turn.run(actor.root, deliver_q)
self._system.queue_task(make_deliver_q(actor, q))
def make_deliver_q(facet, q): # gratuitous
def deliver_q(turn):
for action in q:
action(turn)
return lambda: Turn.run(facet, deliver_q)
queue_task(make_deliver_q(facet, q))
self.queues = {}
def stop_if_inert_after(action):
def wrapped_action():
turn = Turn.active
action()
def check_action():
def wrapped_action(turn):
action(turn)
def check_action(turn):
if (turn._facet.parent is not None and not turn._facet.parent.alive) \
or turn._facet.isinert():
turn.stop()
turn._enqueue(turn._facet, check_action)
return wrapped_action
class DataflowPublication:
def __init__(self, assertion_function):
self.assertion_function = assertion_function
self.handle = None
self.target = None
self.assertion = None
def update(self):
(next_target, next_assertion) = self.assertion_function() or (None, None)
if next_target != self.target or next_assertion != self.assertion_function:
self.target = next_target
self.assertion = next_assertion
self.handle = Turn.active.replace(self.target, self.handle, self.assertion)
class Ref:
def __init__(self, facet, entity):
self.facet = facet
@ -569,35 +435,27 @@ class OutboundAssertion:
# Can act as a mixin
class Entity:
def on_publish(self, v, handle):
def on_publish(self, turn, v, handle):
pass
def on_retract(self, handle):
def on_retract(self, turn, handle):
pass
def on_message(self, v):
def on_message(self, turn, v):
pass
def on_sync(self, peer):
Turn.active.send(peer, True)
def on_sync(self, turn, peer):
turn.send(peer, True)
_inert_actor = None
_inert_facet = None
_inert_ref = None
_inert_entity = Entity()
def __boot_inert():
def __boot_inert(turn):
global _inert_actor, _inert_facet, _inert_ref
_inert_actor = Turn.active._facet.actor
_inert_facet = Turn.active._facet
_inert_ref = Turn.active.ref(_inert_entity)
_inert_actor = turn._facet.actor
_inert_facet = turn._facet
_inert_ref = turn.ref(_inert_entity)
async def __run_inert():
Actor(__boot_inert, system = System(), name = '_inert_actor')
def __setup_inert():
def setup_main():
loop = asyncio.new_event_loop()
loop.run_until_complete(__run_inert())
loop.close()
t = threading.Thread(target=setup_main)
t.start()
t.join()
__setup_inert()
Actor(__boot_inert, name = '_inert_actor')
asyncio.get_event_loop().run_until_complete(__run_inert())

View File

@ -1,81 +0,0 @@
from . import mapset
class Graph:
def __init__(self):
self.edges_forward = {}
self.edges_reverse = {}
self.damaged_nodes = set()
self.active_subject = None
def with_subject(self, subject_id, f):
old_subject = self.active_subject
self.active_subject = subject_id
try:
return f()
finally:
self.active_subject = old_subject
def record_observation(self, object_id):
if self.active_subject is not None:
mapset.add(self.edges_forward, object_id, self.active_subject)
mapset.add(self.edges_reverse, self.active_subject, object_id)
def record_damage(self, object_id):
self.damaged_nodes.add(object_id)
def forget_subject(self, subject_id):
for oid in self.edges_reverse.pop(subject_id, set()):
mapset.discard(self.edges_forward, oid, subject_id)
def observers_of(self, object_id):
return list(self.edges_forward.get(object_id, []))
def repair_damage(self, repair_fn):
repaired_this_round = set()
while True:
workset = self.damaged_nodes - repaired_this_round
self.damaged_nodes = set()
if not workset:
break
repaired_this_round = repaired_this_round | workset
updated_subjects = set()
for object_id in workset:
for subject_id in self.observers_of(object_id):
if subject_id not in updated_subjects:
updated_subjects.add(subject_id)
self.forget_subject(subject_id)
self.with_subject(subject_id, lambda: repair_fn(subject_id))
__nextFieldId = 0
class Field:
def __init__(self, graph, initial=None, name=None):
global __nextFieldId
self.id = name
if self.id is None:
self.id = str(__nextFieldId)
__nextFieldId = __nextFieldId + 1
self.graph = graph
self._value = initial
@property
def value(self):
self.graph.record_observation(self)
return self._value
@value.setter
def value(self, new_value):
if self._value != new_value:
self.graph.record_damage(self)
self._value = new_value
@property
def update(self):
self.graph.record_damage(self)
return self.value
def changed(self):
self.graph.record_damage(self)

View File

@ -1,18 +1,17 @@
from .schema import dataspace
from .during import During
from . import turn
# decorator
def observe(ds, pattern):
def observe(turn, ds, pattern):
def publish_observer(entity):
turn.publish(ds, dataspace.Observe(pattern, turn.ref(entity)))
return entity
return publish_observer
# decorator
def on_message(ds, pattern, *args, **kwargs):
return lambda on_msg: observe(ds, pattern)(During(*args, **kwargs).msg_handler(on_msg))
def on_message(turn, ds, pattern, *args, **kwargs):
return lambda on_msg: observe(turn, ds, pattern)(During(*args, **kwargs).msg_handler(on_msg))
# decorator
def during(ds, pattern, *args, **kwargs):
return lambda on_add: observe(ds, pattern)(During(*args, **kwargs).add_handler(on_add))
def during(turn, ds, pattern, *args, **kwargs):
return lambda on_add: observe(turn, ds, pattern)(During(*args, **kwargs).add_handler(on_add))

View File

@ -1,9 +1,9 @@
from . import turn, actor
from . import actor
def _ignore(*args, **kwargs):
pass
def _default_sync(peer):
def _default_sync(turn, peer):
turn.send(peer, True)
class Handler(actor.Entity):
@ -27,21 +27,21 @@ class Handler(actor.Entity):
def _wrap_add_handler(self, handler):
return handler
def on_publish(self, v, handle):
retraction_handler = self._on_add(*self._wrap(v))
def on_publish(self, turn, v, handle):
retraction_handler = self._on_add(turn, *self._wrap(v))
if retraction_handler is not None:
self.retraction_handlers[handle] = retraction_handler
def on_retract(self, handle):
def on_retract(self, turn, handle):
retraction_handler = self.retraction_handlers.pop(handle, None)
if retraction_handler is not None:
retraction_handler()
retraction_handler(turn)
def on_message(self, v):
self._on_msg(*self._wrap(v))
def on_message(self, turn, v):
self._on_msg(turn, *self._wrap(v))
def on_sync(self, peer):
self._on_sync(peer)
def on_sync(self, turn, peer):
self._on_sync(turn, peer)
# decorator
def add_handler(self, on_add):
@ -60,13 +60,13 @@ class Handler(actor.Entity):
class During(Handler):
def _wrap_add_handler(self, handler):
def facet_handler(*args):
def facet_handler(turn, *args):
@turn.facet
def facet():
def facet(turn):
if self.inert_ok:
turn.prevent_inert_check()
maybe_stop_action = handler(*args)
maybe_stop_action = handler(turn, *args)
if maybe_stop_action is not None:
turn.on_stop(maybe_stop_action)
return lambda: turn.stop(facet)
return lambda turn: turn.stop(facet)
return facet_handler

View File

@ -1,20 +1,16 @@
from .schema import gatekeeper
from .during import During
from . import turn
# decorator
def resolve(gk, cap, *args, **kwargs):
def resolve(turn, gk, cap, *args, **kwargs):
def configure_handler(handler):
def unwrapping_handler(r):
resolved = gatekeeper.Resolved.decode(r)
if resolved.VARIANT.name == 'accepted':
return handler(resolved.responderSession)
raise Exception('Could not resolve reference: ' + repr(resolved))
return _resolve(gk, cap)(During(*args, **kwargs).add_handler(unwrapping_handler))
def unwrapping_handler(turn, wrapped_ref):
return handler(turn, wrapped_ref.embeddedValue)
return _resolve(turn, gk, cap)(During(*args, **kwargs).add_handler(unwrapping_handler))
return configure_handler
# decorator
def _resolve(gk, cap):
def _resolve(turn, gk, cap):
def publish_resolution_request(entity):
turn.publish(gk, gatekeeper.Resolve(cap, turn.ref(entity)))
return entity

View File

@ -1,14 +0,0 @@
def add(m, k, v):
s = m.get(k)
if s is None:
s = set()
m[k] = s
s.add(v)
def discard(m, k, v):
s = m.get(k)
if s is None:
return
s.discard(v)
if not s:
m.pop(k)

View File

@ -1,15 +0,0 @@
class staticproperty:
"""For use as @staticproperty, like @property, but for static properties of classes.
Read-only for now."""
def __init__(self, getter):
self.getter = getter
def __get__(self, inst, cls=None):
return self.getter()
class classproperty:
"""For use as @classproperty, like @property, but for class-side properties of classes.
Read-only for now."""
def __init__(self, getter):
self.getter = getter
def __get__(self, inst, cls=None):
return self.getter(cls)

View File

@ -1,111 +1,32 @@
from .schema import dataspacePatterns as P
from . import Symbol, Record
from preserves import preserve
from . import Symbol
_dict = dict ## we're about to shadow the builtin
_ = P.Pattern.discard()
_ = P.Pattern.DDiscard(P.DDiscard())
def bind(p):
return P.Pattern.bind(p)
return P.Pattern.DBind(P.DBind(p))
CAPTURE = bind(_)
class unquote:
def __init__(self, pattern):
self.pattern = pattern
def __escape_schema__(self):
return self
uCAPTURE = unquote(CAPTURE)
u_ = unquote(_)
# Given
#
# Run = <run @name string @input any @output any>
#
# then these all produce the same pattern:
#
# P.rec('Observe', P.quote(P.rec('run', P.lit('N'), P.uCAPTURE, P.bind(P.u_))), P._)
#
# P.rec('Observe', P.quote(P.quote(Run('N', P.unquote(P.uCAPTURE), P.unquote(P.bind(P.u_))))), P._)
#
# P.quote(Record(Symbol('Observe'),
# [P.quote(Run('N', P.unquote(P.uCAPTURE), P.unquote(P.bind(P.u_)))),
# P.u_]))
# Simple, stupid single-level quasiquotation.
def quote(p):
if isinstance(p, unquote):
return p.pattern
p = preserve(p)
if isinstance(p, list) or isinstance(p, tuple):
return arr(*map(quote, p))
elif isinstance(p, set) or isinstance(p, frozenset):
raise Exception('Cannot represent literal set in dataspace pattern')
elif isinstance(p, _dict):
return dict(*((k, quote(pp)) for (k, pp) in p.items()))
elif isinstance(p, Record):
return _rec(p.key, *map(quote, p.fields))
else:
return P.Pattern.lit(P.AnyAtom.decode(p))
def lit(v):
if isinstance(v, list) or isinstance(v, tuple):
return arr(*map(lit, v))
elif isinstance(v, set) or isinstance(v, frozenset):
raise Exception('Cannot represent literal set in dataspace pattern')
elif isinstance(v, _dict):
return dict(*((k, lit(vv)) for (k, vv) in v.items()))
elif isinstance(v, Record):
return _rec(v.key, *map(lit, v.fields))
else:
return P.Pattern.lit(P.AnyAtom.decode(v))
def seq_entries(seq):
entries = {}
for i, p in enumerate(seq):
if p.VARIANT != P.Pattern.discard.VARIANT:
entries[i] = p
np = len(seq)
if np > 0 and (np - 1) not in entries:
entries[np - 1] = P.Pattern.discard()
return entries
def unlit_seq(entries):
seq = []
if len(entries) > 0:
try:
max_k = max(entries.keys())
except TypeError:
raise Exception('Pattern entries do not represent a gap-free sequence')
for i in range(max_k + 1):
seq.append(unlit(entries[i]))
return seq
def unlit(p):
if not hasattr(p, 'VARIANT'):
p = P.Pattern.decode(p)
if p.VARIANT == P.Pattern.lit.VARIANT:
return p.value.value
if p.VARIANT != P.Pattern.group.VARIANT:
raise Exception('Pattern does not represent a literal value')
if p.type.VARIANT == P.GroupType.rec.VARIANT:
return Record(p.type.label, unlit_seq(p.entries))
if p.type.VARIANT == P.GroupType.arr.VARIANT:
return list(unlit_seq(p.entries))
if p.type.VARIANT == P.GroupType.dict.VARIANT:
return _dict(map(lambda kv: (kv[0], unlit(kv[1])), p.entries.items()))
raise Exception('unreachable')
return P.Pattern.DLit(P.DLit(v))
def rec(labelstr, *members):
return _rec(Symbol(labelstr), *members)
def _rec(label, *members):
return P.Pattern.group(P.GroupType.rec(label), seq_entries(members))
return P.Pattern.DCompound(P.DCompound.rec(
P.CRec(label, len(members)),
_dict(enumerate(members))))
def arr(*members):
return P.Pattern.group(P.GroupType.arr(), seq_entries(members))
return P.Pattern.DCompound(P.DCompound.arr(
P.CArr(len(members)),
_dict(enumerate(members))))
def dict(*kvs):
return P.Pattern.group(P.GroupType.dict(), _dict(kvs))
return P.Pattern.DCompound(P.DCompound.dict(
P.CDict(),
_dict(kvs)))

View File

@ -1,8 +0,0 @@
all: schema-bundle.bin
clean:
rm -f schema-bundle.bin
schema-bundle.bin: schemas/*.prs
preserves-schemac schemas > $@.tmp
mv $@.tmp $@

View File

@ -1,44 +0,0 @@
´³bundle·µ³tcp„´³schema·³version°³ definitions·³TcpLocal´³rec´³lit³ tcp-local„´³tupleµ´³named³host´³atom³String„„´³named³port´³atom³ SignedInteger„„„„„³ TcpRemote´³rec´³lit³
tcp-remote„´³tupleµ´³named³host´³atom³String„„´³named³port´³atom³ SignedInteger„„„„„³ TcpPeerInfo´³rec´³lit³tcp-peer„´³tupleµ´³named³handle´³embedded³any„„´³named³local´³refµ„³TcpLocal„„´³named³remote´³refµ„³ TcpRemote„„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„µ³http„´³schema·³version°³ definitions·³Chunk´³orµµ±string´³atom³String„„µ±bytes´³atom³
ByteString„„„„³Headers´³dictof´³atom³Symbol„´³atom³String„„³MimeType´³atom³Symbol„³
QueryValue´³orµµ±string´³atom³String„„µ±file´³rec´³lit³file„´³tupleµ´³named³filename´³atom³String„„´³named³headers´³refµ„³Headers„„´³named³body´³atom³
ByteString„„„„„„„„³ HostPattern´³orµµ±host´³atom³String„„µ±any´³lit€„„„„³ HttpBinding´³rec´³lit³ http-bind„´³tupleµ´³named³host´³refµ„³ HostPattern„„´³named³port´³atom³ SignedInteger„„´³named³method´³refµ„³ MethodPattern„„´³named³path´³refµ„³ PathPattern„„´³named³handler´³embedded´³refµ„³ HttpRequest„„„„„„³ HttpContext´³rec´³lit³request„´³tupleµ´³named³req´³refµ„³ HttpRequest„„´³named³res´³embedded´³refµ„³ HttpResponse„„„„„„³ HttpRequest´³rec´³lit³ http-request„´³tupleµ´³named³sequenceNumber´³atom³ SignedInteger„„´³named³host´³refµ„³ RequestHost„„´³named³port´³atom³ SignedInteger„„´³named³method´³atom³Symbol„„´³named³path´³seqof´³atom³String„„„´³named³headers´³refµ„³Headers„„´³named³query´³dictof´³atom³Symbol„´³seqof´³refµ„³
QueryValue„„„„´³named³body´³refµ„³ RequestBody„„„„„³ HttpService´³rec´³lit³ http-service„´³tupleµ´³named³host´³refµ„³ HostPattern„„´³named³port´³atom³ SignedInteger„„´³named³method´³refµ„³ MethodPattern„„´³named³path´³refµ„³ PathPattern„„„„„³ PathPattern´³seqof´³refµ„³PathPatternElement„„³ RequestBody´³orµµ±absent´³lit€„„µ±present´³atom³
ByteString„„„„³ RequestHost´³orµµ±absent´³lit€„„µ±present´³atom³String„„„„³ HttpListener´³rec´³lit³ http-listener„´³tupleµ´³named³port´³atom³ SignedInteger„„„„„³ HttpResponse´³orµµ±status´³rec´³lit³status„´³tupleµ´³named³code´³atom³ SignedInteger„„´³named³message´³atom³String„„„„„„µ±header´³rec´³lit³header„´³tupleµ´³named³name´³atom³Symbol„„´³named³value´³atom³String„„„„„„µ±chunk´³rec´³lit³chunk„´³tupleµ´³named³chunk´³refµ„³Chunk„„„„„„µ±done´³rec´³lit³done„´³tupleµ´³named³chunk´³refµ„³Chunk„„„„„„„„³ MethodPattern´³orµµ±any´³lit€„„µ±specific´³atom³Symbol„„„„³PathPatternElement´³orµµ±label´³atom³String„„µ±wildcard´³lit³_„„µ±rest´³lit³...„„„„„³ embeddedType€„„µ³noise„´³schema·³version°³ definitions·³Packet´³orµµ±complete´³atom³
ByteString„„µ±
fragmented´³seqof´³atom³
ByteString„„„„„³ Initiator´³rec´³lit³ initiator„´³tupleµ´³named³initiatorSession´³embedded´³refµ„³Packet„„„„„„³ NoiseSpec´³andµ´³dict·³key´³named³key´³atom³
ByteString„„³service´³named³service´³refµ„³ServiceSelector„„„„´³named³protocol´³refµ„³ NoiseProtocol„„´³named³ preSharedKeys´³refµ„³NoisePreSharedKeys„„„„³ SessionItem´³orµµ± Initiator´³refµ„³ Initiator„„µ±Packet´³refµ„³Packet„„„„³ NoiseProtocol´³orµµ±present´³dict·³protocol´³named³protocol´³atom³String„„„„„µ±invalid´³dict·³protocol´³named³protocol³any„„„„µ±absent´³dict·„„„„„³ NoiseStepType´³lit³noise„³SecretKeyField´³orµµ±present´³dict·³ secretKey´³named³ secretKey´³atom³
ByteString„„„„„µ±invalid´³dict·³ secretKey´³named³ secretKey³any„„„„µ±absent´³dict·„„„„„³DefaultProtocol´³lit±!Noise_NK_25519_ChaChaPoly_BLAKE2s„³NoiseStepDetail´³refµ„³ServiceSelector„³ServiceSelector³any³NoiseServiceSpec´³andµ´³named³base´³refµ„³ NoiseSpec„„´³named³ secretKey´³refµ„³SecretKeyField„„„„³NoisePreSharedKeys´³orµµ±present´³dict·³ preSharedKeys´³named³ preSharedKeys´³seqof´³atom³
ByteString„„„„„„µ±invalid´³dict·³ preSharedKeys´³named³ preSharedKeys³any„„„„µ±absent´³dict·„„„„„³NoisePathStepDetail´³refµ„³ NoiseSpec„³NoiseDescriptionDetail´³refµ„³NoiseServiceSpec„„³ embeddedType´³refµ³ EntityRef„³Cap„„„µ³timer„´³schema·³version°³ definitions·³SetTimer´³rec´³lit³ set-timer„´³tupleµ´³named³label³any„´³named³seconds´³atom³Double„„´³named³kind´³refµ„³ TimerKind„„„„„³ LaterThan´³rec´³lit³
later-than„´³tupleµ´³named³seconds´³atom³Double„„„„„³ TimerKind´³orµµ±relative´³lit³relative„„µ±absolute´³lit³absolute„„µ±clear´³lit³clear„„„„³ TimerExpired´³rec´³lit³ timer-expired„´³tupleµ´³named³label³any„´³named³seconds´³atom³Double„„„„„„³ embeddedType€„„µ³trace„´³schema·³version°³ definitions·³Oid³any³Name´³orµµ± anonymous´³rec´³lit³ anonymous„´³tupleµ„„„„µ±named´³rec´³lit³named„´³tupleµ´³named³name³any„„„„„„„³Target´³rec´³lit³entity„´³tupleµ´³named³actor´³refµ„³ActorId„„´³named³facet´³refµ„³FacetId„„´³named³oid´³refµ„³Oid„„„„„³TaskId³any³TurnId³any³ActorId³any³FacetId³any³ TurnCause´³orµµ±turn´³rec´³lit³ caused-by„´³tupleµ´³named³id´³refµ„³TurnId„„„„„„µ±cleanup´³rec´³lit³cleanup„´³tupleµ„„„„µ±linkedTaskRelease´³rec´³lit³linked-task-release„´³tupleµ´³named³id´³refµ„³TaskId„„´³named³reason´³refµ„³LinkedTaskReleaseReason„„„„„„µ±periodicActivation´³rec´³lit³periodic-activation„´³tupleµ´³named³period´³atom³Double„„„„„„µ±delay´³rec´³lit³delay„´³tupleµ´³named³ causingTurn´³refµ„³TurnId„„´³named³amount´³atom³Double„„„„„„µ±external´³rec´³lit³external„´³tupleµ´³named³ description³any„„„„„„„³ TurnEvent´³orµµ±assert´³rec´³lit³assert„´³tupleµ´³named³ assertion´³refµ„³AssertionDescription„„´³named³handle´³refµ³protocol„³Handle„„„„„„µ±retract´³rec´³lit³retract„´³tupleµ´³named³handle´³refµ³protocol„³Handle„„„„„„µ±message´³rec´³lit³message„´³tupleµ´³named³body´³refµ„³AssertionDescription„„„„„„µ±sync´³rec´³lit³sync„´³tupleµ´³named³peer´³refµ„³Target„„„„„„µ± breakLink´³rec´³lit³
break-link„´³tupleµ´³named³source´³refµ„³ActorId„„´³named³handle´³refµ³protocol„³Handle„„„„„„„„³
ExitStatus´³orµµ±ok´³lit³ok„„µ±Error´³refµ³protocol„³Error„„„„³
TraceEntry´³rec´³lit³trace„´³tupleµ´³named³ timestamp´³atom³Double„„´³named³actor´³refµ„³ActorId„„´³named³item´³refµ„³ActorActivation„„„„„³ActorActivation´³orµµ±start´³rec´³lit³start„´³tupleµ´³named³ actorName´³refµ„³Name„„„„„„µ±turn´³refµ„³TurnDescription„„µ±stop´³rec´³lit³stop„´³tupleµ´³named³status´³refµ„³
ExitStatus„„„„„„„„³FacetStopReason´³orµµ±explicitAction´³lit³explicit-action„„µ±inert´³lit³inert„„µ±parentStopping´³lit³parent-stopping„„µ± actorStopping´³lit³actor-stopping„„„„³TurnDescription´³rec´³lit³turn„´³tupleµ´³named³id´³refµ„³TurnId„„´³named³cause´³refµ„³ TurnCause„„´³named³actions´³seqof´³refµ„³ActionDescription„„„„„„³ActionDescription´³orµµ±dequeue´³rec´³lit³dequeue„´³tupleµ´³named³event´³refµ„³TargetedTurnEvent„„„„„„µ±enqueue´³rec´³lit³enqueue„´³tupleµ´³named³event´³refµ„³TargetedTurnEvent„„„„„„µ±dequeueInternal´³rec´³lit³dequeue-internal„´³tupleµ´³named³event´³refµ„³TargetedTurnEvent„„„„„„µ±enqueueInternal´³rec´³lit³enqueue-internal„´³tupleµ´³named³event´³refµ„³TargetedTurnEvent„„„„„„µ±spawn´³rec´³lit³spawn„´³tupleµ´³named³link´³atom³Boolean„„´³named³id´³refµ„³ActorId„„„„„„µ±link´³rec´³lit³link„´³tupleµ´³named³ parentActor´³refµ„³ActorId„„´³named³ childToParent´³refµ³protocol„³Handle„„´³named³
childActor´³refµ„³ActorId„„´³named³ parentToChild´³refµ³protocol„³Handle„„„„„„µ±
facetStart´³rec´³lit³ facet-start„´³tupleµ´³named³path´³seqof´³refµ„³FacetId„„„„„„„µ± facetStop´³rec´³lit³
facet-stop„´³tupleµ´³named³path´³seqof´³refµ„³FacetId„„„´³named³reason´³refµ„³FacetStopReason„„„„„„µ±linkedTaskStart´³rec´³lit³linked-task-start„´³tupleµ´³named³taskName´³refµ„³Name„„´³named³id´³refµ„³TaskId„„„„„„„„³TargetedTurnEvent´³rec´³lit³event„´³tupleµ´³named³target´³refµ„³Target„„´³named³detail´³refµ„³ TurnEvent„„„„„³AssertionDescription´³orµµ±value´³rec´³lit³value„´³tupleµ´³named³value³any„„„„„µ±opaque´³rec´³lit³opaque„´³tupleµ´³named³ description³any„„„„„„„³LinkedTaskReleaseReason´³orµµ± cancelled´³lit³ cancelled„„µ±normal´³lit³normal„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„µ³stdenv„´³schema·³version°³ definitions·³ StandardRoute´³orµµ±standard´³ tuplePrefixµ´³named³
transports´³seqof´³refµ„³StandardTransport„„„´³named³key´³atom³
ByteString„„´³named³service³any„´³named³sig´³atom³
ByteString„„´³named³oid³any„„´³named³caveats´³seqof´³refµ³sturdy„³Caveat„„„„„µ±general´³refµ³
gatekeeper„³Route„„„„³StandardTransport´³orµµ±wsUrl´³atom³String„„µ±other³any„„„„³ embeddedType€„„µ³stream„´³schema·³version°³ definitions·³Mode´³orµµ±bytes´³lit³bytes„„µ±lines´³refµ„³LineMode„„µ±packet´³rec´³lit³packet„´³tupleµ´³named³size´³atom³ SignedInteger„„„„„„µ±object´³rec´³lit³object„´³tupleµ´³named³ description³any„„„„„„„³Sink´³orµµ±source´³rec´³lit³source„´³tupleµ´³named³
controller´³embedded´³refµ„³Source„„„„„„„µ± StreamError´³refµ„³ StreamError„„µ±data´³rec´³lit³data„´³tupleµ´³named³payload³any„´³named³mode´³refµ„³Mode„„„„„„µ±eof´³rec´³lit³eof„´³tupleµ„„„„„„³Source´³orµµ±sink´³rec´³lit³sink„´³tupleµ´³named³
controller´³embedded´³refµ„³Sink„„„„„„„µ± StreamError´³refµ„³ StreamError„„µ±credit´³rec´³lit³credit„´³tupleµ´³named³amount´³refµ„³ CreditAmount„„´³named³mode´³refµ„³Mode„„„„„„„„³LineMode´³orµµ±lf´³lit³lf„„µ±crlf´³lit³crlf„„„„³ StreamError´³rec´³lit³error„´³tupleµ´³named³message´³atom³String„„„„„³ CreditAmount´³orµµ±count´³atom³ SignedInteger„„µ± unbounded´³lit³ unbounded„„„„³StreamConnection´³rec´³lit³stream-connection„´³tupleµ´³named³source´³embedded´³refµ„³Source„„„´³named³sink´³embedded´³refµ„³Sink„„„´³named³spec³any„„„„³StreamListenerError´³rec´³lit³stream-listener-error„´³tupleµ´³named³spec³any„´³named³message´³atom³String„„„„„³StreamListenerReady´³rec´³lit³stream-listener-ready„´³tupleµ´³named³spec³any„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„µ³sturdy„´³schema·³version°³ definitions·³Lit´³rec´³lit³lit„´³tupleµ´³named³value³any„„„„³Oid´³atom³ SignedInteger„³Alts´³rec´³lit³or„´³tupleµ´³named³ alternatives´³seqof´³refµ„³Rewrite„„„„„„³PAnd´³rec´³lit³and„´³tupleµ´³named³patterns´³seqof´³refµ„³Pattern„„„„„„³PNot´³rec´³lit³not„´³tupleµ´³named³pattern´³refµ„³Pattern„„„„„³TRef´³rec´³lit³ref„´³tupleµ´³named³binding´³atom³ SignedInteger„„„„„³PAtom´³orµµ±Boolean´³lit³Boolean„„µ±Double´³lit³Double„„µ± SignedInteger´³lit³ SignedInteger„„µ±String´³lit³String„„µ±
ByteString´³lit³
ByteString„„µ±Symbol´³lit³Symbol„„„„³PBind´³rec´³lit³bind„´³tupleµ´³named³pattern´³refµ„³Pattern„„„„„³Caveat´³orµµ±Rewrite´³refµ„³Rewrite„„µ±Alts´³refµ„³Alts„„µ±Reject´³refµ„³Reject„„µ±unknown³any„„„³Reject´³rec´³lit³reject„´³tupleµ´³named³pattern´³refµ„³Pattern„„„„„³Pattern´³orµµ±PDiscard´³refµ„³PDiscard„„µ±PAtom´³refµ„³PAtom„„µ± PEmbedded´³refµ„³ PEmbedded„„µ±PBind´³refµ„³PBind„„µ±PAnd´³refµ„³PAnd„„µ±PNot´³refµ„³PNot„„µ±Lit´³refµ„³Lit„„µ± PCompound´³refµ„³ PCompound„„„„³Rewrite´³rec´³lit³rewrite„´³tupleµ´³named³pattern´³refµ„³Pattern„„´³named³template´³refµ„³Template„„„„„³WireRef´³orµµ±mine´³tupleµ´³lit°„´³named³oid´³refµ„³Oid„„„„„µ±yours´³ tuplePrefixµ´³lit°„´³named³oid´³refµ„³Oid„„„´³named³ attenuation´³seqof´³refµ„³Caveat„„„„„„„³PDiscard´³rec´³lit³_„´³tupleµ„„„³Template´³orµµ±
TAttenuate´³refµ„³
TAttenuate„„µ±TRef´³refµ„³TRef„„µ±Lit´³refµ„³Lit„„µ± TCompound´³refµ„³ TCompound„„„„³ PCompound´³orµµ±rec´³rec´³lit³rec„´³tupleµ´³named³label³any„´³named³fields´³seqof´³refµ„³Pattern„„„„„„„µ±arr´³rec´³lit³arr„´³tupleµ´³named³items´³seqof´³refµ„³Pattern„„„„„„„µ±dict´³rec´³lit³dict„´³tupleµ´³named³entries´³dictof³any´³refµ„³Pattern„„„„„„„„„³ PEmbedded´³lit³Embedded„³ SturdyRef´³rec´³lit³ref„´³tupleµ´³named³
parameters´³refµ„³
Parameters„„„„„³ TCompound´³orµµ±rec´³rec´³lit³rec„´³tupleµ´³named³label³any„´³named³fields´³seqof´³refµ„³Template„„„„„„„µ±arr´³rec´³lit³arr„´³tupleµ´³named³items´³seqof´³refµ„³Template„„„„„„„µ±dict´³rec´³lit³dict„´³tupleµ´³named³entries´³dictof³any´³refµ„³Template„„„„„„„„„³
Parameters´³andµ´³dict·³oid´³named³oid³any„³sig´³named³sig´³atom³
ByteString„„„„´³named³caveats´³refµ„³ CaveatsField„„„„³
TAttenuate´³rec´³lit³ attenuate„´³tupleµ´³named³template´³refµ„³Template„„´³named³ attenuation´³seqof´³refµ„³Caveat„„„„„„³ CaveatsField´³orµµ±present´³dict·³caveats´³named³caveats´³seqof´³refµ„³Caveat„„„„„„µ±invalid´³dict·³caveats´³named³caveats³any„„„„µ±absent´³dict·„„„„„³SturdyStepType´³lit³ref„³SturdyStepDetail´³refµ„³
Parameters„³SturdyPathStepDetail´³refµ„³
Parameters„³SturdyDescriptionDetail´³dict·³key´³named³key´³atom³
ByteString„„³oid´³named³oid³any„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„µ³worker„´³schema·³version°³ definitions·³Instance´³rec´³lit³Instance„´³tupleµ´³named³name´³atom³String„„´³named³argument³any„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„µ³service„´³schema·³version°³ definitions·³State´³orµµ±started´³lit³started„„µ±ready´³lit³ready„„µ±failed´³lit³failed„„µ±complete´³lit³complete„„µ± userDefined³any„„„³
RunService´³rec´³lit³ run-service„´³tupleµ´³named³ serviceName³any„„„„³ ServiceState´³rec´³lit³ service-state„´³tupleµ´³named³ serviceName³any„´³named³state´³refµ„³State„„„„„³ ServiceObject´³rec´³lit³service-object„´³tupleµ´³named³ serviceName³any„´³named³object³any„„„„³RequireService´³rec´³lit³require-service„´³tupleµ´³named³ serviceName³any„„„„³RestartService´³rec´³lit³restart-service„´³tupleµ´³named³ serviceName³any„„„„³ServiceDependency´³rec´³lit³
depends-on„´³tupleµ´³named³depender³any„´³named³dependee´³refµ„³ ServiceState„„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„µ³protocol„´³schema·³version°³ definitions·³Nop´³lit€„³Oid´³atom³ SignedInteger„³Sync´³rec´³lit³S„´³tupleµ´³named³peer´³embedded´³lit<69>„„„„„„³Turn´³seqof´³refµ„³ TurnEvent„„³Error´³rec´³lit³error„´³tupleµ´³named³message´³atom³String„„´³named³detail³any„„„„³Event´³orµµ±Assert´³refµ„³Assert„„µ±Retract´³refµ„³Retract„„µ±Message´³refµ„³Message„„µ±Sync´³refµ„³Sync„„„„³Assert´³rec´³lit³A„´³tupleµ´³named³ assertion´³refµ„³ Assertion„„´³named³handle´³refµ„³Handle„„„„„³Handle´³atom³ SignedInteger„³Packet´³orµµ±Turn´³refµ„³Turn„„µ±Error´³refµ„³Error„„µ± Extension´³refµ„³ Extension„„µ±Nop´³refµ„³Nop„„„„³Message´³rec´³lit³M„´³tupleµ´³named³body´³refµ„³ Assertion„„„„„³Retract´³rec´³lit³R„´³tupleµ´³named³handle´³refµ„³Handle„„„„„³ Assertion³any³ Extension´³rec´³named³label³any„´³named³fields´³seqof³any„„„³ TurnEvent´³tupleµ´³named³oid´³refµ„³Oid„„´³named³event´³refµ„³Event„„„„„³ embeddedType€„„µ³ dataspace„´³schema·³version°³ definitions·³Observe´³rec´³lit³Observe„´³tupleµ´³named³pattern´³refµ³dataspacePatterns„³Pattern„„´³named³observer´³embedded³any„„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„µ³
gatekeeper„´³schema·³version°³ definitions·³Bind´³rec´³lit³bind„´³tupleµ´³named³ description´³refµ„³ Description„„´³named³target´³embedded³any„„´³named³observer´³refµ„³ BindObserver„„„„„³Step´³rec´³named³stepType´³atom³Symbol„„´³tupleµ´³named³detail³any„„„„³Bound´³orµµ±bound´³rec´³lit³bound„´³tupleµ´³named³pathStep´³refµ„³PathStep„„„„„„µ±Rejected´³refµ„³Rejected„„„„³Route´³rec´³lit³route„´³ tuplePrefixµ´³named³
transports´³seqof³any„„„´³named³ pathSteps´³seqof´³refµ„³PathStep„„„„„³Resolve´³rec´³lit³resolve„´³tupleµ´³named³step´³refµ„³Step„„´³named³observer´³embedded´³refµ„³Resolved„„„„„„³PathStep´³rec´³named³stepType´³atom³Symbol„„´³tupleµ´³named³detail³any„„„„³Rejected´³rec´³lit³rejected„´³tupleµ´³named³detail³any„„„„³Resolved´³orµµ±accepted´³rec´³lit³accepted„´³tupleµ´³named³responderSession´³embedded³any„„„„„„µ±Rejected´³refµ„³Rejected„„„„³ Description´³rec´³named³stepType´³atom³Symbol„„´³tupleµ´³named³detail³any„„„„³ ResolvePath´³rec´³lit³ resolve-path„´³tupleµ´³named³route´³refµ„³Route„„´³named³addr³any„´³named³control´³embedded´³refµ„³TransportControl„„„´³named³resolved´³refµ„³Resolved„„„„„³ BindObserver´³orµµ±present´³embedded´³refµ„³Bound„„„µ±absent´³lit€„„„„³ForceDisconnect´³rec´³lit³force-disconnect„´³tupleµ„„„³ResolvedPathStep´³rec´³lit³ path-step„´³tupleµ´³named³origin´³embedded´³refµ„³Resolve„„„´³named³pathStep´³refµ„³PathStep„„´³named³resolved´³refµ„³Resolved„„„„„³TransportControl´³refµ„³ForceDisconnect„³TransportConnection´³rec´³lit³connect-transport„´³tupleµ´³named³addr³any„´³named³control´³embedded´³refµ„³TransportControl„„„´³named³resolved´³refµ„³Resolved„„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„µ³transportAddress„´³schema·³version°³ definitions·³Tcp´³rec´³lit³tcp„´³tupleµ´³named³host´³atom³String„„´³named³port´³atom³ SignedInteger„„„„„³Unix´³rec´³lit³unix„´³tupleµ´³named³path´³atom³String„„„„„³Stdio´³rec´³lit³stdio„´³tupleµ„„„³ WebSocket´³rec´³lit³ws„´³tupleµ´³named³url´³atom³String„„„„„„³ embeddedType€„„µ³dataspacePatterns„´³schema·³version°³ definitions·³AnyAtom´³orµµ±bool´³atom³Boolean„„µ±double´³atom³Double„„µ±int´³atom³ SignedInteger„„µ±string´³atom³String„„µ±bytes´³atom³
ByteString„„µ±symbol´³atom³Symbol„„µ±embedded´³embedded³any„„„„³Pattern´³orµµ±discard´³rec´³lit³_„´³tupleµ„„„„µ±bind´³rec´³lit³bind„´³tupleµ´³named³pattern´³refµ„³Pattern„„„„„„µ±lit´³rec´³lit³lit„´³tupleµ´³named³value´³refµ„³AnyAtom„„„„„„µ±group´³rec´³lit³group„´³tupleµ´³named³type´³refµ„³ GroupType„„´³named³entries´³dictof³any´³refµ„³Pattern„„„„„„„„„³ GroupType´³orµµ±rec´³rec´³lit³rec„´³tupleµ´³named³label³any„„„„„µ±arr´³rec´³lit³arr„´³tupleµ„„„„µ±dict´³rec´³lit³dict„´³tupleµ„„„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„„„

View File

@ -1,4 +0,0 @@
version 1 .
embeddedType EntityRef.Cap .
Observe = <Observe @pattern dataspacePatterns.Pattern @observer #:any>.

View File

@ -1,30 +0,0 @@
version 1 .
embeddedType EntityRef.Cap .
# Dataspace patterns: *almost* a sublanguage of attenuation patterns.
#
# One key difference is that Dataspace patterns are extensible, in that
# they ignore fields not mentioned in group patterns.
Pattern =
/ @discard <_>
/ <bind @pattern Pattern>
/ <lit @value AnyAtom>
/ <group @type GroupType @entries { any: Pattern ...:... }>
.
GroupType =
/ <rec @label any>
/ <arr>
/ <dict>
.
AnyAtom =
/ @bool bool
/ @double double
/ @int int
/ @string string
/ @bytes bytes
/ @symbol symbol
/ @embedded #:any
.

View File

@ -1,87 +0,0 @@
version 1 .
embeddedType EntityRef.Cap .
# ---------------------------------------------------------------------------
# Protocol at *gatekeeper* entities
# Assertion. Gatekeeper will attempt to resolve `step`, responding with a `Resolved` to
# `observer`.
Resolve = <resolve @step Step @observer #:Resolved> .
Resolved = <accepted @responderSession #:any> / Rejected .
Step = <<rec> @stepType symbol [@detail any]> .
# ---------------------------------------------------------------------------
# Protocol at dataspaces *associated* with gatekeeper entities
# ## Handling `Resolve` requests
#
# When the gatekeeper entity receives a `Resolve` assertion (call it R1), it
#
# 1. asserts a `Resolve` (call it R2) into its associated dataspace that
# is the same as R1 except it has a different `observer`; and
#
# 2. observes a `Bind` with `description` matching the `step` of R1/R2
# according to `stepType` (e.g. treatment of SturdyStepType is not the
# same as treatment of NoiseStepType).
#
# Normally, an appropriate `Bind` is expected to exist. If the gatekeeper
# sees the `Bind` first, it takes the `target` from it and does whatever
# `stepType` mandates before replying to R1's observer.
#
# However, if a `Resolved` is asserted to R2's observer before a `Bind`
# appears, that resolution is relayed on to R1's observer directly, be it
# positive or negative, and the gatekeeper stops waiting for a `Bind`.
#
# This way, entities can keep an eye out for `Resolve` requests that will
# never complete, and answer `Rejected` to them even when no matching
# `Bind` exists. Entities could also use `Resolve` requests to synthesize a
# `Bind` in a "just-in-time" fashion.
#
# ## General treatment of `Bind` assertions
#
# When the gatekeeper sees a `Bind`, independently of any potential
# `Resolve` requests, it computes an appropriate PathStep from
# `description` pointing at `target`, and responds with a `Bound` to
# `observer` (if supplied).
#
Bind = <bind @description Description @target #:any @observer BindObserver> .
Description = <<rec> @stepType symbol [@detail any]> .
BindObserver = @present #:Bound / @absent #f .
Bound = <bound @pathStep PathStep> / Rejected .
# ---------------------------------------------------------------------------
# Protocol at client-side dataspaces, for resolution utilities
# Assertion. In response to observation of this with appropriate captures/wildcards in `addr`
# and `resolved`, respondent will follow `route.pathSteps` starting from one of the
# `route.transports`, asserting `ResolvePath` with the final `Resolved` as well as the selected
# transport `addr` and a `control` for it.
ResolvePath = <resolve-path @route Route @addr any @control #:TransportControl @resolved Resolved> .
TransportConnection = <connect-transport @addr any @control #:TransportControl @resolved Resolved> .
ResolvedPathStep = <path-step @origin #:Resolve @pathStep PathStep @resolved Resolved> .
PathStep = <<rec> @stepType symbol [@detail any]> .
# A `Route` describes a network path that can be followed to reach some target entity.
#
# It starts with a set of zero or more possible non-Syndicate `transports`. These could be
# `transportAddress.Tcp` values or similar. They are just suggestions; it's quite possible the
# endpoint is reachable by some means not listed. The network outside Syndicate is, after all,
# pretty diverse! In particular, *zero* `transports` may be provided, in which case some
# out-of-band means has to be used to make that first connection.
#
# The `transports` give instructions for contacting the first entity in the `Route` path. Often
# this will be a `gatekeeper`, or a `noise` protocol endpoint, or both. Occasionally, it may
# even be the desired target entity. Subsequent `pathSteps` describe how to proceed from the
# initial entity to the target.
#
# (`transports` should by rights be a set, not a sequence, but that opens up a Can Of Worms
# regarding dataspace patterns including literal sets that I can't deal with right now.)
Route = <route @transports [any ...] @pathSteps PathStep ...> .
TransportControl = ForceDisconnect .
ForceDisconnect = <force-disconnect> .
# ---------------------------------------------------------------------------
Rejected = <rejected @detail any> .

View File

@ -1,62 +0,0 @@
version 1 .
# Assertion in driver DS
# Causes creation of server and route
HttpBinding = <http-bind @host HostPattern @port int @method MethodPattern @path PathPattern @handler #:HttpRequest> .
# Assertion in driver DS
# Describes active server and route
HttpService = <http-service @host HostPattern @port int @method MethodPattern @path PathPattern> .
# Assertion in driver DS
# Describes active listener
HttpListener = <http-listener @port int> .
HostPattern = @host string / @any #f .
PathPattern = [PathPatternElement ...] .
PathPatternElement = @label string / @wildcard =_ / @rest =... .
MethodPattern = @any #f / @specific @"Lowercase" symbol .
# Assertion in driver DS
HttpRequest = <http-request
@sequenceNumber int
@host RequestHost
@port int
@method @"Lowercase" symbol
@path [string ...]
@headers Headers
@query {symbol: [QueryValue ...] ...:...}
@body RequestBody> .
Headers = {@"Lowercase" symbol: string ...:...} .
QueryValue = @string string / <file @filename string @headers Headers @body bytes> .
RequestBody = @absent #f / @present bytes .
RequestHost = @absent #f / @present string .
# Assertion to handler entity
HttpContext = <request @req HttpRequest @res #:HttpResponse> .
# HttpResponse protocol. Delivered to the `res` ref in `HttpContext`.
#
# (status | header)* . chunk* . done
#
# Done triggers completion of the response and retraction of the frame by the peer. If the
# HttpBinding responsible for the request is withdrawn mid-way through a response (i.e. when
# chunked transfer is used and at least one chunk has been sent) the request is abruptly
# closed; if it is withdrawn at any other moment in the lifetime of the request, a 500 Internal
# Server Error is send to the client.
#
@<TODO "trailers?">
HttpResponse =
# Messages.
/ <status @code int @message string>
/ <header @name symbol @value string>
/ <chunk @chunk Chunk>
/ <done @chunk Chunk>
.
Chunk = @string string / @bytes bytes .
# e.g. text/plain, text/html, application/json
MimeType = symbol .

View File

@ -1,83 +0,0 @@
version 1 .
embeddedType EntityRef.Cap .
# https://noiseprotocol.org/
# ---------------------------------------------------------------------------
# Binding and connection
NoiseStepType = =noise .
# In a gatekeeper.Step, use ServiceSelector as detail.
NoiseStepDetail = ServiceSelector .
# In a gatekeeper.PathStep, use a NoiseSpec as detail.
NoisePathStepDetail = NoiseSpec .
# In a gatekeeper.Description, use a NoiseServiceSpec as detail.
NoiseDescriptionDetail = NoiseServiceSpec .
# ---------------------------------------------------------------------------
# Specification of target and bind addresses
ServiceSelector = any .
NoiseSpec = {
# The `serviceSelector` to use in a `NoiseStep` for `gatekeeper.Resolve`.
service: ServiceSelector,
# The responder's static public key. If not required (uncommon!), supply the empty ByteString.
key: bytes,
}
& @protocol NoiseProtocol
& @preSharedKeys NoisePreSharedKeys
.
NoiseServiceSpec = @base NoiseSpec & @secretKey SecretKeyField .
SecretKeyField = @present { secretKey: bytes } / @invalid { secretKey: any } / @absent {} .
# If absent, a default of DefaultProtocol is used. Most services will speak the default.
NoiseProtocol = @present { protocol: string } / @invalid { protocol: any } / @absent {} .
DefaultProtocol = "Noise_NK_25519_ChaChaPoly_BLAKE2s" .
# If present, Noise pre-shared-keys (PSKs) are drawn from the sequence as required; if the
# sequence is exhausted or not supplied, an all-zeros key is used each time a PSK is needed.
NoisePreSharedKeys = @present { preSharedKeys: [bytes ...] } / @invalid { preSharedKeys: any } / @absent {} .
# ---------------------------------------------------------------------------
# Handshaking and running a session
# 1. initiator asserts <resolve <noise ServiceSelector> #:A> at Gatekeeper
# 2. gatekeeper asserts <accepted #:B> at #:A
# 3. initiator asserts <initiator #:C> at #:B and then sends `Packet`s to #:B
# 4. responder sends `Packet`s to #:C
#
# Sessions begin with introduction of initiator (#:C) and responder (#:B) to each other, and
# then proceed by sending `Packet`s (from #:C) to #:B and (from #:B) to #:C according to
# the Noise protocol definition. Each `Packet` represents a complete logical unit of
# communication; for example, a complete Turn when layering the Syndicate protocol over Noise.
# Note well the restriction on Noise messages: no individual complete packet or packet fragment
# may exceed 65535 bytes (N.B. not 65536!). When `fragmented`, each portion of a `Packet` is a
# complete Noise "transport message"; when `complete`, the whole thing is likewise a complete
# "transport message".
#
# Retraction of the `Initiator` ends the session from the initiator-side; retraction of the
# `<accepted ...>` assertion ends the session from the responder-side.
SessionItem = Initiator / Packet .
# Assertion
Initiator = <initiator @initiatorSession #:Packet> .
# Message
Packet = @complete bytes / @fragmented [bytes ...] .
# When layering Syndicate protocol over noise,
#
# - the canonical encoding of the serviceSelector is the prologue
# - protocol.Packets MUST be encoded using the machine-oriented Preserves syntax
# - zero or more Turns are permitted per noise.Packet
# - each Turn must fit inside a single noise.Packet (fragment if needed)
# - payloads inside a noise.Packet may be padded at the end with byte 0x80 (128), which
# encodes `#f` in the machine-oriented Preserves syntax.
#
# In summary, each noise.Packet, once (reassembled and) decrypted, will be a sequence of zero
# or more machine-encoded protocol.Packets, followed by zero or more 0x80 bytes.
.

View File

@ -1,20 +0,0 @@
version 1 .
Packet = Turn / Error / Extension / Nop .
Extension = <<rec> @label any @fields [any ...]> .
Nop = #f .
Error = <error @message string @detail any>.
Assertion = any .
Handle = int .
Event = Assert / Retract / Message / Sync .
Oid = int .
Turn = [TurnEvent ...].
TurnEvent = [@oid Oid @event Event].
Assert = <A @assertion Assertion @handle Handle>.
Retract = <R @handle Handle>.
Message = <M @body Assertion>.
Sync = <S @peer #:#t>.

View File

@ -1,51 +0,0 @@
version 1 .
embeddedType EntityRef.Cap .
# Asserts that a service should begin (and stay) running after waiting
# for its dependencies and considering reverse-dependencies, blocks,
# and so on.
RequireService = <require-service @serviceName any>.
# Asserts that a service should begin (and stay) running RIGHT NOW,
# without considering its dependencies.
RunService = <run-service @serviceName any>.
# Asserts one or more current states of service `serviceName`. The
# overall state of the service is the union of asserted `state`s.
#
# Only a few combinations make sense:
# - `started`
# - `started` + `ready`
# - `failed`
# - `complete`
#
ServiceState = <service-state @serviceName any @state State>.
# A running service publishes zero or more of these. The details of
# the object vary by service.
#
ServiceObject = <service-object @serviceName any @object any>.
# Possible service states.
State =
/ # The service has begun its startup routine, and may or may not be
# ready to take requests from other parties.
=started
/ # The service is ready to take requests from other parties.
# (This state is special in that it is asserted *in addition* to `started`.)
=ready
/ # The service has failed.
=failed
/ # The service has completed execution.
=complete
/ # Extension or user-defined state
@userDefined any
.
# Asserts that, when `depender` is `require-service`d, it should not be started until
# `dependee` has been asserted, and also that `dependee`'s `serviceName` should be
# `require-service`d.
ServiceDependency = <depends-on @depender any @dependee ServiceState>.
# Message. Triggers a service restart.
RestartService = <restart-service @serviceName any>.

View File

@ -1,31 +0,0 @@
version 1 .
# A "standard" route is
#
# - a collection of websocket urls, for transport.
# - a noise tunnel, for server authentication, confidentiality and integrity.
# - a macaroon, for authorization.
#
# Making these choices allows a compact representation. Encoding a binary-syntax representation
# of a standard route using base64 produces a somewhat-convenient blob of text representing
# access to a network object that users can cut and paste.
#
# A `stdenv.StandardRoute.standard` can be rewritten to a `gatekeeper.Route` like this (with
# `$caveats`, if any, added as appropriate):
#
# <route $transports <noise { service: $service key: $key }> <ref { sig: $sig oid: $oid }>>
#
StandardRoute =
/ @standard [@transports [StandardTransport ...]
@key bytes
@service any
@sig bytes
@oid any
@caveats sturdy.Caveat ...]
/ @general gatekeeper.Route
.
StandardTransport =
/ @wsUrl string
/ @other any
.

View File

@ -1,38 +0,0 @@
version 1 .
embeddedType EntityRef.Cap .
# Assertion:
StreamConnection = <stream-connection @source #:Source @sink #:Sink @spec any>.
# Assertions:
StreamListenerReady = <stream-listener-ready @spec any>.
StreamListenerError = <stream-listener-error @spec any @message string>.
# Assertion:
StreamError = <error @message string>.
Source =
# Assertions:
/ <sink @controller #:Sink>
/ StreamError
# Messages:
/ <credit @amount CreditAmount @mode Mode>
.
Sink =
# Assertions:
/ <source @controller #:Source>
/ StreamError
# Messages:
/ <data @payload any @mode Mode>
/ <eof>
.
# Value:
CreditAmount = @count int / @unbounded =unbounded .
# Value:
Mode = =bytes / @lines LineMode / <packet @size int> / <object @description any>.
LineMode = =lf / =crlf .

View File

@ -1,70 +0,0 @@
version 1 .
embeddedType EntityRef.Cap .
# ---------------------------------------------------------------------------
# Binding and connection
SturdyStepType = =ref .
# In a gatekeeper.Step or gatekeeper.PathStep, use Parameters as detail.
SturdyStepDetail = Parameters .
SturdyPathStepDetail = Parameters .
# In a gatekeeper.Description, use the following detail.
SturdyDescriptionDetail = {
oid: any,
key: bytes,
} .
# ---------------------------------------------------------------------------
# Macaroons
# The sequence of Caveats is run RIGHT-TO-LEFT.
# That is, the newest Caveats are at the right.
#
# Let f(k,d) = HMAC-BLAKE2s-256(k,d)[0..16),
# e = canonical machine-oriented serialization of some preserves value, and
# k = the original secret key for the ref.
#
# The `sig` is then f(f(f(f(k, e(oid)), ...), e(Caveat)), ...).
#
SturdyRef = <ref @parameters Parameters> .
Parameters = {
oid: any,
sig: bytes,
} & @caveats CaveatsField .
CaveatsField = @present { caveats: [Caveat ...] } / @invalid { caveats: any } / @absent {} .
# embodies 1st-party caveats over assertion structure, but nothing else
# can add 3rd-party caveats and richer predicates later
Caveat = Rewrite / Alts / Reject / @unknown any .
Rewrite = <rewrite @pattern Pattern @template Template> .
Reject = <reject @pattern Pattern> .
Alts = <or @alternatives [Rewrite ...]>.
Oid = int .
WireRef = @mine [0 @oid Oid] / @yours [1 @oid Oid @attenuation Caveat ...].
# ---------------------------------------------------------------------------
Lit = <lit @value any>.
Pattern = PDiscard / PAtom / PEmbedded / PBind / PAnd / PNot / Lit / PCompound .
PDiscard = <_>.
PAtom = =Boolean / =Double / =SignedInteger / =String / =ByteString / =Symbol .
PEmbedded = =Embedded .
PBind = <bind @pattern Pattern>.
PAnd = <and @patterns [Pattern ...]>.
PNot = <not @pattern Pattern>.
PCompound =
/ @rec <rec @label any @fields [Pattern ...]>
/ @arr <arr @items [Pattern ...]>
/ @dict <dict @entries { any: Pattern ...:... }> .
Template = TAttenuate / TRef / Lit / TCompound .
TAttenuate = <attenuate @template Template @attenuation [Caveat ...]>.
TRef = <ref @binding int>.
TCompound =
/ @rec <rec @label any @fields [Template ...]>
/ @arr <arr @items [Template ...]>
/ @dict <dict @entries { any: Template ...:... }> .

View File

@ -1,7 +0,0 @@
version 1 .
embeddedType EntityRef.Cap .
TcpRemote = <tcp-remote @host string @port int>.
TcpLocal = <tcp-local @host string @port int>.
TcpPeerInfo = <tcp-peer @handle #:any @local TcpLocal @remote TcpRemote>.

View File

@ -1,7 +0,0 @@
version 1 .
SetTimer = <set-timer @label any @seconds double @kind TimerKind>.
TimerExpired = <timer-expired @label any @seconds double>.
TimerKind = =relative / =absolute / =clear .
LaterThan = <later-than @seconds double>.

View File

@ -1,96 +0,0 @@
version 1 .
embeddedType EntityRef.Cap .
TraceEntry = <trace
@timestamp @"seconds since Unix epoch" double
@actor ActorId
@item ActorActivation> .
ActorActivation =
/ <start @actorName Name>
/ @turn TurnDescription
/ <stop @status ExitStatus>
.
Name =
/ <anonymous>
/ <named @name any>
.
ActorId = any .
FacetId = any .
Oid = any .
TaskId = any .
TurnId = any .
ExitStatus = =ok / protocol.Error .
# Trace information associated with a turn.
TurnDescription = <turn @id TurnId @cause TurnCause @actions [ActionDescription ...]> .
# The cause of a turn.
TurnCause =
/ @turn <caused-by @id TurnId>
/ <cleanup>
/ @linkedTaskRelease <linked-task-release @id TaskId @reason LinkedTaskReleaseReason>
/ @periodicActivation <periodic-activation @"`period` is in seconds" @period double>
/ <delay @causingTurn TurnId @"`amount` is in seconds" @amount double>
/ <external @description any>
.
LinkedTaskReleaseReason = =cancelled / =normal .
# An actual event carried within a turn.
TurnEvent =
/ <assert @assertion AssertionDescription @handle protocol.Handle>
/ <retract @handle protocol.Handle>
/ <message @body AssertionDescription>
/ <sync @peer Target>
/ # A souped-up, disguised, special-purpose `retract` event.
@breakLink <break-link @source ActorId @handle protocol.Handle>
.
TargetedTurnEvent = <event @target Target @detail TurnEvent> .
# An action taken during a turn.
ActionDescription =
/ # The active party is processing a new `event` for `target` from the received Turn.
<dequeue @event TargetedTurnEvent>
/ # The active party has queued a new `event` to be processed later by `target`.
<enqueue @event TargetedTurnEvent>
/ # The active party is processing an internally-queued event for one of its own entities.
@dequeueInternal <dequeue-internal @event TargetedTurnEvent>
/ # The active party has scheduled an internally-queued event for one of its own entities.
@enqueueInternal <enqueue-internal @event TargetedTurnEvent>
/ <spawn @link bool @id ActorId>
/ <link
@parentActor ActorId
@childToParent protocol.Handle
@childActor ActorId
@parentToChild protocol.Handle>
/ @facetStart <facet-start @path [FacetId ...]>
/ @facetStop <facet-stop @path [FacetId ...] @reason FacetStopReason>
/ @linkedTaskStart <linked-task-start @taskName Name @id TaskId>
.
# An assertion or the body of a message: either a Preserves value, or
# some opaque system-internal value, represented according to the
# system concerned.
AssertionDescription =
/ <value @value any>
/ <opaque @description any>
.
FacetStopReason =
/ @explicitAction =explicit-action
/ =inert
/ @parentStopping =parent-stopping
/ @actorStopping =actor-stopping
.
Target = <entity @actor ActorId @facet FacetId @oid Oid> .
# For the future: consider including information about `protocol`-level `Turn`s etc sent to
# peers over e.g. Websockets or TCP/IP, allowing cross-correlation of traces from different
# processes and implementations with each other to form a large overall picture.
.

View File

@ -1,6 +0,0 @@
version 1 .
Tcp = <tcp @host string @port int>.
Unix = <unix @path string>.
WebSocket = <ws @url string>.
Stdio = <stdio>.

View File

@ -1,4 +0,0 @@
version 1 .
embeddedType EntityRef.Cap .
Instance = <Instance @name string @argument any>.

View File

@ -1,12 +1,14 @@
import sys
import asyncio
import websockets
import logging
from preserves import Embedded, stringify
from preserves.fold import map_embeddeds
from . import actor, encode, transport, Decoder, gatekeeper, turn
from . import actor, encode, transport, Decoder, gatekeeper
from .during import During
from .actor import _inert_ref
from .actor import _inert_ref, Turn
from .idgen import IdGenerator
from .schema import protocol, sturdy, transportAddress
@ -16,6 +18,8 @@ class InboundAssertion:
self.local_handle = local_handle
self.pins = pins
_next_local_oid = IdGenerator()
class WireSymbol:
def __init__(self, oid, ref, membrane):
self.oid = oid
@ -64,6 +68,7 @@ def drop_all(wss):
# There are other kinds of relay. This one has exactly two participants connected to each other.
class TunnelRelay:
def __init__(self,
turn,
address,
gatekeeper_peer = None,
gatekeeper_oid = 0,
@ -71,19 +76,17 @@ class TunnelRelay:
publish_oid = 0,
on_connected = None,
on_disconnected = None,
connection_timeout = None,
):
self.facet = turn.active_facet()
self.facet = turn._facet
self.facet.on_stop(self._shutdown)
self.address = address
self.gatekeeper_peer = gatekeeper_peer
self.gatekeeper_oid = gatekeeper_oid
self.publish_service = publish_service
self.publish_oid = publish_oid
self.connection_timeout = connection_timeout
self._reset()
self.facet.linked_task(
lambda facet: self._reconnecting_main(facet.actor._system,
lambda facet: self._reconnecting_main(asyncio.get_running_loop(),
on_connected = on_connected,
on_disconnected = on_disconnected))
@ -95,10 +98,7 @@ class TunnelRelay:
self.pending_turn = []
self._connected = False
self.gatekeeper_handle = None
if self.publish_service is None:
self.next_local_oid = IdGenerator(initial_value=0)
else:
self.next_local_oid = IdGenerator(initial_value=(self.publish_oid + 1))
if self.publish_service is not None:
# Very specific specialization of logic in rewrite_ref_out
ws = WireSymbol(self.publish_oid, self.publish_service, self.exported_references)
self.exported_references.get_ref([], self.publish_service, False, lambda: ws)
@ -107,7 +107,7 @@ class TunnelRelay:
def connected(self):
return self._connected
def _shutdown(self):
def _shutdown(self, turn):
self._disconnect()
def deregister(self, handle):
@ -138,17 +138,17 @@ class TunnelRelay:
return sturdy.WireRef.yours(sturdy.Oid(r.entity.oid), ())
else:
ws = self.exported_references.get_ref(
pins, r, is_transient, lambda: WireSymbol(next(self.next_local_oid), r,
pins, r, is_transient, lambda: WireSymbol(next(_next_local_oid), r,
self.exported_references))
return sturdy.WireRef.mine(sturdy.Oid(ws.oid))
def rewrite_in(self, assertion, pins):
def rewrite_in(self, turn, assertion, pins):
rewritten = map_embeddeds(
lambda wire_ref: Embedded(self.rewrite_ref_in(wire_ref, pins)),
lambda wire_ref: Embedded(self.rewrite_ref_in(turn, wire_ref, pins)),
assertion)
return rewritten
def rewrite_ref_in(self, wire_ref, pins):
def rewrite_ref_in(self, turn, wire_ref, pins):
if wire_ref.VARIANT.name == 'mine':
oid = wire_ref.oid.value
ws = self.imported_references.get_oid(
@ -165,60 +165,59 @@ class TunnelRelay:
def _on_disconnected(self):
self._connected = False
def retract_inbound():
def retract_inbound(turn):
for ia in self.inbound_assertions.values():
turn.retract(ia.local_handle)
if self.gatekeeper_handle is not None:
turn.retract(self.gatekeeper_handle)
self._reset()
turn.run(self.facet, retract_inbound)
Turn.run(self.facet, retract_inbound)
self._disconnect()
def _on_connected(self):
self._connected = True
if self.gatekeeper_peer is not None:
def connected_action():
gk = self.rewrite_ref_in(sturdy.WireRef.mine(sturdy.Oid(self.gatekeeper_oid)), [])
def connected_action(turn):
gk = self.rewrite_ref_in(turn,
sturdy.WireRef.mine(sturdy.Oid(self.gatekeeper_oid)),
[])
self.gatekeeper_handle = turn.publish(self.gatekeeper_peer, Embedded(gk))
turn.run(self.facet, connected_action)
Turn.run(self.facet, connected_action)
def _on_event(self, v):
turn.run(self.facet, lambda: self._handle_event(v))
Turn.run(self.facet, lambda turn: self._handle_event(turn, v))
def _handle_event(self, v):
def _handle_event(self, turn, v):
packet = protocol.Packet.decode(v)
# self.facet.log.info('IN: %r', packet)
variant = packet.VARIANT.name
if variant == 'Turn': self._handle_turn_events(packet.value.value)
elif variant == 'Error': self._on_error(packet.value.message, packet.value.detail)
elif variant == 'Extension': pass
elif variant == 'Nop': pass
if variant == 'Turn': self._handle_turn_events(turn, packet.value.value)
elif variant == 'Error': self._on_error(turn, packet.value.message, packet.value.detail)
def _on_error(self, message, detail):
def _on_error(self, turn, message, detail):
self.facet.log.error('Error from server: %r (detail: %r)', message, detail)
self._disconnect()
def _handle_turn_events(self, events):
def _handle_turn_events(self, turn, events):
for e in events:
pins = []
ref = self._lookup_exported_oid(e.oid.value, pins)
event = e.event
variant = event.VARIANT.name
if variant == 'Assert':
self._handle_publish(pins, ref, event.value.assertion.value, event.value.handle.value)
self._handle_publish(pins, turn, ref, event.value.assertion.value, event.value.handle.value)
elif variant == 'Retract':
self._handle_retract(pins, ref, event.value.handle.value)
self._handle_retract(pins, turn, ref, event.value.handle.value)
elif variant == 'Message':
self._handle_message(pins, ref, event.value.body.value)
self._handle_message(pins, turn, ref, event.value.body.value)
elif variant == 'Sync':
self._handle_sync(pins, ref, event.value.peer)
self._handle_sync(pins, turn, ref, event.value.peer)
def _handle_publish(self, pins, ref, assertion, remote_handle):
assertion = self.rewrite_in(assertion, pins)
def _handle_publish(self, pins, turn, ref, assertion, remote_handle):
assertion = self.rewrite_in(turn, assertion, pins)
self.inbound_assertions[remote_handle] = \
InboundAssertion(remote_handle, turn.publish(ref, assertion), pins)
def _handle_retract(self, pins, ref, remote_handle):
def _handle_retract(self, pins, turn, ref, remote_handle):
ia = self.inbound_assertions.pop(remote_handle, None)
if ia is None:
raise ValueError('Peer retracted invalid handle %s' % (remote_handle,))
@ -226,29 +225,28 @@ class TunnelRelay:
drop_all(pins)
turn.retract(ia.local_handle)
def _handle_message(self, pins, ref, message):
message = self.rewrite_in(message, pins)
def _handle_message(self, pins, turn, ref, message):
message = self.rewrite_in(turn, message, pins)
for ws in pins:
if ws.count == 1:
raise ValueError('Cannot receive transient reference')
turn.send(ref, message)
drop_all(pins)
def _handle_sync(self, pins, ref, wire_peer):
peer = self.rewrite_ref_in(wire_peer, pins)
def done():
def _handle_sync(self, pins, turn, ref, wire_peer):
peer = self.rewrite_ref_in(turn, wire_peer, pins)
def done(turn):
turn.send(peer, True)
drop_all(pins)
turn.sync(ref, done)
def _send(self, remote_oid, turn_event):
if len(self.pending_turn) == 0:
def flush_pending():
def flush_pending(turn):
packet = protocol.Packet.Turn(protocol.Turn(self.pending_turn))
self.pending_turn = []
# self.facet.log.info('OUT: %r', packet)
self._send_bytes(encode(packet))
self.facet.actor._system.queue_task(lambda: turn.run(self.facet, flush_pending))
actor.queue_task(lambda: Turn.run(self.facet, flush_pending))
self.pending_turn.append(protocol.TurnEvent(protocol.Oid(remote_oid), turn_event))
def _send_bytes(self, bs):
@ -257,26 +255,24 @@ class TunnelRelay:
def _disconnect(self):
raise Exception('subclassresponsibility')
async def _reconnecting_main(self, system, on_connected=None, on_disconnected=None):
async def _reconnecting_main(self, loop, on_connected=None, on_disconnected=None):
should_run = True
while should_run and self.facet.alive:
did_connect = await self.main(system, on_connected=(on_connected or _default_on_connected))
did_connect = await self.main(loop, on_connected=(on_connected or _default_on_connected))
should_run = await (on_disconnected or _default_on_disconnected)(self, did_connect)
@staticmethod
def from_str(conn_str, **kwargs):
return transport.connection_from_str(conn_str, **kwargs)
def from_str(turn, conn_str, **kwargs):
return transport.connection_from_str(turn, conn_str, **kwargs)
# decorator
def connect(conn_str, cap = None, **kwargs):
def connect(turn, conn_str, cap, **kwargs):
def prepare_resolution_handler(handler):
@During().add_handler
def handle_gatekeeper(gk):
if cap is None:
handler(gk.embeddedValue)
else:
gatekeeper.resolve(gk.embeddedValue, cap)(handler)
def handle_gatekeeper(turn, gk):
gatekeeper.resolve(turn, gk.embeddedValue, cap)(handler)
return transport.connection_from_str(
turn,
conn_str,
gatekeeper_peer = turn.ref(handle_gatekeeper),
**kwargs)
@ -293,20 +289,20 @@ class RelayEntity(actor.Entity):
def _send(self, e):
self.relay._send(self.oid, e)
def on_publish(self, assertion, handle):
def on_publish(self, turn, assertion, handle):
self._send(protocol.Event.Assert(protocol.Assert(
protocol.Assertion(self.relay.register(self.oid, assertion, handle)),
protocol.Handle(handle))))
def on_retract(self, handle):
def on_retract(self, turn, handle):
self.relay.deregister(handle)
self._send(protocol.Event.Retract(protocol.Retract(protocol.Handle(handle))))
def on_message(self, message):
def on_message(self, turn, message):
self._send(protocol.Event.Message(protocol.Message(
protocol.Assertion(self.relay.register(self.oid, message, None)))))
def on_sync(self, peer):
def on_sync(self, turn, peer):
pins = []
self.relay.register_imported_oid(self.oid, pins)
entity = SyncPeerEntity(self.relay, peer, pins)
@ -319,7 +315,7 @@ class SyncPeerEntity(actor.Entity):
self.peer = peer
self.pins = pins
def on_message(self, body):
def on_message(self, turn, body):
drop_all(self.pins)
turn.send(self.peer, body)
@ -335,8 +331,8 @@ async def _default_on_disconnected(relay, did_connect):
return True
class _StreamTunnelRelay(TunnelRelay, asyncio.Protocol):
def __init__(self, address, **kwargs):
super().__init__(address, **kwargs)
def __init__(self, turn, address, **kwargs):
super().__init__(turn, address, **kwargs)
self.decoder = None
self.stop_signal = None
self.transport = None
@ -344,6 +340,10 @@ class _StreamTunnelRelay(TunnelRelay, asyncio.Protocol):
def connection_lost(self, exc):
self._on_disconnected()
def connection_made(self, transport):
self.transport = transport
self._on_connected()
def data_received(self, chunk):
self.decoder.extend(chunk)
while True:
@ -364,88 +364,74 @@ class _StreamTunnelRelay(TunnelRelay, asyncio.Protocol):
pass
self.stop_signal.get_loop().call_soon_threadsafe(set_stop_signal)
async def _create_connection(self, system):
async def _create_connection(self, loop):
raise Exception('subclassresponsibility')
async def main(self, system, on_connected=None):
async def main(self, loop, on_connected=None):
if self.transport is not None:
raise Exception('Cannot run connection twice!')
self.decoder = Decoder(decode_embedded = sturdy.WireRef.decode)
self.stop_signal = system.loop.create_future()
self.stop_signal = loop.create_future()
try:
try:
transport, _protocol = await asyncio.wait_for(
self._create_connection(system), timeout=self.connection_timeout)
except asyncio.TimeoutError:
self.facet.log.error(
'%s: Timeout connecting to server' % (self.__class__.__qualname__,))
return False
except OSError as e:
self.facet.log.error(
'%s: Could not connect to server: %s' % (self.__class__.__qualname__, e))
return False
_transport, _protocol = await self._create_connection(loop)
except OSError as e:
log.error('%s: Could not connect to server: %s' % (self.__class__.__qualname__, e))
return False
self.transport = transport
self._on_connected()
try:
if on_connected: await on_connected(self)
await self.stop_signal
return True
finally:
if self.transport:
self.transport.close()
self.transport.close()
self.transport = None
self.stop_signal = None
self.decoder = None
@transport.address(transportAddress.Tcp)
class TcpTunnelRelay(_StreamTunnelRelay):
async def _create_connection(self, system):
return await system.loop.create_connection(lambda: self, self.address.host, self.address.port)
async def _create_connection(self, loop):
return await loop.create_connection(lambda: self, self.address.host, self.address.port)
@transport.address(transportAddress.Unix)
class UnixSocketTunnelRelay(_StreamTunnelRelay):
async def _create_connection(self, system):
return await system.loop.create_unix_connection(lambda: self, self.address.path)
async def _create_connection(self, loop):
return await loop.create_unix_connection(lambda: self, self.address.path)
@transport.address(transportAddress.WebSocket)
class WebsocketTunnelRelay(TunnelRelay):
def __init__(self, address, **kwargs):
super().__init__(address, **kwargs)
self.system = None
def __init__(self, turn, address, **kwargs):
super().__init__(turn, address, **kwargs)
self.loop = None
self.ws = None
def _send_bytes(self, bs):
if self.system:
if self.loop:
def _do_send():
if self.ws:
self.system.queue_task(lambda: self.ws.send(bs))
self.system.loop.call_soon_threadsafe(_do_send)
self.loop.create_task(self.ws.send(bs))
self.loop.call_soon_threadsafe(_do_send)
def _disconnect(self):
if self.system:
if self.loop:
def _do_disconnect():
if self.ws:
self.system.queue_task(lambda: self.ws.close())
self.system.loop.call_soon_threadsafe(_do_disconnect)
self.loop.create_task(self.ws.close())
self.loop.call_soon_threadsafe(_do_disconnect)
def __connection_error(self, e):
self.facet.log.error('Could not connect to server: %s' % (e,))
return False
async def main(self, system, on_connected=None):
import websockets
async def main(self, loop, on_connected=None):
if self.ws is not None:
raise Exception('Cannot run connection twice!')
self.system = system
self.loop = loop
try:
self.ws = await websockets.connect(
self.address.url, open_timeout=self.connection_timeout)
except asyncio.TimeoutError:
return self.__connection_error('timeout')
self.ws = await websockets.connect(self.address.url)
except OSError as e:
return self.__connection_error(e)
except websockets.exceptions.InvalidHandshake as e:
@ -464,28 +450,29 @@ class WebsocketTunnelRelay(TunnelRelay):
if self.ws:
await self.ws.close()
self.system = None
self.loop = None
self.ws = None
return True
@transport.address(transportAddress.Stdio)
class PipeTunnelRelay(_StreamTunnelRelay):
def __init__(self, address, input_fileobj = sys.stdin, output_fileobj = sys.stdout, **kwargs):
super().__init__(address, **kwargs)
def __init__(self, turn, address, input_fileobj = sys.stdin, output_fileobj = sys.stdout, **kwargs):
super().__init__(turn, address, **kwargs)
self.input_fileobj = input_fileobj
self.output_fileobj = output_fileobj
self.reader = asyncio.StreamReader()
async def _create_connection(self, system):
return await system.loop.connect_read_pipe(lambda: self, self.input_fileobj)
async def _create_connection(self, loop):
return await loop.connect_read_pipe(lambda: self, self.input_fileobj)
def _send_bytes(self, bs):
self.output_fileobj.buffer.write(bs)
self.output_fileobj.buffer.flush()
def run_stdio_service(entity):
PipeTunnelRelay(transportAddress.Stdio(), publish_service=turn.ref(entity))
def run_stdio_service(turn, entity):
PipeTunnelRelay(turn, transportAddress.Stdio(), publish_service=turn.ref(entity))
# decorator
def service(**kwargs):
return lambda entity: actor.run_system(**kwargs)(lambda: run_stdio_service(entity))
return lambda entity: \
actor.start_actor_system(lambda turn: run_stdio_service(turn, entity), **kwargs)

View File

@ -2,6 +2,6 @@ def __load():
from preserves.schema import load_schema_file
import pathlib
for (n, ns) in load_schema_file(pathlib.Path(__file__).parent /
'protocols/schema-bundle.bin')._items().items():
'../../syndicate-protocols/schema-bundle.bin')._items().items():
globals()[n] = ns
__load()

View File

@ -11,10 +11,10 @@ def address(address_class):
return connection_factory_class
return k
def connection_from_str(s, **kwargs):
def connection_from_str(turn, s, **kwargs):
address = parse(s)
for (address_class, factory_class) in constructors.items():
decoded_address = address_class.try_decode(address)
if decoded_address is not None:
return factory_class(decoded_address, **kwargs)
return factory_class(turn, decoded_address, **kwargs)
raise InvalidTransportAddress('Invalid transport address', address)