Compare commits
1 Commits
trunk
...
lostandfou
Author | SHA1 | Date |
---|---|---|
Emery Hemingway | 4451114781 |
|
@ -23,8 +23,9 @@ FriendRequest = <request @key bytes @msg string> .
|
||||||
; Asserted to the core to accept a friend request.
|
; Asserted to the core to accept a friend request.
|
||||||
FriendAccept = <accept @key bytes> .
|
FriendAccept = <accept @key bytes> .
|
||||||
|
|
||||||
; Messages sent by friend entities.
|
; Messages sent and received by friend entities.
|
||||||
FriendMessage = <msg @body string @kind int> .
|
MessageRecv = <msgrecv @body string @kind int> .
|
||||||
|
MessageSend = <msgsend @body string @kind int> .
|
||||||
|
|
||||||
; Asserted by a friend while a transfer is active.
|
; Asserted by a friend while a transfer is active.
|
||||||
TransferDataspace = <transfer @kind int @size int @filename string @entity #!any> .
|
TransferDataspace = <transfer @kind int @size int @filename string @entity #!any> .
|
||||||
|
@ -32,4 +33,6 @@ TransferDataspace = <transfer @kind int @size int @filename string @entity #!any
|
||||||
; Asserted to the transfer entity to indicate the transfer should be saved to a file
|
; Asserted to the transfer entity to indicate the transfer should be saved to a file
|
||||||
TransferSink = <sink @path string> .
|
TransferSink = <sink @path string> .
|
||||||
|
|
||||||
|
Completion = <complete> .
|
||||||
|
|
||||||
BootstrapNode = <bootstrap @publicKey string @host string @port int> .
|
BootstrapNode = <bootstrap @publicKey string @host string @port int> .
|
||||||
|
|
|
@ -3,6 +3,10 @@ import
|
||||||
std/typetraits, preserves
|
std/typetraits, preserves
|
||||||
|
|
||||||
type
|
type
|
||||||
|
MessageSend* {.preservesRecord: "msgsend".} = object
|
||||||
|
`body`*: string
|
||||||
|
`kind`*: BiggestInt
|
||||||
|
|
||||||
Name* {.preservesRecord: "name".} = object
|
Name* {.preservesRecord: "name".} = object
|
||||||
`name`*: string
|
`name`*: string
|
||||||
|
|
||||||
|
@ -21,7 +25,7 @@ type
|
||||||
`publicKey`*: seq[byte]
|
`publicKey`*: seq[byte]
|
||||||
`entity`*: Preserve[E]
|
`entity`*: Preserve[E]
|
||||||
|
|
||||||
FriendMessage* {.preservesRecord: "msg".} = object
|
MessageRecv* {.preservesRecord: "msgrecv".} = object
|
||||||
`body`*: string
|
`body`*: string
|
||||||
`kind`*: BiggestInt
|
`kind`*: BiggestInt
|
||||||
|
|
||||||
|
@ -47,6 +51,8 @@ type
|
||||||
FriendAccept* {.preservesRecord: "accept".} = object
|
FriendAccept* {.preservesRecord: "accept".} = object
|
||||||
`key`*: seq[byte]
|
`key`*: seq[byte]
|
||||||
|
|
||||||
|
Completion* {.preservesRecord: "complete".} = object
|
||||||
|
|
||||||
`Connection`* {.preservesOr, pure.} = enum
|
`Connection`* {.preservesOr, pure.} = enum
|
||||||
`none`, `tcp`, `udp`
|
`none`, `tcp`, `udp`
|
||||||
TransferSink* {.preservesRecord: "sink".} = object
|
TransferSink* {.preservesRecord: "sink".} = object
|
||||||
|
@ -64,20 +70,23 @@ proc encode*[E](x: FriendDataspace[E] | ToxDataspace[E] | TransferDataspace[E]):
|
||||||
byte] =
|
byte] =
|
||||||
encode(toPreserve(x, E))
|
encode(toPreserve(x, E))
|
||||||
|
|
||||||
proc `$`*(x: Name | FriendRequest | Address | FriendMessage | Typing |
|
proc `$`*(x: MessageSend | Name | FriendRequest | Address | MessageRecv | Typing |
|
||||||
BootstrapNode |
|
BootstrapNode |
|
||||||
StatusMessage |
|
StatusMessage |
|
||||||
Status |
|
Status |
|
||||||
FriendAccept |
|
FriendAccept |
|
||||||
|
Completion |
|
||||||
TransferSink |
|
TransferSink |
|
||||||
CoreVersion): string =
|
CoreVersion): string =
|
||||||
`$`(toPreserve(x))
|
`$`(toPreserve(x))
|
||||||
|
|
||||||
proc encode*(x: Name | FriendRequest | Address | FriendMessage | Typing |
|
proc encode*(x: MessageSend | Name | FriendRequest | Address | MessageRecv |
|
||||||
|
Typing |
|
||||||
BootstrapNode |
|
BootstrapNode |
|
||||||
StatusMessage |
|
StatusMessage |
|
||||||
Status |
|
Status |
|
||||||
FriendAccept |
|
FriendAccept |
|
||||||
|
Completion |
|
||||||
TransferSink |
|
TransferSink |
|
||||||
CoreVersion): seq[byte] =
|
CoreVersion): seq[byte] =
|
||||||
encode(toPreserve(x))
|
encode(toPreserve(x))
|
||||||
|
|
|
@ -44,7 +44,6 @@ proc writeSaveData(core: Tox) =
|
||||||
|
|
||||||
type
|
type
|
||||||
Entity = ref object of RootObj
|
Entity = ref object of RootObj
|
||||||
facet: Facet
|
|
||||||
ds: Ref
|
ds: Ref
|
||||||
|
|
||||||
TransferEntity {.final.} = ref object of Entity
|
TransferEntity {.final.} = ref object of Entity
|
||||||
|
@ -68,8 +67,6 @@ type
|
||||||
friends: Table[Friend, FriendEntity]
|
friends: Table[Friend, FriendEntity]
|
||||||
|
|
||||||
proc init(e: Entity; turn: var Turn; parent: Ref): Handle =
|
proc init(e: Entity; turn: var Turn; parent: Ref): Handle =
|
||||||
assert e.facet.isNil
|
|
||||||
e.facet = turn.facet
|
|
||||||
e.ds = newDataspace(turn)
|
e.ds = newDataspace(turn)
|
||||||
|
|
||||||
proc copyExisting(te: TransferEntity; path: string) =
|
proc copyExisting(te: TransferEntity; path: string) =
|
||||||
|
@ -99,6 +96,7 @@ proc closeSink(te: TransferEntity; path: string) =
|
||||||
var file: AsyncFile
|
var file: AsyncFile
|
||||||
if te.sinks.pop(path, file):
|
if te.sinks.pop(path, file):
|
||||||
close file
|
close file
|
||||||
|
stderr.writeLine("closed sink on ", path)
|
||||||
|
|
||||||
proc closeSinks(te: TransferEntity) =
|
proc closeSinks(te: TransferEntity) =
|
||||||
for file in te.sinks.values: close(file)
|
for file in te.sinks.values: close(file)
|
||||||
|
@ -111,7 +109,6 @@ proc write(te: TransferEntity; pos: uint64; data: pointer; size: int): Future[vo
|
||||||
result = all futs
|
result = all futs
|
||||||
|
|
||||||
proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
|
proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
|
||||||
assert entity.core.isNil
|
|
||||||
block: # Tox initialization
|
block: # Tox initialization
|
||||||
var
|
var
|
||||||
proxy_host: cstring
|
proxy_host: cstring
|
||||||
|
@ -162,7 +159,7 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
|
||||||
of "save-file":
|
of "save-file":
|
||||||
let saveFilePath = val
|
let saveFilePath = val
|
||||||
if fileExists saveFilePath:
|
if fileExists saveFilePath:
|
||||||
opts.savedata = cast[seq[byte]](readFile saveFilePath)
|
opts.savedata = readFile saveFilePath
|
||||||
opts.savedata_type = TOX_SAVEDATA_TYPE_TOX_SAVE
|
opts.savedata_type = TOX_SAVEDATA_TYPE_TOX_SAVE
|
||||||
else: saveIsFresh = true
|
else: saveIsFresh = true
|
||||||
else:
|
else:
|
||||||
|
@ -198,12 +195,15 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
|
||||||
let name = entity.core.name(fn)
|
let name = entity.core.name(fn)
|
||||||
if name != "":
|
if name != "":
|
||||||
fe.handles.name = publish(turn, fe.ds, Name(name: name))
|
fe.handles.name = publish(turn, fe.ds, Name(name: name))
|
||||||
|
onMessage(turn, fe.ds, ?MessageSend) do (body: string, kind: int):
|
||||||
|
stderr.writeLine "got message ", body
|
||||||
|
discard entity.core.send(fn, body, MessageType(kind))
|
||||||
entity.friends[fn] = fe
|
entity.friends[fn] = fe
|
||||||
|
|
||||||
for fn in entity.core.friends: createFriend(turn, fn)
|
for fn in entity.core.friends: createFriend(turn, fn)
|
||||||
|
|
||||||
entity.core.onSelfConnectionStatus do (status: toxcore.Connection):
|
entity.core.onSelfConnectionStatus do (status: toxcore.Connection):
|
||||||
run(entity.facet) do (turn: var Turn):
|
run(entity.ds) do (turn: var Turn):
|
||||||
let conn = case status
|
let conn = case status
|
||||||
of TOX_CONNECTION_NONE: protocol.Connection.none
|
of TOX_CONNECTION_NONE: protocol.Connection.none
|
||||||
of TOX_CONNECTION_TCP: protocol.Connection.tcp
|
of TOX_CONNECTION_TCP: protocol.Connection.tcp
|
||||||
|
@ -212,7 +212,7 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
|
||||||
Status(status: conn))
|
Status(status: conn))
|
||||||
|
|
||||||
template update[T](fe: FriendEntity; h: var Handle; a: T) =
|
template update[T](fe: FriendEntity; h: var Handle; a: T) =
|
||||||
run(fe.facet) do (turn: var Turn): replace(turn, fe.ds, h, a)
|
run(fe.ds) do (turn: var Turn): replace(turn, fe.ds, h, a)
|
||||||
|
|
||||||
entity.core.onFriendName do (num: Friend; name: string):
|
entity.core.onFriendName do (num: Friend; name: string):
|
||||||
let fe = entity.friends[num]
|
let fe = entity.friends[num]
|
||||||
|
@ -227,11 +227,11 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
|
||||||
if typing:
|
if typing:
|
||||||
update(fe, fe.handles.typing, Typing())
|
update(fe, fe.handles.typing, Typing())
|
||||||
else:
|
else:
|
||||||
run(fe.facet) do (turn: var Turn):
|
run(fe.ds) do (turn: var Turn):
|
||||||
retract(turn, fe.handles.typing)
|
retract(turn, fe.handles.typing)
|
||||||
|
|
||||||
entity.core.onFriendRequest do (pk: PublicKey; msg: string):
|
entity.core.onFriendRequest do (pk: PublicKey; msg: string):
|
||||||
run(entity.facet) do (turn: var Turn):
|
run(entity.ds) do (turn: var Turn):
|
||||||
let reqHandle = publish(turn, entity.ds,
|
let reqHandle = publish(turn, entity.ds,
|
||||||
FriendRequest(key: pk.bytes.toSeq, msg: msg))
|
FriendRequest(key: pk.bytes.toSeq, msg: msg))
|
||||||
onPublish(turn, entity.ds, ?FriendAccept(key: pk.bytes.toSeq)) do:
|
onPublish(turn, entity.ds, ?FriendAccept(key: pk.bytes.toSeq)) do:
|
||||||
|
@ -242,8 +242,8 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
|
||||||
|
|
||||||
entity.core.onFriendMessage do (num: Friend; msg: string; kind: MessageType):
|
entity.core.onFriendMessage do (num: Friend; msg: string; kind: MessageType):
|
||||||
let fe = entity.friends[num]
|
let fe = entity.friends[num]
|
||||||
run(fe.facet) do (turn: var Turn):
|
run(fe.ds) do (turn: var Turn):
|
||||||
message(turn, fe.ds, FriendMessage(body: msg, kind: int kind))
|
message(turn, fe.ds, MessageRecv(body: msg, kind: int kind))
|
||||||
|
|
||||||
entity.core.onFriendLosslessPacket do (
|
entity.core.onFriendLosslessPacket do (
|
||||||
num: Friend; data: pointer; len: int):
|
num: Friend; data: pointer; len: int):
|
||||||
|
@ -251,7 +251,7 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
|
||||||
|
|
||||||
entity.core.onFileRecv do (fn: Friend; tn: FileTransfer; kind: uint32; size: uint64; filename: string):
|
entity.core.onFileRecv do (fn: Friend; tn: FileTransfer; kind: uint32; size: uint64; filename: string):
|
||||||
let fe = entity.friends[fn]
|
let fe = entity.friends[fn]
|
||||||
run(fe.facet) do (turn: var Turn):
|
run(fe.ds) do (turn: var Turn):
|
||||||
let te = TransferEntity(size: size)
|
let te = TransferEntity(size: size)
|
||||||
discard init(te, turn, fe.ds)
|
discard init(te, turn, fe.ds)
|
||||||
te.dsHandle = publish(turn, fe.ds, TransferDataspace[Ref](
|
te.dsHandle = publish(turn, fe.ds, TransferDataspace[Ref](
|
||||||
|
@ -267,6 +267,8 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
|
||||||
te.closeSink(path)
|
te.closeSink(path)
|
||||||
if te.sinks.len == 0:
|
if te.sinks.len == 0:
|
||||||
entity.core.control(fn, tn, TOX_FILE_CONTROL_CANCEL)
|
entity.core.control(fn, tn, TOX_FILE_CONTROL_CANCEL)
|
||||||
|
stderr.writeLine "all sinks retracted, retract transfer"
|
||||||
|
retract(turn, te.dsHandle)
|
||||||
fe.transfers[tn] = te
|
fe.transfers[tn] = te
|
||||||
|
|
||||||
entity.core.onFileRecvChunk do (fn: Friend; tn: FileTransfer; pos: uint64; data: pointer; size: int):
|
entity.core.onFileRecvChunk do (fn: Friend; tn: FileTransfer; pos: uint64; data: pointer; size: int):
|
||||||
|
@ -277,10 +279,9 @@ proc initCore(entity: CoreEntity; turn: var Turn; parentRef: Ref) =
|
||||||
waitFor te.write(pos, data, size)
|
waitFor te.write(pos, data, size)
|
||||||
# wait for all the writes to complete within the lifetime of the callback
|
# wait for all the writes to complete within the lifetime of the callback
|
||||||
else:
|
else:
|
||||||
te.closeSinks()
|
run(te.ds) do (turn: var Turn):
|
||||||
run(fe.facet) do (turn: var Turn):
|
discard publish(turn, te.ds, Completion())
|
||||||
retract(turn, te.dsHandle)
|
fe.transfers.del(tn)
|
||||||
fe.transfers.del(tn)
|
|
||||||
|
|
||||||
var alive: bool
|
var alive: bool
|
||||||
setControlCHook do:
|
setControlCHook do:
|
||||||
|
|
|
@ -0,0 +1,63 @@
|
||||||
|
# SPDX-FileCopyrightText: ☭ 2022 Emery Hemingway
|
||||||
|
# SPDX-License-Identifier: Unlicense
|
||||||
|
|
||||||
|
import std/[asyncdispatch, asyncfile, logging, strutils]
|
||||||
|
from std/os import `/`, getTempDir, lastPathPart, removeFile
|
||||||
|
import preserves, syndicate
|
||||||
|
import eris
|
||||||
|
import ./protocol
|
||||||
|
|
||||||
|
addHandler(newConsoleLogger(useStderr = true))
|
||||||
|
# register global logger to stderr
|
||||||
|
|
||||||
|
proc ingestFile(
|
||||||
|
friend, transfer: Ref; sinkHandle: Handle;
|
||||||
|
store: ErisStore; path: string; fileSize: uint64) {.async.} =
|
||||||
|
var
|
||||||
|
file = openAsync(path, fmRead)
|
||||||
|
count: uint64
|
||||||
|
if file.getFileSize.uint64 == fileSize:
|
||||||
|
var
|
||||||
|
blockSize = recommendedBlockSize(fileSize)
|
||||||
|
buf = newSeq[byte](blockSize.int)
|
||||||
|
ingest = newErisIngest(store, blockSize)
|
||||||
|
while count < fileSize and buf.len > 0:
|
||||||
|
buf.setLen(await readBuffer(file, addr buf[0], buf.len))
|
||||||
|
await ingest.append(buf)
|
||||||
|
count.inc buf.len
|
||||||
|
if count == fileSize:
|
||||||
|
var
|
||||||
|
cap = await ingest.cap
|
||||||
|
fileName = path.lastPathPart
|
||||||
|
urn = $cap & "&dn=" & filename & "&xl=" & $fileSize
|
||||||
|
run(friend) do (turn: var Turn):
|
||||||
|
let ass = MessageSend(body: urn)
|
||||||
|
message(turn, friend, ass)
|
||||||
|
info "close and remove ", path
|
||||||
|
close(file)
|
||||||
|
removeFile(path)
|
||||||
|
if count != fileSize:
|
||||||
|
error "failed to ingest ", path, ", read ", count, " bytes of ", fileSize
|
||||||
|
run(transfer) do (turn: var Turn):
|
||||||
|
info "retract sink handle"
|
||||||
|
retract(turn, sinkHandle)
|
||||||
|
|
||||||
|
bootDataspace("main") do (root: Ref; turn: var Turn):
|
||||||
|
connectStdio(root, turn)
|
||||||
|
|
||||||
|
during(turn, root, FriendDataspace[Ref] ? { 1: grab() }) do (friend: Ref):
|
||||||
|
info "got a friend ref ", friend
|
||||||
|
during(turn, friend, ?TransferDataspace[Ref]) do (
|
||||||
|
kind: int, size: uint64, filename: string, transfer: Ref):
|
||||||
|
info "got a transfer for ", filename
|
||||||
|
let
|
||||||
|
path = getTempDir() / filename
|
||||||
|
sinkHandle = publish(turn, transfer, TransferSink(path: path))
|
||||||
|
onPublish(turn, transfer, ?Completion) do:
|
||||||
|
asyncCheck ingestFile(friend, transfer, sinkHandle, newDiscardStore(), path, size)
|
||||||
|
do:
|
||||||
|
info "transfer retracted"
|
||||||
|
do:
|
||||||
|
info "friend retracted"
|
||||||
|
|
||||||
|
runForever()
|
|
@ -5,9 +5,10 @@ author = "Emery Hemingway"
|
||||||
description = "Tox chat actor for Syndicate"
|
description = "Tox chat actor for Syndicate"
|
||||||
license = "Unlicense"
|
license = "Unlicense"
|
||||||
srcDir = "src"
|
srcDir = "src"
|
||||||
bin = @["syndicate_actor_tox"]
|
bin = @["syndicate_actor_tox", "tox_eris_ingester"]
|
||||||
|
backend = "cpp"
|
||||||
|
|
||||||
|
|
||||||
# Dependencies
|
# Dependencies
|
||||||
|
|
||||||
requires "nim >= 1.6.2", "syndicate >= 1.2.1", "cbor", "toxcore"
|
requires "nim >= 1.6.2", "syndicate >= 1.2.1", "cbor", "toxcore", "eris", "eris_protocols", "eris_tkrzw"
|
||||||
|
|
|
@ -49,9 +49,9 @@
|
||||||
|
|
||||||
? <transfer ?kind ?size ?filename ?transfer> [
|
? <transfer ?kind ?size ?filename ?transfer> [
|
||||||
$log ! <log "-" { line: [$kind $size $filename $transfer]}>
|
$log ! <log "-" { line: [$kind $size $filename $transfer]}>
|
||||||
$config ? <service-object <daemon tox_eris_ingester> ?ingester> [
|
$config ? <service-object <daemon tox_eris_ingester> ?ingester> $ingester [
|
||||||
$log ! <log "-" { line: ["ingester is up at " $ingester] }>
|
; don't leak the public key to the ingester
|
||||||
$ingester <transfer $kind $size $filename $transfer>
|
<friend #[] $friend>
|
||||||
]
|
]
|
||||||
]
|
]
|
||||||
]
|
]
|
||||||
|
|
Loading…
Reference in New Issue