Compare commits
1 Commits
trunk
...
lostandfou
Author | SHA1 | Date |
---|---|---|
Emery Hemingway | 4451114781 |
|
@ -23,8 +23,9 @@ FriendRequest = <request @key bytes @msg string> .
|
|||
; Asserted to the core to accept a friend request.
|
||||
FriendAccept = <accept @key bytes> .
|
||||
|
||||
; Messages sent by friend entities.
|
||||
FriendMessage = <msg @body string @kind int> .
|
||||
; Messages sent and received by friend entities.
|
||||
MessageRecv = <msgrecv @body string @kind int> .
|
||||
MessageSend = <msgsend @body string @kind int> .
|
||||
|
||||
; Asserted by a friend while a transfer is active.
|
||||
TransferDataspace = <transfer @kind int @size int @filename string @entity #!any> .
|
||||
|
@ -32,4 +33,6 @@ TransferDataspace = <transfer @kind int @size int @filename string @entity #!any
|
|||
; Asserted to the transfer entity to indicate the transfer should be saved to a file
|
||||
TransferSink = <sink @path string> .
|
||||
|
||||
Completion = <complete> .
|
||||
|
||||
BootstrapNode = <bootstrap @publicKey string @host string @port int> .
|
||||
|
|
|
@ -3,6 +3,10 @@ import
|
|||
std/typetraits, preserves
|
||||
|
||||
type
|
||||
MessageSend* {.preservesRecord: "msgsend".} = object
|
||||
`body`*: string
|
||||
`kind`*: BiggestInt
|
||||
|
||||
Name* {.preservesRecord: "name".} = object
|
||||
`name`*: string
|
||||
|
||||
|
@ -21,7 +25,7 @@ type
|
|||
`publicKey`*: seq[byte]
|
||||
`entity`*: Preserve[E]
|
||||
|
||||
FriendMessage* {.preservesRecord: "msg".} = object
|
||||
MessageRecv* {.preservesRecord: "msgrecv".} = object
|
||||
`body`*: string
|
||||
`kind`*: BiggestInt
|
||||
|
||||
|
@ -47,6 +51,8 @@ type
|
|||
FriendAccept* {.preservesRecord: "accept".} = object
|
||||
`key`*: seq[byte]
|
||||
|
||||
Completion* {.preservesRecord: "complete".} = object
|
||||
|
||||
`Connection`* {.preservesOr, pure.} = enum
|
||||
`none`, `tcp`, `udp`
|
||||
TransferSink* {.preservesRecord: "sink".} = object
|
||||
|
@ -64,20 +70,23 @@ proc encode*[E](x: FriendDataspace[E] | ToxDataspace[E] | TransferDataspace[E]):
|
|||
byte] =
|
||||
encode(toPreserve(x, E))
|
||||
|
||||
proc `$`*(x: Name | FriendRequest | Address | FriendMessage | Typing |
|
||||
proc `$`*(x: MessageSend | Name | FriendRequest | Address | MessageRecv | Typing |
|
||||
BootstrapNode |
|
||||
StatusMessage |
|
||||
Status |
|
||||
FriendAccept |
|
||||
Completion |
|
||||
TransferSink |
|
||||
CoreVersion): string =
|
||||
`$`(toPreserve(x))
|
||||
|
||||
proc encode*(x: Name | FriendRequest | Address | FriendMessage | Typing |
|
||||
proc encode*(x: MessageSend | Name | FriendRequest | Address | MessageRecv |
|
||||
Typing |
|
||||
BootstrapNode |
|
||||
StatusMessage |
|
||||
Status |
|
||||
FriendAccept |
|
||||
Completion |
|
||||
TransferSink |
|
||||
CoreVersion): seq[byte] =
|
||||
encode(toPreserve(x))
|
||||
|
|
|
@ -44,7 +44,6 @@ proc writeSaveData(core: Tox) =
|
|||
|
||||
type
|
||||
Entity = ref object of RootObj
|
||||
facet: Facet
|
||||
ds: Ref
|
||||
|
||||
TransferEntity {.final.} = ref object of Entity
|
||||
|
@ -68,8 +67,6 @@ type
|
|||
friends: Table[Friend, FriendEntity]
|
||||
|
||||
proc init(e: Entity; turn: var Turn; parent: Ref): Handle =
|
||||
assert e.facet.isNil
|
||||
e.facet = turn.facet
|
||||
e.ds = newDataspace(turn)
|
||||
|
||||
proc copyExisting(te: TransferEntity; path: string) =
|
||||
|
@ -99,6 +96,7 @@ proc closeSink(te: TransferEntity; path: string) =
|
|||
var file: AsyncFile
|
||||
if te.sinks.pop(path, file):
|
||||
close file
|
||||
stderr.writeLine("closed sink on ", path)
|
||||
|
||||
proc closeSinks(te: TransferEntity) =
|
||||
for file in te.sinks.values: close(file)
|
||||
|
@ -111,7 +109,6 @@ proc write(te: TransferEntity; pos: uint64; data: pointer; size: int): Future[vo
|
|||
result = all futs
|
||||
|
||||
proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
|
||||
assert entity.core.isNil
|
||||
block: # Tox initialization
|
||||
var
|
||||
proxy_host: cstring
|
||||
|
@ -162,7 +159,7 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
|
|||
of "save-file":
|
||||
let saveFilePath = val
|
||||
if fileExists saveFilePath:
|
||||
opts.savedata = cast[seq[byte]](readFile saveFilePath)
|
||||
opts.savedata = readFile saveFilePath
|
||||
opts.savedata_type = TOX_SAVEDATA_TYPE_TOX_SAVE
|
||||
else: saveIsFresh = true
|
||||
else:
|
||||
|
@ -198,12 +195,15 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
|
|||
let name = entity.core.name(fn)
|
||||
if name != "":
|
||||
fe.handles.name = publish(turn, fe.ds, Name(name: name))
|
||||
onMessage(turn, fe.ds, ?MessageSend) do (body: string, kind: int):
|
||||
stderr.writeLine "got message ", body
|
||||
discard entity.core.send(fn, body, MessageType(kind))
|
||||
entity.friends[fn] = fe
|
||||
|
||||
for fn in entity.core.friends: createFriend(turn, fn)
|
||||
|
||||
entity.core.onSelfConnectionStatus do (status: toxcore.Connection):
|
||||
run(entity.facet) do (turn: var Turn):
|
||||
run(entity.ds) do (turn: var Turn):
|
||||
let conn = case status
|
||||
of TOX_CONNECTION_NONE: protocol.Connection.none
|
||||
of TOX_CONNECTION_TCP: protocol.Connection.tcp
|
||||
|
@ -212,7 +212,7 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
|
|||
Status(status: conn))
|
||||
|
||||
template update[T](fe: FriendEntity; h: var Handle; a: T) =
|
||||
run(fe.facet) do (turn: var Turn): replace(turn, fe.ds, h, a)
|
||||
run(fe.ds) do (turn: var Turn): replace(turn, fe.ds, h, a)
|
||||
|
||||
entity.core.onFriendName do (num: Friend; name: string):
|
||||
let fe = entity.friends[num]
|
||||
|
@ -227,11 +227,11 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
|
|||
if typing:
|
||||
update(fe, fe.handles.typing, Typing())
|
||||
else:
|
||||
run(fe.facet) do (turn: var Turn):
|
||||
run(fe.ds) do (turn: var Turn):
|
||||
retract(turn, fe.handles.typing)
|
||||
|
||||
entity.core.onFriendRequest do (pk: PublicKey; msg: string):
|
||||
run(entity.facet) do (turn: var Turn):
|
||||
run(entity.ds) do (turn: var Turn):
|
||||
let reqHandle = publish(turn, entity.ds,
|
||||
FriendRequest(key: pk.bytes.toSeq, msg: msg))
|
||||
onPublish(turn, entity.ds, ?FriendAccept(key: pk.bytes.toSeq)) do:
|
||||
|
@ -242,8 +242,8 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
|
|||
|
||||
entity.core.onFriendMessage do (num: Friend; msg: string; kind: MessageType):
|
||||
let fe = entity.friends[num]
|
||||
run(fe.facet) do (turn: var Turn):
|
||||
message(turn, fe.ds, FriendMessage(body: msg, kind: int kind))
|
||||
run(fe.ds) do (turn: var Turn):
|
||||
message(turn, fe.ds, MessageRecv(body: msg, kind: int kind))
|
||||
|
||||
entity.core.onFriendLosslessPacket do (
|
||||
num: Friend; data: pointer; len: int):
|
||||
|
@ -251,7 +251,7 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
|
|||
|
||||
entity.core.onFileRecv do (fn: Friend; tn: FileTransfer; kind: uint32; size: uint64; filename: string):
|
||||
let fe = entity.friends[fn]
|
||||
run(fe.facet) do (turn: var Turn):
|
||||
run(fe.ds) do (turn: var Turn):
|
||||
let te = TransferEntity(size: size)
|
||||
discard init(te, turn, fe.ds)
|
||||
te.dsHandle = publish(turn, fe.ds, TransferDataspace[Ref](
|
||||
|
@ -267,6 +267,8 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
|
|||
te.closeSink(path)
|
||||
if te.sinks.len == 0:
|
||||
entity.core.control(fn, tn, TOX_FILE_CONTROL_CANCEL)
|
||||
stderr.writeLine "all sinks retracted, retract transfer"
|
||||
retract(turn, te.dsHandle)
|
||||
fe.transfers[tn] = te
|
||||
|
||||
entity.core.onFileRecvChunk do (fn: Friend; tn: FileTransfer; pos: uint64; data: pointer; size: int):
|
||||
|
@ -277,10 +279,9 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
|
|||
waitFor te.write(pos, data, size)
|
||||
# wait for all the writes to complete within the lifetime of the callback
|
||||
else:
|
||||
te.closeSinks()
|
||||
run(fe.facet) do (turn: var Turn):
|
||||
retract(turn, te.dsHandle)
|
||||
fe.transfers.del(tn)
|
||||
run(te.ds) do (turn: var Turn):
|
||||
discard publish(turn, te.ds, Completion())
|
||||
fe.transfers.del(tn)
|
||||
|
||||
var alive: bool
|
||||
setControlCHook do:
|
||||
|
|
|
@ -0,0 +1,63 @@
|
|||
# SPDX-FileCopyrightText: ☭ 2022 Emery Hemingway
|
||||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
import std/[asyncdispatch, asyncfile, logging, strutils]
|
||||
from std/os import `/`, getTempDir, lastPathPart, removeFile
|
||||
import preserves, syndicate
|
||||
import eris
|
||||
import ./protocol
|
||||
|
||||
addHandler(newConsoleLogger(useStderr = true))
|
||||
# register global logger to stderr
|
||||
|
||||
proc ingestFile(
|
||||
friend, transfer: Ref; sinkHandle: Handle;
|
||||
store: ErisStore; path: string; fileSize: uint64) {.async.} =
|
||||
var
|
||||
file = openAsync(path, fmRead)
|
||||
count: uint64
|
||||
if file.getFileSize.uint64 == fileSize:
|
||||
var
|
||||
blockSize = recommendedBlockSize(fileSize)
|
||||
buf = newSeq[byte](blockSize.int)
|
||||
ingest = newErisIngest(store, blockSize)
|
||||
while count < fileSize and buf.len > 0:
|
||||
buf.setLen(await readBuffer(file, addr buf[0], buf.len))
|
||||
await ingest.append(buf)
|
||||
count.inc buf.len
|
||||
if count == fileSize:
|
||||
var
|
||||
cap = await ingest.cap
|
||||
fileName = path.lastPathPart
|
||||
urn = $cap & "&dn=" & filename & "&xl=" & $fileSize
|
||||
run(friend) do (turn: var Turn):
|
||||
let ass = MessageSend(body: urn)
|
||||
message(turn, friend, ass)
|
||||
info "close and remove ", path
|
||||
close(file)
|
||||
removeFile(path)
|
||||
if count != fileSize:
|
||||
error "failed to ingest ", path, ", read ", count, " bytes of ", fileSize
|
||||
run(transfer) do (turn: var Turn):
|
||||
info "retract sink handle"
|
||||
retract(turn, sinkHandle)
|
||||
|
||||
bootDataspace("main") do (root: Ref; turn: var Turn):
|
||||
connectStdio(root, turn)
|
||||
|
||||
during(turn, root, FriendDataspace[Ref] ? { 1: grab() }) do (friend: Ref):
|
||||
info "got a friend ref ", friend
|
||||
during(turn, friend, ?TransferDataspace[Ref]) do (
|
||||
kind: int, size: uint64, filename: string, transfer: Ref):
|
||||
info "got a transfer for ", filename
|
||||
let
|
||||
path = getTempDir() / filename
|
||||
sinkHandle = publish(turn, transfer, TransferSink(path: path))
|
||||
onPublish(turn, transfer, ?Completion) do:
|
||||
asyncCheck ingestFile(friend, transfer, sinkHandle, newDiscardStore(), path, size)
|
||||
do:
|
||||
info "transfer retracted"
|
||||
do:
|
||||
info "friend retracted"
|
||||
|
||||
runForever()
|
|
@ -5,9 +5,10 @@ author = "Emery Hemingway"
|
|||
description = "Tox chat actor for Syndicate"
|
||||
license = "Unlicense"
|
||||
srcDir = "src"
|
||||
bin = @["syndicate_actor_tox"]
|
||||
bin = @["syndicate_actor_tox", "tox_eris_ingester"]
|
||||
backend = "cpp"
|
||||
|
||||
|
||||
# Dependencies
|
||||
|
||||
requires "nim >= 1.6.2", "syndicate >= 1.2.1", "cbor", "toxcore"
|
||||
requires "nim >= 1.6.2", "syndicate >= 1.2.1", "cbor", "toxcore", "eris", "eris_protocols", "eris_tkrzw"
|
||||
|
|
|
@ -49,9 +49,9 @@
|
|||
|
||||
? <transfer ?kind ?size ?filename ?transfer> [
|
||||
$log ! <log "-" { line: [$kind $size $filename $transfer]}>
|
||||
$config ? <service-object <daemon tox_eris_ingester> ?ingester> [
|
||||
$log ! <log "-" { line: ["ingester is up at " $ingester] }>
|
||||
$ingester <transfer $kind $size $filename $transfer>
|
||||
$config ? <service-object <daemon tox_eris_ingester> ?ingester> $ingester [
|
||||
; don't leak the public key to the ingester
|
||||
<friend #[] $friend>
|
||||
]
|
||||
]
|
||||
]
|
||||
|
|
Loading…
Reference in New Issue