Add connectNet

Use Taps to connect to Syndicate peers over TCP. Taps is now a
depedency.
This commit is contained in:
Emery Hemingway 2022-12-13 22:57:17 -06:00
parent bb4ba36ff7
commit 47da042671
4 changed files with 217 additions and 107 deletions

7
shell.nix Normal file
View File

@ -0,0 +1,7 @@
{ pkgs ? (builtins.getFlake "github:nixos/nixpkgs/release-22.11").legacyPackages.x86_64-linux }:
with pkgs;
mkShell {
packages = [ pkg-config getdns ];
inputsFrom = [ nim-unwrapped ];
}

View File

@ -29,11 +29,16 @@ import preserves
import ./syndicate/[actors, dataspaces, durings, patterns]
import ./syndicate/protocols/dataspace
from ./syndicate/relays import connectStdio, connectUnix
when defined(posix):
from ./syndicate/relays import connectStdio, connectUnix, SturdyRef
export connectStdio, connectUnix
else:
from ./syndicate/relays import SturdyRef
export SturdyRef
export Actor, Assertion, Facet, Handle, Ref, Symbol, Turn, TurnAction,
`$`, `?`, addCallback, analyse, asyncCheck, bootDataspace, connectStdio,
connectUnix, drop, facet, future, grab, message, newDataspace, publish,
`$`, `?`, addCallback, analyse, asyncCheck, bootDataspace,
drop, facet, future, grab, message, newDataspace, publish,
retract, replace, run, stop, unembed
type

91
src/syndicate/peers.nim Normal file
View File

@ -0,0 +1,91 @@
# SPDX-FileCopyrightText: ☭ 2022 Emery Hemingway
# SPDX-License-Identifier: Unlicense
## Module for peering with remote dataspaces over network.
import std/[asyncdispatch, net, options, streams, tables]
import preserves
import ./actors, ./durings, ./relays, ./protocols/protocol
import taps
export `$`
type
Turn = actors.Turn
Assertion = Preserve[Ref]
Value = Preserve[void]
proc connectTcp(remote: RemoteSpecifier): Connection =
var transport = newTransportProperties()
transport.require("reliability")
transport.require("preserve-order")
var preConn = newPreConnection(
transport = some transport,
remote = some remote)
preconn.initiate()
proc connectNet*(turn: var Turn; remote: RemoteSpecifier; cap: SturdyRef; bootProc: ConnectProc) =
let
facet = turn.facet
reenable = facet.preventInertCheck()
connectionClosedRef = newRef(turn, ShutdownEntity())
conn = connectTcp(remote)
conn.onReady do:
discard bootActor("net") do (turn: var Turn):
var shutdownRef: Ref
proc tapsWriter(pkt: sink Packet): Future[void] =
let fut = newFuture[void]("tapsWriter")
send(conn, encode(pkt))
onSent(conn) do (ctx: MessageContext):
complete(fut)
onSendError(conn) do (ctx: MessageContext; reason: ref Exception):
fail(fut, reason)
fut
var ops = RelayActorOptions(
packetWriter: tapsWriter,
initialOid: 0.Oid.some)
let relayFut = spawnRelay("net", turn, ops) do (turn: var Turn; relay: Relay):
let facet = turn.facet
facet.actor.atExit do (turn: var Turn):
close(conn)
conn.onConnectionError do (reason: ref Exception):
terminate(facet, reason)
conn.onReceiveError do (ctx: MessageContext; reason: ref Exception):
terminate(facet, reason)
conn.onClosed do:
run(facet) do (turn: var Turn):
stopActor(turn)
var wireBuf = newBufferedDecoder()
conn.onReceived do (buf: seq[byte]; ctx: MessageContext):
feed(wireBuf, buf)
var (success, pr) = decode(wireBuf)
if success: dispatch(relay, pr)
receive(conn)
receive(conn)
discard publish(turn, connectionClosedRef, true)
shutdownRef = newRef(turn, ShutdownEntity())
relayFut.addCallback do (refFut: Future[Ref]):
let gatekeeper = read refFut
run(gatekeeper.relay) do (turn: var Turn):
reenable()
discard publish(turn, shutdownRef, true)
proc duringCallback(turn: var Turn; a: Assertion; h: Handle): TurnAction =
let facet = facet(turn) do (turn: var Turn):
bootProc(turn, unembed a)
proc action(turn: var Turn) =
stop(turn, facet)
result = action
var res = Resolve(
sturdyref: cap,
observer: newRef(turn, during(duringCallback)))
discard publish(turn, gatekeeper, res)
proc connectNet*(turn: var Turn; host: string; port: Port; cap: SturdyRef; bootProc: ConnectProc) =
var remote = newRemoteEndpoint()
remote.with(port)
if isIpAddress host:
remote.with(parseIpAddress(host))
else:
remote.withHostname(host)
connectNet(turn, remote, cap, bootProc)

View File

@ -6,10 +6,15 @@ import preserves
import ./actors, ./durings, ./membranes, ./protocols/[protocol, sturdy]
when defined(traceSyndicate):
template trace(args: varargs[untyped]): untyped = stderr.writeLine(args)
when defined(posix):
template trace(args: varargs[untyped]): untyped = stderr.writeLine(args)
else:
template trace(args: varargs[untyped]): untyped = echo(args)
else:
template trace(args: varargs[untyped]): untyped = discard
export `$`
type Oid = sturdy.Oid
type
@ -23,7 +28,7 @@ type
PacketWriter = proc (pkt: sink Packet): Future[void] {.gcsafe.}
RelaySetup = proc (turn: var Turn; relay: Relay) {.gcsafe.}
Relay = ref object of RootObj
Relay* = ref object of RootObj
facet: Facet
inboundAssertions: Table[Handle,
tuple[localHandle: Handle, imported: seq[WireSymbol]]]
@ -187,7 +192,7 @@ proc rewriteIn(relay; facet; v: Value):
proc close(r: Relay) = discard
proc dispatch(relay: Relay; turn: var Turn; `ref`: Ref; event: Event) =
proc dispatch*(relay: Relay; turn: var Turn; `ref`: Ref; event: Event) =
case event.orKind
of EventKind.Assert:
let (a, imported) = rewriteIn(relay, turn.facet, event.assert.assertion)
@ -215,7 +220,7 @@ proc dispatch(relay: Relay; turn: var Turn; `ref`: Ref; event: Event) =
for e in imported: relay.imported.del e
]#
proc dispatch(relay: Relay; v: Value) =
proc dispatch*(relay: Relay; v: Value) =
trace "S: ", v
run(relay.facet) do (t: var Turn):
var pkt: Packet
@ -235,13 +240,13 @@ proc dispatch(relay: Relay; v: Value) =
stderr.writeLine("discarding undecoded packet ", v)
type
RelayOptions = object of RootObj
packetWriter: PacketWriter
untrusted: bool
RelayActorOptions = object of RelayOptions
initialOid: Option[Oid]
initialRef: Ref
nextLocalOid: Option[Oid]
RelayOptions* = object of RootObj
packetWriter*: PacketWriter
untrusted*: bool
RelayActorOptions* = object of RelayOptions
initialOid*: Option[Oid]
initialRef*: Ref
nextLocalOid*: Option[Oid]
proc newRelay(turn: var Turn; opts: RelayOptions; setup: RelaySetup): Relay =
result = Relay(
@ -251,7 +256,7 @@ proc newRelay(turn: var Turn; opts: RelayOptions; setup: RelaySetup): Relay =
discard result.facet.preventInertCheck()
setup(turn, result)
proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup): Future[Ref] =
proc spawnRelay*(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup): Future[Ref] =
var fut = newFuture[Ref]"spawnRelay"
spawn(name, turn) do (turn: var Turn):
let relay = newRelay(turn, opts, setup)
@ -272,110 +277,112 @@ proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions; setup: Re
else: oid
fut
import std/asyncnet
from std/nativesockets import AF_UNIX, SOCK_STREAM, Protocol
when defined(posix):
import std/asyncnet
from std/nativesockets import AF_UNIX, SOCK_STREAM, Protocol
import protocols/gatekeeper
type ShutdownEntity = ref object of Entity
type ShutdownEntity* = ref object of Entity
method retract(e: ShutdownEntity; turn: var Turn; h: Handle) =
stopActor(turn)
type
SturdyRef = sturdy.SturdyRef[Ref]
Resolve = gatekeeper.Resolve[Ref]
SturdyRef* = sturdy.SturdyRef[Ref]
Resolve* = gatekeeper.Resolve[Ref]
ConnectProc* = proc (turn: var Turn; ds: Ref) {.gcsafe.}
proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: ConnectProc) =
var socket = newAsyncSocket(
domain = AF_UNIX,
sockType = SOCK_STREAM,
protocol = cast[Protocol](0),
buffered = false)
proc socketWriter(packet: sink Packet): Future[void] =
socket.send(cast[string](encode(packet)))
const recvSize = 0x2000
var shutdownRef: Ref
let reenable = turn.facet.preventInertCheck()
let connectionClosedRef = newRef(turn, ShutdownEntity())
var fut = newFuture[void]"connectUnix"
connectUnix(socket, path).addCallback do (f: Future[void]):
read f
discard bootActor("unix") do (turn: var Turn):
var ops = RelayActorOptions(
packetWriter: socketWriter,
initialOid: 0.Oid.some)
let relayFut = spawnRelay("unix", turn, ops) do (turn: var Turn; relay: Relay):
let facet = turn.facet
var wireBuf = newBufferedDecoder()
proc recvCb(pktFut: Future[string]) {.gcsafe.} =
if pktFut.failed:
run(facet) do (turn: var Turn): stopActor(turn)
else:
var buf = pktFut.read
if buf.len == 0:
when defined(posix):
proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: ConnectProc) =
var socket = newAsyncSocket(
domain = AF_UNIX,
sockType = SOCK_STREAM,
protocol = cast[Protocol](0),
buffered = false)
proc socketWriter(packet: sink Packet): Future[void] =
socket.send(cast[string](encode(packet)))
const recvSize = 0x2000
var shutdownRef: Ref
let reenable = turn.facet.preventInertCheck()
let connectionClosedRef = newRef(turn, ShutdownEntity())
var fut = newFuture[void]"connectUnix"
connectUnix(socket, path).addCallback do (f: Future[void]):
read f
discard bootActor("unix") do (turn: var Turn):
var ops = RelayActorOptions(
packetWriter: socketWriter,
initialOid: 0.Oid.some)
let relayFut = spawnRelay("unix", turn, ops) do (turn: var Turn; relay: Relay):
let facet = turn.facet
var wireBuf = newBufferedDecoder()
proc recvCb(pktFut: Future[string]) {.gcsafe.} =
if pktFut.failed:
run(facet) do (turn: var Turn): stopActor(turn)
else:
feed(wireBuf, buf)
var (success, pr) = decode(wireBuf)
if success:
dispatch(relay, pr)
callSoon:
socket.recv(recvSize).addCallback(recvCb)
socket.recv(recvSize).addCallback(recvCb)
turn.facet.actor.atExit do (turn: var Turn): close(socket)
discard publish(turn, connectionClosedRef, true)
shutdownRef = newRef(turn, ShutdownEntity())
relayFut.addCallback do (refFut: Future[Ref]):
let gatekeeper = read refFut
run(gatekeeper.relay) do (turn: var Turn):
reenable()
discard publish(turn, shutdownRef, true)
proc duringCallback(turn: var Turn; a: Assertion; h: Handle): TurnAction =
let facet = facet(turn) do (turn: var Turn):
bootProc(turn, unembed a)
proc action(turn: var Turn) =
stop(turn, facet)
result = action
var res = Resolve(
sturdyref: cap,
observer: newRef(turn, during(duringCallback)))
discard publish(turn, gatekeeper, res)
fut.complete()
asyncCheck(turn, fut)
var buf = pktFut.read
if buf.len == 0:
run(facet) do (turn: var Turn): stopActor(turn)
else:
feed(wireBuf, buf)
var (success, pr) = decode(wireBuf)
if success:
dispatch(relay, pr)
callSoon:
socket.recv(recvSize).addCallback(recvCb)
socket.recv(recvSize).addCallback(recvCb)
turn.facet.actor.atExit do (turn: var Turn): close(socket)
discard publish(turn, connectionClosedRef, true)
shutdownRef = newRef(turn, ShutdownEntity())
relayFut.addCallback do (refFut: Future[Ref]):
let gatekeeper = read refFut
run(gatekeeper.relay) do (turn: var Turn):
reenable()
discard publish(turn, shutdownRef, true)
proc duringCallback(turn: var Turn; a: Assertion; h: Handle): TurnAction =
let facet = facet(turn) do (turn: var Turn):
bootProc(turn, unembed a)
proc action(turn: var Turn) =
stop(turn, facet)
result = action
var res = Resolve(
sturdyref: cap,
observer: newRef(turn, during(duringCallback)))
discard publish(turn, gatekeeper, res)
fut.complete()
asyncCheck(turn, fut)
import std/asyncfile
import std/asyncfile
const stdinReadSize = 128
const stdinReadSize = 128
proc connectStdio*(ds: Ref; turn: var Turn) =
## Connect to an external dataspace over stdin and stdout.
proc stdoutWriter(packet: sink Packet): Future[void] {.async.} =
var buf = encode(packet)
doAssert writeBytes(stdout, buf, 0, buf.len) == buf.len
flushFile(stdout)
var opts = RelayActorOptions(
packetWriter: stdoutWriter,
initialRef: ds,
initialOid: 0.Oid.some)
asyncCheck spawnRelay("stdio", turn, opts) do (turn: var Turn; relay: Relay):
let
facet = turn.facet
asyncStdin = openAsync("/dev/stdin")
facet.actor.atExit do (turn: var Turn):
close(asyncStdin)
var wireBuf = newBufferedDecoder()
proc recvCb(pktFut: Future[string]) {.gcsafe.} =
if not pktFut.failed:
var buf = pktFut.read
if buf.len == 0:
run(facet) do (turn: var Turn): stopActor(turn)
else:
feed(wireBuf, buf)
var (success, pr) = decode(wireBuf)
if success:
dispatch(relay, pr)
callSoon: asyncStdin.read(stdinReadSize).addCallback(recvCb)
# do not process the next line immediately
asyncStdin.read(stdinReadSize).addCallback(recvCb)
proc connectStdio*(ds: Ref; turn: var Turn) =
## Connect to an external dataspace over stdin and stdout.
proc stdoutWriter(packet: sink Packet): Future[void] {.async.} =
var buf = encode(packet)
doAssert writeBytes(stdout, buf, 0, buf.len) == buf.len
flushFile(stdout)
var opts = RelayActorOptions(
packetWriter: stdoutWriter,
initialRef: ds,
initialOid: 0.Oid.some)
asyncCheck spawnRelay("stdio", turn, opts) do (turn: var Turn; relay: Relay):
let
facet = turn.facet
asyncStdin = openAsync("/dev/stdin")
facet.actor.atExit do (turn: var Turn):
close(asyncStdin)
var wireBuf = newBufferedDecoder()
proc recvCb(pktFut: Future[string]) {.gcsafe.} =
if not pktFut.failed:
var buf = pktFut.read
if buf.len == 0:
run(facet) do (turn: var Turn): stopActor(turn)
else:
feed(wireBuf, buf)
var (success, pr) = decode(wireBuf)
if success:
dispatch(relay, pr)
callSoon: asyncStdin.read(stdinReadSize).addCallback(recvCb)
# do not process the next line immediately
asyncStdin.read(stdinReadSize).addCallback(recvCb)