From ec42282c69f09b231f870303338c71d431db65a4 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sat, 15 May 2021 22:34:58 +0200 Subject: [PATCH] Update for latest preserves API --- syndicate/distributed/wire-protocol.rkt | 21 ++++++++++++--------- syndicate/mc/udp-dataspace.rkt | 4 ++-- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/syndicate/distributed/wire-protocol.rkt b/syndicate/distributed/wire-protocol.rkt index f5a45b7..bfbb973 100644 --- a/syndicate/distributed/wire-protocol.rkt +++ b/syndicate/distributed/wire-protocol.rkt @@ -35,13 +35,10 @@ ;; establish `observe`s, i.e. subscriptions. (define (decode bs) - (bit-string-case bs - #:on-short (lambda (fail) (values #f bs)) - ([ (v :: (preserves:wire-value)) (rest :: binary) ] (values v (bit-string->bytes rest))) - (else (error 'decode "Invalid wire message")))) + (preserves:bytes->preserve bs)) (define (encode v) - (preserves:encode v)) + (preserves:preserve->bytes v)) (define (ping-interval) (* 1000 (min 60 ;; reasonable default? @@ -58,9 +55,15 @@ (define (packet-accumulator handle-packet!) (field [buffer #""]) (begin/dataflow - (define-values (packet remainder) (decode (buffer))) - (when packet - (buffer remainder) - (handle-packet! packet))) + (define p (open-input-bytes (buffer))) + (let read-more () + (define start-pos (file-position p)) + (match (preserves:read-preserve/binary p #:on-short (lambda () eof)) + [(? eof-object?) + (when (positive? start-pos) + (buffer (subbytes (buffer) start-pos)))] + [packet + (handle-packet! packet) + (read-more)]))) (lambda (chunk) (buffer (bytes-append (buffer) chunk)))) diff --git a/syndicate/mc/udp-dataspace.rkt b/syndicate/mc/udp-dataspace.rkt index e2c0faf..eb40604 100644 --- a/syndicate/mc/udp-dataspace.rkt +++ b/syndicate/mc/udp-dataspace.rkt @@ -28,7 +28,7 @@ (define *assertion-refresh* (* 0.9 *assertion-lifetime*)) (define (send-packet! h packet) - (send! (udp-packet h group-target (encode packet)))) + (send! (udp-packet h group-target (preserve->bytes packet)))) (define (packet-statistics h) (define report-period 10000) @@ -64,7 +64,7 @@ (on (message (udp-packet _ h $body)) (spawn* ;; (log-mcds-info "received: ~v" body) - (match (decode body) + (match (bytes->preserve body) [(list peer type assertion) ;; (log-mcds-info "~v ~v ~v" peer type assertion) (send! (mcds-change peer type assertion))])))