Support deletion

This commit is contained in:
Tony Garnock-Jones 2015-08-21 17:41:59 -04:00
parent e54004f4e0
commit 4d3e668ed4
1 changed files with 71 additions and 42 deletions

View File

@ -2,47 +2,62 @@
(require racket/set) (require racket/set)
;; Bindings are versioned with a pair of an epoch and a version.
;; Within an epoch, versions increase monotonically.
;; Epochs increase monotonically. At an epoch boundary, versions reset to 0.
;; Indicates a nonexistent binding when a binding has this as its value.
;; (I guess bindings with `(absent)` as their value are more properly pseudo-bindings.)
(struct absent () #:transparent)
;; `binding` tuples associate keys with values at a certain version. ;; `binding` tuples associate keys with values at a certain version.
;; versions start at 0 and increase by 1 with every successful update. ;; versions start at 0 and increase by 1 with every successful update.
(struct binding (key version value) #:transparent) (struct binding (key epoch version value) #:transparent)
;; `update` tuples request a binding update. ;; `update` tuples request a binding update.
(struct update (binding) #:transparent) ;; The epoch and version describe the *current* version of the binding.
(struct update (key base-epoch base-version value) #:transparent)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(define observation-projector (compile-projection (observe (binding (?!) ? ?)))) (define observation-projector (compile-projection (observe (binding (?!) ? ? ?))))
(define update-projector (compile-projection (update (?! (binding ? ? ?))))) (define update-projector (compile-projection (?! (update ? ? ? ?))))
(struct db-state (bindings observed-keys) #:transparent) (struct db-state (epoch bindings observed-keys) #:transparent)
(define (lookup-binding bindings key) (define (lookup-binding epoch bindings key)
(hash-ref bindings key (lambda () (binding key -1 (void))))) (hash-ref bindings key (lambda () (binding key epoch 0 (absent)))))
(define ((process-suggestion suggestion) state) (define ((process-suggestion suggestion) state)
(match-define (db-state bindings observed-keys) state) (match-define (db-state epoch bindings observed-keys) state)
(match-define (binding key suggested-version _) suggestion) (match-define (update key base-epoch base-version new-value) suggestion)
(define old-binding (lookup-binding bindings key)) (define old-binding (lookup-binding epoch bindings key))
(match-define (binding _ current-version _) old-binding) (match-define (binding _ current-epoch current-version _) old-binding)
(if (= suggested-version (+ current-version 1)) (if (and (= current-epoch base-epoch)
(transition (struct-copy db-state state (= current-version base-version))
[bindings (hash-set bindings key suggestion)]) (let ((new-binding (cond
(when (set-member? observed-keys key) [(absent? new-value) (binding key (+ epoch 1) 0 new-value)]
(patch-seq (retract old-binding) [(> epoch current-epoch) (binding key epoch 0 new-value)]
(assert suggestion)))) [else (binding key base-epoch (+ base-version 1) new-value)])))
(transition (struct-copy db-state state
[epoch (binding-epoch new-binding)]
[bindings (if (absent? new-value)
(hash-remove bindings key)
(hash-set bindings key new-binding))])
(when (set-member? observed-keys key)
(patch-seq (retract old-binding) (assert new-binding)))))
(transition state '()))) (transition state '())))
(define ((adjust-observations added-observations removed-observations) state) (define ((adjust-observations added-observations removed-observations) state)
(match-define (db-state bindings observed-keys) state) (match-define (db-state epoch bindings observed-keys) state)
(transition (struct-copy db-state state (transition (struct-copy db-state state
[observed-keys (set-union added-observations [observed-keys (set-union added-observations
(set-subtract observed-keys (set-subtract observed-keys
removed-observations))]) removed-observations))])
(list (for/list [(key (in-set removed-observations))] (list (for/list [(key (in-set removed-observations))]
(retract (binding key ? ?))) (retract (binding key ? ? ?)))
(for/list [(key (in-set added-observations))] (for/list [(key (in-set added-observations))]
(when (hash-has-key? bindings key) (assert (lookup-binding epoch bindings key))))))
(assert (lookup-binding bindings key)))))))
(define (db) (define (db)
(spawn (lambda (e old-state) (spawn (lambda (e old-state)
@ -56,30 +71,33 @@
[(suggestion (in-set added-updates))] [(suggestion (in-set added-updates))]
(transition-bind (process-suggestion suggestion) t)))] (transition-bind (process-suggestion suggestion) t)))]
[_ #f])) [_ #f]))
(db-state (hash) (set)) (db-state 0 (hash) (set))
(sub (observe (binding ? ? ?))) (sub (observe (binding ? ? ? ?)))
(sub (update (binding ? ? ?))))) (sub (update ? ? ? ?))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(define binding-projector (compile-projection (?! (binding ? ? ?)))) (define binding-projector (compile-projection (?! (binding ? ? ? ?))))
(define (async-update key version value on-complete on-conflict) (define (async-update key epoch version value on-complete on-conflict)
(spawn (lambda (e s) (spawn (lambda (e s)
(match e (match e
[(? patch? p) [(? patch? p)
(match (set->list (matcher-project/set/single (patch-added p) binding-projector)) (match (set->list (matcher-project/set/single (patch-added p) binding-projector))
['() #f] ['() #f]
[(list (binding _ (== version) (== value))) [(list (binding _ (== epoch) (== version) _)) #f]
[(list (binding _ (== epoch) (== (+ version 1)) (== value))) (quit (on-complete))]
[(list (binding _ new-epoch 0 (== value)))
#:when (> new-epoch epoch)
(quit (on-complete))] (quit (on-complete))]
[(list (binding _ (== (- version 1)) _)) [(list (binding _ other-epoch other-version other-value))
#f] (quit (on-conflict key
[(list (binding _ other-version other-value)) epoch version value
(quit (on-conflict key version value other-version other-value))])] other-epoch other-version other-value))])]
[_ #f])) [_ #f]))
(void) (void)
(assert (update (binding key version value))) (assert (update key epoch version value))
(sub (binding key ? ?)))) (sub (binding key ? ? ?))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -94,18 +112,29 @@
#f] #f]
[_ #f])) [_ #f]))
(void) (void)
(sub (binding key ? ?)))) (sub (binding key ? ? ?))))
(monitor 'a) (monitor 'a)
(monitor 'b) (monitor 'b)
(async-update 'a 0 0 (define (now-update-a-again)
(lambda () (async-update 'a 1 1 (async-update 'a 0 2 2
(lambda () '())
(on-conflict "a/2")))
(define ((on-conflict where) . args)
(error 'conflict "at ~v" where))
(async-update 'a 0 0 0
(lambda () (async-update 'a 0 1 1
(lambda () '()) (lambda () '())
(lambda args (error 'conflict "at a/1")))) (on-conflict "a/1")))
(lambda args (error 'conflict "at a/0"))) (on-conflict "a/0"))
(async-update 'b 0 0 (async-update 'b 0 0 0
(lambda () (async-update 'b 0 1 (lambda () (async-update 'b 0 0 1
(lambda () '()) (lambda () '())
(lambda args '()))) (lambda args
(lambda args (error 'conflict "at b/0"))) (async-update 'b 0 1 (absent)
now-update-a-again
(on-conflict "b/2")))))
(on-conflict "b/0"))