diff --git a/protocol.prs b/protocol.prs index 957c140..c8ffd5c 100644 --- a/protocol.prs +++ b/protocol.prs @@ -23,4 +23,13 @@ FriendRequest = . ; Asserted to the core to accept a friend request. FriendAccept = . +; Messages sent by friend entities. +FriendMessage = . + +; Asserted by a friend while a transfer is active. +TransferDataspace = . + +; Asserted to the transfer entity to indicate the transfer should be saved to a file +TransferSink = . + BootstrapNode = . diff --git a/src/protocol.nim b/src/protocol.nim index 7eb69d0..08801cf 100644 --- a/src/protocol.nim +++ b/src/protocol.nim @@ -21,16 +21,26 @@ type `publicKey`*: seq[byte] `entity`*: Preserve[E] + FriendMessage* {.preservesRecord: "msg".} = object + `body`*: string + `kind`*: BiggestInt + Typing* {.preservesRecord: "typing".} = object BootstrapNode* {.preservesRecord: "bootstrap".} = object `publicKey`*: string `host`*: string - `port`*: int + `port`*: BiggestInt StatusMessage* {.preservesRecord: "status-message".} = object `msg`*: string + TransferDataspace*[E] {.preservesRecord: "transfer".} = ref object + `kind`*: BiggestInt + `size`*: BiggestInt + `filename`*: string + `entity`*: Preserve[E] + Status* {.preservesRecord: "status".} = object `status`*: Connection @@ -39,27 +49,35 @@ type `Connection`* {.preservesOr, pure.} = enum `none`, `tcp`, `udp` - CoreVersion* {.preservesRecord: "core".} = object - `major`*: int - `minor`*: int - `patch`*: int + TransferSink* {.preservesRecord: "sink".} = object + `path`*: string -proc `$`*[E](x: FriendDataspace[E] | ToxDataspace[E]): string = + CoreVersion* {.preservesRecord: "core".} = object + `major`*: BiggestInt + `minor`*: BiggestInt + `patch`*: BiggestInt + +proc `$`*[E](x: FriendDataspace[E] | ToxDataspace[E] | TransferDataspace[E]): string = `$`(toPreserve(x, E)) -proc encode*[E](x: FriendDataspace[E] | ToxDataspace[E]): seq[byte] = +proc encode*[E](x: FriendDataspace[E] | ToxDataspace[E] | TransferDataspace[E]): seq[ + byte] = encode(toPreserve(x, E)) -proc `$`*(x: Name | FriendRequest | Address | Typing | BootstrapNode | +proc `$`*(x: Name | FriendRequest | Address | FriendMessage | Typing | + BootstrapNode | StatusMessage | Status | FriendAccept | + TransferSink | CoreVersion): string = `$`(toPreserve(x)) -proc encode*(x: Name | FriendRequest | Address | Typing | BootstrapNode | +proc encode*(x: Name | FriendRequest | Address | FriendMessage | Typing | + BootstrapNode | StatusMessage | Status | FriendAccept | + TransferSink | CoreVersion): seq[byte] = encode(toPreserve(x)) diff --git a/src/syndicate_actor_tox.nim b/src/syndicate_actor_tox.nim index 75b6a52..b448105 100644 --- a/src/syndicate_actor_tox.nim +++ b/src/syndicate_actor_tox.nim @@ -1,7 +1,8 @@ # SPDX-FileCopyrightText: ☭ 2022 Emery Hemingway # SPDX-License-Identifier: Unlicense -import std/[asyncdispatch, logging, parseopt, strutils, tables] +import std/[asyncdispatch, asyncfile, logging, parseopt, strutils, tables] +from std/os import fileExists, copyFile, moveFile from std/sequtils import toSeq from std/times import inMilliseconds @@ -28,16 +29,35 @@ proc logging_callback( if lvl != lvlNone: log(lvl, `func`, ": ", message) +proc saveFilePath(): string = + for kind, key, val in getopt(): + if kind == cmdLongOption and key == "save-file" and val != "": + result = val + +proc writeSaveData(core: Tox) = + let path = saveFilePath() + if path != "": + let tmpPath = path & ".tmp" + writeFile(tmpPath, core.saveData) + moveFile(tmpPath, path) + debug("Data saved to ", path) + type Entity = ref object of RootObj facet: Facet ds: Ref + TransferEntity {.final.} = ref object of Entity + dsHandle: Handle + sinks: OrderedTable[string, AsyncFile] + size: uint64 + FriendHandles = object name, statusMessage, lastOnline, typing: Handle FriendEntity {.final.} = ref object of Entity handles: FriendHandles + transfers: Table[FileTransfer, TransferEntity] CoreHandles = object address, name, statusMessage, connectionStatus: Handle @@ -52,10 +72,50 @@ proc init(e: Entity; turn: var Turn; parent: Ref): Handle = e.facet = turn.facet e.ds = newDataspace(turn) +proc copyExisting(te: TransferEntity; path: string) = + if te.sinks.len > 0: + var err: ref Exception + for existingPath in te.sinks.keys: + try: + copyFile(existingPath, path) + # TODO: async copy, don't block tox iterate + return + except Exception as e: + err = e + raise err + +proc openSink(te: TransferEntity; path: string; size: int64) = + copyExisting(te, path) + var file: AsyncFile + try: + file = openAsync(path, fmReadWriteExisting) + except: + file = openAsync(path, fmReadWrite) + # Stupid file modes + setFileSize(file, size) + te.sinks[path] = file + +proc closeSink(te: TransferEntity; path: string) = + var file: AsyncFile + if te.sinks.pop(path, file): + close file + +proc closeSinks(te: TransferEntity) = + for file in te.sinks.values: close(file) + +proc write(te: TransferEntity; pos: uint64; data: pointer; size: int): Future[void] = + var futs: seq[Future[void]] + for file in te.sinks.values: + file.setFilePos(int64 pos) + futs.add file.writeBuffer(data, size) + result = all futs + proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) = assert entity.core.isNil block: # Tox initialization - var proxy_host: cstring + var + proxy_host: cstring + saveIsFresh = false entity.core = newTox do (opts: toxcore.Options): opts.log_callback = logging_callback debug "parsing command-line options…" @@ -99,12 +159,19 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) = opts.tcp_port = parsePortParam(key, val) of "hole-punching": opts.hole_punching_enabled = parseBoolParam(key, val) - of "save-file": discard + of "save-file": + let saveFilePath = val + if fileExists saveFilePath: + opts.savedata = cast[seq[byte]](readFile saveFilePath) + opts.savedata_type = TOX_SAVEDATA_TYPE_TOX_SAVE + else: saveIsFresh = true else: quit("unhandled command-line parameter: " & key) of cmdShortOption, cmdArgument: quit("unhandled command-line parameter: " & key) of cmdEnd: discard + if saveIsFresh: + writeSaveData(entity.core) block: # Syndicate entity initialization discard init(entity, turn, parentRef) discard publish(turn, parentRef, ToxDataspace[Ref]( @@ -128,7 +195,9 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) = discard publish(turn, entity.ds, FriendDataspace[Ref]( publicKey: entity.core.publicKey(fn).bytes.toSeq, entity: fe.ds.embed)) - fe.handles.name = publish(turn, fe.ds, Name(name: entity.core.name(fn))) + let name = entity.core.name(fn) + if name != "": + fe.handles.name = publish(turn, fe.ds, Name(name: name)) entity.friends[fn] = fe for fn in entity.core.friends: createFriend(turn, fn) @@ -168,10 +237,54 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) = onPublish(turn, entity.ds, ?FriendAccept(key: pk.bytes.toSeq)) do: createFriend(turn, entity.core.addFriendNoRequest(pk)) retract(turn, reqHandle) + writeSaveData(entity.core) # TODO: stop watching for the accept assertion + entity.core.onFriendMessage do (num: Friend; msg: string; kind: MessageType): + let fe = entity.friends[num] + run(fe.facet) do (turn: var Turn): + message(turn, fe.ds, FriendMessage(body: msg, kind: int kind)) + + entity.core.onFriendLosslessPacket do ( + num: Friend; data: pointer; len: int): + info "friend sent a lossy packet of ", len, " bytes" + + entity.core.onFileRecv do (fn: Friend; tn: FileTransfer; kind: uint32; size: uint64; filename: string): + let fe = entity.friends[fn] + run(fe.facet) do (turn: var Turn): + let te = TransferEntity(size: size) + discard init(te, turn, fe.ds) + te.dsHandle = publish(turn, fe.ds, TransferDataspace[Ref]( + kind: int kind, + size: BiggestInt size, + filename: filename, + entity: te.ds.embed)) + during(turn, te.ds, ?TransferSink) do (path: string): + te.openSink(path, int64 size) + if te.sinks.len == 1: + entity.core.control(fn, tn, TOX_FILE_CONTROL_RESUME) + do: + te.closeSink(path) + if te.sinks.len == 0: + entity.core.control(fn, tn, TOX_FILE_CONTROL_CANCEL) + fe.transfers[tn] = te + + entity.core.onFileRecvChunk do (fn: Friend; tn: FileTransfer; pos: uint64; data: pointer; size: int): + let + fe = entity.friends[fn] + te = fe.transfers[tn] + if size != 0: + waitFor te.write(pos, data, size) + # wait for all the writes to complete within the lifetime of the callback + else: + te.closeSinks() + run(fe.facet) do (turn: var Turn): + retract(turn, te.dsHandle) + fe.transfers.del(tn) + var alive: bool setControlCHook do: + info "quiting" if not alive: quit() alive = false @@ -188,8 +301,10 @@ proc run(entity: CoreEntity) = error "failed to bootstrap: ", e.msg poll() + writeSaveData(entity.core) while alive: iterate entity.core poll(entity.core.iterationInterval.inMilliseconds.int) + writeSaveData(entity.core) run(new CoreEntity) diff --git a/syndicate_server_config.pr b/syndicate_server_config.pr deleted file mode 100644 index c03a657..0000000 --- a/syndicate_server_config.pr +++ /dev/null @@ -1,24 +0,0 @@ -let ?root_ds = dataspace - - $gatekeeper>> - -> - - - -? ?cap> [ - $cap [ - - - - - ] -] diff --git a/tox.config-example.pr b/tox.config-example.pr index 569e147..75b9c89 100644 --- a/tox.config-example.pr +++ b/tox.config-example.pr @@ -1,29 +1,60 @@ > + +> + + + +; wait for the tox_bot to come up and announce itself ? ?tox> [ - $config ? ?notifier> [ - $tox [ - - ? $core [ - ?
$log ! - ? [ - $notifier ! - + $tox + + ; wait for the core capability to be announced + $tox ? $core [ + + ; log the address of the core + ?
$log ! + + ; notify on friend request + ? [ + ; auto-accept + + ] + + ; wait for capability to a friend + ? $friend [ + + ; get the friend name + ? [ + ; get the status message + ? [ $log ! ] + + ?? [ + $log ! ] - ? $friend [ - $notifier ! - ? [ - $notifier ! - ? [ $notifier ! ] - ] + ] + + ? [ + $log ! + $config ? ?ingester> [ + $log ! + $ingester ] ] ] ] + ]