From 235b0f2969b3479b55329654f09c89e997c8c0d0 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 28 Oct 2013 19:08:24 +0000 Subject: [PATCH] Functioning demand-matcher --- minimart/demand-matcher.rkt | 121 +++++++++++++++++++++++++++++---- minimart/presence-detector.rkt | 10 ++- 2 files changed, 112 insertions(+), 19 deletions(-) diff --git a/minimart/demand-matcher.rkt b/minimart/demand-matcher.rkt index 8ded7be..6153e20 100644 --- a/minimart/demand-matcher.rkt +++ b/minimart/demand-matcher.rkt @@ -1,29 +1,124 @@ #lang racket/base +(require racket/match) (require "core.rkt") (require "presence-detector.rkt") (provide (except-out (struct-out demand-matcher) demand-matcher) (rename-out [make-demand-matcher demand-matcher]) - demand-matcher-update) + demand-matcher-update + spawn-demand-matcher) -(struct demand-matcher (subscription-is-demand? increase-handler decrease-handler state) +(struct demand-matcher (demand-is-subscription? + pattern + meta-level + demand-level + supply-level + increase-handler + decrease-handler + state) #:transparent) -(define (default-decrease-handler removed state) - (error 'demand-matcher "Unexpected decrease in supply for route ~a" removed)) +(define (unexpected-supply-decrease r) + (error 'demand-matcher "Unexpected decrease in supply for route ~a" r)) -(define (make-demand-matcher increase-handler - [decrease-handler default-decrease-handler] - #:subscription-is-demand? [subscription-is-demand? #t]) - (demand-matcher subscription-is-demand? +(define (default-decrease-handler removed state) + (unexpected-supply-decrease removed)) + +(define (make-demand-matcher demand-is-subscription? + pattern + meta-level + demand-level + supply-level + increase-handler + [decrease-handler default-decrease-handler]) + (demand-matcher demand-is-subscription? + pattern + meta-level + demand-level + supply-level increase-handler decrease-handler (presence-detector))) +(define (compute-detector demand? d) + (route (if (demand-matcher-demand-is-subscription? d) (not demand?) demand?) + (demand-matcher-pattern d) + (demand-matcher-meta-level d) + (+ 1 (max (demand-matcher-demand-level d) + (demand-matcher-supply-level d))))) + +;; For each route "changed" in routes, if changed is one of our +;; monitored entities (a demand, if arrivals? is #t, or a supply +;; otherwise), including both a pattern, meta-level, and level match, +;; then search for matching peers (including level matching). If +;; arrivals? is #t, then if there are no matching peers (i.e. supplies +;; are not allocated), signel an increas in demand; otherwise, if +;; there are any matching peers (i.e. demand remains), signal a +;; decrease in supply. +(define (incorporate-delta arrivals? routes d state) + (define relevant-change-detector (compute-detector arrivals? d)) + (define expected-change-level + (if arrivals? (demand-matcher-demand-level d) (demand-matcher-supply-level d))) + (define expected-peer-level + (if arrivals? (demand-matcher-supply-level d) (demand-matcher-demand-level d))) + (for/fold ([s state]) ([changed routes]) + (if (= (route-level changed) expected-change-level) + (match (intersect-routes (list changed) (list relevant-change-detector)) + ['() s] + [(list relevant-changed-route) ;; narrowed to relevancy by intersect-routes + ;; (log-info "incorporate-delta ~v ~v <--> ~v /// ~v" + ;; arrivals? + ;; relevant-changed-route + ;; relevant-change-detector + ;; (demand-matcher-state d)) + (define peer-detector + (struct-copy route relevant-changed-route [level (+ 1 expected-peer-level)])) + (define peer-exists? + (ormap (lambda (r) (= (route-level r) expected-peer-level)) + (intersect-routes (presence-detector-routes (demand-matcher-state d)) + (list peer-detector)))) + ;; (log-info "peer-exists? == ~v, peer-detector == ~v" + ;; peer-exists? + ;; peer-detector) + (cond + [(and arrivals? (not peer-exists?)) + ((demand-matcher-increase-handler d) relevant-changed-route s)] + [(and (not arrivals?) peer-exists?) + ((demand-matcher-decrease-handler d) relevant-changed-route s)] + [else + s])]) + s))) + (define (demand-matcher-update d state0 rs) - (define-values (new-state added removed) (presence-detector-update (demand-matcher-state d))) - (define state1 (for/fold ([s state0]) ([r added]) ((demand-matcher-increase-handler d) r s))) - (define state2 (for/fold ([s state1]) ([r removed]) ((demand-matcher-decrease-handler d) r s))) - (values (struct-copy demand-matcher d [state new-state]) - state2)) + (define-values (new-state added removed) (presence-detector-update (demand-matcher-state d) rs)) + (define new-d (struct-copy demand-matcher d [state new-state])) + (define state1 (incorporate-delta #t added new-d state0)) + (define state2 (incorporate-delta #f removed new-d state1)) + (values new-d state2)) + +(define (demand-matcher-handle-event e d) + (match e + [(routing-update routes) + (define-values (new-d actions) (demand-matcher-update d '() routes)) + (transition new-d actions)] + [_ #f])) + +(define (spawn-demand-matcher pattern + increase-handler + [decrease-handler unexpected-supply-decrease] + #:demand-is-subscription? [demand-is-subscription? #t] + #:meta-level [meta-level 0] + #:demand-level demand-level + #:supply-level supply-level) + (define d (make-demand-matcher demand-is-subscription? + pattern + meta-level + demand-level + supply-level + (lambda (r actions) (cons (increase-handler r) actions)) + (lambda (r actions) (cons (decrease-handler r) actions)))) + (spawn demand-matcher-handle-event + d + (list (compute-detector #t d) + (compute-detector #f d)))) diff --git a/minimart/presence-detector.rkt b/minimart/presence-detector.rkt index eba6bc2..c5a6c66 100644 --- a/minimart/presence-detector.rkt +++ b/minimart/presence-detector.rkt @@ -1,13 +1,14 @@ #lang racket/base (require racket/set) +(require racket/match) (require "core.rkt") (require "pattern.rkt") (provide (except-out (struct-out presence-detector) presence-detector) (rename-out [make-presence-detector presence-detector]) presence-detector-update - presence-exists-for?) + presence-detector-routes) (struct presence-detector (route-set) #:transparent) @@ -21,8 +22,5 @@ (set-subtract new-route-set old-route-set) (set-subtract old-route-set new-route-set))) -(define (presence-exists-for? p probe-route) - (for/or ((existing-route (in-set (presence-detector-route-set p)))) - (and (equal? (route-subscription? probe-route) (route-subscription? existing-route)) - (equal? (route-meta-level probe-route) (route-meta-level existing-route)) - (intersect? (route-pattern probe-route) (route-pattern existing-route))))) +(define (presence-detector-routes p) + (set->list (presence-detector-route-set p)))