From 389fd04c4a12e7a239dbb015aba75d962331699b Mon Sep 17 00:00:00 2001 From: Emery Hemingway Date: Fri, 10 Jun 2022 17:43:20 -0500 Subject: [PATCH] Reduce for a redo, use entities core at each friend --- protocol.prs | 43 +--- src/protocol.nim | 106 ++------- src/syndicate_actor_tox.nim | 414 +++++++++++------------------------- 3 files changed, 152 insertions(+), 411 deletions(-) diff --git a/protocol.prs b/protocol.prs index d7b6ffb..1f158e1 100644 --- a/protocol.prs +++ b/protocol.prs @@ -1,36 +1,15 @@ version 1 . -ListenOn = . +ToxDataspace = . + +FriendDataspace = . + +CoreVersion = . + +Address =
. + +Name = . + +StatusMessage = . BootstrapNode = . - -ToxCoreVersion = . - -ToxSelfAddress = . - -ToxSelfConnectionStatus = . - -ToxSelfName = . - -Status = =online / =away / =busy . -ToxSelfStatus = . - -ToxSelfStatusMessage = . - -ToxFriendRequest = . - -ToxFriendKey = . - -ToxFriendLastOnline = . - -ToxFriendName = . - -ToxFriendStatusMessage = . - -ToxFriendTyping = . - -ToxFriendAdd = . - -ToxFriendSend = . - -ToxFriendBacklog = . diff --git a/src/protocol.nim b/src/protocol.nim index 87bfced..351a707 100644 --- a/src/protocol.nim +++ b/src/protocol.nim @@ -3,106 +3,42 @@ import std/typetraits, preserves type - ToxFriendSend* {.preservesRecord: "ToxFriendSend".} = object - `num`*: int - `message`*: string + Name* {.preservesRecord: "name".} = object + `name`*: string - ToxCoreVersion* {.preservesRecord: "ToxCoreVersion".} = object - `major`*: int - `minor`*: int - `patch`*: int + FriendDataspace*[E] {.preservesRecord: "friend".} = ref object + `publicKey`*: seq[byte] + `entity`*: Preserve[E] - ToxFriendLastOnline* {.preservesRecord: "ToxFriendLastNnline".} = object - `num`*: int - `unixEpoch`*: float64 + Address* {.preservesRecord: "address".} = object + `text`*: string - ToxSelfConnectionStatus* {.preservesRecord: "ToxSelfConnectionStatus".} = object - `status`*: Symbol - - ToxSelfStatus* {.preservesRecord: "ToxSelfStatus".} = object - `status`*: Status - - ToxFriendBacklog* {.preservesRecord: "ToxFriendBacklog".} = object - `num`*: int - `start`*: float64 - `stop`*: float64 + ToxDataspace*[E] {.preservesRecord: "tox".} = ref object + `publicKey`*: seq[byte] + `entity`*: Preserve[E] BootstrapNode* {.preservesRecord: "bootstrap".} = object `publicKey`*: string `host`*: string `port`*: int - ToxSelfStatusMessage* {.preservesRecord: "ToxSelfStatusMessage".} = object - `message`*: string + StatusMessage* {.preservesRecord: "status-message".} = object + `msg`*: string - `Status`* {.preservesOr, pure.} = enum - `online`, `away`, `busy` - ToxSelfName* {.preservesRecord: "ToxSelfName".} = object - `name`*: string + CoreVersion* {.preservesRecord: "core".} = object + `major`*: int + `minor`*: int + `patch`*: int - ToxFriendKey* {.preservesRecord: "ToxFriendkey".} = object - `num`*: int - `key`*: seq[byte] - - ListenOn*[E] {.preservesRecord: "listen-on".} = ref object - `dataspace`*: Preserve[E] - - ToxSelfAddress* {.preservesRecord: "ToxSelfAddress".} = object - `address`*: seq[byte] - - ToxFriendAdd* {.preservesRecord: "ToxFriendAdd".} = object - `address`*: seq[byte] - `message`*: string - - ToxFriendStatusMessage* {.preservesRecord: "ToxFriendStatusMessage".} = object - `num`*: int - `message`*: string - - ToxFriendRequest* {.preservesRecord: "ToxFriendRequest".} = object - `key`*: seq[byte] - `message`*: string - - ToxFriendName* {.preservesRecord: "ToxFriendName".} = object - `num`*: int - `name`*: string - - ToxFriendTyping* {.preservesRecord: "ToxFriendTyping".} = object - `num`*: int - -proc `$`*[E](x: ListenOn[E]): string = +proc `$`*[E](x: FriendDataspace[E] | ToxDataspace[E]): string = `$`(toPreserve(x, E)) -proc encode*[E](x: ListenOn[E]): seq[byte] = +proc encode*[E](x: FriendDataspace[E] | ToxDataspace[E]): seq[byte] = encode(toPreserve(x, E)) -proc `$`*(x: ToxFriendSend | ToxCoreVersion | ToxFriendLastOnline | - ToxSelfConnectionStatus | - ToxSelfStatus | - ToxFriendBacklog | - BootstrapNode | - ToxSelfStatusMessage | - ToxSelfName | - ToxFriendKey | - ToxSelfAddress | - ToxFriendAdd | - ToxFriendStatusMessage | - ToxFriendRequest | - ToxFriendName | - ToxFriendTyping): string = +proc `$`*(x: Name | Address | BootstrapNode | StatusMessage | CoreVersion): string = `$`(toPreserve(x)) -proc encode*(x: ToxFriendSend | ToxCoreVersion | ToxFriendLastOnline | - ToxSelfConnectionStatus | - ToxSelfStatus | - ToxFriendBacklog | - BootstrapNode | - ToxSelfStatusMessage | - ToxSelfName | - ToxFriendKey | - ToxSelfAddress | - ToxFriendAdd | - ToxFriendStatusMessage | - ToxFriendRequest | - ToxFriendName | - ToxFriendTyping): seq[byte] = +proc encode*(x: Name | Address | BootstrapNode | StatusMessage | CoreVersion): seq[ + byte] = encode(toPreserve(x)) diff --git a/src/syndicate_actor_tox.nim b/src/syndicate_actor_tox.nim index c22353c..0e99b83 100644 --- a/src/syndicate_actor_tox.nim +++ b/src/syndicate_actor_tox.nim @@ -1,27 +1,19 @@ # SPDX-FileCopyrightText: ☭ 2022 Emery Hemingway # SPDX-License-Identifier: Unlicense -import std/[asyncdispatch, logging, parseopt, streams, strutils, tables] -from std/os import getEnv, fileExists, moveFile +import std/[asyncdispatch, logging, parseopt, strutils, tables] from std/sequtils import toSeq from std/times import inMilliseconds -import syndicate, syndicate/actors +import preserves, syndicate, syndicate/actors -import cbor, toxcore +import toxcore import ./protocol # import ./logging addHandler(newConsoleLogger(useStderr = true)) # register global logger to stderr -var alive: bool -setControlCHook do: - if not alive: - # hard stop - quit "exited without saving Tox data" - alive = false - proc logging_callback( core: Core; level: Log_Level; file: cstring; line: uint32; @@ -36,299 +28,133 @@ proc logging_callback( if lvl != lvlNone: log(lvl, `func`, ": ", message) -proc saveFilePath(): string = - for kind, key, val in getopt(): - if kind == cmdLongOption and key == "save-file" and val != "": - result = val - -type GlobalState = ref object - core: Tox - statusCounts: array[3, int] - -proc writeSaveData(state: GlobalState) = - let path = saveFilePath() - if path != "": - let tmpPath = path & ".tmp" - debug("Dumping save data to ", tmpPath) - var stream = newFileStream(tmpPath, fmWrite) - stream.writeCbor(state.core.saveData) - close(stream) - moveFile(tmpPath, path) - debug("Data saved to ", path) - -proc newGlobalState(): GlobalState = - new result - let saveFile = saveFilePath() - var saveData: seq[byte] - if fileExists saveFile: - block: - var - stream = newFileStream(saveFile, fmRead) - parser: CborParser - dbPath: string - open(parser, stream) - parser.next() - if not saveData.fromCbor(parser.nextNode()): - raise newException(ValueError, "failed to parse Tox save data") - close(stream) - var proxy_host: string - result.core = newTox do (opts: toxcore.Options): - opts.log_callback = logging_callback - debug "parsing command-line options…" - proc parseBoolParam(key, val: string): bool = - if val == "": result = true - else: - try: result = parsebool(val) - except: - quit("failed to parse " & key & " as boolean: " & val) - proc parsePortParam(key, val: string): uint16 = - try: result = uint16 parsebool(val) - except: - quit("failed to parse " & key & " as port: " & val) - for kind, key, val in getopt(): - case kind - of cmdLongOption: - case key - of "ipv6": - opts.ipv6_enabled = parseBoolParam(key, val) - of "udp": - opts.udp_enabled = parseBoolParam(key, val) - of "local-discovery": - opts.local_discovery_enabled = parseBoolParam(key, val) - of "proxy": - case val - of "none": opts.proxy_type = TOX_PROXY_TYPE_NONE - of "http": opts.proxy_type = TOX_PROXY_TYPE_HTTP - of "socks5": opts.proxy_type = TOX_PROXY_TYPE_SOCKS5 - else: - quit("unhandled proxy type: " & val) - of "proxy-host": - proxy_host = val - opts.proxy_host = proxy_host - of "proxy-port": - opts.proxy_port = parsePortParam(key, val) - of "start-port": - opts.start_port = parsePortParam(key, val) - of "end-port": - opts.end_port = parsePortParam(key, val) - of "tcp-port": - opts.tcp_port = parsePortParam(key, val) - of "hole-punching": - opts.hole_punching_enabled = parseBoolParam(key, val) - of "save-file": discard - else: - quit("unhandled command-line parameter: " & key) - of cmdShortOption, cmdArgument: - quit("unhandled command-line parameter: " & key) - of cmdEnd: discard - if saveData.len > 0: - opts.savedata_type = TOX_SAVEDATA_TYPE_TOX_SAVE - opts.savedata = saveData - let - ms = result.core.iterationInterval.inMilliseconds.int - oneshot = false - global = result - addTimer(ms, oneshot) do (fd: AsyncFD) -> bool: - if not alive: - writeSaveData(global) - quit() - iterate global.core - writeSaveData(global) - info "Tox ready" - info result.core.dhtId, "@:", result.core.udpPort - type - SelfHandles = object - connectionStatus, name, status, statusMessage: Handle - FriendHandles = object - `ref`: Ref - key, lastOnline, name, statusMessage, typing: Handle - - ToxRelay = ref object - global: GlobalState + Entity = ref object of RootObj facet: Facet - `ref`: Ref - self: SelfHandles - friendRequests: Table[PublicKey, Handle] - friends: Table[Friend, FriendHandles] + ds: Ref -proc publish[T](relay: ToxRelay; assertion: T): Handle = - info "publish ", assertion - var h: Handle - relay.facet.run do (turn: var Turn): - h = publish(turn, relay.`ref`, assertion) - h + FriendHandles = object + name, statusMessage, lastOnline, typing: Handle -proc retract(relay: ToxRelay; h: Handle) = - relay.facet.run do (turn: var Turn): retract(turn, h) + FriendEntity {.final.} = ref object of Entity + handles: FriendHandles -template replace[T](relay: ToxRelay; h: var Handle; assertion: T) = - relay.facet.run do (turn: var Turn): - replace(turn, relay.`ref`, h, assertion) + CoreHandles = object + address, name, statusMessage, connectionStatus: Handle + CoreEntity {.final.} = ref object of Entity + core: Tox + statusCounts: array[3, int] + handles: CoreHandles + friendRequests: Table[toxcore.PublicKey, Handle] + friendEntities: seq[FriendEntity] -proc installCallbacks(relay: ToxRelay; turn: var Turn) = - relay.global.core.status = TOX_USER_STATUS_AWAY +proc init(e: Entity; turn: var Turn; parent: Ref): Handle = + assert e.facet.isNil + e.facet = turn.facet + e.ds = newRef(turn, parent.target) - discard publish(turn, relay.`ref`, - ToxCoreVersion( - major: int version_major(), - minor: int version_minor(), - patch: int version_patch())) - discard publish(turn, relay.`ref`, - ToxSelfAddress(address: relay.global.core.address.bytes.toSeq)) +proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) = + assert entity.core.isNil + block: # Tox initialization + var proxy_host: cstring + entity.core = newTox do (opts: toxcore.Options): + opts.log_callback = logging_callback + debug "parsing command-line options…" + proc parseBoolParam(key, val: string): bool = + if val == "": result = true + else: + try: result = parsebool(val) + except: + quit("failed to parse " & key & " as boolean: " & val) + proc parsePortParam(key, val: string): uint16 = + try: result = uint16 parsebool(val) + except: + quit("failed to parse " & key & " as port: " & val) + for kind, key, val in getopt(): + case kind + of cmdLongOption: + case key + of "ipv6": + opts.ipv6_enabled = parseBoolParam(key, val) + of "udp": + opts.udp_enabled = parseBoolParam(key, val) + of "local-discovery": + opts.local_discovery_enabled = parseBoolParam(key, val) + of "proxy": + case val + of "none": opts.proxy_type = TOX_PROXY_TYPE_NONE + of "http": opts.proxy_type = TOX_PROXY_TYPE_HTTP + of "socks5": opts.proxy_type = TOX_PROXY_TYPE_SOCKS5 + else: + quit("unhandled proxy type: " & val) + of "proxy-host": + proxy_host = val + opts.proxy_host = proxy_host + of "proxy-port": + opts.proxy_port = parsePortParam(key, val) + of "start-port": + opts.start_port = parsePortParam(key, val) + of "end-port": + opts.end_port = parsePortParam(key, val) + of "tcp-port": + opts.tcp_port = parsePortParam(key, val) + of "hole-punching": + opts.hole_punching_enabled = parseBoolParam(key, val) + of "save-file": discard + else: + quit("unhandled command-line parameter: " & key) + of cmdShortOption, cmdArgument: + quit("unhandled command-line parameter: " & key) + of cmdEnd: discard + block: # Syndicate entity initialization + discard init(entity, turn, parentRef) + discard publish(turn, parentRef, ToxDataspace[Ref]( + publicKey: entity.core.publicKey.bytes.toSeq, + entity: entity.ds.embed)) + discard publish(turn, entity.ds, + CoreVersion( + major: int version_major(), + minor: int version_minor(), + patch: int version_patch())) + entity.handles.address = publish(turn, entity.ds, + protocol.Address(text: $entity.core.address)) + entity.handles.name = publish(turn, entity.ds, + Name(name: entity.core.name)) + entity.handles.statusMessage = publish(turn, entity.ds, + StatusMessage(msg: entity.core.statusMessage)) + block: # Friends initialization + var friendNums = entity.core.friends + entity.friendEntities.setLen(friendNums.len) + for fn in friendNums: + var fe = new FriendEntity + discard init(fe, turn, entity.ds) + discard publish(turn, entity.ds, FriendDataspace[Ref]( + publicKey: entity.core.publicKey(fn).bytes.toSeq, + entity: fe.ds.embed)) + if fn.int > entity.friendEntities.len: + entity.friendEntities.setLen(fn.int.succ) + entity.friendEntities[int fn] = fe - if relay.global.core.name != "": - relay.self.name = - publish(turn, relay.`ref`, - ToxSelfName(name: relay.global.core.name)) +var alive: bool +setControlCHook do: + if not alive: quit() + alive = false - if relay.global.core.statusMessage != "": - relay.self.statusMessage = - publish(turn, relay.`ref`, - ToxSelfStatusMessage(message: relay.global.core.statusMessage)) - - relay.global.core.onSelfConnectionStatus do (status: Connection): - let sym = - case status - of TOX_CONNECTION_NONE: Symbol"none" - of TOX_CONNECTION_TCP: Symbol"tcp" - of TOX_CONNECTION_UDP: Symbol"udp" - relay.replace(relay.self.connectionStatus, - ToxSelfConnectionStatus(status: sym)) - - for num in relay.global.core.friends: - var handles = FriendHandles(`ref`: newRef(turn, relay.`ref`.target)) - handles.key = publish(turn, relay.`ref`, - ToxFriendKey(num: int num, key: relay.global.core.publicKey(num).bytes.toSeq)) - - let lastOnline = relay.global.core.lastOnline(num) - if lastOnline > 0: - handles.lastOnline = publish(turn, relay.`ref`, - ToxFriendLastOnline(num: int num, unixEpoch: float64 lastOnline)) - - handles.name = publish(turn, relay.`ref`, - ToxFriendName(num: int num, name: relay.global.core.name(num))) - - handles.statusMessage = publish(turn, relay.`ref`, - ToxFriendStatusMessage(num: int num, message: relay.global.core.statusMessage(num))) - relay.friends[num] = handles - - relay.global.core.onFriendName do (num: Friend; name: string): - relay.replace(relay.friends[num].name, ToxFriendName(num: int num, name: name)) - - relay.global.core.onFriendStatusMessage do (num: Friend; msg: string): - relay.replace( - relay.friends[num].statusMessage, - ToxFriendStatusMessage(num: int num, message: msg)) - - relay.global.core.onFriendTyping do (num: Friend; typing: bool): - if typing: - relay.friends[num].typing = - relay.publish( - ToxFriendTyping(num: int num)) - else: - relay.retract(relay.friends[num].typing) - relay.friends[num].typing = 0 - - relay.global.core.onFriendRequest do (key: PublicKey; msg: string): - var a = ToxFriendRequest(key: key.bytes.toSeq, message: msg) - if relay.friendRequests.hasKey key: - relay.replace(relay.friendRequests[key], a) - else: - relay.friendRequests[key] = relay.publish(a) - -proc newToxRelay(global: GlobalState; `ref`: Ref; turn: var Turn): ToxRelay = - result = ToxRelay( - global: global, - facet: turn.facet, - `ref`: `ref`) - installCallbacks(result, turn) - -bootDataspace("main") do (root: Ref; turn: var Turn): +proc run(entity: CoreEntity) = alive = true - var global = newGlobalState() + bootDataspace("main") do (ds: Ref; turn: var Turn): + connectStdio(ds, turn) + initCore(entity, turn, ds) - connectStdio(root, turn) + onPublish(turn, ds, ?BootstrapNode) do (key: string; host: string; port: int): + info "Bootstrapping from ", key, "@", host, ":", port + try: entity.core.bootstrap(host, key.toPublicKey, uint16 port) + except ToxError as e: + error "failed to bootstrap: ", e.msg - onPublish(turn, root, ?BootstrapNode) do (key: string; host: string; port: int): - info "Bootstrapping from ", key, "@", host, ":", port - try: global.core.bootstrap(host, key.toPublicKey, uint16 port) - except ToxError as e: - error "failed to bootstrap: ", e.msg + poll() + while alive: + iterate entity.core + poll(entity.core.iterationInterval.inMilliseconds.int) - during(turn, root, ?ListenOn[Ref]) do (a: Assertion): - let - ds = unembed a - relay = newToxRelay(global, ds, turn) - - during(turn, ds, ?ToxSelfStatus) do (status: Status): - let e = case status - of Status.online: TOX_USER_STATUS_NONE - of Status.away: TOX_USER_STATUS_AWAY - of Status.busy: TOX_USER_STATUS_BUSY - inc relay.global.statusCounts[int e] - let most = max(relay.global.statusCounts) - if relay.global.statusCounts[int e] == most: - relay.global.core.status = e - do: - relay.global.statusCounts[int e] = max(0, pred relay.global.statusCounts[int e]) - let most = max(relay.global.statusCounts) - for e in [ - TOX_USER_STATUS_BUSY, - TOX_USER_STATUS_NONE, - TOX_USER_STATUS_AWAY]: - if relay.global.statusCounts[int e] == most: - relay.global.core.status = e - break - - onPublish(turn, ds, ?ToxSelfName) do (name: string): - debug "ToxSelfName ", rawValues - relay.global.core.name = name - replace(turn, ds, relay.self.name, ToxSelfName(name: relay.global.core.name)) - writeSaveData(relay.global) - - onPublish(turn, ds, ?ToxSelfStatusMessage) do (msg: string): - debug "ToxSelfStatusMessage ", rawValues - relay.global.core.statusMessage = msg - replace(turn, ds, relay.self.statusMessage, - ToxSelfStatusMessage(message: relay.global.core.statusMessage)) - relay.global.writeSaveData() - - onMessage(turn, ds, ?ToxFriendRequest) do (bytes: seq[byte]; msg: string): - info "got a ToxFriendRequest with a ", bytes.len, " byte key" - if bytes.len == TOX_PUBLIC_KEY_SIZE: - var key: PublicKey - copyMem(addr key.bytes[0], addr bytes[0], key.bytes.len) - var h: Handle - if relay.friendRequests.pop(key, h): - relay.retract(h) - try: - debug "addFriendNoRequest ", key - let friend = relay.global.core.addFriendNoRequest(key) - relay.friends[friend] = FriendHandles() - relay.global.writeSaveData() - except ToxError as e: - error "failed to add friend: ", e.msg - - onMessage(turn, ds, ?ToxFriendAdd) do (toxid: seq[byte]; msg: string): - var address: Address - if toxid.len != address.bytes.len: - error "invalid Tox address: ", toxid - else: - copyMem(addr address.bytes[0], addr toxid[0], address.bytes.len) - try: - let friend = relay.global.core.addFriend(address, msg) - relay.friends[friend] = FriendHandles() - relay.global.writeSaveData() - # TODO: assert a pending friend? - except ToxError as e: - error "failed to add friend: ", e.msg - - onMessage(turn, ds, ?ToxFriendSend) do (friend: Friend; msg: string): - discard relay.global.core.send(friend, msg) - # TODO: assert pending messages? - - do: - info "facet stopped within ListenOn body" - -runForever() +run(new CoreEntity)