Compare commits

...

1 Commits

Author SHA1 Message Date
Emery Hemingway 4451114781 Lost and found 2022-10-02 18:07:09 -05:00
6 changed files with 103 additions and 26 deletions

View File

@ -23,8 +23,9 @@ FriendRequest = <request @key bytes @msg string> .
; Asserted to the core to accept a friend request. ; Asserted to the core to accept a friend request.
FriendAccept = <accept @key bytes> . FriendAccept = <accept @key bytes> .
; Messages sent by friend entities. ; Messages sent and received by friend entities.
FriendMessage = <msg @body string @kind int> . MessageRecv = <msgrecv @body string @kind int> .
MessageSend = <msgsend @body string @kind int> .
; Asserted by a friend while a transfer is active. ; Asserted by a friend while a transfer is active.
TransferDataspace = <transfer @kind int @size int @filename string @entity #!any> . TransferDataspace = <transfer @kind int @size int @filename string @entity #!any> .
@ -32,4 +33,6 @@ TransferDataspace = <transfer @kind int @size int @filename string @entity #!any
; Asserted to the transfer entity to indicate the transfer should be saved to a file ; Asserted to the transfer entity to indicate the transfer should be saved to a file
TransferSink = <sink @path string> . TransferSink = <sink @path string> .
Completion = <complete> .
BootstrapNode = <bootstrap @publicKey string @host string @port int> . BootstrapNode = <bootstrap @publicKey string @host string @port int> .

View File

@ -3,6 +3,10 @@ import
std/typetraits, preserves std/typetraits, preserves
type type
MessageSend* {.preservesRecord: "msgsend".} = object
`body`*: string
`kind`*: BiggestInt
Name* {.preservesRecord: "name".} = object Name* {.preservesRecord: "name".} = object
`name`*: string `name`*: string
@ -21,7 +25,7 @@ type
`publicKey`*: seq[byte] `publicKey`*: seq[byte]
`entity`*: Preserve[E] `entity`*: Preserve[E]
FriendMessage* {.preservesRecord: "msg".} = object MessageRecv* {.preservesRecord: "msgrecv".} = object
`body`*: string `body`*: string
`kind`*: BiggestInt `kind`*: BiggestInt
@ -47,6 +51,8 @@ type
FriendAccept* {.preservesRecord: "accept".} = object FriendAccept* {.preservesRecord: "accept".} = object
`key`*: seq[byte] `key`*: seq[byte]
Completion* {.preservesRecord: "complete".} = object
`Connection`* {.preservesOr, pure.} = enum `Connection`* {.preservesOr, pure.} = enum
`none`, `tcp`, `udp` `none`, `tcp`, `udp`
TransferSink* {.preservesRecord: "sink".} = object TransferSink* {.preservesRecord: "sink".} = object
@ -64,20 +70,23 @@ proc encode*[E](x: FriendDataspace[E] | ToxDataspace[E] | TransferDataspace[E]):
byte] = byte] =
encode(toPreserve(x, E)) encode(toPreserve(x, E))
proc `$`*(x: Name | FriendRequest | Address | FriendMessage | Typing | proc `$`*(x: MessageSend | Name | FriendRequest | Address | MessageRecv | Typing |
BootstrapNode | BootstrapNode |
StatusMessage | StatusMessage |
Status | Status |
FriendAccept | FriendAccept |
Completion |
TransferSink | TransferSink |
CoreVersion): string = CoreVersion): string =
`$`(toPreserve(x)) `$`(toPreserve(x))
proc encode*(x: Name | FriendRequest | Address | FriendMessage | Typing | proc encode*(x: MessageSend | Name | FriendRequest | Address | MessageRecv |
Typing |
BootstrapNode | BootstrapNode |
StatusMessage | StatusMessage |
Status | Status |
FriendAccept | FriendAccept |
Completion |
TransferSink | TransferSink |
CoreVersion): seq[byte] = CoreVersion): seq[byte] =
encode(toPreserve(x)) encode(toPreserve(x))

View File

@ -44,7 +44,6 @@ proc writeSaveData(core: Tox) =
type type
Entity = ref object of RootObj Entity = ref object of RootObj
facet: Facet
ds: Ref ds: Ref
TransferEntity {.final.} = ref object of Entity TransferEntity {.final.} = ref object of Entity
@ -68,8 +67,6 @@ type
friends: Table[Friend, FriendEntity] friends: Table[Friend, FriendEntity]
proc init(e: Entity; turn: var Turn; parent: Ref): Handle = proc init(e: Entity; turn: var Turn; parent: Ref): Handle =
assert e.facet.isNil
e.facet = turn.facet
e.ds = newDataspace(turn) e.ds = newDataspace(turn)
proc copyExisting(te: TransferEntity; path: string) = proc copyExisting(te: TransferEntity; path: string) =
@ -99,6 +96,7 @@ proc closeSink(te: TransferEntity; path: string) =
var file: AsyncFile var file: AsyncFile
if te.sinks.pop(path, file): if te.sinks.pop(path, file):
close file close file
stderr.writeLine("closed sink on ", path)
proc closeSinks(te: TransferEntity) = proc closeSinks(te: TransferEntity) =
for file in te.sinks.values: close(file) for file in te.sinks.values: close(file)
@ -111,7 +109,6 @@ proc write(te: TransferEntity; pos: uint64; data: pointer; size: int): Future[vo
result = all futs result = all futs
proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) = proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
assert entity.core.isNil
block: # Tox initialization block: # Tox initialization
var var
proxy_host: cstring proxy_host: cstring
@ -162,7 +159,7 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
of "save-file": of "save-file":
let saveFilePath = val let saveFilePath = val
if fileExists saveFilePath: if fileExists saveFilePath:
opts.savedata = cast[seq[byte]](readFile saveFilePath) opts.savedata = readFile saveFilePath
opts.savedata_type = TOX_SAVEDATA_TYPE_TOX_SAVE opts.savedata_type = TOX_SAVEDATA_TYPE_TOX_SAVE
else: saveIsFresh = true else: saveIsFresh = true
else: else:
@ -198,12 +195,15 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
let name = entity.core.name(fn) let name = entity.core.name(fn)
if name != "": if name != "":
fe.handles.name = publish(turn, fe.ds, Name(name: name)) fe.handles.name = publish(turn, fe.ds, Name(name: name))
onMessage(turn, fe.ds, ?MessageSend) do (body: string, kind: int):
stderr.writeLine "got message ", body
discard entity.core.send(fn, body, MessageType(kind))
entity.friends[fn] = fe entity.friends[fn] = fe
for fn in entity.core.friends: createFriend(turn, fn) for fn in entity.core.friends: createFriend(turn, fn)
entity.core.onSelfConnectionStatus do (status: toxcore.Connection): entity.core.onSelfConnectionStatus do (status: toxcore.Connection):
run(entity.facet) do (turn: var Turn): run(entity.ds) do (turn: var Turn):
let conn = case status let conn = case status
of TOX_CONNECTION_NONE: protocol.Connection.none of TOX_CONNECTION_NONE: protocol.Connection.none
of TOX_CONNECTION_TCP: protocol.Connection.tcp of TOX_CONNECTION_TCP: protocol.Connection.tcp
@ -212,7 +212,7 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
Status(status: conn)) Status(status: conn))
template update[T](fe: FriendEntity; h: var Handle; a: T) = template update[T](fe: FriendEntity; h: var Handle; a: T) =
run(fe.facet) do (turn: var Turn): replace(turn, fe.ds, h, a) run(fe.ds) do (turn: var Turn): replace(turn, fe.ds, h, a)
entity.core.onFriendName do (num: Friend; name: string): entity.core.onFriendName do (num: Friend; name: string):
let fe = entity.friends[num] let fe = entity.friends[num]
@ -227,11 +227,11 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
if typing: if typing:
update(fe, fe.handles.typing, Typing()) update(fe, fe.handles.typing, Typing())
else: else:
run(fe.facet) do (turn: var Turn): run(fe.ds) do (turn: var Turn):
retract(turn, fe.handles.typing) retract(turn, fe.handles.typing)
entity.core.onFriendRequest do (pk: PublicKey; msg: string): entity.core.onFriendRequest do (pk: PublicKey; msg: string):
run(entity.facet) do (turn: var Turn): run(entity.ds) do (turn: var Turn):
let reqHandle = publish(turn, entity.ds, let reqHandle = publish(turn, entity.ds,
FriendRequest(key: pk.bytes.toSeq, msg: msg)) FriendRequest(key: pk.bytes.toSeq, msg: msg))
onPublish(turn, entity.ds, ?FriendAccept(key: pk.bytes.toSeq)) do: onPublish(turn, entity.ds, ?FriendAccept(key: pk.bytes.toSeq)) do:
@ -242,8 +242,8 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
entity.core.onFriendMessage do (num: Friend; msg: string; kind: MessageType): entity.core.onFriendMessage do (num: Friend; msg: string; kind: MessageType):
let fe = entity.friends[num] let fe = entity.friends[num]
run(fe.facet) do (turn: var Turn): run(fe.ds) do (turn: var Turn):
message(turn, fe.ds, FriendMessage(body: msg, kind: int kind)) message(turn, fe.ds, MessageRecv(body: msg, kind: int kind))
entity.core.onFriendLosslessPacket do ( entity.core.onFriendLosslessPacket do (
num: Friend; data: pointer; len: int): num: Friend; data: pointer; len: int):
@ -251,7 +251,7 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
entity.core.onFileRecv do (fn: Friend; tn: FileTransfer; kind: uint32; size: uint64; filename: string): entity.core.onFileRecv do (fn: Friend; tn: FileTransfer; kind: uint32; size: uint64; filename: string):
let fe = entity.friends[fn] let fe = entity.friends[fn]
run(fe.facet) do (turn: var Turn): run(fe.ds) do (turn: var Turn):
let te = TransferEntity(size: size) let te = TransferEntity(size: size)
discard init(te, turn, fe.ds) discard init(te, turn, fe.ds)
te.dsHandle = publish(turn, fe.ds, TransferDataspace[Ref]( te.dsHandle = publish(turn, fe.ds, TransferDataspace[Ref](
@ -267,6 +267,8 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
te.closeSink(path) te.closeSink(path)
if te.sinks.len == 0: if te.sinks.len == 0:
entity.core.control(fn, tn, TOX_FILE_CONTROL_CANCEL) entity.core.control(fn, tn, TOX_FILE_CONTROL_CANCEL)
stderr.writeLine "all sinks retracted, retract transfer"
retract(turn, te.dsHandle)
fe.transfers[tn] = te fe.transfers[tn] = te
entity.core.onFileRecvChunk do (fn: Friend; tn: FileTransfer; pos: uint64; data: pointer; size: int): entity.core.onFileRecvChunk do (fn: Friend; tn: FileTransfer; pos: uint64; data: pointer; size: int):
@ -277,10 +279,9 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
waitFor te.write(pos, data, size) waitFor te.write(pos, data, size)
# wait for all the writes to complete within the lifetime of the callback # wait for all the writes to complete within the lifetime of the callback
else: else:
te.closeSinks() run(te.ds) do (turn: var Turn):
run(fe.facet) do (turn: var Turn): discard publish(turn, te.ds, Completion())
retract(turn, te.dsHandle) fe.transfers.del(tn)
fe.transfers.del(tn)
var alive: bool var alive: bool
setControlCHook do: setControlCHook do:

63
src/tox_eris_ingester.nim Normal file
View File

@ -0,0 +1,63 @@
# SPDX-FileCopyrightText: ☭ 2022 Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[asyncdispatch, asyncfile, logging, strutils]
from std/os import `/`, getTempDir, lastPathPart, removeFile
import preserves, syndicate
import eris
import ./protocol
addHandler(newConsoleLogger(useStderr = true))
# register global logger to stderr
proc ingestFile(
friend, transfer: Ref; sinkHandle: Handle;
store: ErisStore; path: string; fileSize: uint64) {.async.} =
var
file = openAsync(path, fmRead)
count: uint64
if file.getFileSize.uint64 == fileSize:
var
blockSize = recommendedBlockSize(fileSize)
buf = newSeq[byte](blockSize.int)
ingest = newErisIngest(store, blockSize)
while count < fileSize and buf.len > 0:
buf.setLen(await readBuffer(file, addr buf[0], buf.len))
await ingest.append(buf)
count.inc buf.len
if count == fileSize:
var
cap = await ingest.cap
fileName = path.lastPathPart
urn = $cap & "&dn=" & filename & "&xl=" & $fileSize
run(friend) do (turn: var Turn):
let ass = MessageSend(body: urn)
message(turn, friend, ass)
info "close and remove ", path
close(file)
removeFile(path)
if count != fileSize:
error "failed to ingest ", path, ", read ", count, " bytes of ", fileSize
run(transfer) do (turn: var Turn):
info "retract sink handle"
retract(turn, sinkHandle)
bootDataspace("main") do (root: Ref; turn: var Turn):
connectStdio(root, turn)
during(turn, root, FriendDataspace[Ref] ? { 1: grab() }) do (friend: Ref):
info "got a friend ref ", friend
during(turn, friend, ?TransferDataspace[Ref]) do (
kind: int, size: uint64, filename: string, transfer: Ref):
info "got a transfer for ", filename
let
path = getTempDir() / filename
sinkHandle = publish(turn, transfer, TransferSink(path: path))
onPublish(turn, transfer, ?Completion) do:
asyncCheck ingestFile(friend, transfer, sinkHandle, newDiscardStore(), path, size)
do:
info "transfer retracted"
do:
info "friend retracted"
runForever()

View File

@ -5,9 +5,10 @@ author = "Emery Hemingway"
description = "Tox chat actor for Syndicate" description = "Tox chat actor for Syndicate"
license = "Unlicense" license = "Unlicense"
srcDir = "src" srcDir = "src"
bin = @["syndicate_actor_tox"] bin = @["syndicate_actor_tox", "tox_eris_ingester"]
backend = "cpp"
# Dependencies # Dependencies
requires "nim >= 1.6.2", "syndicate >= 1.2.1", "cbor", "toxcore" requires "nim >= 1.6.2", "syndicate >= 1.2.1", "cbor", "toxcore", "eris", "eris_protocols", "eris_tkrzw"

View File

@ -49,9 +49,9 @@
? <transfer ?kind ?size ?filename ?transfer> [ ? <transfer ?kind ?size ?filename ?transfer> [
$log ! <log "-" { line: [$kind $size $filename $transfer]}> $log ! <log "-" { line: [$kind $size $filename $transfer]}>
$config ? <service-object <daemon tox_eris_ingester> ?ingester> [ $config ? <service-object <daemon tox_eris_ingester> ?ingester> $ingester [
$log ! <log "-" { line: ["ingester is up at " $ingester] }> ; don't leak the public key to the ingester
$ingester <transfer $kind $size $filename $transfer> <friend #[] $friend>
] ]
] ]
] ]