2019-03-18 15:34:14 +00:00
|
|
|
#lang imperative-syndicate
|
|
|
|
|
|
|
|
(provide (all-defined-out))
|
|
|
|
|
|
|
|
(require (prefix-in preserves: preserves))
|
|
|
|
(require bitsyntax)
|
2019-03-18 23:29:12 +00:00
|
|
|
(require (only-in net/rfc6455 ws-idle-timeout))
|
2019-05-12 12:07:38 +00:00
|
|
|
(require imperative-syndicate/protocol/credit)
|
2019-03-18 15:34:14 +00:00
|
|
|
|
2019-05-07 11:56:22 +00:00
|
|
|
;; Enrolment
|
|
|
|
(message-struct Connect (scope)) ;; Client --> Server
|
|
|
|
(message-struct Peer (scope)) ;; Peer --> Peer
|
|
|
|
|
2019-05-05 15:37:03 +00:00
|
|
|
;; Actions; Client --> Server (and Peer --> Peer, except for Message)
|
2019-03-18 15:34:14 +00:00
|
|
|
(message-struct Assert (endpoint-name assertion))
|
|
|
|
(message-struct Clear (endpoint-name))
|
|
|
|
(message-struct Message (body))
|
|
|
|
|
2019-05-05 15:37:03 +00:00
|
|
|
;; Events; Server --> Client (and Peer --> Peer)
|
2019-03-18 15:34:14 +00:00
|
|
|
(message-struct Add (endpoint-name captures))
|
|
|
|
(message-struct Del (endpoint-name captures))
|
|
|
|
(message-struct Msg (endpoint-name captures))
|
2019-05-07 11:56:22 +00:00
|
|
|
(message-struct Err (detail))
|
2019-03-18 15:34:14 +00:00
|
|
|
|
2019-05-05 15:37:03 +00:00
|
|
|
;; Transport-related; Bidirectional
|
2019-03-18 23:29:12 +00:00
|
|
|
(message-struct Ping ())
|
|
|
|
(message-struct Pong ())
|
|
|
|
|
2019-05-07 11:56:22 +00:00
|
|
|
;; Peering
|
|
|
|
;; =======
|
|
|
|
;;
|
|
|
|
;; To peer, send `(Peer Scope)` at the start of a connection instead
|
|
|
|
;; of the usual `(Connect Scope)`.
|
|
|
|
;;
|
|
|
|
;; In peer mode, *actions* and *events* travel in *both* directions,
|
|
|
|
;; but `Message`s do not appear and (for now) `Assert` is only used to
|
|
|
|
;; establish `observe`s, i.e. subscriptions.
|
|
|
|
|
2019-03-18 15:34:14 +00:00
|
|
|
(define (decode bs)
|
|
|
|
(parameterize ((preserves:short-form-labels '#(discard capture observe)))
|
|
|
|
(bit-string-case bs
|
|
|
|
#:on-short (lambda (fail) (values #f bs))
|
|
|
|
([ (v :: (preserves:wire-value)) (rest :: binary) ] (values v (bit-string->bytes rest)))
|
|
|
|
(else (error 'decode "Invalid wire message")))))
|
|
|
|
|
|
|
|
(define (encode v)
|
|
|
|
(parameterize ((preserves:short-form-labels '#(discard capture observe)))
|
|
|
|
(preserves:encode v)))
|
2019-03-18 23:29:12 +00:00
|
|
|
|
|
|
|
(define (ping-interval)
|
|
|
|
(* 1000 (max (- (ws-idle-timeout) 10)
|
|
|
|
(* (ws-idle-timeout) 0.8))))
|
|
|
|
|
|
|
|
(define (packet-accumulator handle-packet!)
|
|
|
|
(field [buffer #""])
|
|
|
|
(begin/dataflow
|
|
|
|
(define-values (packet remainder) (decode (buffer)))
|
|
|
|
(when packet
|
|
|
|
(buffer remainder)
|
|
|
|
(handle-packet! packet)))
|
|
|
|
(lambda (chunk)
|
|
|
|
(buffer (bytes-append (buffer) chunk))))
|