#lang imperative-syndicate (provide (struct-out mcds-inbound) (struct-out mcds-outbound)) (require/activate imperative-syndicate/drivers/timer) (require/activate imperative-syndicate/drivers/udp) (require racket/random file/sha1) (require imperative-syndicate/term) (define-logger mcds) (struct mcds-inbound (assertion) #:prefab) (struct mcds-outbound (assertion) #:prefab) (struct mcds-change (peer type assertion) #:transparent) (struct mcds-demand () #:transparent) (define group-address "239.192.57.49") ;; falls within Organization Local Scope (see RFC 2365) (define group-port 5999) ;; make sure your firewall is open to UDP on this port (define group-target (udp-remote-address group-address group-port)) (define *assertion-lifetime* 5000) (define *assertion-refresh* (* 0.9 *assertion-lifetime*)) (define (send-packet! h packet) (send! (udp-packet h group-target (string->bytes/utf-8 (format "~s" packet))))) (spawn (during (observe (mcds-inbound _)) (assert (mcds-demand))) (during (mcds-outbound _) (assert (mcds-demand))) (during/spawn (mcds-demand) (define me (bytes->hex-string (crypto-random-bytes 8))) (define h (udp-listener group-port)) (during h (assert (udp-multicast-group-member h group-address #f)) (assert (udp-multicast-loopback h #t)) (define report-period 10000) (field [packet-count 0] [byte-count 0] [reset-time (+ (current-inexact-milliseconds) report-period)]) (on (asserted (later-than (reset-time))) (reset-time (+ (reset-time) report-period)) (log-mcds-info "~a packets, ~a bytes received in ~a ms = ~a Hz, ~a bytes/s" (packet-count) (byte-count) report-period (/ (packet-count) (/ report-period 1000.0)) (/ (byte-count) (/ report-period 1000.0))) (packet-count 0) (byte-count 0)) (on (message (udp-packet _ h $body)) (packet-count (+ (packet-count) 1)) (byte-count (+ (byte-count) (bytes-length body))) (spawn* (match (read (open-input-string (bytes->string/utf-8 body))) [(list peer type assertion) (send! (mcds-change peer type assertion))]))) (on (message (mcds-change $peer 'asserted $assertion)) (spawn (define expiry (+ (current-inexact-milliseconds) *assertion-lifetime*)) (assert (mcds-inbound assertion)) (stop-when (message (mcds-change peer 'retracted assertion))) (stop-when (asserted (later-than expiry))) (stop-when (retracted (mcds-demand))))) (during (observe (mcds-inbound $pattern)) (assert (mcds-outbound (observe pattern)))) (during (mcds-outbound $assertion) (on-start (send-packet! h (list me 'asserted assertion))) (on-stop (send-packet! h (list me 'retracted assertion))) (when (observe? assertion) (field [deadline (+ (current-inexact-milliseconds) *assertion-refresh*)]) (on (asserted (later-than (deadline))) (send-packet! h (list me 'asserted assertion)) (deadline (+ (deadline) *assertion-refresh*)))) (on (message (mcds-change _ 'asserted (observe $pattern))) (term-intersect assertion pattern (lambda (intersection) (send-packet! h (list me 'asserted assertion))) void))) (on (message (mcds-change $peer 'message $body)) (send! (mcds-inbound body))) (on (message (mcds-outbound $body)) (send-packet! h (list me 'message body))))))