124 lines
5.0 KiB
Racket
124 lines
5.0 KiB
Racket
;;; SPDX-License-Identifier: LGPL-3.0-or-later
|
|
;;; SPDX-FileCopyrightText: Copyright © 2010-2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
|
|
|
|
#lang syndicate
|
|
|
|
(provide (struct-out mcds-inbound)
|
|
(struct-out mcds-outbound))
|
|
|
|
(require/activate syndicate/drivers/timer)
|
|
(require/activate syndicate/drivers/udp)
|
|
(require racket/random file/sha1)
|
|
(require syndicate/skeleton)
|
|
(require syndicate/term)
|
|
(require preserves)
|
|
|
|
(define-logger mcds)
|
|
|
|
(struct mcds-inbound (assertion) #:prefab)
|
|
(struct mcds-outbound (assertion) #:prefab)
|
|
|
|
(struct mcds-change (peer type assertion) #:transparent)
|
|
(struct mcds-demand () #:transparent)
|
|
|
|
(struct mcds-relevant (assertion peer) #:transparent)
|
|
|
|
(define group-address "239.192.57.49") ;; falls within Organization Local Scope (see RFC 2365)
|
|
(define group-port 5999) ;; make sure your firewall is open to UDP on this port
|
|
(define group-target (udp-remote-address group-address group-port))
|
|
|
|
(define *assertion-lifetime* 30000)
|
|
(define *assertion-refresh* (* 0.9 *assertion-lifetime*))
|
|
|
|
(define (send-packet! h packet)
|
|
(send! (udp-packet h group-target (preserve->bytes packet))))
|
|
|
|
(define (packet-statistics h)
|
|
(define report-period 10000)
|
|
(field [packet-count 0]
|
|
[byte-count 0]
|
|
[reset-time (+ (current-inexact-milliseconds) report-period)])
|
|
(on (message (udp-packet _ h $body))
|
|
(packet-count (+ (packet-count) 1))
|
|
(byte-count (+ (byte-count) (bytes-length body))))
|
|
(on (asserted (later-than (reset-time)))
|
|
(reset-time (+ (reset-time) report-period))
|
|
(log-mcds-info "~a packets, ~a bytes received in ~a ms = ~a Hz, ~a bytes/s"
|
|
(packet-count)
|
|
(byte-count)
|
|
report-period
|
|
(/ (packet-count) (/ report-period 1000.0))
|
|
(/ (byte-count) (/ report-period 1000.0)))
|
|
(packet-count 0)
|
|
(byte-count 0)))
|
|
|
|
(spawn (during (observe (mcds-inbound _)) (assert (mcds-demand)))
|
|
(during (mcds-outbound _) (assert (mcds-demand)))
|
|
|
|
(during/spawn (mcds-demand)
|
|
(define me (crypto-random-bytes 8))
|
|
(define h (udp-listener group-port))
|
|
(during h
|
|
(assert (udp-multicast-group-member h group-address #f))
|
|
(assert (udp-multicast-loopback h #t))
|
|
|
|
(packet-statistics h)
|
|
|
|
(on (message (udp-packet _ h $body))
|
|
(spawn*
|
|
;; (log-mcds-info "received: ~v" body)
|
|
(match (bytes->preserve body)
|
|
[(list peer type assertion)
|
|
;; (log-mcds-info "~v ~v ~v" peer type assertion)
|
|
(send! (mcds-change peer type assertion))])))
|
|
|
|
(on (message (mcds-change $peer '+ $assertion))
|
|
(spawn
|
|
(define expiry (+ (current-inexact-milliseconds) *assertion-lifetime*))
|
|
(assert (mcds-inbound assertion))
|
|
|
|
(when (observe? assertion)
|
|
(define pattern (observe-specification assertion))
|
|
(define x (mcds-outbound pattern))
|
|
(add-observer-endpoint!
|
|
(lambda () x)
|
|
#:on-add
|
|
(lambda (captured-values)
|
|
;; TODO: flawed?? Needs visibility-restriction, or some other way of
|
|
;; ignoring the opaque placeholders!
|
|
(assert! (mcds-relevant (instantiate-term->value pattern
|
|
captured-values
|
|
#:visibility-restriction-proj
|
|
#f)
|
|
peer)))))
|
|
|
|
(stop-when (message (mcds-change peer '- assertion)))
|
|
(stop-when (asserted (later-than expiry)))
|
|
(stop-when (retracted (mcds-demand)))))
|
|
|
|
(during (observe (mcds-inbound $pattern))
|
|
(assert (mcds-relevant (observe pattern) me))
|
|
(assert (mcds-outbound (observe pattern))))
|
|
|
|
(during (mcds-relevant $assertion _)
|
|
(during (mcds-outbound assertion)
|
|
(define (refresh!) (send-packet! h (list me '+ assertion)))
|
|
(on-start (refresh!))
|
|
(on-stop (send-packet! h (list me '- assertion)))
|
|
|
|
(field [deadline (+ (current-inexact-milliseconds) *assertion-refresh*)])
|
|
(on (asserted (later-than (deadline)))
|
|
(refresh!)
|
|
(deadline (+ (deadline) *assertion-refresh*)))
|
|
|
|
(on (asserted (mcds-relevant assertion $peer))
|
|
;; (log-mcds-info "Peer ~a cares about outbound assertion ~v" peer assertion)
|
|
(define soon (+ (current-inexact-milliseconds) 100))
|
|
(when (> (deadline) soon) (deadline soon)))))
|
|
|
|
(on (message (mcds-change $peer '! $body))
|
|
(send! (mcds-inbound body)))
|
|
|
|
(on (message (mcds-outbound $body))
|
|
(send-packet! h (list me '! body))))))
|