#lang imperative-syndicate (provide (struct-out mcds-inbound) (struct-out mcds-outbound)) (require/activate imperative-syndicate/drivers/timer) (require/activate imperative-syndicate/drivers/udp) (require racket/random file/sha1) (require imperative-syndicate/skeleton) (require imperative-syndicate/term) (require preserves) (define-logger mcds) (struct mcds-inbound (assertion) #:prefab) (struct mcds-outbound (assertion) #:prefab) (struct mcds-change (peer type assertion) #:transparent) (struct mcds-demand () #:transparent) (struct mcds-relevant (assertion peer) #:transparent) (define group-address "239.192.57.49") ;; falls within Organization Local Scope (see RFC 2365) (define group-port 5999) ;; make sure your firewall is open to UDP on this port (define group-target (udp-remote-address group-address group-port)) (define *assertion-lifetime* 30000) (define *assertion-refresh* (* 0.9 *assertion-lifetime*)) (define (send-packet! h packet) (send! (udp-packet h group-target (encode packet)))) (define (packet-statistics h) (define report-period 10000) (field [packet-count 0] [byte-count 0] [reset-time (+ (current-inexact-milliseconds) report-period)]) (on (message (udp-packet _ h $body)) (packet-count (+ (packet-count) 1)) (byte-count (+ (byte-count) (bytes-length body)))) (on (asserted (later-than (reset-time))) (reset-time (+ (reset-time) report-period)) (log-mcds-info "~a packets, ~a bytes received in ~a ms = ~a Hz, ~a bytes/s" (packet-count) (byte-count) report-period (/ (packet-count) (/ report-period 1000.0)) (/ (byte-count) (/ report-period 1000.0))) (packet-count 0) (byte-count 0))) (spawn (during (observe (mcds-inbound _)) (assert (mcds-demand))) (during (mcds-outbound _) (assert (mcds-demand))) (during/spawn (mcds-demand) (define me (crypto-random-bytes 8)) (define h (udp-listener group-port)) (during h (assert (udp-multicast-group-member h group-address #f)) (assert (udp-multicast-loopback h #t)) (packet-statistics h) (on (message (udp-packet _ h $body)) (spawn* ;; (log-mcds-info "received: ~v" body) (match (decode body) [(list peer type assertion) ;; (log-mcds-info "~v ~v ~v" peer type assertion) (send! (mcds-change peer type assertion))]))) (on (message (mcds-change $peer '+ $assertion)) (spawn (define expiry (+ (current-inexact-milliseconds) *assertion-lifetime*)) (assert (mcds-inbound assertion)) (when (observe? assertion) (define pattern (observe-specification assertion)) (define x (mcds-outbound pattern)) (define i (skeleton-interest (term->skeleton x) (term->skeleton-proj x) (term->key x) (term->capture-proj x) (lambda (op . captured-values) (when (eq? op '+) (define term (instantiate-term->value pattern captured-values)) (schedule-script! (current-actor) (lambda () (assert! (mcds-relevant term peer)))))) #f)) (add-endpoint! (current-facet) "udp-dataspace (mcds-inbound (observe ...))" #t (lambda () (values (observe x) i)))) (stop-when (message (mcds-change peer '- assertion))) (stop-when (asserted (later-than expiry))) (stop-when (retracted (mcds-demand))))) (during (observe (mcds-inbound $pattern)) (assert (mcds-relevant (observe pattern) me)) (assert (mcds-outbound (observe pattern)))) (during (mcds-relevant $assertion _) (during (mcds-outbound assertion) (define (refresh!) (send-packet! h (list me '+ assertion))) (on-start (refresh!)) (on-stop (send-packet! h (list me '- assertion))) (field [deadline (+ (current-inexact-milliseconds) *assertion-refresh*)]) (on (asserted (later-than (deadline))) (refresh!) (deadline (+ (deadline) *assertion-refresh*))) (on (asserted (mcds-relevant assertion $peer)) ;; (log-mcds-info "Peer ~a cares about outbound assertion ~v" peer assertion) (define soon (+ (current-inexact-milliseconds) 100)) (when (> (deadline) soon) (deadline soon))))) (on (message (mcds-change $peer '! $body)) (send! (mcds-inbound body))) (on (message (mcds-outbound $body)) (send-packet! h (list me '! body))))))