#lang imperative-syndicate (provide (all-defined-out)) (require (prefix-in preserves: preserves)) (require bitsyntax) (require (only-in net/rfc6455 ws-idle-timeout)) ;; Client --> Broker (message-struct Assert (endpoint-name assertion)) (message-struct Clear (endpoint-name)) (message-struct Message (body)) ;; Broker --> Client (message-struct Add (endpoint-name captures)) (message-struct Del (endpoint-name captures)) (message-struct Msg (endpoint-name captures)) ;; Bidirectional (message-struct Ping ()) (message-struct Pong ()) (define (decode bs) (parameterize ((preserves:short-form-labels '#(discard capture observe))) (bit-string-case bs #:on-short (lambda (fail) (values #f bs)) ([ (v :: (preserves:wire-value)) (rest :: binary) ] (values v (bit-string->bytes rest))) (else (error 'decode "Invalid wire message"))))) (define (encode v) (parameterize ((preserves:short-form-labels '#(discard capture observe))) (preserves:encode v))) (define (ping-interval) (* 1000 (max (- (ws-idle-timeout) 10) (* (ws-idle-timeout) 0.8)))) (define (packet-accumulator handle-packet!) (field [buffer #""]) (begin/dataflow (define-values (packet remainder) (decode (buffer))) (when packet (buffer remainder) (handle-packet! packet))) (lambda (chunk) (buffer (bytes-append (buffer) chunk)))) ;; Received packets from broker are relayed via one of these. (message-struct broker-packet (address packet))