92 lines
3.2 KiB
Nim
92 lines
3.2 KiB
Nim
# SPDX-FileCopyrightText: ☭ 2022 Emery Hemingway
|
|
# SPDX-License-Identifier: Unlicense
|
|
|
|
## Module for peering with remote dataspaces over network.
|
|
|
|
import std/[asyncdispatch, net, options, tables]
|
|
import preserves
|
|
import ./actors, ./durings, ./relays, ./protocols/protocol
|
|
|
|
import taps
|
|
|
|
export `$`
|
|
|
|
type
|
|
Turn = actors.Turn
|
|
Assertion = Preserve[Ref]
|
|
Value = Preserve[void]
|
|
|
|
proc connectTcp(remote: RemoteSpecifier): Connection =
|
|
var transport = newTransportProperties()
|
|
transport.require("reliability")
|
|
transport.require("preserve-order")
|
|
var preConn = newPreConnection(
|
|
transport = some transport,
|
|
remote = @[remote])
|
|
preconn.initiate()
|
|
|
|
proc connectNet*(turn: var Turn; remote: RemoteSpecifier; cap: SturdyRef; bootProc: ConnectProc) =
|
|
let
|
|
facet = turn.facet
|
|
reenable = facet.preventInertCheck()
|
|
connectionClosedRef = newRef(turn, ShutdownEntity())
|
|
conn = connectTcp(remote)
|
|
conn.onReady do ():
|
|
discard bootActor("net") do (turn: var Turn):
|
|
var shutdownRef: Ref
|
|
proc tapsWriter(pkt: sink Packet): Future[void] =
|
|
let fut = newFuture[void]("tapsWriter")
|
|
send(conn, encode(pkt))
|
|
onSent(conn) do (ctx: MessageContext):
|
|
complete(fut)
|
|
onSendError(conn) do (ctx: MessageContext; reason: ref Exception):
|
|
fail(fut, reason)
|
|
fut
|
|
var ops = RelayActorOptions(
|
|
packetWriter: tapsWriter,
|
|
initialOid: 0.Oid.some)
|
|
let relayFut = spawnRelay("net", turn, ops) do (turn: var Turn; relay: Relay):
|
|
let facet = turn.facet
|
|
facet.actor.atExit do (turn: var Turn):
|
|
close(conn)
|
|
conn.onConnectionError do (reason: ref Exception):
|
|
terminate(facet, reason)
|
|
conn.onReceiveError do (ctx: MessageContext; reason: ref Exception):
|
|
terminate(facet, reason)
|
|
conn.onClosed do ():
|
|
run(facet) do (turn: var Turn):
|
|
stopActor(turn)
|
|
var wireBuf = newBufferedDecoder()
|
|
conn.onReceived do (buf: seq[byte]; ctx: MessageContext):
|
|
feed(wireBuf, buf)
|
|
var (success, pr) = decode(wireBuf)
|
|
if success: dispatch(relay, pr)
|
|
receive(conn)
|
|
receive(conn)
|
|
discard publish(turn, connectionClosedRef, true)
|
|
shutdownRef = newRef(turn, ShutdownEntity())
|
|
relayFut.addCallback do (refFut: Future[Ref]):
|
|
let gatekeeper = read refFut
|
|
run(gatekeeper.relay) do (turn: var Turn):
|
|
reenable()
|
|
discard publish(turn, shutdownRef, true)
|
|
proc duringCallback(turn: var Turn; a: Assertion; h: Handle): TurnAction =
|
|
let facet = inFacet(turn) do (turn: var Turn):
|
|
bootProc(turn, unembed a)
|
|
proc action(turn: var Turn) =
|
|
stop(turn, facet)
|
|
result = action
|
|
var res = Resolve(
|
|
sturdyref: cap,
|
|
observer: newRef(turn, during(duringCallback)))
|
|
discard publish(turn, gatekeeper, res)
|
|
|
|
proc connectNet*(turn: var Turn; host: string; port: Port; cap: SturdyRef; bootProc: ConnectProc) =
|
|
var remote = newRemoteEndpoint()
|
|
remote.with(port)
|
|
if isIpAddress host:
|
|
remote.with(parseIpAddress(host))
|
|
else:
|
|
remote.withHostname(host)
|
|
connectNet(turn, remote, cap, bootProc)
|