WIP heading toward explicit-endpoint model

This commit is contained in:
Tony Garnock-Jones 2012-03-22 11:56:03 -04:00
parent 7cf4c79757
commit 2e9d8b060e
2 changed files with 256 additions and 21 deletions

50
os2.rkt
View File

@ -11,12 +11,18 @@
;;---------------------------------------------------------------------------
;; Data definitions
;; A PID is an (arbitrary) VM-unique process identifier.
;; A SID is an (arbitrary) process-unique subscription identifier.
;; A PID is an (arbitrary) VM-unique process identifier. Concretely,
;; it's an integer.
;; A EID is an (arbitrary) VM-unique endpoint identifier. Concretely,
;; it's a list of two elements, the first being the endpoint's
;; process's PID and the second being an integer.
;; A QuasiQueue<X> is a list of Xs in *reversed* order.
(struct vm (processes ;; Hash<PID, Process>
endpoints ;; Hash<EID, Endpoint>
topic-flows ;; Relation<Topic, Flow>
flow-topics ;; Relation<Flow, Topic>
active-handlers ;; Relation<Topic, Endpoint>
@ -24,13 +30,14 @@
pending-actions ;; QuasiQueue<(cons PID Action)>
) #:transparent)
;; (endpoint PID SID Handlers)
(struct endpoint (process-id sid handlers) #:transparent)
(struct endpoint (id ;; EID
handlers ;; Handlers
) #:transparent)
(struct process (id ;; PID
state
interests ;; Relation<SID, Topic>
meta-interests ;; Relation<SID, Topic>
next-endpoint-id-number ;; NonnegativeInteger
endpoints ;; Set<EID>
) #:transparent)
(struct topic (role pattern virtual?) #:prefab)
@ -38,19 +45,20 @@
;; A Flow is a Topic that comes from the intersection of two dual
;; topics.
;; PresenceHandler = Topic -> State -> Transition
;; AbsenceHandler = Topic * Reason -> State -> Transition
;; MessageHandler = Topic * Message -> State -> Transition
;; InterruptK = State -> Transition
;; PresenceHandler = EID * Topic -> InterruptK
;; AbsenceHandler = EID * Topic * Reason -> InterruptK
;; MessageHandler = EID * Topic * Message -> InterruptK
(struct handlers (presence absence message) #:transparent)
;; actions is a plain old List<Action>, not a QuasiQueue.
(struct transition (state actions) #:transparent)
;; Preactions
(struct add-role (sid topic handlers) #:prefab)
(struct delete-roles (sid) #:prefab)
;; Preactions.
(struct add-role (topic handlers k) #:prefab)
(struct delete-role (eid) #:prefab)
(struct send-message (topic body) #:prefab)
(struct spawn (thunk) #:prefab)
(struct spawn (thunk k) #:prefab)
;; An Action is either a Preaction or an (at-meta-level Preaction).
(struct at-meta-level (preaction) #:prefab)
@ -144,8 +152,8 @@
(define (perform-action pid action state)
(match action
[(add-role sid topic handlers) (do-subscribe pid sid topic handlers state)]
[(delete-roles sid) (do-unsubscribe pid sid state)]
[(add-role eid topic handlers) (do-subscribe pid eid topic handlers state)]
[(delete-roles eid) (do-unsubscribe pid eid state)]
[(send-message topic body) (route-and-deliver topic body state)]
[(spawn thunk) (do-spawn thunk state)]))
@ -165,18 +173,18 @@
((handlers-presence (endpoint-handlers e)) source-flow)))
state))
(define ((add-interest sid topic) p)
(struct-copy process p [interests (relation-add (process-interests p) sid topic)]))
(define ((add-interest eid topic) p)
(struct-copy process p [interests (relation-add (process-interests p) eid topic)]))
(define (do-subscribe pid sid topic handlers state)
(define e (endpoint pid sid handlers))
(define (do-subscribe pid eid topic handlers state)
(define e (endpoint pid eid handlers))
(define topic-previously-known? (relation-domain-member? (vm-active-handlers state)))
;; Install the handler.
;; Update the process.
(let ((state
(struct-copy vm state
[active-handlers (relation-add (vm-active-handlers state) topic e)]
[processes (hash-update (vm-processes state) pid (add-interest sid topic))])))
[processes (hash-update (vm-processes state) pid (add-interest eid topic))])))
;; Add topic <--> flow mappings and fire the appropriate presence handlers.
(if topic-previously-known?
;; Just tell the local end. The other ends have already heard about this topic.
@ -192,7 +200,7 @@
(state (install-flow state (refine-topic matching-topic flow-pattern) topic)))
state)))))
(define (do-unsubscribe pid sid state)
(define (do-unsubscribe pid eid state)
;; For each topic in the process's interests,
;; - for each appropriate endpoint in active-handlers,
;; - fire the absence handler

227
topicset.rkt Normal file
View File

@ -0,0 +1,227 @@
#lang racket/base
;; (Possibly infinite) sets of (individually finite) message topics.
(require racket/set)
(require racket/match)
(require "struct-map.rkt")
(provide __ ;; the wildcard value
wildcard?
// ;; finite set of alternatives
;; topicset->set - requires equivalent of amb
topicset-empty?
topicset-member?
topicset-finite?
topicset-intersection
topicset-union
topicset-subtract
topicset-subset?)
(struct wildcard ()
#:property prop:custom-write
(lambda (v port mode) (display "__" port)))
(define __ (wildcard))
(struct alt (values)
#:transparent
#:property prop:custom-write
(lambda (v port mode)
((if mode write display) (cons '#:// (set->list (alt-values v))) port)))
(define (// . alternatives)
(match alternatives
['() (alt (set))]
[(list v) v]
[(list* (wildcard) rest) __] ;; short-circuit
[(list* (alt (? set-empty?)) rest) (apply // rest)] ;; identity
[(list* v1 v2 rest) (apply // (topicset-union v1 v2) rest)]))
(define (struct-types-equal? a b)
;; ugh
(define-values (ta skipped-a) (struct-info a))
(define-values (tb skipped-b) (struct-info b))
(eq? ta tb))
(define (struct-double-map f a b)
;; This is gross.
(define i 0) ;; we increment this before using it, so zero is the right choice
(define b-vector (struct->vector b #f))
(struct-map (lambda (v)
(set! i (+ i 1)) ;; ugh
(f v (vector-ref b-vector i)))
a))
;; Any -> Boolean
(define (topicset-empty? x)
(and (alt? x) (set-empty? (alt-values x))))
;; Any * Any -> Boolean
(define (topicset-member? a b)
;; Values are treated as singleton sets.
(topicset-subset? a b))
;; Any * Any -> Boolean
(define (topicset-finite? x)
(let walk ((x x))
(cond
[(wildcard? x) #f]
[(alt? x) (for/and ([xx (alt-values x)]) (walk xx))]
[(pair? x) (and (walk (car x)) (walk (cdr x)))]
[(vector? x) (for/and ([xx x]) (walk xx))]
[(struct? x) (walk (struct->vector x #f))]
[else #t])))
;; Any * Any -> Any
(define (topicset-intersection a b)
(let/ec escape
(let walk ((a a) (b b))
(cond
[(wildcard? a) b]
[(wildcard? b) a]
[(alt? a) (for/fold ([acc (//)]) ([aa (alt-values a)])
(// acc (topicset-intersection aa b)))]
[(alt? b) (for/fold ([acc (//)]) ([bb (alt-values b)])
(// acc (topicset-intersection a bb)))]
[(and (pair? a) (pair? b))
(cons (walk (car a) (car b)) (walk (cdr a) (cdr b)))]
[(and (vector? a) (vector? b) (= (vector-length a) (vector-length b)))
(for/vector ([aa a] [bb b]) (walk aa bb))]
[(and (struct? a) (struct? b) (struct-types-equal? a b))
(struct-double-map walk a b)]
[(equal? a b) a]
[else (escape (//))]))))
;; Any * Any -> Boolean
(define (common-structure? a b)
(when (or (alt? a) (alt? b) (wildcard? a) (wildcard? b))
(error 'common-structure? "Must not receive alt or wildcard"))
(or (and (pair? a) (pair? b))
(and (vector? a) (vector? b) (= (vector-length a) (vector-length b)))
(and (struct? a) (struct? b) (struct-types-equal? a b))
(equal? a b)))
;; Alt * Any -> Alt
(define (add* a b)
;; If there's at least one layer of structure common to b and some
;; element of a, then union b with that element. Otherwise simply
;; place b in a.
(let loop ((as (set->list (alt-values a))) (acc (set)))
(cond
[(null? as) (alt (set-add acc b))]
[(common-structure? (car as) b) (alt (set-union (list->set (cdr as))
(set-add acc
(topicset-union (car as) b))))]
[else (loop (cdr as) (set-add acc (car as)))])))
;; Any * Any -> Any
(define (topicset-union a b)
(let walk ((a a) (b b))
(cond
[(wildcard? a) a]
[(wildcard? b) b]
[(and (alt? a) (alt? b)) (for/fold ([a a]) ([bb (alt-values b)]) (// a bb))]
[(alt? a) (add* a b)]
[(alt? b) (add* b a)]
[(and (pair? a) (pair? b))
(cons (walk (car a) (car b)) (walk (cdr a) (cdr b)))]
[(and (vector? a) (vector? b) (= (vector-length a) (vector-length b)))
(for/vector ([aa a] [bb b]) (walk aa bb))]
[(and (struct? a) (struct? b) (struct-types-equal? a b))
(struct-double-map walk a b)]
[(equal? a b) a]
[else (alt (set a b))])))
;; Any * Any -> Any
(define (topicset-subtract a b)
(let/ec escape
(let walk ((a a) (b b))
(cond
[(wildcard? b) (escape (//))]
[(wildcard? a) (error 'topicset-subtract "Cannot subtract finity from infinity")]
[(alt? a) (for/fold ([acc (//)]) ([aa (alt-values a)]) (// acc (topicset-subtract aa b)))]
[(alt? b) (for/fold ([a a]) ([bb (alt-values b)]) (walk a bb))]
[(and (pair? a) (pair? b))
(cons (walk (car a) (car b)) (walk (cdr a) (cdr b)))]
[(and (vector? a) (vector? b) (= (vector-length a) (vector-length b)))
(for/vector ([aa a] [bb b]) (walk aa bb))]
[(and (struct? a) (struct? b) (struct-types-equal? a b))
(struct-double-map walk a b)]
[(equal? a b) (escape (//))]
[else a]))))
;; Any * Any -> Boolean
(define (topicset-subset? a b)
(let walk ((a a) (b b))
(cond
[(wildcard? b) #t]
[(wildcard? a) #f]
[(alt? a) (for/and ([aa (alt-values a)]) (topicset-member? aa b))]
[(alt? b) (for/or ([bb (alt-values b)]) (topicset-member? a bb))]
[(and (pair? a) (pair? b))
(and (walk (car a) (car b)) (walk (cdr a) (cdr b)))]
[(and (vector? a) (vector? b) (= (vector-length a) (vector-length b)))
(for/and ([aa a] [bb b]) (walk aa bb))]
[(and (struct? a) (struct? b))
(walk (struct->vector a #f) (struct->vector b #f))]
[else (equal? a b)])))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(require rackunit)
(check-equal? (//) (alt (set)))
(check-equal? (// (//) (//)) (//))
(check-equal? (// (//) 'x) 'x)
(check-equal? (// 'x) 'x)
(check-equal? (// (//) __) __)
(check-equal? (// __ 'x) __)
(check-equal? (// 'x __) __)
(check-equal? (// 'x 'y 'z) (alt (set 'x 'y 'z)))
(check-equal? (// (list 'x) (list 'x)) (list 'x))
(check-equal? (// (list 'x) (list 'y)) (list (// 'x 'y)))
(check-equal? (// (list 'x) (list 'y)) (list (// 'y 'x)))
(check-equal? (topicset-union (list 'x) (list (// 'x 'y))) (list (// 'x 'y)))
(check-equal? (topicset-union (list 'y) (list (// 'x 'y))) (list (// 'x 'y)))
(check-equal? (topicset-union (list 'z) (list (// 'x 'y))) (list (// 'x 'y 'z)))
(check-equal? (// (// (list 'y)) (// (list 'y) (list 'x))) (list (// 'x 'y)))
(check-equal? (// (// (list 'x)) (// (list 'y) (list 'x))) (list (// 'x 'y)))
(check-equal? (// (// (list 'x) (list 'y)) (// (list 'x))) (list (// 'x 'y)))
(check-equal? (// (// (list 'x) (list 'y)) (// (list 'y))) (list (// 'x 'y)))
(check-equal? (// (// (list 'x) (list 'y)) (// (list 'x) (list 'y))) (list (// 'x 'y)))
(check-equal? (// (// (list 'x) (list 'y)) (// (list 'y) (list 'x))) (list (// 'x 'y)))
(check-equal? (// (// (list 'x) (vector 'y)) (list 'z)) (// (vector 'y) (list (// 'x 'z))))
(check-equal? (// (// (list 'x) (vector 'y)) (vector 'z)) (// (vector (// 'y 'z)) (list 'x)))
(check-equal? (// (// (list 'x) (vector 1 2)) (vector 'z))
(// (vector 'z) (vector 1 2) (list 'x)))
(check-equal? (// '(((x))) '(((y)))) (list (list (list (// 'x 'y)))))
(check-equal? (// (list 'x 'y) (list 'x __)) (list 'x __))
(check-equal? (// (list __ 'y) (list 'x __)) (list __ __))
(check-equal? (topicset-empty? 'x) #f)
(check-equal? (topicset-empty? __) #f)
(check-equal? (topicset-empty? (list 'x 'y)) #f)
(check-equal? (topicset-empty? (//)) #t)
(check-equal? (topicset-member? 'x 'x) #t)
(check-equal? (topicset-member? 'x 'y) #f)
(check-equal? (topicset-member? 'x (// 'x 'y)) #t)
(check-equal? (topicset-member? 'x __) #t)
(check-equal? (topicset-member? (list 'x) (list 'x)) #t)
(check-equal? (topicset-member? (list 'x) (cons 'x __)) #t)
(check-equal? (topicset-finite? __) #f)
(check-equal? (topicset-finite? (list 'a __)) #f)
(check-equal? (topicset-finite? (list 'a 'b)) #t)
(check-equal? (topicset-finite? (list 'a (// 'b 'c))) #t)
(check-equal? (topicset-finite? (// (list 'a (// 'b 'c)) 'd)) #t)
(check-equal? (topicset-finite? (// (vector 'a (// 'b 'c)) (cons __ __))) #f)
(check-equal? (topicset-finite? (// (list 'a (// 'b 'c)) __)) #f)