Add file transfer conversations

This commit is contained in:
Emery Hemingway 2022-07-07 08:20:34 -05:00
parent 2f83d0ef62
commit 72d959dd6e
5 changed files with 202 additions and 53 deletions

View File

@ -23,4 +23,13 @@ FriendRequest = <request @key bytes @msg string> .
; Asserted to the core to accept a friend request.
FriendAccept = <accept @key bytes> .
; Messages sent by friend entities.
FriendMessage = <msg @body string @kind int> .
; Asserted by a friend while a transfer is active.
TransferDataspace = <transfer @kind int @size int @filename string @entity #!any> .
; Asserted to the transfer entity to indicate the transfer should be saved to a file
TransferSink = <sink @path string> .
BootstrapNode = <bootstrap @publicKey string @host string @port int> .

View File

@ -21,16 +21,26 @@ type
`publicKey`*: seq[byte]
`entity`*: Preserve[E]
FriendMessage* {.preservesRecord: "msg".} = object
`body`*: string
`kind`*: BiggestInt
Typing* {.preservesRecord: "typing".} = object
BootstrapNode* {.preservesRecord: "bootstrap".} = object
`publicKey`*: string
`host`*: string
`port`*: int
`port`*: BiggestInt
StatusMessage* {.preservesRecord: "status-message".} = object
`msg`*: string
TransferDataspace*[E] {.preservesRecord: "transfer".} = ref object
`kind`*: BiggestInt
`size`*: BiggestInt
`filename`*: string
`entity`*: Preserve[E]
Status* {.preservesRecord: "status".} = object
`status`*: Connection
@ -39,27 +49,35 @@ type
`Connection`* {.preservesOr, pure.} = enum
`none`, `tcp`, `udp`
CoreVersion* {.preservesRecord: "core".} = object
`major`*: int
`minor`*: int
`patch`*: int
TransferSink* {.preservesRecord: "sink".} = object
`path`*: string
proc `$`*[E](x: FriendDataspace[E] | ToxDataspace[E]): string =
CoreVersion* {.preservesRecord: "core".} = object
`major`*: BiggestInt
`minor`*: BiggestInt
`patch`*: BiggestInt
proc `$`*[E](x: FriendDataspace[E] | ToxDataspace[E] | TransferDataspace[E]): string =
`$`(toPreserve(x, E))
proc encode*[E](x: FriendDataspace[E] | ToxDataspace[E]): seq[byte] =
proc encode*[E](x: FriendDataspace[E] | ToxDataspace[E] | TransferDataspace[E]): seq[
byte] =
encode(toPreserve(x, E))
proc `$`*(x: Name | FriendRequest | Address | Typing | BootstrapNode |
proc `$`*(x: Name | FriendRequest | Address | FriendMessage | Typing |
BootstrapNode |
StatusMessage |
Status |
FriendAccept |
TransferSink |
CoreVersion): string =
`$`(toPreserve(x))
proc encode*(x: Name | FriendRequest | Address | Typing | BootstrapNode |
proc encode*(x: Name | FriendRequest | Address | FriendMessage | Typing |
BootstrapNode |
StatusMessage |
Status |
FriendAccept |
TransferSink |
CoreVersion): seq[byte] =
encode(toPreserve(x))

View File

@ -1,7 +1,8 @@
# SPDX-FileCopyrightText: ☭ 2022 Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[asyncdispatch, logging, parseopt, strutils, tables]
import std/[asyncdispatch, asyncfile, logging, parseopt, strutils, tables]
from std/os import fileExists, copyFile, moveFile
from std/sequtils import toSeq
from std/times import inMilliseconds
@ -28,16 +29,35 @@ proc logging_callback(
if lvl != lvlNone:
log(lvl, `func`, ": ", message)
proc saveFilePath(): string =
for kind, key, val in getopt():
if kind == cmdLongOption and key == "save-file" and val != "":
result = val
proc writeSaveData(core: Tox) =
let path = saveFilePath()
if path != "":
let tmpPath = path & ".tmp"
writeFile(tmpPath, core.saveData)
moveFile(tmpPath, path)
debug("Data saved to ", path)
type
Entity = ref object of RootObj
facet: Facet
ds: Ref
TransferEntity {.final.} = ref object of Entity
dsHandle: Handle
sinks: OrderedTable[string, AsyncFile]
size: uint64
FriendHandles = object
name, statusMessage, lastOnline, typing: Handle
FriendEntity {.final.} = ref object of Entity
handles: FriendHandles
transfers: Table[FileTransfer, TransferEntity]
CoreHandles = object
address, name, statusMessage, connectionStatus: Handle
@ -52,10 +72,50 @@ proc init(e: Entity; turn: var Turn; parent: Ref): Handle =
e.facet = turn.facet
e.ds = newDataspace(turn)
proc copyExisting(te: TransferEntity; path: string) =
if te.sinks.len > 0:
var err: ref Exception
for existingPath in te.sinks.keys:
try:
copyFile(existingPath, path)
# TODO: async copy, don't block tox iterate
return
except Exception as e:
err = e
raise err
proc openSink(te: TransferEntity; path: string; size: int64) =
copyExisting(te, path)
var file: AsyncFile
try:
file = openAsync(path, fmReadWriteExisting)
except:
file = openAsync(path, fmReadWrite)
# Stupid file modes
setFileSize(file, size)
te.sinks[path] = file
proc closeSink(te: TransferEntity; path: string) =
var file: AsyncFile
if te.sinks.pop(path, file):
close file
proc closeSinks(te: TransferEntity) =
for file in te.sinks.values: close(file)
proc write(te: TransferEntity; pos: uint64; data: pointer; size: int): Future[void] =
var futs: seq[Future[void]]
for file in te.sinks.values:
file.setFilePos(int64 pos)
futs.add file.writeBuffer(data, size)
result = all futs
proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
assert entity.core.isNil
block: # Tox initialization
var proxy_host: cstring
var
proxy_host: cstring
saveIsFresh = false
entity.core = newTox do (opts: toxcore.Options):
opts.log_callback = logging_callback
debug "parsing command-line options…"
@ -99,12 +159,19 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
opts.tcp_port = parsePortParam(key, val)
of "hole-punching":
opts.hole_punching_enabled = parseBoolParam(key, val)
of "save-file": discard
of "save-file":
let saveFilePath = val
if fileExists saveFilePath:
opts.savedata = cast[seq[byte]](readFile saveFilePath)
opts.savedata_type = TOX_SAVEDATA_TYPE_TOX_SAVE
else: saveIsFresh = true
else:
quit("unhandled command-line parameter: " & key)
of cmdShortOption, cmdArgument:
quit("unhandled command-line parameter: " & key)
of cmdEnd: discard
if saveIsFresh:
writeSaveData(entity.core)
block: # Syndicate entity initialization
discard init(entity, turn, parentRef)
discard publish(turn, parentRef, ToxDataspace[Ref](
@ -128,7 +195,9 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
discard publish(turn, entity.ds, FriendDataspace[Ref](
publicKey: entity.core.publicKey(fn).bytes.toSeq,
entity: fe.ds.embed))
fe.handles.name = publish(turn, fe.ds, Name(name: entity.core.name(fn)))
let name = entity.core.name(fn)
if name != "":
fe.handles.name = publish(turn, fe.ds, Name(name: name))
entity.friends[fn] = fe
for fn in entity.core.friends: createFriend(turn, fn)
@ -168,10 +237,54 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
onPublish(turn, entity.ds, ?FriendAccept(key: pk.bytes.toSeq)) do:
createFriend(turn, entity.core.addFriendNoRequest(pk))
retract(turn, reqHandle)
writeSaveData(entity.core)
# TODO: stop watching for the accept assertion
entity.core.onFriendMessage do (num: Friend; msg: string; kind: MessageType):
let fe = entity.friends[num]
run(fe.facet) do (turn: var Turn):
message(turn, fe.ds, FriendMessage(body: msg, kind: int kind))
entity.core.onFriendLosslessPacket do (
num: Friend; data: pointer; len: int):
info "friend sent a lossy packet of ", len, " bytes"
entity.core.onFileRecv do (fn: Friend; tn: FileTransfer; kind: uint32; size: uint64; filename: string):
let fe = entity.friends[fn]
run(fe.facet) do (turn: var Turn):
let te = TransferEntity(size: size)
discard init(te, turn, fe.ds)
te.dsHandle = publish(turn, fe.ds, TransferDataspace[Ref](
kind: int kind,
size: BiggestInt size,
filename: filename,
entity: te.ds.embed))
during(turn, te.ds, ?TransferSink) do (path: string):
te.openSink(path, int64 size)
if te.sinks.len == 1:
entity.core.control(fn, tn, TOX_FILE_CONTROL_RESUME)
do:
te.closeSink(path)
if te.sinks.len == 0:
entity.core.control(fn, tn, TOX_FILE_CONTROL_CANCEL)
fe.transfers[tn] = te
entity.core.onFileRecvChunk do (fn: Friend; tn: FileTransfer; pos: uint64; data: pointer; size: int):
let
fe = entity.friends[fn]
te = fe.transfers[tn]
if size != 0:
waitFor te.write(pos, data, size)
# wait for all the writes to complete within the lifetime of the callback
else:
te.closeSinks()
run(fe.facet) do (turn: var Turn):
retract(turn, te.dsHandle)
fe.transfers.del(tn)
var alive: bool
setControlCHook do:
info "quiting"
if not alive: quit()
alive = false
@ -188,8 +301,10 @@ proc run(entity: CoreEntity) =
error "failed to bootstrap: ", e.msg
poll()
writeSaveData(entity.core)
while alive:
iterate entity.core
poll(entity.core.iterationInterval.inMilliseconds.int)
writeSaveData(entity.core)
run(new CoreEntity)

View File

@ -1,24 +0,0 @@
let ?root_ds = dataspace
<bind "syndicate" #x"" $root_ds>
<require-service <relay-listener <tcp "0.0.0.0" 9001> $gatekeeper>>
<require-service <daemon tox_actor>>
<daemon tox_actor {
argv: [
"/home/repo/syndicate/syndicate_actor_tox/src/syndicate_actor_tox"
"--ipv6"
"--local-discovery"
"--udp"
]
protocol: text/syndicate
}>
? <service-object <daemon tox_actor> ?cap> [
$cap [
<listen-on $root_ds>
<bootstrap "1D13A037DEEC07BA10A3ABA54A3D09075C51A81FA4A9939271CB11245C16A510" "201:7d01:2539:fb46:a575:bad1:98dd:d7ed" 33445>
<bootstrap "6EF679EBD205E8DF9B6975D21CD157D046287700CADDF86F94B7ED243DC26A30" "20a:c3d2:8cf8:f8e5:80fe:9194:3800:87e6" 33445>
<bootstrap "D527E5847F8330D628DAB1814F0A422F6DC9D0A300E6C357634EE2DA88C35463" "tox.novg.net" 33445>
]
]

View File

@ -1,29 +1,60 @@
<require-service <daemon tox_bot>>
<daemon tox_bot {
argv: "/home/repo/syndicate/syndicate_actor_tox/syndicate_actor_tox"
dir: "/home/repo/syndicate/syndicate_actor_tox"
argv: [
"/home/repo/syndicate/syndicate_actor_tox/syndicate_actor_tox"
"--save-file:/home/emery/lib/syndicate/tox.save"
"--local-discovery:true"
]
protocol: text/syndicate
clearEnv: #t
}>
<require-service <daemon tox_eris_ingester>>
<daemon tox_eris_ingester {
argv: [ "/home/repo/syndicate/syndicate_actor_tox/tox_eris_ingester" ]
protocol: text/syndicate
clearEnv: #t
}>
; wait for the tox_bot to come up and announce itself
? <service-object <daemon tox_bot> ?tox> [
$config ? <service-object <daemon freedesktop_notifier> ?notifier> [
$tox [
<bootstrap "6EF679EBD205E8DF9B6975D21CD157D046287700CADDF86F94B7ED243DC26A30" "20a:c3d2:8cf8:f8e5:80fe:9194:3800:87e6" 33445>
? <tox ?pk ?core> $core [
? <address ?addr> $log ! <log "-" { line: ["tox address" $addr] }>
? <request ?pk ?msg> [
$notifier ! <notify "friend request" $msg 0 Low>
<accept $pk>
$tox <bootstrap "6EF679EBD205E8DF9B6975D21CD157D046287700CADDF86F94B7ED243DC26A30" "192.168.144.1" 33445>
; wait for the core capability to be announced
$tox ? <tox ?pk ?core> $core [
; log the address of the core
? <address ?addr> $log ! <log "-" { line: ["tox address" $addr] }>
; notify on friend request
? <request ?pk ?msg> [
; auto-accept
<accept $pk>
]
; wait for capability to a friend
? <friend ?pk ?friend> $friend [
; get the friend name
? <name ?name> [
; get the status message
? <status-message ?msg> [ $log ! <log "-" { line: [$name $msg]}> ]
?? <msg ?body ?kind> [
$log ! <log "-" { line: [$name $kind $body]}>
]
? <friend ?pk ?friend> $friend [
$notifier ! <notify "new friend" $pk 0 Low>
? <name ?name> [
$notifier ! <notify "friend name" $name 0 Low>
? <status-message ?msg> [ $notifier ! <notify $name $msg 0 Low> ]
]
]
? <transfer ?kind ?size ?filename ?transfer> [
$log ! <log "-" { line: [$kind $size $filename $transfer]}>
$config ? <service-object <daemon tox_eris_ingester> ?ingester> [
$log ! <log "-" { line: ["ingester is up at " $ingester] }>
$ingester <transfer $kind $size $filename $transfer>
]
]
]
]
]