Lost and found

This commit is contained in:
Emery Hemingway 2022-10-02 18:07:09 -05:00
parent 72d959dd6e
commit 4451114781
6 changed files with 103 additions and 26 deletions

View File

@ -23,8 +23,9 @@ FriendRequest = <request @key bytes @msg string> .
; Asserted to the core to accept a friend request.
FriendAccept = <accept @key bytes> .
; Messages sent by friend entities.
FriendMessage = <msg @body string @kind int> .
; Messages sent and received by friend entities.
MessageRecv = <msgrecv @body string @kind int> .
MessageSend = <msgsend @body string @kind int> .
; Asserted by a friend while a transfer is active.
TransferDataspace = <transfer @kind int @size int @filename string @entity #!any> .
@ -32,4 +33,6 @@ TransferDataspace = <transfer @kind int @size int @filename string @entity #!any
; Asserted to the transfer entity to indicate the transfer should be saved to a file
TransferSink = <sink @path string> .
Completion = <complete> .
BootstrapNode = <bootstrap @publicKey string @host string @port int> .

View File

@ -3,6 +3,10 @@ import
std/typetraits, preserves
type
MessageSend* {.preservesRecord: "msgsend".} = object
`body`*: string
`kind`*: BiggestInt
Name* {.preservesRecord: "name".} = object
`name`*: string
@ -21,7 +25,7 @@ type
`publicKey`*: seq[byte]
`entity`*: Preserve[E]
FriendMessage* {.preservesRecord: "msg".} = object
MessageRecv* {.preservesRecord: "msgrecv".} = object
`body`*: string
`kind`*: BiggestInt
@ -47,6 +51,8 @@ type
FriendAccept* {.preservesRecord: "accept".} = object
`key`*: seq[byte]
Completion* {.preservesRecord: "complete".} = object
`Connection`* {.preservesOr, pure.} = enum
`none`, `tcp`, `udp`
TransferSink* {.preservesRecord: "sink".} = object
@ -64,20 +70,23 @@ proc encode*[E](x: FriendDataspace[E] | ToxDataspace[E] | TransferDataspace[E]):
byte] =
encode(toPreserve(x, E))
proc `$`*(x: Name | FriendRequest | Address | FriendMessage | Typing |
proc `$`*(x: MessageSend | Name | FriendRequest | Address | MessageRecv | Typing |
BootstrapNode |
StatusMessage |
Status |
FriendAccept |
Completion |
TransferSink |
CoreVersion): string =
`$`(toPreserve(x))
proc encode*(x: Name | FriendRequest | Address | FriendMessage | Typing |
proc encode*(x: MessageSend | Name | FriendRequest | Address | MessageRecv |
Typing |
BootstrapNode |
StatusMessage |
Status |
FriendAccept |
Completion |
TransferSink |
CoreVersion): seq[byte] =
encode(toPreserve(x))

View File

@ -44,7 +44,6 @@ proc writeSaveData(core: Tox) =
type
Entity = ref object of RootObj
facet: Facet
ds: Ref
TransferEntity {.final.} = ref object of Entity
@ -68,8 +67,6 @@ type
friends: Table[Friend, FriendEntity]
proc init(e: Entity; turn: var Turn; parent: Ref): Handle =
assert e.facet.isNil
e.facet = turn.facet
e.ds = newDataspace(turn)
proc copyExisting(te: TransferEntity; path: string) =
@ -99,6 +96,7 @@ proc closeSink(te: TransferEntity; path: string) =
var file: AsyncFile
if te.sinks.pop(path, file):
close file
stderr.writeLine("closed sink on ", path)
proc closeSinks(te: TransferEntity) =
for file in te.sinks.values: close(file)
@ -111,7 +109,6 @@ proc write(te: TransferEntity; pos: uint64; data: pointer; size: int): Future[vo
result = all futs
proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
assert entity.core.isNil
block: # Tox initialization
var
proxy_host: cstring
@ -162,7 +159,7 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
of "save-file":
let saveFilePath = val
if fileExists saveFilePath:
opts.savedata = cast[seq[byte]](readFile saveFilePath)
opts.savedata = readFile saveFilePath
opts.savedata_type = TOX_SAVEDATA_TYPE_TOX_SAVE
else: saveIsFresh = true
else:
@ -198,12 +195,15 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
let name = entity.core.name(fn)
if name != "":
fe.handles.name = publish(turn, fe.ds, Name(name: name))
onMessage(turn, fe.ds, ?MessageSend) do (body: string, kind: int):
stderr.writeLine "got message ", body
discard entity.core.send(fn, body, MessageType(kind))
entity.friends[fn] = fe
for fn in entity.core.friends: createFriend(turn, fn)
entity.core.onSelfConnectionStatus do (status: toxcore.Connection):
run(entity.facet) do (turn: var Turn):
run(entity.ds) do (turn: var Turn):
let conn = case status
of TOX_CONNECTION_NONE: protocol.Connection.none
of TOX_CONNECTION_TCP: protocol.Connection.tcp
@ -212,7 +212,7 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
Status(status: conn))
template update[T](fe: FriendEntity; h: var Handle; a: T) =
run(fe.facet) do (turn: var Turn): replace(turn, fe.ds, h, a)
run(fe.ds) do (turn: var Turn): replace(turn, fe.ds, h, a)
entity.core.onFriendName do (num: Friend; name: string):
let fe = entity.friends[num]
@ -227,11 +227,11 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
if typing:
update(fe, fe.handles.typing, Typing())
else:
run(fe.facet) do (turn: var Turn):
run(fe.ds) do (turn: var Turn):
retract(turn, fe.handles.typing)
entity.core.onFriendRequest do (pk: PublicKey; msg: string):
run(entity.facet) do (turn: var Turn):
run(entity.ds) do (turn: var Turn):
let reqHandle = publish(turn, entity.ds,
FriendRequest(key: pk.bytes.toSeq, msg: msg))
onPublish(turn, entity.ds, ?FriendAccept(key: pk.bytes.toSeq)) do:
@ -242,8 +242,8 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
entity.core.onFriendMessage do (num: Friend; msg: string; kind: MessageType):
let fe = entity.friends[num]
run(fe.facet) do (turn: var Turn):
message(turn, fe.ds, FriendMessage(body: msg, kind: int kind))
run(fe.ds) do (turn: var Turn):
message(turn, fe.ds, MessageRecv(body: msg, kind: int kind))
entity.core.onFriendLosslessPacket do (
num: Friend; data: pointer; len: int):
@ -251,7 +251,7 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
entity.core.onFileRecv do (fn: Friend; tn: FileTransfer; kind: uint32; size: uint64; filename: string):
let fe = entity.friends[fn]
run(fe.facet) do (turn: var Turn):
run(fe.ds) do (turn: var Turn):
let te = TransferEntity(size: size)
discard init(te, turn, fe.ds)
te.dsHandle = publish(turn, fe.ds, TransferDataspace[Ref](
@ -267,6 +267,8 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
te.closeSink(path)
if te.sinks.len == 0:
entity.core.control(fn, tn, TOX_FILE_CONTROL_CANCEL)
stderr.writeLine "all sinks retracted, retract transfer"
retract(turn, te.dsHandle)
fe.transfers[tn] = te
entity.core.onFileRecvChunk do (fn: Friend; tn: FileTransfer; pos: uint64; data: pointer; size: int):
@ -277,10 +279,9 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
waitFor te.write(pos, data, size)
# wait for all the writes to complete within the lifetime of the callback
else:
te.closeSinks()
run(fe.facet) do (turn: var Turn):
retract(turn, te.dsHandle)
fe.transfers.del(tn)
run(te.ds) do (turn: var Turn):
discard publish(turn, te.ds, Completion())
fe.transfers.del(tn)
var alive: bool
setControlCHook do:

63
src/tox_eris_ingester.nim Normal file
View File

@ -0,0 +1,63 @@
# SPDX-FileCopyrightText: ☭ 2022 Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[asyncdispatch, asyncfile, logging, strutils]
from std/os import `/`, getTempDir, lastPathPart, removeFile
import preserves, syndicate
import eris
import ./protocol
addHandler(newConsoleLogger(useStderr = true))
# register global logger to stderr
proc ingestFile(
friend, transfer: Ref; sinkHandle: Handle;
store: ErisStore; path: string; fileSize: uint64) {.async.} =
var
file = openAsync(path, fmRead)
count: uint64
if file.getFileSize.uint64 == fileSize:
var
blockSize = recommendedBlockSize(fileSize)
buf = newSeq[byte](blockSize.int)
ingest = newErisIngest(store, blockSize)
while count < fileSize and buf.len > 0:
buf.setLen(await readBuffer(file, addr buf[0], buf.len))
await ingest.append(buf)
count.inc buf.len
if count == fileSize:
var
cap = await ingest.cap
fileName = path.lastPathPart
urn = $cap & "&dn=" & filename & "&xl=" & $fileSize
run(friend) do (turn: var Turn):
let ass = MessageSend(body: urn)
message(turn, friend, ass)
info "close and remove ", path
close(file)
removeFile(path)
if count != fileSize:
error "failed to ingest ", path, ", read ", count, " bytes of ", fileSize
run(transfer) do (turn: var Turn):
info "retract sink handle"
retract(turn, sinkHandle)
bootDataspace("main") do (root: Ref; turn: var Turn):
connectStdio(root, turn)
during(turn, root, FriendDataspace[Ref] ? { 1: grab() }) do (friend: Ref):
info "got a friend ref ", friend
during(turn, friend, ?TransferDataspace[Ref]) do (
kind: int, size: uint64, filename: string, transfer: Ref):
info "got a transfer for ", filename
let
path = getTempDir() / filename
sinkHandle = publish(turn, transfer, TransferSink(path: path))
onPublish(turn, transfer, ?Completion) do:
asyncCheck ingestFile(friend, transfer, sinkHandle, newDiscardStore(), path, size)
do:
info "transfer retracted"
do:
info "friend retracted"
runForever()

View File

@ -5,9 +5,10 @@ author = "Emery Hemingway"
description = "Tox chat actor for Syndicate"
license = "Unlicense"
srcDir = "src"
bin = @["syndicate_actor_tox"]
bin = @["syndicate_actor_tox", "tox_eris_ingester"]
backend = "cpp"
# Dependencies
requires "nim >= 1.6.2", "syndicate >= 1.2.1", "cbor", "toxcore"
requires "nim >= 1.6.2", "syndicate >= 1.2.1", "cbor", "toxcore", "eris", "eris_protocols", "eris_tkrzw"

View File

@ -49,9 +49,9 @@
? <transfer ?kind ?size ?filename ?transfer> [
$log ! <log "-" { line: [$kind $size $filename $transfer]}>
$config ? <service-object <daemon tox_eris_ingester> ?ingester> [
$log ! <log "-" { line: ["ingester is up at " $ingester] }>
$ingester <transfer $kind $size $filename $transfer>
$config ? <service-object <daemon tox_eris_ingester> ?ingester> $ingester [
; don't leak the public key to the ingester
<friend #[] $friend>
]
]
]