Better codec

This commit is contained in:
Tony Garnock-Jones 2018-08-19 22:13:42 +01:00
parent 77b0addcd0
commit 7dacb5ba6e
2 changed files with 338 additions and 12 deletions

323
syndicate/mc/codec.rkt Normal file
View File

@ -0,0 +1,323 @@
#lang racket/base
(provide encode
decode
wire-value)
(require racket/match)
(require bitsyntax)
(require syndicate/support/struct)
(require imperative-syndicate/assertions)
(require imperative-syndicate/pattern)
(define version 1)
(define (encode v)
(bit-string->bytes (bit-string (version :: bits 8) (v :: (wire-value)))))
(define (decode bs [on-fail (lambda () (error 'decode "Invalid encoding: ~v" bs))])
(bit-string-case bs
([ (= version :: bits 8) (v :: (wire-value)) ] v)
(else (on-fail))))
(define-syntax wire-value
(syntax-rules ()
[(_ #t input ks kf) (decode-value input ks kf)]
[(_ #f v) (encode-value v)]))
(define-syntax wire-length
(syntax-rules ()
[(_ #t input ks kf) (decode-wire-length input ks kf)]
[(_ #f v) (encode-wire-length v)]))
;; MM NN LLLL
;;
;; 00 00 0000 discard
;; 00 01 0001 capture
;; 00 10 0001 observe
;; 00 11 nnnn any other struct
;;
;; 01 00 nnnn list
;; 01 01 nnnn vector
;;
;; 10 00 nnnn signed integer, bigendian
;; 10 01 nnnn string
;; 10 10 nnnn bytes
;; 10 11 nnnn symbol
;;
;; 11 00 0000 #f
;; 11 00 0001 #t
;; 11 00 0010 (32 bits) single
;; 11 00 0011 (64 bits) double
;;
;; When nnnn = 1111, following bytes are real length
;;
;; The following bytes are a chain of big-endian, high-bit-continuation-bit chunks
;;---------------------------------------------------------------------------
(define (encode-wire-length v)
(when (negative? v) (error 'encode-wire-length "Cannot encode negative wire-length ~v" v))
(if (< v #b1111)
(bit-string (v :: bits 4))
(bit-string (#b1111 :: bits 4) ((encode-varint v) :: binary))))
(define (encode-varint v)
(if (< v 128)
(bytes v)
(bit-string ((+ (modulo v 128) 128) :: bits 8)
((encode-varint (quotient v 128)) :: binary))))
(define (encode-array-like major minor fields)
(bit-string (major :: bits 2)
(minor :: bits 2)
((length fields) :: (wire-length))
((apply bit-string-append (map encode-value fields)) :: binary)))
(define (encode-binary-like minor bs)
(bit-string (#b10 :: bits 2)
(minor :: bits 2)
((bytes-length bs) :: (wire-length))
(bs :: binary)))
(define (encode-value v)
(match v
[(discard) (encode-array-like 0 0 '())]
[(capture s) (encode-array-like 0 1 (list s))]
[(observe s) (encode-array-like 0 2 (list s))]
[(? non-object-struct?)
(define key (prefab-struct-key v))
(when (not key) (error 'encode-value "Cannot encode non-prefab struct ~v" v))
(define fields (cdr (vector->list (struct->vector v))))
(encode-array-like 0 3 (cons key fields))]
[(? list?) (encode-array-like 1 0 v)]
[(? vector?) (encode-array-like 1 1 (vector->list v))]
[(? single-flonum?) (bit-string #b11000010 (v :: float bits 32))]
[(? double-flonum?) (bit-string #b11000011 (v :: float bits 64))]
[0 (bytes #b10000000)]
[(? integer?)
(define raw-bit-count (+ (integer-length v) 1)) ;; at least one sign bit
(define byte-count (quotient (+ raw-bit-count 7) 8))
(bit-string (#b1000 :: bits 4) (byte-count :: (wire-length)) (v :: integer bytes byte-count))]
[(? string?) (encode-binary-like 1 (string->bytes/utf-8 v))]
[(? bytes?) (encode-binary-like 2 v)]
[(? symbol?) (encode-binary-like 3 (string->bytes/utf-8 (symbol->string v)))]
[#f (bytes #b11000000)]
[#t (bytes #b11000001)]
[_ (error 'encode-value "Cannot encode value ~v" v)]))
;;---------------------------------------------------------------------------
(define (decode-wire-length bs ks kf)
(bit-string-case bs
([ (= #b1111 :: bits 4) (rest :: binary) ]
(decode-varint rest
(lambda (v tail)
(if (< v #b1111)
(kf)
(ks v tail)))
kf))
([ (v :: bits 4) (rest :: binary) ] (ks v rest))
(else (kf))))
(define (decode-varint bs ks kf)
(bit-string-case bs
([ (= 1 :: bits 1) (v :: bits 7) (rest :: binary) ]
(decode-varint rest (lambda (acc tail) (ks (+ (* acc 128) v) tail)) kf))
([ (= 0 :: bits 1) (v :: bits 7) (rest :: binary) ]
(ks v rest))
(else
(kf))))
(define (decode-values n acc-rev bs ks kf)
(if (zero? n)
(ks (reverse acc-rev) bs)
(bit-string-case bs
([ (v :: (wire-value)) (rest :: binary) ]
(decode-values (- n 1) (cons v acc-rev) rest ks kf))
(else (kf)))))
(define (decode-value bs ks kf)
(bit-string-case bs
([ (= #b00 :: bits 2) (minor :: bits 2) (field-count :: (wire-length)) (rest :: binary) ]
(decode-values field-count '() rest
(lambda (vs bs)
(match* (minor vs)
[(0 '()) (ks (discard) bs)]
[(1 (list s)) (ks (capture s) bs)]
[(2 (list s)) (ks (observe s) bs)]
[(3 (list* key fs)) (ks (apply make-prefab-struct key fs) bs)]
[(_ _) (kf)]))
kf))
([ (= #b01 :: bits 2) (minor :: bits 2) (count :: (wire-length)) (rest :: binary) ]
(decode-values count '() rest
(lambda (vs bs)
(match minor
[0 (ks vs bs)]
[1 (ks (list->vector vs) bs)]
[_ (kf)]))
kf))
([ (= #b10000000 :: bits 8) (rest :: binary) ]
(ks 0 rest)) ;; because a signed 0-bit integer == -1 !
([ (= #b1000 :: bits 4) (byte-count :: (wire-length))
(v :: integer signed bytes byte-count)
(rest :: binary) ]
(ks v rest))
([ (= #b10 :: bits 2) (minor :: bits 2) (byte-count :: (wire-length))
(bits :: binary bytes byte-count)
(rest :: binary) ]
(define bs (bit-string->bytes bits))
(match minor
[2 (ks bs rest)]
[(or 1 3)
((with-handlers [(exn:fail:contract? (lambda (e) kf))]
(define s (bytes->string/utf-8 bs))
(lambda () (ks (if (= minor 3) (string->symbol s) s) rest))))]))
([ (= #b11000000 :: bits 8) (rest :: binary) ] (ks #f rest))
([ (= #b11000001 :: bits 8) (rest :: binary) ] (ks #t rest))
([ (= #b11000010 :: bits 8) (v :: float bits 32) (rest :: binary) ] (ks v rest))
([ (= #b11000011 :: bits 8) (v :: float bits 64) (rest :: binary) ] (ks v rest))
(else (kf))))
;;---------------------------------------------------------------------------
(module+ test
(require rackunit)
(check-equal? (bit-string->bytes (encode-varint 0)) (bytes 0))
(check-equal? (bit-string->bytes (encode-varint 1)) (bytes 1))
(check-equal? (bit-string->bytes (encode-varint 127)) (bytes 127))
(check-equal? (bit-string->bytes (encode-varint 128)) (bytes 128 1))
(check-equal? (bit-string->bytes (encode-varint 255)) (bytes 255 1))
(check-equal? (bit-string->bytes (encode-varint 256)) (bytes 128 2))
(check-equal? (bit-string->bytes (encode-varint 300)) (bytes #b10101100 #b00000010))
(check-equal? (bit-string->bytes (encode-varint 1000000000)) (bytes 128 148 235 220 3))
(define (ks* v rest) (list v (bit-string->bytes rest)))
(define (kf*) (void))
(check-equal? (decode-varint (bytes 0) ks* kf*) (list 0 (bytes)))
(check-equal? (decode-varint (bytes 0 99) ks* kf*) (list 0 (bytes 99)))
(check-equal? (decode-varint (bytes 1) ks* kf*) (list 1 (bytes)))
(check-equal? (decode-varint (bytes 127) ks* kf*) (list 127 (bytes)))
(check-equal? (decode-varint (bytes 128 1) ks* kf*) (list 128 (bytes)))
(check-equal? (decode-varint (bytes 128 1 99) ks* kf*) (list 128 (bytes 99)))
(check-equal? (decode-varint (bytes 255 1) ks* kf*) (list 255 (bytes)))
(check-equal? (decode-varint (bytes 128 2) ks* kf*) (list 256 (bytes)))
(check-equal? (decode-varint (bytes #b10101100 #b00000010) ks* kf*) (list 300 (bytes)))
(check-equal? (decode-varint (bytes 128 148 235 220 3) ks* kf*) (list 1000000000 (bytes)))
(check-equal? (decode-varint (bytes 128 148 235 220 3 99) ks* kf*) (list 1000000000 (bytes 99)))
(check-equal? (bit-string->bytes (bit-string (0 :: bits 4) (0 :: (wire-length)))) (bytes 0))
(check-equal? (bit-string->bytes (bit-string (0 :: bits 4) (3 :: (wire-length)))) (bytes 3))
(check-equal? (bit-string->bytes (bit-string (0 :: bits 4) (14 :: (wire-length)))) (bytes 14))
(check-equal? (bit-string->bytes (bit-string (0 :: bits 4) (15 :: (wire-length)))) (bytes 15 15))
(check-equal? (bit-string->bytes (bit-string (0 :: bits 4) (100 :: (wire-length))))
(bytes 15 100))
(check-equal? (bit-string->bytes (bit-string (0 :: bits 4) (300 :: (wire-length))))
(bytes 15 #b10101100 #b00000010))
(define (dwl bs)
(bit-string-case bs
([ (= 0 :: bits 4) (w :: (wire-length)) ] w)
(else (void))))
(check-equal? (dwl (bytes 0)) 0)
(check-equal? (dwl (bytes 3)) 3)
(check-equal? (dwl (bytes 14)) 14)
(check-equal? (dwl (bytes 15)) (void))
(check-equal? (dwl (bytes 15 9)) (void)) ;; not canonical
(check-equal? (dwl (bytes 15 15)) 15)
(check-equal? (dwl (bytes 15 100)) 100)
(check-equal? (dwl (bytes 15 #b10101100 #b00000010)) 300)
(struct speak (who what) #:prefab)
(check-equal? (encode (capture (discard))) (bytes version 17 0))
(check-equal? (encode (observe (speak (discard) (capture (discard)))))
(bytes version 33 51 181 115 112 101 97 107 0 17 0))
(check-equal? (encode '(1 2 3 4)) (bytes version 68 129 1 129 2 129 3 129 4))
(check-equal? (encode '#(-2 -1 0 1)) (bytes version 84 129 254 129 255 128 129 1))
(check-equal? (encode '("hello" there #"world" () #() #t #f))
(bit-string->bytes
(bit-string 1
71
#b10010101 (#"hello" :: binary)
#b10110101 (#"there" :: binary)
#b10100101 (#"world" :: binary)
64
80
#b11000001
#b11000000)))
(check-equal? (encode -257) (bytes version 130 254 255))
(check-equal? (encode -256) (bytes version 130 255 0))
(check-equal? (encode -255) (bytes version 130 255 1))
(check-equal? (encode -254) (bytes version 130 255 2))
(check-equal? (encode -129) (bytes version 130 255 127))
(check-equal? (encode -128) (bytes version 129 128))
(check-equal? (encode -127) (bytes version 129 129))
(check-equal? (encode -2) (bytes version 129 254))
(check-equal? (encode -1) (bytes version 129 255))
(check-equal? (encode 0) (bytes version 128))
(check-equal? (encode 1) (bytes version 129 1))
(check-equal? (encode 127) (bytes version 129 127))
(check-equal? (encode 128) (bytes version 130 0 128))
(check-equal? (encode 255) (bytes version 130 0 255))
(check-equal? (encode 256) (bytes version 130 1 0))
(check-equal? (encode 32767) (bytes version 130 127 255))
(check-equal? (encode 32768) (bytes version 131 0 128 0))
(check-equal? (encode 65535) (bytes version 131 0 255 255))
(check-equal? (encode 65536) (bytes version 131 1 0 0))
(check-equal? (encode 131072) (bytes version 131 2 0 0))
(check-equal? (encode 1.0f0) (bytes version #b11000010 #b00111111 #b10000000 0 0))
(check-equal? (encode 1.0) (bytes version #b11000011 #b00111111 #b11110000 0 0 0 0 0 0))
(define (d bs) (decode bs void))
(check-equal? (d (bytes version 17 0)) (capture (discard)))
(check-equal? (d (bytes (+ version 1) 17 0)) (void))
(check-equal? (d (bytes version 68 129 1 129 2 129 3 129 4)) '(1 2 3 4))
(check-equal? (d (bytes version 84 129 254 129 255 128 129 1)) '#(-2 -1 0 1))
(check-equal? (d (bit-string->bytes
(bit-string 1
71
#b10010101 (#"hello" :: binary)
#b10110101 (#"there" :: binary)
#b10100101 (#"world" :: binary)
64
80
#b11000001
#b11000000)))
'("hello" there #"world" () #() #t #f))
(check-equal? (d (bytes version 33 51 181 115 112 101 97 107 0 17 0))
(observe (speak (discard) (capture (discard)))))
(check-equal? (d (bytes version 130 254 255)) -257)
(check-equal? (d (bytes version 130 255 0)) -256)
(check-equal? (d (bytes version 130 255 1)) -255)
(check-equal? (d (bytes version 130 255 2)) -254)
(check-equal? (d (bytes version 130 255 127)) -129)
(check-equal? (d (bytes version 129 128)) -128)
(check-equal? (d (bytes version 129 129)) -127)
(check-equal? (d (bytes version 129 254)) -2)
(check-equal? (d (bytes version 129 255)) -1)
(check-equal? (d (bytes version 128)) 0)
(check-equal? (d (bytes version 129 1)) 1)
(check-equal? (d (bytes version 129 127)) 127)
(check-equal? (d (bytes version 130 0 128)) 128)
(check-equal? (d (bytes version 130 0 255)) 255)
(check-equal? (d (bytes version 130 1 0)) 256)
(check-equal? (d (bytes version 130 127 255)) 32767)
(check-equal? (d (bytes version 131 0 128 0)) 32768)
(check-equal? (d (bytes version 131 0 255 255)) 65535)
(check-equal? (d (bytes version 131 1 0 0)) 65536)
(check-equal? (d (bytes version 131 2 0 0)) 131072)
(check-equal? (d (bytes version #b11000010 #b00111111 #b10000000 0 0)) 1.0)
(check-equal? (d (bytes version #b11000011 #b00111111 #b11110000 0 0 0 0 0 0)) 1.0)
)

View File

@ -8,6 +8,7 @@
(require racket/random file/sha1)
(require imperative-syndicate/skeleton)
(require imperative-syndicate/term)
(require "codec.rkt")
(define-logger mcds)
@ -27,7 +28,7 @@
(define *assertion-refresh* (* 0.9 *assertion-lifetime*))
(define (send-packet! h packet)
(send! (udp-packet h group-target (string->bytes/utf-8 (format "~s" packet)))))
(send! (udp-packet h group-target (encode packet))))
(define (packet-statistics h)
(define report-period 10000)
@ -52,7 +53,7 @@
(during (mcds-outbound _) (assert (mcds-demand)))
(during/spawn (mcds-demand)
(define me (bytes->hex-string (crypto-random-bytes 8)))
(define me (crypto-random-bytes 8))
(define h (udp-listener group-port))
(during h
(assert (udp-multicast-group-member h group-address #f))
@ -61,12 +62,14 @@
(packet-statistics h)
(on (message (udp-packet _ h $body))
(spawn* (match (read (open-input-string (bytes->string/utf-8 body)))
[(list peer type assertion)
;; (log-mcds-info "~v ~v ~v" peer type assertion)
(send! (mcds-change peer type assertion))])))
(spawn*
;; (log-mcds-info "received: ~v" body)
(match (decode body)
[(list peer type assertion)
;; (log-mcds-info "~v ~v ~v" peer type assertion)
(send! (mcds-change peer type assertion))])))
(on (message (mcds-change $peer 'asserted $assertion))
(on (message (mcds-change $peer '+ $assertion))
(spawn
(define expiry (+ (current-inexact-milliseconds) *assertion-lifetime*))
(assert (mcds-inbound assertion))
@ -92,7 +95,7 @@
#t
(lambda () (values (observe x) i))))
(stop-when (message (mcds-change peer 'retracted assertion)))
(stop-when (message (mcds-change peer '- assertion)))
(stop-when (asserted (later-than expiry)))
(stop-when (retracted (mcds-demand)))))
@ -102,9 +105,9 @@
(during (mcds-relevant $assertion _)
(during (mcds-outbound assertion)
(define (refresh!) (send-packet! h (list me 'asserted assertion)))
(define (refresh!) (send-packet! h (list me '+ assertion)))
(on-start (refresh!))
(on-stop (send-packet! h (list me 'retracted assertion)))
(on-stop (send-packet! h (list me '- assertion)))
(field [deadline (+ (current-inexact-milliseconds) *assertion-refresh*)])
(on (asserted (later-than (deadline)))
@ -116,8 +119,8 @@
(define soon (+ (current-inexact-milliseconds) 100))
(when (> (deadline) soon) (deadline soon)))))
(on (message (mcds-change $peer 'message $body))
(on (message (mcds-change $peer '! $body))
(send! (mcds-inbound body)))
(on (message (mcds-outbound $body))
(send-packet! h (list me 'message body))))))
(send-packet! h (list me '! body))))))