diff --git a/syndicate/drivers/udp.rkt b/syndicate/drivers/udp.rkt new file mode 100644 index 0000000..28a1235 --- /dev/null +++ b/syndicate/drivers/udp.rkt @@ -0,0 +1,98 @@ +#lang imperative-syndicate + +(provide (struct-out udp-remote-address) + (struct-out udp-handle) + (struct-out udp-listener) + (struct-out udp-multicast-group-member) + (struct-out udp-multicast-loopback) + udp-address? + udp-local-address? + (struct-out udp-packet)) + +(require (prefix-in udp: racket/udp)) + +;; A UdpAddress is one of +;; -- a (udp-remote-address String Uint16), representing a remote socket +;; -- a (udp-handle Any), representing a local socket on a kernel-assigned port +;; -- a (udp-listener Uint16), representing a local socket on a user-assigned port +;; Note that udp-handle-ids must be chosen carefully: they are scoped +;; to the local dataspace, i.e. shared between processes in that +;; dataspace, so processes must make sure not to accidentally clash in +;; handle ID selection. +(struct udp-remote-address (host port) #:prefab) +(struct udp-handle (id) #:prefab) +(struct udp-listener (port) #:prefab) + +(define (udp-address? x) + (or (udp-remote-address? x) + (udp-local-address? x))) + +(define (udp-local-address? x) + (or (udp-handle? x) + (udp-listener? x))) + +;; A UdpMembership is a (udp-multicast-group-member UdpLocalAddress String String), +;; where the latter two arguments correspond to the last two arguments +;; of `udp-multicast-join-group!`. +(assertion-struct udp-multicast-group-member (local-address group-address interface)) + +;; A UdpLoopback is a (udp-multicast-loopback UdpLocalAddress Boolean). +(assertion-struct udp-multicast-loopback (local-address enabled?)) + +;; A UdpPacket is a (udp-packet UdpAddress UdpAddress Bytes), and +;; represents a packet appearing on our local "subnet" of the full UDP +;; network, complete with source, destination and contents. +(message-struct udp-packet (source destination body)) + +(spawn #:name 'udp-driver + (during/spawn (observe ($ local-addr (udp-listener _))) + #:name local-addr + (udp-main local-addr)) + (during/spawn (observe ($ local-addr (udp-handle _))) + #:name local-addr + (udp-main local-addr))) + +;; UdpLocalAddress -> Void +(define (udp-main local-addr) + (define socket (udp:udp-open-socket #f #f)) + + (match local-addr + [(udp-listener port) (udp:udp-bind! socket #f port #t)] + [(udp-handle _) (udp:udp-bind! socket #f 0)]) ;; kernel-allocated port number + + (define control-ch (make-channel)) + (thread (lambda () (udp-receiver-thread local-addr socket control-ch))) + (signal-background-activity! +1) + (on-stop (channel-put control-ch 'quit)) + + (assert local-addr) + (stop-when (retracted (observe local-addr))) + + (during (udp-multicast-group-member local-addr $group $interface) + (on-start (udp:udp-multicast-join-group! socket group interface)) + (on-stop (udp:udp-multicast-leave-group! socket group interface))) + + (on (asserted (udp-multicast-loopback local-addr $enabled)) + (udp:udp-multicast-set-loopback! socket enabled)) + + (on (message (inbound ($ p (udp-packet _ local-addr _)))) + (send! p)) + + (on (message (udp-packet local-addr (udp-remote-address $h $p) $body)) + (udp:udp-send-to* socket h p body))) + +;; UdpLocalAddress UdpSocket Channel -> Void +(define (udp-receiver-thread local-addr socket control-ch) + (define buffer (make-bytes 65536)) + (let loop () + (sync (handle-evt control-ch (match-lambda ['quit (void)])) + (handle-evt (udp:udp-receive!-evt socket buffer) + (lambda (receive-results) + (match-define (list len source-hostname source-port) receive-results) + (ground-send! + (udp-packet (udp-remote-address source-hostname source-port) + local-addr + (subbytes buffer 0 len))) + (loop))))) + (udp:udp-close socket) + (signal-background-activity! -1)) diff --git a/syndicate/examples/udp-echo.rkt b/syndicate/examples/udp-echo.rkt new file mode 100644 index 0000000..da3bb8d --- /dev/null +++ b/syndicate/examples/udp-echo.rkt @@ -0,0 +1,10 @@ +#lang imperative-syndicate + +(require/activate imperative-syndicate/drivers/udp) + +(spawn (define s (udp-listener 5999)) + (during s + (on (message (udp-packet $c s $body)) + (printf "~a: ~v\n" c body) + (define reply (string->bytes/utf-8 (format "You said: ~a" body))) + (send! (udp-packet s c reply))))) diff --git a/syndicate/examples/udp-multicast.rkt b/syndicate/examples/udp-multicast.rkt new file mode 100644 index 0000000..8173dc4 --- /dev/null +++ b/syndicate/examples/udp-multicast.rkt @@ -0,0 +1,38 @@ +#lang imperative-syndicate + +(require/activate imperative-syndicate/drivers/timer) +(require/activate imperative-syndicate/drivers/udp) +(require racket/random file/sha1) + +;; IANA offers guidelines for choosing multicast addresses [1]. +;; +;; Reasonable candidates for local experimentation include: +;; - 224.0.1.20, "any private experiment" +;; - 233.252.0.0 - 233.252.0.255, "MCAST-TEST-NET", for examples and documentation (only) +;; +;; For production and semi-production use, registering an address may +;; be an option; failing that, the Administratively Scoped Block +;; (239/8; see RFC 2365) may be used: +;; - 239.255.0.0 - 239.255.255.255, "IPv4 Local Scope" +;; - 239.192.0.0 - 239.195.255.255, "Organization Local Scope" +;; +;; [1] http://www.iana.org/assignments/multicast-addresses/ + +(define group-address "233.252.0.101") ;; falls within MCAST-TEST-NET +(define group-port 5999) ;; make sure your firewall is open to UDP on this port + +(spawn (define me (bytes->hex-string (crypto-random-bytes 8))) + (define h (udp-listener group-port)) + (during h + (assert (udp-multicast-group-member h group-address #f)) + (assert (udp-multicast-loopback h #t)) + + (field [deadline (current-inexact-milliseconds)]) + (on (asserted (later-than (deadline))) + (send! (udp-packet h + (udp-remote-address group-address group-port) + (string->bytes/utf-8 (format "~a ~a" me (deadline))))) + (deadline (+ (deadline) 1000))) + + (on (message (udp-packet $source h $body)) + (printf "~a: ~a\n" source body))))