From 7afc67e912ba1da7416fb5c5fae63932ba670a52 Mon Sep 17 00:00:00 2001 From: Emery Hemingway Date: Fri, 10 Jun 2022 11:49:36 -0500 Subject: [PATCH] Shit happens --- protocol.prs | 4 + src/protocol.nim | 13 ++ src/syndicate_actor_tox.nim | 403 ++++++++++++++++++++---------------- syndicate_actor_tox.nimble | 2 +- syndicate_server_config.pr | 4 +- 5 files changed, 250 insertions(+), 176 deletions(-) diff --git a/protocol.prs b/protocol.prs index 692838f..d7b6ffb 100644 --- a/protocol.prs +++ b/protocol.prs @@ -17,6 +17,8 @@ ToxSelfStatus = . ToxSelfStatusMessage = . +ToxFriendRequest = . + ToxFriendKey = . ToxFriendLastOnline = . @@ -30,3 +32,5 @@ ToxFriendTyping = . ToxFriendAdd = . ToxFriendSend = . + +ToxFriendBacklog = . diff --git a/src/protocol.nim b/src/protocol.nim index fbae8f0..87bfced 100644 --- a/src/protocol.nim +++ b/src/protocol.nim @@ -22,6 +22,11 @@ type ToxSelfStatus* {.preservesRecord: "ToxSelfStatus".} = object `status`*: Status + ToxFriendBacklog* {.preservesRecord: "ToxFriendBacklog".} = object + `num`*: int + `start`*: float64 + `stop`*: float64 + BootstrapNode* {.preservesRecord: "bootstrap".} = object `publicKey`*: string `host`*: string @@ -53,6 +58,10 @@ type `num`*: int `message`*: string + ToxFriendRequest* {.preservesRecord: "ToxFriendRequest".} = object + `key`*: seq[byte] + `message`*: string + ToxFriendName* {.preservesRecord: "ToxFriendName".} = object `num`*: int `name`*: string @@ -69,6 +78,7 @@ proc encode*[E](x: ListenOn[E]): seq[byte] = proc `$`*(x: ToxFriendSend | ToxCoreVersion | ToxFriendLastOnline | ToxSelfConnectionStatus | ToxSelfStatus | + ToxFriendBacklog | BootstrapNode | ToxSelfStatusMessage | ToxSelfName | @@ -76,6 +86,7 @@ proc `$`*(x: ToxFriendSend | ToxCoreVersion | ToxFriendLastOnline | ToxSelfAddress | ToxFriendAdd | ToxFriendStatusMessage | + ToxFriendRequest | ToxFriendName | ToxFriendTyping): string = `$`(toPreserve(x)) @@ -83,6 +94,7 @@ proc `$`*(x: ToxFriendSend | ToxCoreVersion | ToxFriendLastOnline | proc encode*(x: ToxFriendSend | ToxCoreVersion | ToxFriendLastOnline | ToxSelfConnectionStatus | ToxSelfStatus | + ToxFriendBacklog | BootstrapNode | ToxSelfStatusMessage | ToxSelfName | @@ -90,6 +102,7 @@ proc encode*(x: ToxFriendSend | ToxCoreVersion | ToxFriendLastOnline | ToxSelfAddress | ToxFriendAdd | ToxFriendStatusMessage | + ToxFriendRequest | ToxFriendName | ToxFriendTyping): seq[byte] = encode(toPreserve(x)) diff --git a/src/syndicate_actor_tox.nim b/src/syndicate_actor_tox.nim index 3476dbb..c22353c 100644 --- a/src/syndicate_actor_tox.nim +++ b/src/syndicate_actor_tox.nim @@ -1,14 +1,19 @@ # SPDX-FileCopyrightText: ☭ 2022 Emery Hemingway # SPDX-License-Identifier: Unlicense -import std/[asyncdispatch, logging, parseopt, strutils, tables] -from std/os import fileExists, moveFile +import std/[asyncdispatch, logging, parseopt, streams, strutils, tables] +from std/os import getEnv, fileExists, moveFile from std/sequtils import toSeq from std/times import inMilliseconds -import syndicate -import toxcore +import syndicate, syndicate/actors + +import cbor, toxcore import ./protocol +# import ./logging + +addHandler(newConsoleLogger(useStderr = true)) + # register global logger to stderr var alive: bool setControlCHook do: @@ -17,14 +22,11 @@ setControlCHook do: quit "exited without saving Tox data" alive = false -proc console_log_callback( +proc logging_callback( core: Core; level: Log_Level; file: cstring; line: uint32; `func`: cstring; message: cstring; user_data: pointer) {.exportc, cdecl.} = - stderr.writeLine "console_log_callback: ", `func` - #[ - let console = cast[ConsoleLogger](user_data) let lvl = case level of TOX_LOG_LEVEL_TRACE: lvlDebug of TOX_LOG_LEVEL_DEBUG: lvlDebug @@ -32,131 +34,47 @@ proc console_log_callback( of TOX_LOG_LEVEL_WARNING: lvlWarn of TOX_LOG_LEVEL_ERROR: lvlError if lvl != lvlNone: - log(console, lvl, `func`, ": ", message) - ]# + log(lvl, `func`, ": ", message) -type - SelfHandles = object - connectionStatus, name, status, statusMessage: Handle - FriendHandles = object - key, lastOnline, name, statusMessage, typing: Handle +proc saveFilePath(): string = + for kind, key, val in getopt(): + if kind == cmdLongOption and key == "save-file" and val != "": + result = val - ToxActor = ref object - console: ConsoleLogger - facet: Facet - `ref`: Ref - saveFilePath: string - core: Tox - self: SelfHandles - friends: Table[Friend, FriendHandles] +type GlobalState = ref object + core: Tox + statusCounts: array[3, int] -template debug(actor: ToxActor; args: varargs[string, `$`]) = - actor.console.log(lvlDebug, args) - -template info(actor: ToxActor; args: varargs[string, `$`]) = - actor.console.log(lvlInfo, args) - -template warn(actor: ToxActor; args: varargs[string, `$`]) = - actor.console.log(lvlWarn, args) - -template error(actor: ToxActor; args: varargs[string, `$`]) = - actor.console.log(lvlError, args) - -proc writeSaveData(actor: ToxActor) = - if actor.saveFilePath != "": - let - path = actor.saveFilePath - tmpPath = path & ".tmp" - actor.debug("Duming data to ", tmpPath) - writeFile(tmpPath, actor.core.saveData) +proc writeSaveData(state: GlobalState) = + let path = saveFilePath() + if path != "": + let tmpPath = path & ".tmp" + debug("Dumping save data to ", tmpPath) + var stream = newFileStream(tmpPath, fmWrite) + stream.writeCbor(state.core.saveData) + close(stream) moveFile(tmpPath, path) - actor.debug("Data saved to ", path) + debug("Data saved to ", path) -proc publish[T](actor: ToxActor; assertion: T): Handle = - actor.info("publish ", assertion) - var h: Handle - actor.facet.run do (turn: var Turn): - h = publish(turn, actor.`ref`, assertion) - h - -proc retract(actor: ToxActor; h: Handle) = - actor.facet.run do (turn: var Turn): retract(turn, h) - -template replace[T](actor: ToxActor; h: var Handle; assertion: T) = - actor.facet.run do (turn: var Turn): replace(turn, actor.`ref`, h, assertion) - -proc installCallbacks(actor: ToxActor; turn: var Turn) = - actor.core.status = TOX_USER_STATUS_AWAY - - discard publish(turn, actor.`ref`, - ToxCoreVersion( - major: int version_major(), - minor: int version_minor(), - patch: int version_patch())) - discard publish(turn, actor.`ref`, - ToxSelfAddress(address: actor.core.address.bytes.toSeq)) - - actor.self.name = - publish(turn, actor.`ref`, - ToxSelfName(name: actor.core.name)) - - actor.self.statusMessage = - publish(turn, actor.`ref`, - ToxSelfStatusMessage(message: actor.core.statusMessage)) - - actor.core.onSelfConnectionStatus do (status: Connection): - let sym = - case status - of TOX_CONNECTION_NONE: Symbol"none" - of TOX_CONNECTION_TCP: Symbol"tcp" - of TOX_CONNECTION_UDP: Symbol"udp" - actor.replace(actor.self.connectionStatus, - ToxSelfConnectionStatus(status: sym)) - - for num in actor.core.friends: - var handles: FriendHandles - handles.key = publish(turn, actor.`ref`, - ToxFriendKey(num: int num, key: actor.core.publicKey(num).bytes.toSeq)) - - let lastOnline = actor.core.lastOnline(num) - if lastOnline > 0: - handles.lastOnline = publish(turn, actor.`ref`, - ToxFriendLastOnline(num: int num, unixEpoch: float64 lastOnline)) - - handles.name = publish(turn, actor.`ref`, - ToxFriendName(num: int num, name: actor.core.name(num))) - - handles.statusMessage = publish(turn, actor.`ref`, - ToxFriendStatusMessage(num: int num, message: actor.core.statusMessage(num))) - actor.friends[num] = handles - - actor.core.onFriendName do (num: Friend; name: string): - actor.replace(actor.friends[num].name, ToxFriendName(num: int num, name: name)) - - actor.core.onFriendStatusMessage do (num: Friend; msg: string): - actor.replace( - actor.friends[num].statusMessage, - ToxFriendStatusMessage(num: int num, message: msg)) - - actor.core.onFriendTyping do (num: Friend; typing: bool): - if typing: - actor.friends[num].typing = - actor.publish( - ToxFriendTyping(num: int num)) - else: - actor.retract(actor.friends[num].typing) - actor.friends[num].typing = 0 - -proc newToxActor(`ref`: Ref; turn: var Turn): ToxActor = - result = ToxActor( - console: newConsoleLogger(useStderr=true), - facet: turn.facet, - `ref`: `ref`) - let actor = result - result.core = initTox do (opts: toxcore.Options): - opts.log_callback = console_log_callback - # opts.log_user_data = addr actor.console[] - actor.debug "parsing command-line options…" +proc newGlobalState(): GlobalState = + new result + let saveFile = saveFilePath() + var saveData: seq[byte] + if fileExists saveFile: + block: + var + stream = newFileStream(saveFile, fmRead) + parser: CborParser + dbPath: string + open(parser, stream) + parser.next() + if not saveData.fromCbor(parser.nextNode()): + raise newException(ValueError, "failed to parse Tox save data") + close(stream) + var proxy_host: string + result.core = newTox do (opts: toxcore.Options): + opts.log_callback = logging_callback + debug "parsing command-line options…" proc parseBoolParam(key, val: string): bool = if val == "": result = true else: @@ -167,7 +85,6 @@ proc newToxActor(`ref`: Ref; turn: var Turn): ToxActor = try: result = uint16 parsebool(val) except: quit("failed to parse " & key & " as port: " & val) - var saveDataLoaded = false for kind, key, val in getopt(): case kind of cmdLongOption: @@ -186,7 +103,8 @@ proc newToxActor(`ref`: Ref; turn: var Turn): ToxActor = else: quit("unhandled proxy type: " & val) of "proxy-host": - opts.proxy_host = val + proxy_host = val + opts.proxy_host = proxy_host of "proxy-port": opts.proxy_port = parsePortParam(key, val) of "start-port": @@ -197,83 +115,220 @@ proc newToxActor(`ref`: Ref; turn: var Turn): ToxActor = opts.tcp_port = parsePortParam(key, val) of "hole-punching": opts.hole_punching_enabled = parseBoolParam(key, val) - of "save-file": - actor.saveFilePath = val + of "save-file": discard else: quit("unhandled command-line parameter: " & key) of cmdShortOption, cmdArgument: quit("unhandled command-line parameter: " & key) of cmdEnd: discard - if actor.saveFilePath != "" and fileExists actor.saveFilePath: - actor.info "Loading saved data from ", actor.saveFilePath - opts.savedata_type = TOX_SAVEDATA_TYPE_TOX_SAVE - opts.savedata = readFile(actor.saveFilePath) - installCallbacks(result, turn) + if saveData.len > 0: + opts.savedata_type = TOX_SAVEDATA_TYPE_TOX_SAVE + opts.savedata = saveData let ms = result.core.iterationInterval.inMilliseconds.int oneshot = false + global = result addTimer(ms, oneshot) do (fd: AsyncFD) -> bool: if not alive: - actor.writeSaveData() + writeSaveData(global) quit() - iterate actor.core - result.info "Tox actor ready" + iterate global.core + writeSaveData(global) + info "Tox ready" + info result.core.dhtId, "@:", result.core.udpPort + +type + SelfHandles = object + connectionStatus, name, status, statusMessage: Handle + FriendHandles = object + `ref`: Ref + key, lastOnline, name, statusMessage, typing: Handle + + ToxRelay = ref object + global: GlobalState + facet: Facet + `ref`: Ref + self: SelfHandles + friendRequests: Table[PublicKey, Handle] + friends: Table[Friend, FriendHandles] + +proc publish[T](relay: ToxRelay; assertion: T): Handle = + info "publish ", assertion + var h: Handle + relay.facet.run do (turn: var Turn): + h = publish(turn, relay.`ref`, assertion) + h + +proc retract(relay: ToxRelay; h: Handle) = + relay.facet.run do (turn: var Turn): retract(turn, h) + +template replace[T](relay: ToxRelay; h: var Handle; assertion: T) = + relay.facet.run do (turn: var Turn): + replace(turn, relay.`ref`, h, assertion) + +proc installCallbacks(relay: ToxRelay; turn: var Turn) = + relay.global.core.status = TOX_USER_STATUS_AWAY + + discard publish(turn, relay.`ref`, + ToxCoreVersion( + major: int version_major(), + minor: int version_minor(), + patch: int version_patch())) + discard publish(turn, relay.`ref`, + ToxSelfAddress(address: relay.global.core.address.bytes.toSeq)) + + if relay.global.core.name != "": + relay.self.name = + publish(turn, relay.`ref`, + ToxSelfName(name: relay.global.core.name)) + + if relay.global.core.statusMessage != "": + relay.self.statusMessage = + publish(turn, relay.`ref`, + ToxSelfStatusMessage(message: relay.global.core.statusMessage)) + + relay.global.core.onSelfConnectionStatus do (status: Connection): + let sym = + case status + of TOX_CONNECTION_NONE: Symbol"none" + of TOX_CONNECTION_TCP: Symbol"tcp" + of TOX_CONNECTION_UDP: Symbol"udp" + relay.replace(relay.self.connectionStatus, + ToxSelfConnectionStatus(status: sym)) + + for num in relay.global.core.friends: + var handles = FriendHandles(`ref`: newRef(turn, relay.`ref`.target)) + handles.key = publish(turn, relay.`ref`, + ToxFriendKey(num: int num, key: relay.global.core.publicKey(num).bytes.toSeq)) + + let lastOnline = relay.global.core.lastOnline(num) + if lastOnline > 0: + handles.lastOnline = publish(turn, relay.`ref`, + ToxFriendLastOnline(num: int num, unixEpoch: float64 lastOnline)) + + handles.name = publish(turn, relay.`ref`, + ToxFriendName(num: int num, name: relay.global.core.name(num))) + + handles.statusMessage = publish(turn, relay.`ref`, + ToxFriendStatusMessage(num: int num, message: relay.global.core.statusMessage(num))) + relay.friends[num] = handles + + relay.global.core.onFriendName do (num: Friend; name: string): + relay.replace(relay.friends[num].name, ToxFriendName(num: int num, name: name)) + + relay.global.core.onFriendStatusMessage do (num: Friend; msg: string): + relay.replace( + relay.friends[num].statusMessage, + ToxFriendStatusMessage(num: int num, message: msg)) + + relay.global.core.onFriendTyping do (num: Friend; typing: bool): + if typing: + relay.friends[num].typing = + relay.publish( + ToxFriendTyping(num: int num)) + else: + relay.retract(relay.friends[num].typing) + relay.friends[num].typing = 0 + + relay.global.core.onFriendRequest do (key: PublicKey; msg: string): + var a = ToxFriendRequest(key: key.bytes.toSeq, message: msg) + if relay.friendRequests.hasKey key: + relay.replace(relay.friendRequests[key], a) + else: + relay.friendRequests[key] = relay.publish(a) + +proc newToxRelay(global: GlobalState; `ref`: Ref; turn: var Turn): ToxRelay = + result = ToxRelay( + global: global, + facet: turn.facet, + `ref`: `ref`) + installCallbacks(result, turn) bootDataspace("main") do (root: Ref; turn: var Turn): - connectStdio(root, turn) alive = true + var global = newGlobalState() - onPublish(turn, root, ?ListenOn[Ref]) do (a: Assertion): + connectStdio(root, turn) + + onPublish(turn, root, ?BootstrapNode) do (key: string; host: string; port: int): + info "Bootstrapping from ", key, "@", host, ":", port + try: global.core.bootstrap(host, key.toPublicKey, uint16 port) + except ToxError as e: + error "failed to bootstrap: ", e.msg + + during(turn, root, ?ListenOn[Ref]) do (a: Assertion): let ds = unembed a - actor = newToxActor(ds, turn) + relay = newToxRelay(global, ds, turn) - onPublish(turn, root, ?BootstrapNode) do (key: string; host: string; port: int): - actor.info("Bootstrapping from ", key, "@", host, ":", port) - try: actor.core.bootstrap(host, key.toPublicKey, uint16 port) - except ToxError as e: - actor.error "failed to bootstrap: ", e.msg - - onMessage(turn, ds, ?ToxSelfStatus) do (status: Status): - ## TODO: Take the status of the core from the frontend. - ## Set to away when the frontend retracts its status. - actor.debug "ToxSelfStatus ", rawValues - actor.core.status = case status + during(turn, ds, ?ToxSelfStatus) do (status: Status): + let e = case status of Status.online: TOX_USER_STATUS_NONE of Status.away: TOX_USER_STATUS_AWAY of Status.busy: TOX_USER_STATUS_BUSY - #turn.facet.onStop do (turn: var Turn): - # actor.core.status = TOX_USER_STATUS_AWAY + inc relay.global.statusCounts[int e] + let most = max(relay.global.statusCounts) + if relay.global.statusCounts[int e] == most: + relay.global.core.status = e + do: + relay.global.statusCounts[int e] = max(0, pred relay.global.statusCounts[int e]) + let most = max(relay.global.statusCounts) + for e in [ + TOX_USER_STATUS_BUSY, + TOX_USER_STATUS_NONE, + TOX_USER_STATUS_AWAY]: + if relay.global.statusCounts[int e] == most: + relay.global.core.status = e + break - onMessage(turn, ds, ?ToxSelfName) do (name: string): - actor.debug "ToxSelfName ", rawValues - actor.core.name = name - replace(turn, ds, actor.self.name, ToxSelfName(name: actor.core.name)) - actor.writeSaveData() + onPublish(turn, ds, ?ToxSelfName) do (name: string): + debug "ToxSelfName ", rawValues + relay.global.core.name = name + replace(turn, ds, relay.self.name, ToxSelfName(name: relay.global.core.name)) + writeSaveData(relay.global) - onMessage(turn, ds, ?ToxSelfStatusMessage) do (msg: string): - actor.debug "ToxSelfStatusMessage ", rawValues - actor.core.statusMessage = msg - replace(turn, ds, actor.self.statusMessage, - ToxSelfStatusMessage(message: actor.core.statusMessage)) - actor.writeSaveData() + onPublish(turn, ds, ?ToxSelfStatusMessage) do (msg: string): + debug "ToxSelfStatusMessage ", rawValues + relay.global.core.statusMessage = msg + replace(turn, ds, relay.self.statusMessage, + ToxSelfStatusMessage(message: relay.global.core.statusMessage)) + relay.global.writeSaveData() + + onMessage(turn, ds, ?ToxFriendRequest) do (bytes: seq[byte]; msg: string): + info "got a ToxFriendRequest with a ", bytes.len, " byte key" + if bytes.len == TOX_PUBLIC_KEY_SIZE: + var key: PublicKey + copyMem(addr key.bytes[0], addr bytes[0], key.bytes.len) + var h: Handle + if relay.friendRequests.pop(key, h): + relay.retract(h) + try: + debug "addFriendNoRequest ", key + let friend = relay.global.core.addFriendNoRequest(key) + relay.friends[friend] = FriendHandles() + relay.global.writeSaveData() + except ToxError as e: + error "failed to add friend: ", e.msg onMessage(turn, ds, ?ToxFriendAdd) do (toxid: seq[byte]; msg: string): - actor.debug "ToxFriendAdd ", rawValues var address: Address if toxid.len != address.bytes.len: - actor.error "invalid Tox address: ", toxid + error "invalid Tox address: ", toxid else: copyMem(addr address.bytes[0], addr toxid[0], address.bytes.len) try: - discard actor.core.addFriend(address, msg) - # TODO: assert a pending friend? + let friend = relay.global.core.addFriend(address, msg) + relay.friends[friend] = FriendHandles() + relay.global.writeSaveData() + # TODO: assert a pending friend? except ToxError as e: - actor.error "failed to add friend: ", e.msg + error "failed to add friend: ", e.msg onMessage(turn, ds, ?ToxFriendSend) do (friend: Friend; msg: string): - actor.debug "ToxFriendSend ", rawValues - discard actor.core.send(friend, msg) + discard relay.global.core.send(friend, msg) # TODO: assert pending messages? + do: + info "facet stopped within ListenOn body" + runForever() diff --git a/syndicate_actor_tox.nimble b/syndicate_actor_tox.nimble index e25b78f..f71f32a 100644 --- a/syndicate_actor_tox.nimble +++ b/syndicate_actor_tox.nimble @@ -10,4 +10,4 @@ bin = @["syndicate_actor_tox"] # Dependencies -requires "nim >= 1.6.2", "syndicate >= 1.2.1" +requires "nim >= 1.6.2", "syndicate >= 1.2.1", "cbor", "toxcore" diff --git a/syndicate_server_config.pr b/syndicate_server_config.pr index abfba24..c03a657 100644 --- a/syndicate_server_config.pr +++ b/syndicate_server_config.pr @@ -1,13 +1,15 @@ let ?root_ds = dataspace + $gatekeeper>> >