From 76674c77b378f2639c3939f8d624997bb9cba1d0 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Tue, 14 Aug 2018 12:35:56 +0100 Subject: [PATCH] Multicast-UDP-based dataspace sketch, from last night --- syndicate/mc/mc-chat-client.rkt | 25 +++++++++ syndicate/mc/udp-dataspace.rkt | 89 +++++++++++++++++++++++++++++++++ 2 files changed, 114 insertions(+) create mode 100644 syndicate/mc/mc-chat-client.rkt create mode 100644 syndicate/mc/udp-dataspace.rkt diff --git a/syndicate/mc/mc-chat-client.rkt b/syndicate/mc/mc-chat-client.rkt new file mode 100644 index 0000000..f7f75a5 --- /dev/null +++ b/syndicate/mc/mc-chat-client.rkt @@ -0,0 +1,25 @@ +#lang imperative-syndicate + +(require/activate "udp-dataspace.rkt") +(require/activate imperative-syndicate/drivers/external-event) +(require (only-in racket/port read-bytes-line-evt)) +(require racket/random file/sha1) + +(message-struct speak (who what)) +(assertion-struct present (who)) + +(spawn (define me (bytes->hex-string (crypto-random-bytes 8))) + (define stdin-evt (read-bytes-line-evt (current-input-port) 'any)) + + (assert (mcds-outbound (present me))) + + (on (message (inbound (external-event stdin-evt (list $line)))) + (if (eof-object? line) + (stop-current-facet) + (send! (mcds-outbound (speak me line))))) + + (during (mcds-inbound (present $user)) + (on-start (printf "~a arrived\n" user)) + (on-stop (printf "~a left\n" user)) + (on (message (mcds-inbound (speak user $text))) + (printf "~a says '~a'\n" user text)))) diff --git a/syndicate/mc/udp-dataspace.rkt b/syndicate/mc/udp-dataspace.rkt new file mode 100644 index 0000000..9822eff --- /dev/null +++ b/syndicate/mc/udp-dataspace.rkt @@ -0,0 +1,89 @@ +#lang imperative-syndicate + +(provide (struct-out mcds-inbound) + (struct-out mcds-outbound)) + +(require/activate imperative-syndicate/drivers/timer) +(require/activate imperative-syndicate/drivers/udp) +(require racket/random file/sha1) +(require imperative-syndicate/term) + +(define-logger mcds) + +(struct mcds-inbound (assertion) #:prefab) +(struct mcds-outbound (assertion) #:prefab) + +(struct mcds-change (peer type assertion) #:transparent) +(struct mcds-demand () #:transparent) + +(define group-address "239.192.57.49") ;; falls within Organization Local Scope (see RFC 2365) +(define group-port 5999) ;; make sure your firewall is open to UDP on this port +(define group-target (udp-remote-address group-address group-port)) + +(define *assertion-lifetime* 5000) +(define *assertion-refresh* (* 0.9 *assertion-lifetime*)) + +(define (send-packet! h packet) + (send! (udp-packet h group-target (string->bytes/utf-8 (format "~s" packet))))) + +(spawn (during (observe (mcds-inbound _)) (assert (mcds-demand))) + (during (mcds-outbound _) (assert (mcds-demand))) + + (during/spawn (mcds-demand) + (define me (bytes->hex-string (crypto-random-bytes 8))) + (define h (udp-listener group-port)) + (during h + (assert (udp-multicast-group-member h group-address #f)) + (assert (udp-multicast-loopback h #t)) + + (define report-period 10000) + (field [packet-count 0] + [byte-count 0] + [reset-time (+ (current-inexact-milliseconds) report-period)]) + (on (asserted (later-than (reset-time))) + (reset-time (+ (reset-time) report-period)) + (log-mcds-info "~a packets, ~a bytes received in ~a ms = ~a Hz, ~a bytes/s" + (packet-count) + (byte-count) + report-period + (/ (packet-count) (/ report-period 1000.0)) + (/ (byte-count) (/ report-period 1000.0))) + (packet-count 0) + (byte-count 0)) + + (on (message (udp-packet _ h $body)) + (packet-count (+ (packet-count) 1)) + (byte-count (+ (byte-count) (bytes-length body))) + (spawn* (match (read (open-input-string (bytes->string/utf-8 body))) + [(list peer type assertion) + (send! (mcds-change peer type assertion))]))) + + (on (message (mcds-change $peer 'asserted $assertion)) + (spawn (define expiry (+ (current-inexact-milliseconds) *assertion-lifetime*)) + (assert (mcds-inbound assertion)) + (stop-when (message (mcds-change peer 'retracted assertion))) + (stop-when (asserted (later-than expiry))) + (stop-when (retracted (mcds-demand))))) + + (during (observe (mcds-inbound $pattern)) + (assert (mcds-outbound (observe pattern)))) + + (during (mcds-outbound $assertion) + (on-start (send-packet! h (list me 'asserted assertion))) + (on-stop (send-packet! h (list me 'retracted assertion))) + (when (observe? assertion) + (field [deadline (+ (current-inexact-milliseconds) *assertion-refresh*)]) + (on (asserted (later-than (deadline))) + (send-packet! h (list me 'asserted assertion)) + (deadline (+ (deadline) *assertion-refresh*)))) + (on (message (mcds-change _ 'asserted (observe $pattern))) + (term-intersect assertion pattern + (lambda (intersection) + (send-packet! h (list me 'asserted assertion))) + void))) + + (on (message (mcds-change $peer 'message $body)) + (send! (mcds-inbound body))) + + (on (message (mcds-outbound $body)) + (send-packet! h (list me 'message body))))))