From 27107ebd713d545ba9df94b91b41b56a0eaec30a Mon Sep 17 00:00:00 2001 From: Emery Hemingway Date: Wed, 3 Nov 2021 19:11:47 +0100 Subject: [PATCH] Enqueue multiple files, README --- README.md | 18 ++++++++++ src/mpv_syndicate.nim | 79 +++++++++++++++++++++++++++---------------- 2 files changed, 68 insertions(+), 29 deletions(-) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 0000000..959fd78 --- /dev/null +++ b/README.md @@ -0,0 +1,18 @@ +# mpv Syndicate + +A proof-of-concept for a new kind of +[plumbing](http://doc.cat-v.org/plan_9/4th_edition/papers/plumb). + +The `mpv_syndicate` program starts an idle mpv instance and waits for assertions +that enqueue videos. When this program is renamed (symlinked to `mpv_syndicate`) +then it will enqueue each URI passed as command arguments. + +```sh +nimble build +ln -s mpv_syndicate enque_video + +./enque_video file:///tmp/foo.mkv & + # exits when all videos are enqueue + +./mpv_syndicate +``` diff --git a/src/mpv_syndicate.nim b/src/mpv_syndicate.nim index 2ff3511..7686946 100644 --- a/src/mpv_syndicate.nim +++ b/src/mpv_syndicate.nim @@ -1,9 +1,9 @@ # SPDX-FileCopyrightText: ☭ 2021 Emery Hemingway # SPDX-License-Identifier: Unlicense -import std/[asyncdispatch, json, osproc, strutils] +import std/[asyncdispatch, json, osproc, sets, tables] import preserves, preserves/parse -import syndicate, syndicate/[actors, capabilities, dataspaces, patterns, relay], syndicate/protocols/service +import syndicate, syndicate/[actors, capabilities, dataspaces, patterns, relay] from std/os import getCurrentProcessId, commandLineParams, extractFilename, paramStr, sleep @@ -17,16 +17,14 @@ type VideoEnqueued* {.preservesRecord: "video-enqueued".} = object uri*: string - pos*: int - -const videoPlayback = "video-playback" + id*: int import std/net from std/nativesockets import AF_UNIX, SOCK_DGRAM, Protocol type MpvProcess = ref object process: Process - socket: Socket + socket: Socket # TODO AsyncSocket proc newMpvProcess(): MpvProcess = let socketPath = "/tmp/mpvsocket-" & $getCurrentProcessId() @@ -41,43 +39,57 @@ proc newMpvProcess(): MpvProcess = buffered = false) connectUnix(result.socket, socketPath) -proc send(mpv: MpvProcess; command: varargs[string]) = - let js = %* {"command": command} - mpv.socket.send($js & "\n") - proc recv(mpv: MpvProcess): JsonNode = mpv.socket.recvLine.parseJson +proc send(mpv: MpvProcess; command: varargs[string, `$`]): JsonNode = + let js = %* {"command": command} + mpv.socket.send($js & "\n") + while true: # TODO: dispatch responses asynchronously + let resp = mpv.recv + if resp.kind == JObject and resp.contains "error": + let errmsg = getStr resp["error"] + if errmsg != "success": + raise newException(CatchableError, errmsg) + result = resp["data"] + break + +proc playlist(mpv: MpvProcess): JsonNode = + result = mpv.send("get_property", "playlist") + proc enqueue(mpv: MpvProcess; uri: string): int = - mpv.send "loadfile", uri, "append-play" - let js = mpv.recv() + let resp = mpv.send("loadfile", uri, "append-play") + getInt resp["playlist_entry_id"] proc frontend() = - proc getUriParam(): string = - var params = commandLineParams() - case params.len - of 0: result = execProcess"wl-paste" + proc getUriParams(): seq[string] = + result = commandLineParams() + if result.len == 0: + result.add execProcess"wl-paste" # take the clipboard if nothing on the command line - of 1: result = params[0] - else: - quit "too many parameters" waitFor runActor("chat") do (turn: var Turn): let cap = mint() - let uri = getUriParam() connectUnix(turn, "/run/syndicate/ds", cap) do (turn: var Turn; a: Assertion) -> TurnAction: - let ds = unembed a + let + ds = unembed a + uriToAssertions = newTable[string, Handle]() - discard publish(turn, ds, RequireService[Ref](serviceName: toPreserve(videoPlayback, Ref))) + # TODO: assert a dependency on a video playback service - onPublish(turn, ds, VideoEnqueued ? { 0: ?uri, 1: `?*`() }) do (pos: int): - # querying when a video is enqueued impiles that it should be enqueued? - echo uri, " enqueued at position ", pos - stopActor(turn) + for uri in getUriParams(): + uriToAssertions[uri] = publish(turn, ds, VideoEnqueue(uri: uri)) - discard publish(turn, ds, VideoEnqueue(uri: uri)) + onPublish(turn, ds, VideoEnqueued ? { 0: `?*`(), 1: `?*`() }) do (uri: string; id: int): + # querying when a video is enqueued implies that it should be enqueued? + echo uri, " enqueued with id ", id + var handle: Handle + if pop(uriToAssertions, uri, handle): + retract(turn, handle) + if uriToAssertions.len == 0: + stopActor(turn) proc backend() = waitFor runActor("chat") do (turn: var Turn): @@ -88,10 +100,19 @@ proc backend() = let ds = unembed a mpv = newMpvProcess() + playlistAssertions = newTable[int, Handle]() onPublish(turn, ds, VideoEnqueue ? { 0: `?*`() }) do (uri: string): - let pos = mpv.enqueue(uri) - discard publish(turn, ds, VideoEnqueued(uri: uri, pos: pos)) + let id = mpv.enqueue(uri) + playlistAssertions[id] = publish(turn, ds, VideoEnqueued(uri: uri, id: id)) + block: + # retract assertions not corresponding to the playlist + # TODO: do this from asyncronous mpv IPC events + var idSet: HashSet[int] + for e in mpv.playlist: + idSet.incl getInt(e["id"]) + for (id, handle) in playlistAssertions.pairs: + if id notin idSet: retract(turn, handle) case extractFilename(paramStr 0) of "mpv_syndicate": backend()