racket-matrix-2012/os2-udp.rkt

129 lines
5.0 KiB
Racket

#lang racket/base
;; UDP drivers for os2.rkt
(require racket/set)
(require racket/match)
(require racket/udp)
(require "os2.rkt")
(require "dump-bytes.rkt")
(provide (struct-out udp-address)
(struct-out udp-handle)
(struct-out udp-listener)
(struct-out udp-packet)
udp-driver
udp-spy)
;; A UdpAddress is one of
;; -- a (udp-address String Uint16), representing a remote socket
;; -- a (udp-handle Any), representing a local socket on a kernel-assigned port
;; -- a (udp-listener Uint16), representing a local socket on a user-assigned port
;; Note that udp-handle-ids must be chosen carefully: they are scoped
;; to the local VM, i.e. shared between processes in that VM, so
;; processes must make sure not to accidentally clash in handle ID
;; selection.
(struct udp-address (host port) #:prefab)
(struct udp-handle (id) #:prefab)
(struct udp-listener (port) #:prefab)
;; A UdpPacket is a (udp-packet UdpAddress UdpAddress Bytes), and
;; represents a packet appearing on our local "subnet" of the full UDP
;; network, complete with source, destination and contents.
(struct udp-packet (source destination body) #:prefab)
;; A HandleMapping is a record describing a mapping between a local
;; UdpAddress and its underlying UDP socket. It's private to the
;; implementation of the driver.
(struct handle-mapping (address socket) #:prefab)
;; TODO: BUG?: Routing packets between two local sockets won't work
;; because the patterns aren't set up to recognise that situation.
;; UdpAddress; represents any remote address
(define any-remote (udp-address (wild) (wild)))
;; BootK
;; Process acting as a UDP socket factory.
(define udp-driver
(transition (set)
(role
(set (topic-publisher (udp-packet any-remote (udp-handle (wild)) (wild)) #:monitor? #t)
(topic-publisher (udp-packet any-remote (udp-listener (wild)) (wild)) #:monitor? #t)
(topic-subscriber (udp-packet any-remote (udp-handle (wild)) (wild)) #:monitor? #t)
(topic-subscriber (udp-packet any-remote (udp-listener (wild)) (wild)) #:monitor? #t))
#:state active-handles
#:topic t
#:on-presence (match t
[(topic _ (udp-packet _ local-addr _) #f)
(cond
[(set-member? active-handles local-addr) (transition active-handles)]
[else
(transition (set-add active-handles local-addr)
(spawn (udp-socket-manager local-addr)
#:debug-name (list 'udp-socket local-addr)))])]))
(role (topic-subscriber (handle-mapping (wild) (wild)) #:monitor? #t)
#:state active-handles
#:topic t
#:on-absence (match t
[(topic _ (handle-mapping local-addr socket) _)
(transition (set-remove active-handles local-addr))]))))
(define (bind-socket! s local-addr)
(cond
[(udp-listener? local-addr) (udp-bind! s #f (udp-listener-port local-addr))]
[(udp-handle? local-addr) (udp-bind! s #f 0)]
[else (void)]))
;; UdpAddress -> BootK
(define ((udp-socket-manager local-addr) self-pid)
(define s (udp-open-socket #f #f))
(bind-socket! s local-addr)
(define buffer (make-bytes 65536)) ;; TODO: buffer size control
(transition 'socket-is-open
;; Offers a handle-mapping on the local network so that the
;; driver/factory can clean up when this process dies.
(role (topic-publisher (handle-mapping local-addr s)))
;; If our counterparty removes either of their roles as the
;; subscriber end of the remote-to-local stream or the publisher
;; end of the local-to-remote stream, shut ourselves down. Also,
;; relay messages published on the local-to-remote stream out on
;; the actual socket.
(role (set (topic-publisher (udp-packet any-remote local-addr (wild))) ;; kind of dummy?
(topic-subscriber (udp-packet local-addr any-remote (wild))))
#:state state
#:on-absence (transition 'socket-is-closed
(quit)
(when (eq? state 'socket-is-open)
(spawn (lambda (dummy-pid)
(udp-close s)
(transition 'dummy (quit)))
#:debug-name (list 'udp-socket-closer local-addr))))
[(udp-packet (== local-addr) (udp-address remote-host remote-port) body)
(udp-send-to s remote-host remote-port body)
(transition state)])
;; Listen for messages arriving on the actual socket using a
;; ground event, and relay them at this level.
(role (topic-subscriber (cons (udp-receive!-evt s buffer) (wild)))
#:state state
[(cons (? evt?) (list packet-length remote-host remote-port))
(define packet (subbytes buffer 0 packet-length))
(transition state
(send-message (udp-packet (udp-address remote-host remote-port) local-addr packet)))])))
;; BootK
;; Debugging aid: produces pretty hex dumps of UDP packets sent on
;; this network. Also prints out other messages without special
;; formatting.
(define udp-spy
(transition 'no-state
(role (topic-subscriber (wild) #:monitor? #t)
[(udp-packet source dest body)
(write `(UDP ,source --> ,dest)) (newline)
(dump-bytes! body (bytes-length body))
(void)]
[other
(write `(UDP ,other)) (newline)
(void)])))