Update for latest preserves API

This commit is contained in:
Tony Garnock-Jones 2021-05-15 22:34:58 +02:00
parent 90c4c60699
commit ec42282c69
2 changed files with 14 additions and 11 deletions

View File

@ -35,13 +35,10 @@
;; establish `observe`s, i.e. subscriptions. ;; establish `observe`s, i.e. subscriptions.
(define (decode bs) (define (decode bs)
(bit-string-case bs (preserves:bytes->preserve bs))
#:on-short (lambda (fail) (values #f bs))
([ (v :: (preserves:wire-value)) (rest :: binary) ] (values v (bit-string->bytes rest)))
(else (error 'decode "Invalid wire message"))))
(define (encode v) (define (encode v)
(preserves:encode v)) (preserves:preserve->bytes v))
(define (ping-interval) (define (ping-interval)
(* 1000 (min 60 ;; reasonable default? (* 1000 (min 60 ;; reasonable default?
@ -58,9 +55,15 @@
(define (packet-accumulator handle-packet!) (define (packet-accumulator handle-packet!)
(field [buffer #""]) (field [buffer #""])
(begin/dataflow (begin/dataflow
(define-values (packet remainder) (decode (buffer))) (define p (open-input-bytes (buffer)))
(when packet (let read-more ()
(buffer remainder) (define start-pos (file-position p))
(handle-packet! packet))) (match (preserves:read-preserve/binary p #:on-short (lambda () eof))
[(? eof-object?)
(when (positive? start-pos)
(buffer (subbytes (buffer) start-pos)))]
[packet
(handle-packet! packet)
(read-more)])))
(lambda (chunk) (lambda (chunk)
(buffer (bytes-append (buffer) chunk)))) (buffer (bytes-append (buffer) chunk))))

View File

@ -28,7 +28,7 @@
(define *assertion-refresh* (* 0.9 *assertion-lifetime*)) (define *assertion-refresh* (* 0.9 *assertion-lifetime*))
(define (send-packet! h packet) (define (send-packet! h packet)
(send! (udp-packet h group-target (encode packet)))) (send! (udp-packet h group-target (preserve->bytes packet))))
(define (packet-statistics h) (define (packet-statistics h)
(define report-period 10000) (define report-period 10000)
@ -64,7 +64,7 @@
(on (message (udp-packet _ h $body)) (on (message (udp-packet _ h $body))
(spawn* (spawn*
;; (log-mcds-info "received: ~v" body) ;; (log-mcds-info "received: ~v" body)
(match (decode body) (match (bytes->preserve body)
[(list peer type assertion) [(list peer type assertion)
;; (log-mcds-info "~v ~v ~v" peer type assertion) ;; (log-mcds-info "~v ~v ~v" peer type assertion)
(send! (mcds-change peer type assertion))]))) (send! (mcds-change peer type assertion))])))