From 7dacb5ba6e21879e583c7f51e62f47daaeb279ad Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sun, 19 Aug 2018 22:13:42 +0100 Subject: [PATCH] Better codec --- syndicate/mc/codec.rkt | 323 +++++++++++++++++++++++++++++++++ syndicate/mc/udp-dataspace.rkt | 27 +-- 2 files changed, 338 insertions(+), 12 deletions(-) create mode 100644 syndicate/mc/codec.rkt diff --git a/syndicate/mc/codec.rkt b/syndicate/mc/codec.rkt new file mode 100644 index 0000000..3e49268 --- /dev/null +++ b/syndicate/mc/codec.rkt @@ -0,0 +1,323 @@ +#lang racket/base + +(provide encode + decode + wire-value) + +(require racket/match) +(require bitsyntax) +(require syndicate/support/struct) + +(require imperative-syndicate/assertions) +(require imperative-syndicate/pattern) + +(define version 1) + +(define (encode v) + (bit-string->bytes (bit-string (version :: bits 8) (v :: (wire-value))))) + +(define (decode bs [on-fail (lambda () (error 'decode "Invalid encoding: ~v" bs))]) + (bit-string-case bs + ([ (= version :: bits 8) (v :: (wire-value)) ] v) + (else (on-fail)))) + +(define-syntax wire-value + (syntax-rules () + [(_ #t input ks kf) (decode-value input ks kf)] + [(_ #f v) (encode-value v)])) + +(define-syntax wire-length + (syntax-rules () + [(_ #t input ks kf) (decode-wire-length input ks kf)] + [(_ #f v) (encode-wire-length v)])) + +;; MM NN LLLL +;; +;; 00 00 0000 discard +;; 00 01 0001 capture +;; 00 10 0001 observe +;; 00 11 nnnn any other struct +;; +;; 01 00 nnnn list +;; 01 01 nnnn vector +;; +;; 10 00 nnnn signed integer, bigendian +;; 10 01 nnnn string +;; 10 10 nnnn bytes +;; 10 11 nnnn symbol +;; +;; 11 00 0000 #f +;; 11 00 0001 #t +;; 11 00 0010 (32 bits) single +;; 11 00 0011 (64 bits) double +;; +;; When nnnn = 1111, following bytes are real length +;; +;; The following bytes are a chain of big-endian, high-bit-continuation-bit chunks + +;;--------------------------------------------------------------------------- + +(define (encode-wire-length v) + (when (negative? v) (error 'encode-wire-length "Cannot encode negative wire-length ~v" v)) + (if (< v #b1111) + (bit-string (v :: bits 4)) + (bit-string (#b1111 :: bits 4) ((encode-varint v) :: binary)))) + +(define (encode-varint v) + (if (< v 128) + (bytes v) + (bit-string ((+ (modulo v 128) 128) :: bits 8) + ((encode-varint (quotient v 128)) :: binary)))) + +(define (encode-array-like major minor fields) + (bit-string (major :: bits 2) + (minor :: bits 2) + ((length fields) :: (wire-length)) + ((apply bit-string-append (map encode-value fields)) :: binary))) + +(define (encode-binary-like minor bs) + (bit-string (#b10 :: bits 2) + (minor :: bits 2) + ((bytes-length bs) :: (wire-length)) + (bs :: binary))) + +(define (encode-value v) + (match v + [(discard) (encode-array-like 0 0 '())] + [(capture s) (encode-array-like 0 1 (list s))] + [(observe s) (encode-array-like 0 2 (list s))] + [(? non-object-struct?) + (define key (prefab-struct-key v)) + (when (not key) (error 'encode-value "Cannot encode non-prefab struct ~v" v)) + (define fields (cdr (vector->list (struct->vector v)))) + (encode-array-like 0 3 (cons key fields))] + + [(? list?) (encode-array-like 1 0 v)] + [(? vector?) (encode-array-like 1 1 (vector->list v))] + + [(? single-flonum?) (bit-string #b11000010 (v :: float bits 32))] + [(? double-flonum?) (bit-string #b11000011 (v :: float bits 64))] + + [0 (bytes #b10000000)] + [(? integer?) + (define raw-bit-count (+ (integer-length v) 1)) ;; at least one sign bit + (define byte-count (quotient (+ raw-bit-count 7) 8)) + (bit-string (#b1000 :: bits 4) (byte-count :: (wire-length)) (v :: integer bytes byte-count))] + [(? string?) (encode-binary-like 1 (string->bytes/utf-8 v))] + [(? bytes?) (encode-binary-like 2 v)] + [(? symbol?) (encode-binary-like 3 (string->bytes/utf-8 (symbol->string v)))] + + [#f (bytes #b11000000)] + [#t (bytes #b11000001)] + + [_ (error 'encode-value "Cannot encode value ~v" v)])) + +;;--------------------------------------------------------------------------- + +(define (decode-wire-length bs ks kf) + (bit-string-case bs + ([ (= #b1111 :: bits 4) (rest :: binary) ] + (decode-varint rest + (lambda (v tail) + (if (< v #b1111) + (kf) + (ks v tail))) + kf)) + ([ (v :: bits 4) (rest :: binary) ] (ks v rest)) + (else (kf)))) + +(define (decode-varint bs ks kf) + (bit-string-case bs + ([ (= 1 :: bits 1) (v :: bits 7) (rest :: binary) ] + (decode-varint rest (lambda (acc tail) (ks (+ (* acc 128) v) tail)) kf)) + ([ (= 0 :: bits 1) (v :: bits 7) (rest :: binary) ] + (ks v rest)) + (else + (kf)))) + +(define (decode-values n acc-rev bs ks kf) + (if (zero? n) + (ks (reverse acc-rev) bs) + (bit-string-case bs + ([ (v :: (wire-value)) (rest :: binary) ] + (decode-values (- n 1) (cons v acc-rev) rest ks kf)) + (else (kf))))) + +(define (decode-value bs ks kf) + (bit-string-case bs + ([ (= #b00 :: bits 2) (minor :: bits 2) (field-count :: (wire-length)) (rest :: binary) ] + (decode-values field-count '() rest + (lambda (vs bs) + (match* (minor vs) + [(0 '()) (ks (discard) bs)] + [(1 (list s)) (ks (capture s) bs)] + [(2 (list s)) (ks (observe s) bs)] + [(3 (list* key fs)) (ks (apply make-prefab-struct key fs) bs)] + [(_ _) (kf)])) + kf)) + ([ (= #b01 :: bits 2) (minor :: bits 2) (count :: (wire-length)) (rest :: binary) ] + (decode-values count '() rest + (lambda (vs bs) + (match minor + [0 (ks vs bs)] + [1 (ks (list->vector vs) bs)] + [_ (kf)])) + kf)) + ([ (= #b10000000 :: bits 8) (rest :: binary) ] + (ks 0 rest)) ;; because a signed 0-bit integer == -1 ! + ([ (= #b1000 :: bits 4) (byte-count :: (wire-length)) + (v :: integer signed bytes byte-count) + (rest :: binary) ] + (ks v rest)) + ([ (= #b10 :: bits 2) (minor :: bits 2) (byte-count :: (wire-length)) + (bits :: binary bytes byte-count) + (rest :: binary) ] + (define bs (bit-string->bytes bits)) + (match minor + [2 (ks bs rest)] + [(or 1 3) + ((with-handlers [(exn:fail:contract? (lambda (e) kf))] + (define s (bytes->string/utf-8 bs)) + (lambda () (ks (if (= minor 3) (string->symbol s) s) rest))))])) + ([ (= #b11000000 :: bits 8) (rest :: binary) ] (ks #f rest)) + ([ (= #b11000001 :: bits 8) (rest :: binary) ] (ks #t rest)) + ([ (= #b11000010 :: bits 8) (v :: float bits 32) (rest :: binary) ] (ks v rest)) + ([ (= #b11000011 :: bits 8) (v :: float bits 64) (rest :: binary) ] (ks v rest)) + (else (kf)))) + +;;--------------------------------------------------------------------------- + +(module+ test + (require rackunit) + + (check-equal? (bit-string->bytes (encode-varint 0)) (bytes 0)) + (check-equal? (bit-string->bytes (encode-varint 1)) (bytes 1)) + (check-equal? (bit-string->bytes (encode-varint 127)) (bytes 127)) + (check-equal? (bit-string->bytes (encode-varint 128)) (bytes 128 1)) + (check-equal? (bit-string->bytes (encode-varint 255)) (bytes 255 1)) + (check-equal? (bit-string->bytes (encode-varint 256)) (bytes 128 2)) + (check-equal? (bit-string->bytes (encode-varint 300)) (bytes #b10101100 #b00000010)) + (check-equal? (bit-string->bytes (encode-varint 1000000000)) (bytes 128 148 235 220 3)) + + (define (ks* v rest) (list v (bit-string->bytes rest))) + (define (kf*) (void)) + + (check-equal? (decode-varint (bytes 0) ks* kf*) (list 0 (bytes))) + (check-equal? (decode-varint (bytes 0 99) ks* kf*) (list 0 (bytes 99))) + (check-equal? (decode-varint (bytes 1) ks* kf*) (list 1 (bytes))) + (check-equal? (decode-varint (bytes 127) ks* kf*) (list 127 (bytes))) + (check-equal? (decode-varint (bytes 128 1) ks* kf*) (list 128 (bytes))) + (check-equal? (decode-varint (bytes 128 1 99) ks* kf*) (list 128 (bytes 99))) + (check-equal? (decode-varint (bytes 255 1) ks* kf*) (list 255 (bytes))) + (check-equal? (decode-varint (bytes 128 2) ks* kf*) (list 256 (bytes))) + (check-equal? (decode-varint (bytes #b10101100 #b00000010) ks* kf*) (list 300 (bytes))) + (check-equal? (decode-varint (bytes 128 148 235 220 3) ks* kf*) (list 1000000000 (bytes))) + (check-equal? (decode-varint (bytes 128 148 235 220 3 99) ks* kf*) (list 1000000000 (bytes 99))) + + (check-equal? (bit-string->bytes (bit-string (0 :: bits 4) (0 :: (wire-length)))) (bytes 0)) + (check-equal? (bit-string->bytes (bit-string (0 :: bits 4) (3 :: (wire-length)))) (bytes 3)) + (check-equal? (bit-string->bytes (bit-string (0 :: bits 4) (14 :: (wire-length)))) (bytes 14)) + (check-equal? (bit-string->bytes (bit-string (0 :: bits 4) (15 :: (wire-length)))) (bytes 15 15)) + (check-equal? (bit-string->bytes (bit-string (0 :: bits 4) (100 :: (wire-length)))) + (bytes 15 100)) + (check-equal? (bit-string->bytes (bit-string (0 :: bits 4) (300 :: (wire-length)))) + (bytes 15 #b10101100 #b00000010)) + + (define (dwl bs) + (bit-string-case bs + ([ (= 0 :: bits 4) (w :: (wire-length)) ] w) + (else (void)))) + + (check-equal? (dwl (bytes 0)) 0) + (check-equal? (dwl (bytes 3)) 3) + (check-equal? (dwl (bytes 14)) 14) + (check-equal? (dwl (bytes 15)) (void)) + (check-equal? (dwl (bytes 15 9)) (void)) ;; not canonical + (check-equal? (dwl (bytes 15 15)) 15) + (check-equal? (dwl (bytes 15 100)) 100) + (check-equal? (dwl (bytes 15 #b10101100 #b00000010)) 300) + + (struct speak (who what) #:prefab) + + (check-equal? (encode (capture (discard))) (bytes version 17 0)) + (check-equal? (encode (observe (speak (discard) (capture (discard))))) + (bytes version 33 51 181 115 112 101 97 107 0 17 0)) + (check-equal? (encode '(1 2 3 4)) (bytes version 68 129 1 129 2 129 3 129 4)) + (check-equal? (encode '#(-2 -1 0 1)) (bytes version 84 129 254 129 255 128 129 1)) + (check-equal? (encode '("hello" there #"world" () #() #t #f)) + (bit-string->bytes + (bit-string 1 + 71 + #b10010101 (#"hello" :: binary) + #b10110101 (#"there" :: binary) + #b10100101 (#"world" :: binary) + 64 + 80 + #b11000001 + #b11000000))) + (check-equal? (encode -257) (bytes version 130 254 255)) + (check-equal? (encode -256) (bytes version 130 255 0)) + (check-equal? (encode -255) (bytes version 130 255 1)) + (check-equal? (encode -254) (bytes version 130 255 2)) + (check-equal? (encode -129) (bytes version 130 255 127)) + (check-equal? (encode -128) (bytes version 129 128)) + (check-equal? (encode -127) (bytes version 129 129)) + (check-equal? (encode -2) (bytes version 129 254)) + (check-equal? (encode -1) (bytes version 129 255)) + (check-equal? (encode 0) (bytes version 128)) + (check-equal? (encode 1) (bytes version 129 1)) + (check-equal? (encode 127) (bytes version 129 127)) + (check-equal? (encode 128) (bytes version 130 0 128)) + (check-equal? (encode 255) (bytes version 130 0 255)) + (check-equal? (encode 256) (bytes version 130 1 0)) + (check-equal? (encode 32767) (bytes version 130 127 255)) + (check-equal? (encode 32768) (bytes version 131 0 128 0)) + (check-equal? (encode 65535) (bytes version 131 0 255 255)) + (check-equal? (encode 65536) (bytes version 131 1 0 0)) + (check-equal? (encode 131072) (bytes version 131 2 0 0)) + (check-equal? (encode 1.0f0) (bytes version #b11000010 #b00111111 #b10000000 0 0)) + (check-equal? (encode 1.0) (bytes version #b11000011 #b00111111 #b11110000 0 0 0 0 0 0)) + + (define (d bs) (decode bs void)) + + (check-equal? (d (bytes version 17 0)) (capture (discard))) + (check-equal? (d (bytes (+ version 1) 17 0)) (void)) + + (check-equal? (d (bytes version 68 129 1 129 2 129 3 129 4)) '(1 2 3 4)) + (check-equal? (d (bytes version 84 129 254 129 255 128 129 1)) '#(-2 -1 0 1)) + (check-equal? (d (bit-string->bytes + (bit-string 1 + 71 + #b10010101 (#"hello" :: binary) + #b10110101 (#"there" :: binary) + #b10100101 (#"world" :: binary) + 64 + 80 + #b11000001 + #b11000000))) + '("hello" there #"world" () #() #t #f)) + (check-equal? (d (bytes version 33 51 181 115 112 101 97 107 0 17 0)) + (observe (speak (discard) (capture (discard))))) + (check-equal? (d (bytes version 130 254 255)) -257) + (check-equal? (d (bytes version 130 255 0)) -256) + (check-equal? (d (bytes version 130 255 1)) -255) + (check-equal? (d (bytes version 130 255 2)) -254) + (check-equal? (d (bytes version 130 255 127)) -129) + (check-equal? (d (bytes version 129 128)) -128) + (check-equal? (d (bytes version 129 129)) -127) + (check-equal? (d (bytes version 129 254)) -2) + (check-equal? (d (bytes version 129 255)) -1) + (check-equal? (d (bytes version 128)) 0) + (check-equal? (d (bytes version 129 1)) 1) + (check-equal? (d (bytes version 129 127)) 127) + (check-equal? (d (bytes version 130 0 128)) 128) + (check-equal? (d (bytes version 130 0 255)) 255) + (check-equal? (d (bytes version 130 1 0)) 256) + (check-equal? (d (bytes version 130 127 255)) 32767) + (check-equal? (d (bytes version 131 0 128 0)) 32768) + (check-equal? (d (bytes version 131 0 255 255)) 65535) + (check-equal? (d (bytes version 131 1 0 0)) 65536) + (check-equal? (d (bytes version 131 2 0 0)) 131072) + (check-equal? (d (bytes version #b11000010 #b00111111 #b10000000 0 0)) 1.0) + (check-equal? (d (bytes version #b11000011 #b00111111 #b11110000 0 0 0 0 0 0)) 1.0) + ) diff --git a/syndicate/mc/udp-dataspace.rkt b/syndicate/mc/udp-dataspace.rkt index 0f482a0..b22b7bf 100644 --- a/syndicate/mc/udp-dataspace.rkt +++ b/syndicate/mc/udp-dataspace.rkt @@ -8,6 +8,7 @@ (require racket/random file/sha1) (require imperative-syndicate/skeleton) (require imperative-syndicate/term) +(require "codec.rkt") (define-logger mcds) @@ -27,7 +28,7 @@ (define *assertion-refresh* (* 0.9 *assertion-lifetime*)) (define (send-packet! h packet) - (send! (udp-packet h group-target (string->bytes/utf-8 (format "~s" packet))))) + (send! (udp-packet h group-target (encode packet)))) (define (packet-statistics h) (define report-period 10000) @@ -52,7 +53,7 @@ (during (mcds-outbound _) (assert (mcds-demand))) (during/spawn (mcds-demand) - (define me (bytes->hex-string (crypto-random-bytes 8))) + (define me (crypto-random-bytes 8)) (define h (udp-listener group-port)) (during h (assert (udp-multicast-group-member h group-address #f)) @@ -61,12 +62,14 @@ (packet-statistics h) (on (message (udp-packet _ h $body)) - (spawn* (match (read (open-input-string (bytes->string/utf-8 body))) - [(list peer type assertion) - ;; (log-mcds-info "~v ~v ~v" peer type assertion) - (send! (mcds-change peer type assertion))]))) + (spawn* + ;; (log-mcds-info "received: ~v" body) + (match (decode body) + [(list peer type assertion) + ;; (log-mcds-info "~v ~v ~v" peer type assertion) + (send! (mcds-change peer type assertion))]))) - (on (message (mcds-change $peer 'asserted $assertion)) + (on (message (mcds-change $peer '+ $assertion)) (spawn (define expiry (+ (current-inexact-milliseconds) *assertion-lifetime*)) (assert (mcds-inbound assertion)) @@ -92,7 +95,7 @@ #t (lambda () (values (observe x) i)))) - (stop-when (message (mcds-change peer 'retracted assertion))) + (stop-when (message (mcds-change peer '- assertion))) (stop-when (asserted (later-than expiry))) (stop-when (retracted (mcds-demand))))) @@ -102,9 +105,9 @@ (during (mcds-relevant $assertion _) (during (mcds-outbound assertion) - (define (refresh!) (send-packet! h (list me 'asserted assertion))) + (define (refresh!) (send-packet! h (list me '+ assertion))) (on-start (refresh!)) - (on-stop (send-packet! h (list me 'retracted assertion))) + (on-stop (send-packet! h (list me '- assertion))) (field [deadline (+ (current-inexact-milliseconds) *assertion-refresh*)]) (on (asserted (later-than (deadline))) @@ -116,8 +119,8 @@ (define soon (+ (current-inexact-milliseconds) 100)) (when (> (deadline) soon) (deadline soon))))) - (on (message (mcds-change $peer 'message $body)) + (on (message (mcds-change $peer '! $body)) (send! (mcds-inbound body))) (on (message (mcds-outbound $body)) - (send-packet! h (list me 'message body)))))) + (send-packet! h (list me '! body))))))