Multicast-UDP-based dataspace sketch, from last night
This commit is contained in:
parent
e9457af8c2
commit
76674c77b3
|
@ -0,0 +1,25 @@
|
|||
#lang imperative-syndicate
|
||||
|
||||
(require/activate "udp-dataspace.rkt")
|
||||
(require/activate imperative-syndicate/drivers/external-event)
|
||||
(require (only-in racket/port read-bytes-line-evt))
|
||||
(require racket/random file/sha1)
|
||||
|
||||
(message-struct speak (who what))
|
||||
(assertion-struct present (who))
|
||||
|
||||
(spawn (define me (bytes->hex-string (crypto-random-bytes 8)))
|
||||
(define stdin-evt (read-bytes-line-evt (current-input-port) 'any))
|
||||
|
||||
(assert (mcds-outbound (present me)))
|
||||
|
||||
(on (message (inbound (external-event stdin-evt (list $line))))
|
||||
(if (eof-object? line)
|
||||
(stop-current-facet)
|
||||
(send! (mcds-outbound (speak me line)))))
|
||||
|
||||
(during (mcds-inbound (present $user))
|
||||
(on-start (printf "~a arrived\n" user))
|
||||
(on-stop (printf "~a left\n" user))
|
||||
(on (message (mcds-inbound (speak user $text)))
|
||||
(printf "~a says '~a'\n" user text))))
|
|
@ -0,0 +1,89 @@
|
|||
#lang imperative-syndicate
|
||||
|
||||
(provide (struct-out mcds-inbound)
|
||||
(struct-out mcds-outbound))
|
||||
|
||||
(require/activate imperative-syndicate/drivers/timer)
|
||||
(require/activate imperative-syndicate/drivers/udp)
|
||||
(require racket/random file/sha1)
|
||||
(require imperative-syndicate/term)
|
||||
|
||||
(define-logger mcds)
|
||||
|
||||
(struct mcds-inbound (assertion) #:prefab)
|
||||
(struct mcds-outbound (assertion) #:prefab)
|
||||
|
||||
(struct mcds-change (peer type assertion) #:transparent)
|
||||
(struct mcds-demand () #:transparent)
|
||||
|
||||
(define group-address "239.192.57.49") ;; falls within Organization Local Scope (see RFC 2365)
|
||||
(define group-port 5999) ;; make sure your firewall is open to UDP on this port
|
||||
(define group-target (udp-remote-address group-address group-port))
|
||||
|
||||
(define *assertion-lifetime* 5000)
|
||||
(define *assertion-refresh* (* 0.9 *assertion-lifetime*))
|
||||
|
||||
(define (send-packet! h packet)
|
||||
(send! (udp-packet h group-target (string->bytes/utf-8 (format "~s" packet)))))
|
||||
|
||||
(spawn (during (observe (mcds-inbound _)) (assert (mcds-demand)))
|
||||
(during (mcds-outbound _) (assert (mcds-demand)))
|
||||
|
||||
(during/spawn (mcds-demand)
|
||||
(define me (bytes->hex-string (crypto-random-bytes 8)))
|
||||
(define h (udp-listener group-port))
|
||||
(during h
|
||||
(assert (udp-multicast-group-member h group-address #f))
|
||||
(assert (udp-multicast-loopback h #t))
|
||||
|
||||
(define report-period 10000)
|
||||
(field [packet-count 0]
|
||||
[byte-count 0]
|
||||
[reset-time (+ (current-inexact-milliseconds) report-period)])
|
||||
(on (asserted (later-than (reset-time)))
|
||||
(reset-time (+ (reset-time) report-period))
|
||||
(log-mcds-info "~a packets, ~a bytes received in ~a ms = ~a Hz, ~a bytes/s"
|
||||
(packet-count)
|
||||
(byte-count)
|
||||
report-period
|
||||
(/ (packet-count) (/ report-period 1000.0))
|
||||
(/ (byte-count) (/ report-period 1000.0)))
|
||||
(packet-count 0)
|
||||
(byte-count 0))
|
||||
|
||||
(on (message (udp-packet _ h $body))
|
||||
(packet-count (+ (packet-count) 1))
|
||||
(byte-count (+ (byte-count) (bytes-length body)))
|
||||
(spawn* (match (read (open-input-string (bytes->string/utf-8 body)))
|
||||
[(list peer type assertion)
|
||||
(send! (mcds-change peer type assertion))])))
|
||||
|
||||
(on (message (mcds-change $peer 'asserted $assertion))
|
||||
(spawn (define expiry (+ (current-inexact-milliseconds) *assertion-lifetime*))
|
||||
(assert (mcds-inbound assertion))
|
||||
(stop-when (message (mcds-change peer 'retracted assertion)))
|
||||
(stop-when (asserted (later-than expiry)))
|
||||
(stop-when (retracted (mcds-demand)))))
|
||||
|
||||
(during (observe (mcds-inbound $pattern))
|
||||
(assert (mcds-outbound (observe pattern))))
|
||||
|
||||
(during (mcds-outbound $assertion)
|
||||
(on-start (send-packet! h (list me 'asserted assertion)))
|
||||
(on-stop (send-packet! h (list me 'retracted assertion)))
|
||||
(when (observe? assertion)
|
||||
(field [deadline (+ (current-inexact-milliseconds) *assertion-refresh*)])
|
||||
(on (asserted (later-than (deadline)))
|
||||
(send-packet! h (list me 'asserted assertion))
|
||||
(deadline (+ (deadline) *assertion-refresh*))))
|
||||
(on (message (mcds-change _ 'asserted (observe $pattern)))
|
||||
(term-intersect assertion pattern
|
||||
(lambda (intersection)
|
||||
(send-packet! h (list me 'asserted assertion)))
|
||||
void)))
|
||||
|
||||
(on (message (mcds-change $peer 'message $body))
|
||||
(send! (mcds-inbound body)))
|
||||
|
||||
(on (message (mcds-outbound $body))
|
||||
(send-packet! h (list me 'message body))))))
|
Loading…
Reference in New Issue