diff --git a/syndicate/mc/udp-dataspace.rkt b/syndicate/mc/udp-dataspace.rkt index 9822eff..0f482a0 100644 --- a/syndicate/mc/udp-dataspace.rkt +++ b/syndicate/mc/udp-dataspace.rkt @@ -6,6 +6,7 @@ (require/activate imperative-syndicate/drivers/timer) (require/activate imperative-syndicate/drivers/udp) (require racket/random file/sha1) +(require imperative-syndicate/skeleton) (require imperative-syndicate/term) (define-logger mcds) @@ -16,16 +17,37 @@ (struct mcds-change (peer type assertion) #:transparent) (struct mcds-demand () #:transparent) +(struct mcds-relevant (assertion peer) #:transparent) + (define group-address "239.192.57.49") ;; falls within Organization Local Scope (see RFC 2365) (define group-port 5999) ;; make sure your firewall is open to UDP on this port (define group-target (udp-remote-address group-address group-port)) -(define *assertion-lifetime* 5000) +(define *assertion-lifetime* 30000) (define *assertion-refresh* (* 0.9 *assertion-lifetime*)) (define (send-packet! h packet) (send! (udp-packet h group-target (string->bytes/utf-8 (format "~s" packet))))) +(define (packet-statistics h) + (define report-period 10000) + (field [packet-count 0] + [byte-count 0] + [reset-time (+ (current-inexact-milliseconds) report-period)]) + (on (message (udp-packet _ h $body)) + (packet-count (+ (packet-count) 1)) + (byte-count (+ (byte-count) (bytes-length body)))) + (on (asserted (later-than (reset-time))) + (reset-time (+ (reset-time) report-period)) + (log-mcds-info "~a packets, ~a bytes received in ~a ms = ~a Hz, ~a bytes/s" + (packet-count) + (byte-count) + report-period + (/ (packet-count) (/ report-period 1000.0)) + (/ (byte-count) (/ report-period 1000.0))) + (packet-count 0) + (byte-count 0))) + (spawn (during (observe (mcds-inbound _)) (assert (mcds-demand))) (during (mcds-outbound _) (assert (mcds-demand))) @@ -36,51 +58,63 @@ (assert (udp-multicast-group-member h group-address #f)) (assert (udp-multicast-loopback h #t)) - (define report-period 10000) - (field [packet-count 0] - [byte-count 0] - [reset-time (+ (current-inexact-milliseconds) report-period)]) - (on (asserted (later-than (reset-time))) - (reset-time (+ (reset-time) report-period)) - (log-mcds-info "~a packets, ~a bytes received in ~a ms = ~a Hz, ~a bytes/s" - (packet-count) - (byte-count) - report-period - (/ (packet-count) (/ report-period 1000.0)) - (/ (byte-count) (/ report-period 1000.0))) - (packet-count 0) - (byte-count 0)) + (packet-statistics h) (on (message (udp-packet _ h $body)) - (packet-count (+ (packet-count) 1)) - (byte-count (+ (byte-count) (bytes-length body))) (spawn* (match (read (open-input-string (bytes->string/utf-8 body))) [(list peer type assertion) + ;; (log-mcds-info "~v ~v ~v" peer type assertion) (send! (mcds-change peer type assertion))]))) (on (message (mcds-change $peer 'asserted $assertion)) - (spawn (define expiry (+ (current-inexact-milliseconds) *assertion-lifetime*)) - (assert (mcds-inbound assertion)) - (stop-when (message (mcds-change peer 'retracted assertion))) - (stop-when (asserted (later-than expiry))) - (stop-when (retracted (mcds-demand))))) + (spawn + (define expiry (+ (current-inexact-milliseconds) *assertion-lifetime*)) + (assert (mcds-inbound assertion)) + + (when (observe? assertion) + (define pattern (observe-specification assertion)) + (define x (mcds-outbound pattern)) + (define i (skeleton-interest + (term->skeleton x) + (term->skeleton-proj x) + (term->key x) + (term->capture-proj x) + (lambda (op . captured-values) + (when (eq? op '+) + (define term (instantiate-term->value pattern captured-values)) + (schedule-script! + (current-actor) + (lambda () + (assert! (mcds-relevant term peer)))))) + void)) + (add-endpoint! (current-facet) + "udp-dataspace (mcds-inbound (observe ...))" + #t + (lambda () (values (observe x) i)))) + + (stop-when (message (mcds-change peer 'retracted assertion))) + (stop-when (asserted (later-than expiry))) + (stop-when (retracted (mcds-demand))))) (during (observe (mcds-inbound $pattern)) + (assert (mcds-relevant (observe pattern) me)) (assert (mcds-outbound (observe pattern)))) - (during (mcds-outbound $assertion) - (on-start (send-packet! h (list me 'asserted assertion))) - (on-stop (send-packet! h (list me 'retracted assertion))) - (when (observe? assertion) + (during (mcds-relevant $assertion _) + (during (mcds-outbound assertion) + (define (refresh!) (send-packet! h (list me 'asserted assertion))) + (on-start (refresh!)) + (on-stop (send-packet! h (list me 'retracted assertion))) + (field [deadline (+ (current-inexact-milliseconds) *assertion-refresh*)]) (on (asserted (later-than (deadline))) - (send-packet! h (list me 'asserted assertion)) - (deadline (+ (deadline) *assertion-refresh*)))) - (on (message (mcds-change _ 'asserted (observe $pattern))) - (term-intersect assertion pattern - (lambda (intersection) - (send-packet! h (list me 'asserted assertion))) - void))) + (refresh!) + (deadline (+ (deadline) *assertion-refresh*))) + + (on (asserted (mcds-relevant assertion $peer)) + ;; (log-mcds-info "Peer ~a cares about outbound assertion ~v" peer assertion) + (define soon (+ (current-inexact-milliseconds) 100)) + (when (> (deadline) soon) (deadline soon))))) (on (message (mcds-change $peer 'message $body)) (send! (mcds-inbound body)))