Demand-matcher; UDP driver; simple UDP example
This commit is contained in:
parent
356986134f
commit
5d2ec58ef0
|
@ -0,0 +1,144 @@
|
||||||
|
#lang racket/base
|
||||||
|
;; A structure (and process!) for matching supply to demand via observation of interests.
|
||||||
|
|
||||||
|
(require racket/set)
|
||||||
|
(require racket/match)
|
||||||
|
(require "core.rkt")
|
||||||
|
(require (only-in "route.rkt" matcher-project matcher-key-set))
|
||||||
|
(require "drivers/timer.rkt")
|
||||||
|
|
||||||
|
(provide (except-out (struct-out demand-matcher) demand-matcher)
|
||||||
|
(rename-out [make-demand-matcher demand-matcher])
|
||||||
|
demand-matcher-update
|
||||||
|
spawn-demand-matcher
|
||||||
|
on-claim)
|
||||||
|
|
||||||
|
;; A DemandMatcher keeps track of demand for services based on some
|
||||||
|
;; Projection over a Matcher, as well as a collection of functions
|
||||||
|
;; that can be used to increase supply in response to increased
|
||||||
|
;; demand, or handle a sudden drop in supply for which demand still
|
||||||
|
;; exists.
|
||||||
|
(struct demand-matcher (demand-spec ;; CompiledProjection
|
||||||
|
supply-spec ;; CompiledProjection
|
||||||
|
increase-handler ;; ChangeHandler
|
||||||
|
decrease-handler ;; ChangeHandler
|
||||||
|
current-demand ;; (Setof (Listof Any))
|
||||||
|
current-supply) ;; (Setof (Listof Any))
|
||||||
|
#:transparent)
|
||||||
|
|
||||||
|
;; A ChangeHandler is a ((Constreeof Action) Any* -> (Constreeof Action)).
|
||||||
|
;; It is called with an accumulator of actions so-far-computed as its
|
||||||
|
;; first argument, and with a value for each capture in the
|
||||||
|
;; DemandMatcher's projection as the remaining arguments.
|
||||||
|
|
||||||
|
;; ChangeHandler
|
||||||
|
;; Default handler of unexpected supply decrease.
|
||||||
|
(define (default-decrease-handler state . removed-captures)
|
||||||
|
state)
|
||||||
|
|
||||||
|
(define (make-demand-matcher demand-spec supply-spec increase-handler decrease-handler)
|
||||||
|
(demand-matcher demand-spec
|
||||||
|
supply-spec
|
||||||
|
increase-handler
|
||||||
|
decrease-handler
|
||||||
|
(set)
|
||||||
|
(set)))
|
||||||
|
|
||||||
|
;; DemandMatcher (Constreeof Action) Patch -> (Transition DemandMatcher)
|
||||||
|
;; Given a Patch from the environment, projects it into supply and
|
||||||
|
;; demand increase and decrease sets. Calls ChangeHandlers in response
|
||||||
|
;; to increased unsatisfied demand and decreased demanded supply.
|
||||||
|
(define (demand-matcher-update d s p)
|
||||||
|
(match-define (demand-matcher demand-spec supply-spec inc-h dec-h demand supply) d)
|
||||||
|
(define added-demand (matcher-key-set (matcher-project (patch-added p) demand-spec)))
|
||||||
|
(define removed-demand (matcher-key-set (matcher-project (patch-removed p) demand-spec)))
|
||||||
|
(define added-supply (matcher-key-set (matcher-project (patch-added p) supply-spec)))
|
||||||
|
(define removed-supply (matcher-key-set (matcher-project (patch-removed p) supply-spec)))
|
||||||
|
|
||||||
|
(when (not added-demand)
|
||||||
|
(error 'demand-matcher "Wildcard demand of ~v:\n~a"
|
||||||
|
demand-spec
|
||||||
|
(matcher->pretty-string (patch-added p))))
|
||||||
|
(when (not added-supply)
|
||||||
|
(error 'demand-matcher "Wildcard supply of ~v:\n~a"
|
||||||
|
supply-spec
|
||||||
|
(matcher->pretty-string (patch-added p))))
|
||||||
|
|
||||||
|
(set! supply (set-union supply added-supply))
|
||||||
|
(set! demand (set-subtract demand removed-demand))
|
||||||
|
|
||||||
|
(for [(captures (in-set removed-supply))]
|
||||||
|
(when (set-member? demand captures) (set! s (apply dec-h s captures))))
|
||||||
|
(for [(captures (in-set added-demand))]
|
||||||
|
(when (not (set-member? supply captures)) (set! s (apply inc-h s captures))))
|
||||||
|
|
||||||
|
(set! supply (set-subtract supply removed-supply))
|
||||||
|
(set! demand (set-union demand added-demand))
|
||||||
|
|
||||||
|
(transition (struct-copy demand-matcher d [current-demand demand] [current-supply supply]) s))
|
||||||
|
|
||||||
|
;; Behavior :> (Option Event) DemandMatcher -> (Transition DemandMatcher)
|
||||||
|
;; Handles events from the environment. Only cares about routing-updates.
|
||||||
|
(define (demand-matcher-handle-event e d)
|
||||||
|
(match e
|
||||||
|
[(? patch? p)
|
||||||
|
(demand-matcher-update d '() p)]
|
||||||
|
[_ #f]))
|
||||||
|
|
||||||
|
;; Any* -> (Constreeof Action)
|
||||||
|
;; Default handler of unexpected supply decrease.
|
||||||
|
;; Ignores the situation.
|
||||||
|
(define (unexpected-supply-decrease . removed-captures)
|
||||||
|
'())
|
||||||
|
|
||||||
|
;; Projection Projection (Any* -> (Constreeof Action)) [(Any* -> (Constreeof Action))] -> Action
|
||||||
|
;; Spawns a demand matcher actor.
|
||||||
|
(define (spawn-demand-matcher demand-spec
|
||||||
|
supply-spec
|
||||||
|
increase-handler
|
||||||
|
[decrease-handler unexpected-supply-decrease]
|
||||||
|
#:meta-level [meta-level 0])
|
||||||
|
(define d (make-demand-matcher (compile-projection (prepend-at-meta demand-spec meta-level))
|
||||||
|
(compile-projection (prepend-at-meta supply-spec meta-level))
|
||||||
|
(lambda (acs . rs) (cons (apply increase-handler rs) acs))
|
||||||
|
(lambda (acs . rs) (cons (apply decrease-handler rs) acs))))
|
||||||
|
(spawn demand-matcher-handle-event
|
||||||
|
d
|
||||||
|
(sub (projection->pattern demand-spec) #:meta-level meta-level)
|
||||||
|
(sub (projection->pattern supply-spec) #:meta-level meta-level)
|
||||||
|
(pub (projection->pattern supply-spec) #:meta-level meta-level)))
|
||||||
|
|
||||||
|
;; (Matcher (Option (Setof (Listof Value))) ... -> (Option (Constreeof Action)))
|
||||||
|
;; Matcher Projection ...
|
||||||
|
;; -> Action
|
||||||
|
;; Spawns a process that observes the given projections. Any time the
|
||||||
|
;; environment's interests change in a relevant way, calls
|
||||||
|
;; check-and-maybe-spawn-fn with the aggregate interests and the
|
||||||
|
;; projection results. If check-and-maybe-spawn-fn returns #f,
|
||||||
|
;; continues to wait; otherwise, takes the action(s) returned, and
|
||||||
|
;; quits.
|
||||||
|
(define (on-claim #:timeout-msec [timeout-msec #f]
|
||||||
|
#:on-timeout [timeout-handler (lambda () '())]
|
||||||
|
check-and-maybe-spawn-fn
|
||||||
|
base-interests
|
||||||
|
. projections)
|
||||||
|
(define timer-id (gensym 'on-claim))
|
||||||
|
(list
|
||||||
|
(when timeout-msec (message (set-timer timer-id timeout-msec 'relative)))
|
||||||
|
(spawn (lambda (e current-aggregate)
|
||||||
|
(match e
|
||||||
|
[(? patch? p)
|
||||||
|
(define new-aggregate (update-interests current-aggregate p))
|
||||||
|
(define projection-results
|
||||||
|
(map (lambda (p) (matcher-key-set (matcher-project new-aggregate p))) projections))
|
||||||
|
(define maybe-spawn (apply check-and-maybe-spawn-fn
|
||||||
|
new-aggregate
|
||||||
|
projection-results))
|
||||||
|
(transition new-aggregate (when maybe-spawn (list maybe-spawn (quit))))]
|
||||||
|
[(message (timer-expired (== timer-id) _))
|
||||||
|
(transition current-aggregate (list (timeout-handler) (quit)))]
|
||||||
|
[_ #f]))
|
||||||
|
(matcher-empty)
|
||||||
|
(patch base-interests (matcher-empty))
|
||||||
|
(patch-seq* (map projection->pattern projections))
|
||||||
|
(sub (timer-expired timer-id ?)))))
|
|
@ -0,0 +1,90 @@
|
||||||
|
#lang racket/base
|
||||||
|
|
||||||
|
(require racket/match)
|
||||||
|
(require (prefix-in udp: racket/udp))
|
||||||
|
(require "../main.rkt")
|
||||||
|
(require "../demand-matcher.rkt")
|
||||||
|
|
||||||
|
(provide (struct-out udp-remote-address)
|
||||||
|
(struct-out udp-handle)
|
||||||
|
(struct-out udp-listener)
|
||||||
|
udp-address?
|
||||||
|
udp-local-address?
|
||||||
|
(struct-out udp-packet)
|
||||||
|
spawn-udp-driver)
|
||||||
|
|
||||||
|
;; A UdpAddress is one of
|
||||||
|
;; -- a (udp-address String Uint16), representing a remote socket
|
||||||
|
;; -- a (udp-handle Any), representing a local socket on a kernel-assigned port
|
||||||
|
;; -- a (udp-listener Uint16), representing a local socket on a user-assigned port
|
||||||
|
;; Note that udp-handle-ids must be chosen carefully: they are scoped
|
||||||
|
;; to the local VM, i.e. shared between processes in that VM, so
|
||||||
|
;; processes must make sure not to accidentally clash in handle ID
|
||||||
|
;; selection.
|
||||||
|
(struct udp-remote-address (host port) #:prefab)
|
||||||
|
(struct udp-handle (id) #:prefab)
|
||||||
|
(struct udp-listener (port) #:prefab)
|
||||||
|
|
||||||
|
(define (udp-address? x)
|
||||||
|
(or (udp-remote-address? x)
|
||||||
|
(udp-local-address? x)))
|
||||||
|
|
||||||
|
(define (udp-local-address? x)
|
||||||
|
(or (udp-handle? x)
|
||||||
|
(udp-listener? x)))
|
||||||
|
|
||||||
|
;; A UdpPacket is a (udp-packet UdpAddress UdpAddress Bytes), and
|
||||||
|
;; represents a packet appearing on our local "subnet" of the full UDP
|
||||||
|
;; network, complete with source, destination and contents.
|
||||||
|
(struct udp-packet (source destination body) #:prefab)
|
||||||
|
|
||||||
|
;; -> Action
|
||||||
|
;; Spawns a process acting as a UDP socket factory.
|
||||||
|
(define (spawn-udp-driver)
|
||||||
|
(spawn-demand-matcher (observe (udp-packet ? (?! (udp-listener ?)) ?))
|
||||||
|
(advertise (udp-packet ? (?! (udp-listener ?)) ?))
|
||||||
|
spawn-udp-socket))
|
||||||
|
|
||||||
|
;; UdpLocalAddress -> Action
|
||||||
|
(define (spawn-udp-socket local-addr)
|
||||||
|
(define socket (udp:udp-open-socket #f #f))
|
||||||
|
|
||||||
|
(match local-addr
|
||||||
|
[(udp-listener port) (udp:udp-bind! socket #f port)]
|
||||||
|
[(udp-handle _) (udp:udp-bind! socket #f 0)]) ;; kernel-allocated port number
|
||||||
|
|
||||||
|
(define control-ch (make-channel))
|
||||||
|
(thread (lambda () (udp-receiver-thread local-addr socket control-ch)))
|
||||||
|
|
||||||
|
(spawn (lambda (e s)
|
||||||
|
(match e
|
||||||
|
[(? patch? p)
|
||||||
|
(cond [(matcher-empty? (patch-removed p)) #f] ;; peer hasn't quit yet: do nothing.
|
||||||
|
[else (channel-put control-ch 'quit)
|
||||||
|
(transition s (quit))])]
|
||||||
|
[(message (at-meta (? udp-packet? p)))
|
||||||
|
(transition s (message p))]
|
||||||
|
[(message (udp-packet _ (udp-remote-address host port) body))
|
||||||
|
(udp:udp-send-to socket host port body)
|
||||||
|
#f]
|
||||||
|
[_ #f]))
|
||||||
|
(void)
|
||||||
|
(sub (udp-packet ? local-addr ?) #:meta-level 1)
|
||||||
|
(sub (udp-packet local-addr (udp-remote-address ? ?) ?))
|
||||||
|
(pub (udp-packet (udp-remote-address ? ?) local-addr ?))
|
||||||
|
(sub (observe (udp-packet (udp-remote-address ? ?) local-addr ?)))))
|
||||||
|
|
||||||
|
;; UdpLocalAddress UdpSocket Channel -> Void
|
||||||
|
(define (udp-receiver-thread local-addr socket control-ch)
|
||||||
|
(define buffer (make-bytes 65536))
|
||||||
|
(let loop ()
|
||||||
|
(sync (handle-evt control-ch (match-lambda ['quit (void)]))
|
||||||
|
(handle-evt (udp:udp-receive!-evt socket buffer)
|
||||||
|
(lambda (receive-results)
|
||||||
|
(match-define (list len source-hostname source-port) receive-results)
|
||||||
|
(send-ground-message
|
||||||
|
(udp-packet (udp-remote-address source-hostname source-port)
|
||||||
|
local-addr
|
||||||
|
(subbytes buffer 0 len)))
|
||||||
|
(loop)))))
|
||||||
|
(udp:udp-close socket))
|
|
@ -0,0 +1,18 @@
|
||||||
|
#lang prospect
|
||||||
|
|
||||||
|
(require "../drivers/udp.rkt")
|
||||||
|
|
||||||
|
(spawn-udp-driver)
|
||||||
|
|
||||||
|
(spawn (lambda (e s)
|
||||||
|
(match e
|
||||||
|
[(message (udp-packet src dst #"quit\n"))
|
||||||
|
(log-info "Got quit request")
|
||||||
|
(transition s (list (message (udp-packet dst src #"Goodbye!\n")) (quit)))]
|
||||||
|
[(message (udp-packet src dst body))
|
||||||
|
(log-info "Got packet from ~v: ~v" src body)
|
||||||
|
(define reply (string->bytes/utf-8 (format "You said: ~a" body)))
|
||||||
|
(transition s (message (udp-packet dst src reply)))]
|
||||||
|
[_ #f]))
|
||||||
|
(void)
|
||||||
|
(sub (udp-packet ? (udp-listener 5999) ?)))
|
Loading…
Reference in New Issue