Improve multicast protocol

This commit is contained in:
Tony Garnock-Jones 2018-08-19 17:54:32 +01:00
parent c125564f5f
commit 77b0addcd0
1 changed files with 67 additions and 33 deletions

View File

@ -6,6 +6,7 @@
(require/activate imperative-syndicate/drivers/timer) (require/activate imperative-syndicate/drivers/timer)
(require/activate imperative-syndicate/drivers/udp) (require/activate imperative-syndicate/drivers/udp)
(require racket/random file/sha1) (require racket/random file/sha1)
(require imperative-syndicate/skeleton)
(require imperative-syndicate/term) (require imperative-syndicate/term)
(define-logger mcds) (define-logger mcds)
@ -16,16 +17,37 @@
(struct mcds-change (peer type assertion) #:transparent) (struct mcds-change (peer type assertion) #:transparent)
(struct mcds-demand () #:transparent) (struct mcds-demand () #:transparent)
(struct mcds-relevant (assertion peer) #:transparent)
(define group-address "239.192.57.49") ;; falls within Organization Local Scope (see RFC 2365) (define group-address "239.192.57.49") ;; falls within Organization Local Scope (see RFC 2365)
(define group-port 5999) ;; make sure your firewall is open to UDP on this port (define group-port 5999) ;; make sure your firewall is open to UDP on this port
(define group-target (udp-remote-address group-address group-port)) (define group-target (udp-remote-address group-address group-port))
(define *assertion-lifetime* 5000) (define *assertion-lifetime* 30000)
(define *assertion-refresh* (* 0.9 *assertion-lifetime*)) (define *assertion-refresh* (* 0.9 *assertion-lifetime*))
(define (send-packet! h packet) (define (send-packet! h packet)
(send! (udp-packet h group-target (string->bytes/utf-8 (format "~s" packet))))) (send! (udp-packet h group-target (string->bytes/utf-8 (format "~s" packet)))))
(define (packet-statistics h)
(define report-period 10000)
(field [packet-count 0]
[byte-count 0]
[reset-time (+ (current-inexact-milliseconds) report-period)])
(on (message (udp-packet _ h $body))
(packet-count (+ (packet-count) 1))
(byte-count (+ (byte-count) (bytes-length body))))
(on (asserted (later-than (reset-time)))
(reset-time (+ (reset-time) report-period))
(log-mcds-info "~a packets, ~a bytes received in ~a ms = ~a Hz, ~a bytes/s"
(packet-count)
(byte-count)
report-period
(/ (packet-count) (/ report-period 1000.0))
(/ (byte-count) (/ report-period 1000.0)))
(packet-count 0)
(byte-count 0)))
(spawn (during (observe (mcds-inbound _)) (assert (mcds-demand))) (spawn (during (observe (mcds-inbound _)) (assert (mcds-demand)))
(during (mcds-outbound _) (assert (mcds-demand))) (during (mcds-outbound _) (assert (mcds-demand)))
@ -36,51 +58,63 @@
(assert (udp-multicast-group-member h group-address #f)) (assert (udp-multicast-group-member h group-address #f))
(assert (udp-multicast-loopback h #t)) (assert (udp-multicast-loopback h #t))
(define report-period 10000) (packet-statistics h)
(field [packet-count 0]
[byte-count 0]
[reset-time (+ (current-inexact-milliseconds) report-period)])
(on (asserted (later-than (reset-time)))
(reset-time (+ (reset-time) report-period))
(log-mcds-info "~a packets, ~a bytes received in ~a ms = ~a Hz, ~a bytes/s"
(packet-count)
(byte-count)
report-period
(/ (packet-count) (/ report-period 1000.0))
(/ (byte-count) (/ report-period 1000.0)))
(packet-count 0)
(byte-count 0))
(on (message (udp-packet _ h $body)) (on (message (udp-packet _ h $body))
(packet-count (+ (packet-count) 1))
(byte-count (+ (byte-count) (bytes-length body)))
(spawn* (match (read (open-input-string (bytes->string/utf-8 body))) (spawn* (match (read (open-input-string (bytes->string/utf-8 body)))
[(list peer type assertion) [(list peer type assertion)
;; (log-mcds-info "~v ~v ~v" peer type assertion)
(send! (mcds-change peer type assertion))]))) (send! (mcds-change peer type assertion))])))
(on (message (mcds-change $peer 'asserted $assertion)) (on (message (mcds-change $peer 'asserted $assertion))
(spawn (define expiry (+ (current-inexact-milliseconds) *assertion-lifetime*)) (spawn
(assert (mcds-inbound assertion)) (define expiry (+ (current-inexact-milliseconds) *assertion-lifetime*))
(stop-when (message (mcds-change peer 'retracted assertion))) (assert (mcds-inbound assertion))
(stop-when (asserted (later-than expiry)))
(stop-when (retracted (mcds-demand))))) (when (observe? assertion)
(define pattern (observe-specification assertion))
(define x (mcds-outbound pattern))
(define i (skeleton-interest
(term->skeleton x)
(term->skeleton-proj x)
(term->key x)
(term->capture-proj x)
(lambda (op . captured-values)
(when (eq? op '+)
(define term (instantiate-term->value pattern captured-values))
(schedule-script!
(current-actor)
(lambda ()
(assert! (mcds-relevant term peer))))))
void))
(add-endpoint! (current-facet)
"udp-dataspace (mcds-inbound (observe ...))"
#t
(lambda () (values (observe x) i))))
(stop-when (message (mcds-change peer 'retracted assertion)))
(stop-when (asserted (later-than expiry)))
(stop-when (retracted (mcds-demand)))))
(during (observe (mcds-inbound $pattern)) (during (observe (mcds-inbound $pattern))
(assert (mcds-relevant (observe pattern) me))
(assert (mcds-outbound (observe pattern)))) (assert (mcds-outbound (observe pattern))))
(during (mcds-outbound $assertion) (during (mcds-relevant $assertion _)
(on-start (send-packet! h (list me 'asserted assertion))) (during (mcds-outbound assertion)
(on-stop (send-packet! h (list me 'retracted assertion))) (define (refresh!) (send-packet! h (list me 'asserted assertion)))
(when (observe? assertion) (on-start (refresh!))
(on-stop (send-packet! h (list me 'retracted assertion)))
(field [deadline (+ (current-inexact-milliseconds) *assertion-refresh*)]) (field [deadline (+ (current-inexact-milliseconds) *assertion-refresh*)])
(on (asserted (later-than (deadline))) (on (asserted (later-than (deadline)))
(send-packet! h (list me 'asserted assertion)) (refresh!)
(deadline (+ (deadline) *assertion-refresh*)))) (deadline (+ (deadline) *assertion-refresh*)))
(on (message (mcds-change _ 'asserted (observe $pattern)))
(term-intersect assertion pattern (on (asserted (mcds-relevant assertion $peer))
(lambda (intersection) ;; (log-mcds-info "Peer ~a cares about outbound assertion ~v" peer assertion)
(send-packet! h (list me 'asserted assertion))) (define soon (+ (current-inexact-milliseconds) 100))
void))) (when (> (deadline) soon) (deadline soon)))))
(on (message (mcds-change $peer 'message $body)) (on (message (mcds-change $peer 'message $body))
(send! (mcds-inbound body))) (send! (mcds-inbound body)))