diff --git a/prospect/demand-matcher.rkt b/prospect/demand-matcher.rkt new file mode 100644 index 0000000..d040366 --- /dev/null +++ b/prospect/demand-matcher.rkt @@ -0,0 +1,144 @@ +#lang racket/base +;; A structure (and process!) for matching supply to demand via observation of interests. + +(require racket/set) +(require racket/match) +(require "core.rkt") +(require (only-in "route.rkt" matcher-project matcher-key-set)) +(require "drivers/timer.rkt") + +(provide (except-out (struct-out demand-matcher) demand-matcher) + (rename-out [make-demand-matcher demand-matcher]) + demand-matcher-update + spawn-demand-matcher + on-claim) + +;; A DemandMatcher keeps track of demand for services based on some +;; Projection over a Matcher, as well as a collection of functions +;; that can be used to increase supply in response to increased +;; demand, or handle a sudden drop in supply for which demand still +;; exists. +(struct demand-matcher (demand-spec ;; CompiledProjection + supply-spec ;; CompiledProjection + increase-handler ;; ChangeHandler + decrease-handler ;; ChangeHandler + current-demand ;; (Setof (Listof Any)) + current-supply) ;; (Setof (Listof Any)) + #:transparent) + +;; A ChangeHandler is a ((Constreeof Action) Any* -> (Constreeof Action)). +;; It is called with an accumulator of actions so-far-computed as its +;; first argument, and with a value for each capture in the +;; DemandMatcher's projection as the remaining arguments. + +;; ChangeHandler +;; Default handler of unexpected supply decrease. +(define (default-decrease-handler state . removed-captures) + state) + +(define (make-demand-matcher demand-spec supply-spec increase-handler decrease-handler) + (demand-matcher demand-spec + supply-spec + increase-handler + decrease-handler + (set) + (set))) + +;; DemandMatcher (Constreeof Action) Patch -> (Transition DemandMatcher) +;; Given a Patch from the environment, projects it into supply and +;; demand increase and decrease sets. Calls ChangeHandlers in response +;; to increased unsatisfied demand and decreased demanded supply. +(define (demand-matcher-update d s p) + (match-define (demand-matcher demand-spec supply-spec inc-h dec-h demand supply) d) + (define added-demand (matcher-key-set (matcher-project (patch-added p) demand-spec))) + (define removed-demand (matcher-key-set (matcher-project (patch-removed p) demand-spec))) + (define added-supply (matcher-key-set (matcher-project (patch-added p) supply-spec))) + (define removed-supply (matcher-key-set (matcher-project (patch-removed p) supply-spec))) + + (when (not added-demand) + (error 'demand-matcher "Wildcard demand of ~v:\n~a" + demand-spec + (matcher->pretty-string (patch-added p)))) + (when (not added-supply) + (error 'demand-matcher "Wildcard supply of ~v:\n~a" + supply-spec + (matcher->pretty-string (patch-added p)))) + + (set! supply (set-union supply added-supply)) + (set! demand (set-subtract demand removed-demand)) + + (for [(captures (in-set removed-supply))] + (when (set-member? demand captures) (set! s (apply dec-h s captures)))) + (for [(captures (in-set added-demand))] + (when (not (set-member? supply captures)) (set! s (apply inc-h s captures)))) + + (set! supply (set-subtract supply removed-supply)) + (set! demand (set-union demand added-demand)) + + (transition (struct-copy demand-matcher d [current-demand demand] [current-supply supply]) s)) + +;; Behavior :> (Option Event) DemandMatcher -> (Transition DemandMatcher) +;; Handles events from the environment. Only cares about routing-updates. +(define (demand-matcher-handle-event e d) + (match e + [(? patch? p) + (demand-matcher-update d '() p)] + [_ #f])) + +;; Any* -> (Constreeof Action) +;; Default handler of unexpected supply decrease. +;; Ignores the situation. +(define (unexpected-supply-decrease . removed-captures) + '()) + +;; Projection Projection (Any* -> (Constreeof Action)) [(Any* -> (Constreeof Action))] -> Action +;; Spawns a demand matcher actor. +(define (spawn-demand-matcher demand-spec + supply-spec + increase-handler + [decrease-handler unexpected-supply-decrease] + #:meta-level [meta-level 0]) + (define d (make-demand-matcher (compile-projection (prepend-at-meta demand-spec meta-level)) + (compile-projection (prepend-at-meta supply-spec meta-level)) + (lambda (acs . rs) (cons (apply increase-handler rs) acs)) + (lambda (acs . rs) (cons (apply decrease-handler rs) acs)))) + (spawn demand-matcher-handle-event + d + (sub (projection->pattern demand-spec) #:meta-level meta-level) + (sub (projection->pattern supply-spec) #:meta-level meta-level) + (pub (projection->pattern supply-spec) #:meta-level meta-level))) + +;; (Matcher (Option (Setof (Listof Value))) ... -> (Option (Constreeof Action))) +;; Matcher Projection ... +;; -> Action +;; Spawns a process that observes the given projections. Any time the +;; environment's interests change in a relevant way, calls +;; check-and-maybe-spawn-fn with the aggregate interests and the +;; projection results. If check-and-maybe-spawn-fn returns #f, +;; continues to wait; otherwise, takes the action(s) returned, and +;; quits. +(define (on-claim #:timeout-msec [timeout-msec #f] + #:on-timeout [timeout-handler (lambda () '())] + check-and-maybe-spawn-fn + base-interests + . projections) + (define timer-id (gensym 'on-claim)) + (list + (when timeout-msec (message (set-timer timer-id timeout-msec 'relative))) + (spawn (lambda (e current-aggregate) + (match e + [(? patch? p) + (define new-aggregate (update-interests current-aggregate p)) + (define projection-results + (map (lambda (p) (matcher-key-set (matcher-project new-aggregate p))) projections)) + (define maybe-spawn (apply check-and-maybe-spawn-fn + new-aggregate + projection-results)) + (transition new-aggregate (when maybe-spawn (list maybe-spawn (quit))))] + [(message (timer-expired (== timer-id) _)) + (transition current-aggregate (list (timeout-handler) (quit)))] + [_ #f])) + (matcher-empty) + (patch base-interests (matcher-empty)) + (patch-seq* (map projection->pattern projections)) + (sub (timer-expired timer-id ?))))) diff --git a/prospect/drivers/udp.rkt b/prospect/drivers/udp.rkt new file mode 100644 index 0000000..6c2dcbd --- /dev/null +++ b/prospect/drivers/udp.rkt @@ -0,0 +1,90 @@ +#lang racket/base + +(require racket/match) +(require (prefix-in udp: racket/udp)) +(require "../main.rkt") +(require "../demand-matcher.rkt") + +(provide (struct-out udp-remote-address) + (struct-out udp-handle) + (struct-out udp-listener) + udp-address? + udp-local-address? + (struct-out udp-packet) + spawn-udp-driver) + +;; A UdpAddress is one of +;; -- a (udp-address String Uint16), representing a remote socket +;; -- a (udp-handle Any), representing a local socket on a kernel-assigned port +;; -- a (udp-listener Uint16), representing a local socket on a user-assigned port +;; Note that udp-handle-ids must be chosen carefully: they are scoped +;; to the local VM, i.e. shared between processes in that VM, so +;; processes must make sure not to accidentally clash in handle ID +;; selection. +(struct udp-remote-address (host port) #:prefab) +(struct udp-handle (id) #:prefab) +(struct udp-listener (port) #:prefab) + +(define (udp-address? x) + (or (udp-remote-address? x) + (udp-local-address? x))) + +(define (udp-local-address? x) + (or (udp-handle? x) + (udp-listener? x))) + +;; A UdpPacket is a (udp-packet UdpAddress UdpAddress Bytes), and +;; represents a packet appearing on our local "subnet" of the full UDP +;; network, complete with source, destination and contents. +(struct udp-packet (source destination body) #:prefab) + +;; -> Action +;; Spawns a process acting as a UDP socket factory. +(define (spawn-udp-driver) + (spawn-demand-matcher (observe (udp-packet ? (?! (udp-listener ?)) ?)) + (advertise (udp-packet ? (?! (udp-listener ?)) ?)) + spawn-udp-socket)) + +;; UdpLocalAddress -> Action +(define (spawn-udp-socket local-addr) + (define socket (udp:udp-open-socket #f #f)) + + (match local-addr + [(udp-listener port) (udp:udp-bind! socket #f port)] + [(udp-handle _) (udp:udp-bind! socket #f 0)]) ;; kernel-allocated port number + + (define control-ch (make-channel)) + (thread (lambda () (udp-receiver-thread local-addr socket control-ch))) + + (spawn (lambda (e s) + (match e + [(? patch? p) + (cond [(matcher-empty? (patch-removed p)) #f] ;; peer hasn't quit yet: do nothing. + [else (channel-put control-ch 'quit) + (transition s (quit))])] + [(message (at-meta (? udp-packet? p))) + (transition s (message p))] + [(message (udp-packet _ (udp-remote-address host port) body)) + (udp:udp-send-to socket host port body) + #f] + [_ #f])) + (void) + (sub (udp-packet ? local-addr ?) #:meta-level 1) + (sub (udp-packet local-addr (udp-remote-address ? ?) ?)) + (pub (udp-packet (udp-remote-address ? ?) local-addr ?)) + (sub (observe (udp-packet (udp-remote-address ? ?) local-addr ?))))) + +;; UdpLocalAddress UdpSocket Channel -> Void +(define (udp-receiver-thread local-addr socket control-ch) + (define buffer (make-bytes 65536)) + (let loop () + (sync (handle-evt control-ch (match-lambda ['quit (void)])) + (handle-evt (udp:udp-receive!-evt socket buffer) + (lambda (receive-results) + (match-define (list len source-hostname source-port) receive-results) + (send-ground-message + (udp-packet (udp-remote-address source-hostname source-port) + local-addr + (subbytes buffer 0 len))) + (loop))))) + (udp:udp-close socket)) diff --git a/prospect/examples/udp-hello-plain.rkt b/prospect/examples/udp-hello-plain.rkt new file mode 100644 index 0000000..3f5186a --- /dev/null +++ b/prospect/examples/udp-hello-plain.rkt @@ -0,0 +1,18 @@ +#lang prospect + +(require "../drivers/udp.rkt") + +(spawn-udp-driver) + +(spawn (lambda (e s) + (match e + [(message (udp-packet src dst #"quit\n")) + (log-info "Got quit request") + (transition s (list (message (udp-packet dst src #"Goodbye!\n")) (quit)))] + [(message (udp-packet src dst body)) + (log-info "Got packet from ~v: ~v" src body) + (define reply (string->bytes/utf-8 (format "You said: ~a" body))) + (transition s (message (udp-packet dst src reply)))] + [_ #f])) + (void) + (sub (udp-packet ? (udp-listener 5999) ?)))