syndicate-nim/src/syndicate/protocols/stream.nim

269 lines
8.6 KiB
Nim

import
std/typetraits, preserves
type
StreamConnection*[E = void] {.record: "stream-connection".} = ref object ## ``<stream-connection @source #!Source @sink #!Sink @spec any>``
`source`*: Source[E]
`sink`*: Sink[E]
`spec`*: Preserve[E]
StreamListenerReady*[E = void] {.record: "stream-listener-ready".} = ref object ## ``<stream-listener-ready @spec any>``
`spec`*: Preserve[E]
StreamListenerError*[E = void] {.record: "stream-listener-error".} = ref object ## ``<stream-listener-error @spec any @message string>``
`spec`*: Preserve[E]
`message`*: string
StreamError* {.record: "error".} = ref object ## ``<error @message string>``
`message`*: string
SourceKind* {.pure.} = enum
`sink`, `Streamerror`, `credit`
Sourcesink*[E = void] {.record: "sink".} = ref object
`controller`*: Sink[E]
Sourcecredit*[E = void] {.record: "credit".} = ref object
`amount`*: CreditAmount
`mode`*: Mode[E]
Source*[E = void] = ref object ## ``/ <sink @controller #!Sink> / StreamError / <credit @amount CreditAmount @mode Mode>``
case kind*: SourceKind
of SourceKind.`sink`:
`sink`*: Preserve[E]
of SourceKind.`Streamerror`:
`streamerror`*: StreamError
of SourceKind.`credit`:
`credit`*: Sourcecredit[E]
SinkKind* {.pure.} = enum
`source`, `Streamerror`, `data`, `eof`
Sinksource*[E = void] {.record: "source".} = ref object
`controller`*: Source[E]
Sinkdata*[E = void] {.record: "data".} = ref object
`payload`*: Preserve[E]
`mode`*: Mode[E]
Sinkeof* {.record: "eof".} = object
discard
Sink*[E = void] = ref object ## ``/ <source @controller #!Source> / StreamError / <data @payload any @mode Mode> / <eof>``
case kind*: SinkKind
of SinkKind.`source`:
`source`*: Preserve[E]
of SinkKind.`Streamerror`:
`streamerror`*: StreamError
of SinkKind.`data`:
`data`*: Sinkdata[E]
of SinkKind.`eof`:
`eof`*: Sinkeof
CreditamountKind* {.pure.} = enum
`count`, `unbounded`
CreditAmountcount* = BiggestInt
CreditAmountunbounded* = string
CreditAmount* = ref object ## ``/ @count int / =<<lit>unbounded>``
case kind*: CreditamountKind
of CreditamountKind.`count`:
`count`*: CreditAmountcount
of CreditamountKind.`unbounded`:
discard
ModeKind* {.pure.} = enum
`bytes`, `lines`, `packet`, `object`
Modebytes* = string
Modepacket* {.record: "packet".} = ref object
`size`*: BiggestInt
Modeobject*[E = void] {.record: "object".} = ref object
`description`*: Preserve[E]
Mode*[E = void] = ref object ## ``/ =<<lit>bytes> / LineMode / <packet @size int> / <object @description any>``
case kind*: ModeKind
of ModeKind.`bytes`:
discard
of ModeKind.`lines`:
`lines`*: LineMode
of ModeKind.`packet`:
`packet`*: BiggestInt
of ModeKind.`object`:
`object`*: Preserve[E]
LineMode* {.pure.} = enum ## ``/ =<<lit>lf> / =<<lit>crlf>``
`lf`, `crlf`
proc `streamConnection`*[E = void](`source`: Preserve[E]; `sink`: Preserve[E];
`spec`: Preserve[E]): Preserve[E] =
## Preserves constructor for ``StreamConnection``.
initRecord[E](symbol[E]("stream-connection"), toPreserve(`source`, E),
toPreserve(`sink`, E), toPreserve(`spec`, E))
proc toPreserveHook*(`streamconnection`: StreamConnection; E: typedesc): Preserve[
E] =
initRecord[E](symbol[E]("stream-connection"),
toPreserve(`streamconnection`.`source`, E),
toPreserve(`streamconnection`.`sink`, E),
toPreserve(`streamconnection`.`spec`, E))
proc `streamListenerReady`*[E = void](`spec`: Preserve[E]): Preserve[E] =
## Preserves constructor for ``StreamListenerReady``.
initRecord[E](symbol[E]("stream-listener-ready"), toPreserve(`spec`, E))
proc toPreserveHook*(`streamlistenerready`: StreamListenerReady; E: typedesc): Preserve[
E] =
initRecord[E](symbol[E]("stream-listener-ready"),
toPreserve(`streamlistenerready`.`spec`, E))
proc `streamListenerError`*[E = void](`spec`: Preserve[E];
`message`: string | Preserve[E]): Preserve[
E] =
## Preserves constructor for ``StreamListenerError``.
initRecord[E](symbol[E]("stream-listener-error"), toPreserve(`spec`, E),
toPreserve(`message`, E))
proc toPreserveHook*(`streamlistenererror`: StreamListenerError; E: typedesc): Preserve[
E] =
initRecord[E](symbol[E]("stream-listener-error"),
toPreserve(`streamlistenererror`.`spec`, E),
toPreserve(`streamlistenererror`.`message`, E))
proc `streamError`*[E = void](`message`: string | Preserve[E]): Preserve[E] =
## Preserves constructor for ``StreamError``.
initRecord[E](symbol[E]("error"), toPreserve(`message`, E))
proc toPreserveHook*(`streamerror`: StreamError; E: typedesc): Preserve[E] =
initRecord[E](symbol[E]("error"), toPreserve(`streamerror`.`message`, E))
proc toPreserveHook*(v: Source; E: typedesc): Preserve[E] =
case v.kind
of SourceKind.`sink`:
toPreserve(v.`sink`, E)
of SourceKind.`Streamerror`:
toPreserve(v.`streamerror`, E)
of SourceKind.`credit`:
toPreserve(v.`credit`, E)
proc fromPreserveHook*[E](v: var Source; pr: Preserve[E]): bool =
if isRecord(pr) and pr.label.isSymbol("sink"):
v = Source(kind: SourceKind.`sink`)
result = fromPreserve(v.`sink`, pr)
elif isRecord(pr) and pr.label.isSymbol("StreamError"):
v = Source(kind: SourceKind.`Streamerror`)
result = fromPreserve(v.`streamerror`, pr)
elif isRecord(pr) and pr.label.isSymbol("credit"):
v = Source(kind: SourceKind.`credit`)
result = fromPreserve(v.`credit`, pr)
proc toPreserveHook*(v: Sink; E: typedesc): Preserve[E] =
case v.kind
of SinkKind.`source`:
toPreserve(v.`source`, E)
of SinkKind.`Streamerror`:
toPreserve(v.`streamerror`, E)
of SinkKind.`data`:
toPreserve(v.`data`, E)
of SinkKind.`eof`:
toPreserve(v.`eof`, E)
proc fromPreserveHook*[E](v: var Sink; pr: Preserve[E]): bool =
if isRecord(pr) and pr.label.isSymbol("source"):
v = Sink(kind: SinkKind.`source`)
result = fromPreserve(v.`source`, pr)
elif isRecord(pr) and pr.label.isSymbol("StreamError"):
v = Sink(kind: SinkKind.`Streamerror`)
result = fromPreserve(v.`streamerror`, pr)
elif isRecord(pr) and pr.label.isSymbol("data"):
v = Sink(kind: SinkKind.`data`)
result = fromPreserve(v.`data`, pr)
elif isRecord(pr) and pr.label.isSymbol("eof"):
v = Sink(kind: SinkKind.`eof`)
result = fromPreserve(v.`eof`, pr)
proc toPreserveHook*(v: CreditAmount; E: typedesc): Preserve[E] =
case v.kind
of CreditAmountKind.`count`:
toPreserve(v.`count`, E)
of CreditAmountKind.`unbounded`:
Preserve[E](kind: pkSymbol, symbol: "unbounded")
proc fromPreserveHook*[E](v: var CreditAmount; pr: Preserve[E]): bool =
if false:
discard
elif pr.kind == pkSymbol and pr.symbol == "unbounded":
v = CreditAmount(kind: CreditAmountKind.`unbounded`)
result = true
proc toPreserveHook*(v: Mode; E: typedesc): Preserve[E] =
case v.kind
of ModeKind.`bytes`:
Preserve[E](kind: pkSymbol, symbol: "bytes")
of ModeKind.`lines`:
toPreserve(v.`lines`, E)
of ModeKind.`packet`:
toPreserve(v.`packet`, E)
of ModeKind.`object`:
toPreserve(v.`object`, E)
proc fromPreserveHook*[E](v: var Mode; pr: Preserve[E]): bool =
if pr.kind == pkSymbol and pr.symbol == "bytes":
v = Mode(kind: ModeKind.`bytes`)
result = true
elif false: ## snkOr - / =<<lit>lf> / =<<lit>crlf>
discard
elif isRecord(pr) and pr.label.isSymbol("packet"):
v = Mode(kind: ModeKind.`packet`)
result = fromPreserve(v.`packet`, pr)
elif isRecord(pr) and pr.label.isSymbol("object"):
v = Mode(kind: ModeKind.`object`)
result = fromPreserve(v.`object`, pr)
proc toPreserveHook*(v: LineMode; E: typedesc): Preserve[E] =
case v
of LineMode.`lf`:
symbol[E]("lf")
of LineMode.`crlf`:
symbol[E]("crlf")
proc fromPreserveHook*[E](v: var LineMode; pr: Preserve[E]): bool =
if isSymbol(pr):
case pr.symbol
of "lf":
v = LineMode.`lf`
result = true
of "crlf":
v = LineMode.`crlf`
result = true
proc `$`*[E](x: StreamConnection[E] | StreamListenerReady[E] |
StreamListenerError[E] |
StreamError |
Source[E] |
Sink[E] |
CreditAmount |
Mode[E] |
LineMode): string =
`$`(toPreserve(x, E))
proc `encode`*[E](x: StreamConnection[E] | StreamListenerReady[E] |
StreamListenerError[E] |
StreamError |
Source[E] |
Sink[E] |
CreditAmount |
Mode[E] |
LineMode): seq[byte] =
encode(toPreserve(x, E))