From 4d3e668ed4a7ee2e267bc535f0ad12b9174e06a9 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Fri, 21 Aug 2015 17:41:59 -0400 Subject: [PATCH] Support deletion --- prospect/examples/key-value-store.rkt | 113 ++++++++++++++++---------- 1 file changed, 71 insertions(+), 42 deletions(-) diff --git a/prospect/examples/key-value-store.rkt b/prospect/examples/key-value-store.rkt index d77941e..7afa040 100644 --- a/prospect/examples/key-value-store.rkt +++ b/prospect/examples/key-value-store.rkt @@ -2,47 +2,62 @@ (require racket/set) +;; Bindings are versioned with a pair of an epoch and a version. +;; Within an epoch, versions increase monotonically. +;; Epochs increase monotonically. At an epoch boundary, versions reset to 0. + +;; Indicates a nonexistent binding when a binding has this as its value. +;; (I guess bindings with `(absent)` as their value are more properly pseudo-bindings.) +(struct absent () #:transparent) + ;; `binding` tuples associate keys with values at a certain version. ;; versions start at 0 and increase by 1 with every successful update. -(struct binding (key version value) #:transparent) +(struct binding (key epoch version value) #:transparent) ;; `update` tuples request a binding update. -(struct update (binding) #:transparent) +;; The epoch and version describe the *current* version of the binding. +(struct update (key base-epoch base-version value) #:transparent) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(define observation-projector (compile-projection (observe (binding (?!) ? ?)))) -(define update-projector (compile-projection (update (?! (binding ? ? ?))))) +(define observation-projector (compile-projection (observe (binding (?!) ? ? ?)))) +(define update-projector (compile-projection (?! (update ? ? ? ?)))) -(struct db-state (bindings observed-keys) #:transparent) +(struct db-state (epoch bindings observed-keys) #:transparent) -(define (lookup-binding bindings key) - (hash-ref bindings key (lambda () (binding key -1 (void))))) +(define (lookup-binding epoch bindings key) + (hash-ref bindings key (lambda () (binding key epoch 0 (absent))))) (define ((process-suggestion suggestion) state) - (match-define (db-state bindings observed-keys) state) - (match-define (binding key suggested-version _) suggestion) - (define old-binding (lookup-binding bindings key)) - (match-define (binding _ current-version _) old-binding) - (if (= suggested-version (+ current-version 1)) - (transition (struct-copy db-state state - [bindings (hash-set bindings key suggestion)]) - (when (set-member? observed-keys key) - (patch-seq (retract old-binding) - (assert suggestion)))) + (match-define (db-state epoch bindings observed-keys) state) + (match-define (update key base-epoch base-version new-value) suggestion) + (define old-binding (lookup-binding epoch bindings key)) + (match-define (binding _ current-epoch current-version _) old-binding) + (if (and (= current-epoch base-epoch) + (= current-version base-version)) + (let ((new-binding (cond + [(absent? new-value) (binding key (+ epoch 1) 0 new-value)] + [(> epoch current-epoch) (binding key epoch 0 new-value)] + [else (binding key base-epoch (+ base-version 1) new-value)]))) + (transition (struct-copy db-state state + [epoch (binding-epoch new-binding)] + [bindings (if (absent? new-value) + (hash-remove bindings key) + (hash-set bindings key new-binding))]) + (when (set-member? observed-keys key) + (patch-seq (retract old-binding) (assert new-binding))))) (transition state '()))) (define ((adjust-observations added-observations removed-observations) state) - (match-define (db-state bindings observed-keys) state) + (match-define (db-state epoch bindings observed-keys) state) (transition (struct-copy db-state state [observed-keys (set-union added-observations (set-subtract observed-keys removed-observations))]) (list (for/list [(key (in-set removed-observations))] - (retract (binding key ? ?))) + (retract (binding key ? ? ?))) (for/list [(key (in-set added-observations))] - (when (hash-has-key? bindings key) - (assert (lookup-binding bindings key))))))) + (assert (lookup-binding epoch bindings key)))))) (define (db) (spawn (lambda (e old-state) @@ -56,30 +71,33 @@ [(suggestion (in-set added-updates))] (transition-bind (process-suggestion suggestion) t)))] [_ #f])) - (db-state (hash) (set)) - (sub (observe (binding ? ? ?))) - (sub (update (binding ? ? ?))))) + (db-state 0 (hash) (set)) + (sub (observe (binding ? ? ? ?))) + (sub (update ? ? ? ?)))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(define binding-projector (compile-projection (?! (binding ? ? ?)))) +(define binding-projector (compile-projection (?! (binding ? ? ? ?)))) -(define (async-update key version value on-complete on-conflict) +(define (async-update key epoch version value on-complete on-conflict) (spawn (lambda (e s) (match e [(? patch? p) (match (set->list (matcher-project/set/single (patch-added p) binding-projector)) ['() #f] - [(list (binding _ (== version) (== value))) + [(list (binding _ (== epoch) (== version) _)) #f] + [(list (binding _ (== epoch) (== (+ version 1)) (== value))) (quit (on-complete))] + [(list (binding _ new-epoch 0 (== value))) + #:when (> new-epoch epoch) (quit (on-complete))] - [(list (binding _ (== (- version 1)) _)) - #f] - [(list (binding _ other-version other-value)) - (quit (on-conflict key version value other-version other-value))])] + [(list (binding _ other-epoch other-version other-value)) + (quit (on-conflict key + epoch version value + other-epoch other-version other-value))])] [_ #f])) (void) - (assert (update (binding key version value))) - (sub (binding key ? ?)))) + (assert (update key epoch version value)) + (sub (binding key ? ? ?)))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -94,18 +112,29 @@ #f] [_ #f])) (void) - (sub (binding key ? ?)))) + (sub (binding key ? ? ?)))) (monitor 'a) (monitor 'b) -(async-update 'a 0 0 - (lambda () (async-update 'a 1 1 +(define (now-update-a-again) + (async-update 'a 0 2 2 + (lambda () '()) + (on-conflict "a/2"))) + +(define ((on-conflict where) . args) + (error 'conflict "at ~v" where)) + +(async-update 'a 0 0 0 + (lambda () (async-update 'a 0 1 1 (lambda () '()) - (lambda args (error 'conflict "at a/1")))) - (lambda args (error 'conflict "at a/0"))) -(async-update 'b 0 0 - (lambda () (async-update 'b 0 1 + (on-conflict "a/1"))) + (on-conflict "a/0")) +(async-update 'b 0 0 0 + (lambda () (async-update 'b 0 0 1 (lambda () '()) - (lambda args '()))) - (lambda args (error 'conflict "at b/0"))) + (lambda args + (async-update 'b 0 1 (absent) + now-update-a-again + (on-conflict "b/2"))))) + (on-conflict "b/0"))