Simulate topic unions via sets of topics at each endpoint.

This commit is contained in:
Tony Garnock-Jones 2012-04-12 14:05:25 -04:00
parent 4155f63c28
commit 97abcf91d2
1 changed files with 44 additions and 21 deletions

65
os2.rkt
View File

@ -14,6 +14,7 @@
topic-subscriber
co-roles
co-topics
topic-union
(struct-out handlers)
@ -53,7 +54,10 @@
;; process's PID and the second being an integer. (Except for the
;; ground-vm, where they're different because there aren't any PIDs.)
;; One endpoint, one topic.
;; One endpoint, one topic, with the caveat that as we are at present
;; unable to represent true topic unions, we actually store a *set* of
;; topics against each endpoint. The topic for the endpoint is to be
;; taken as the union of all the members in the set.
;; A Flow is a Topic that comes from the intersection of two dual
;; topics.
@ -66,10 +70,10 @@
pending-actions ;; QuasiQueue<(cons PID Action)>
) #:transparent)
;; An Endpoint is an (endpoint EID Topic Handlers), representing a
;; An Endpoint is an (endpoint EID Set<Topic> Handlers), representing a
;; facet of a process responsible for playing a particular role (the
;; topic) in a conversation.
(struct endpoint (id topic handlers) #:transparent)
(struct endpoint (id topics handlers) #:transparent)
;; A Process is an Exists State . (process PID State
;; NonnegativeInteger Set<EID>), representing a VM process and its
@ -94,7 +98,7 @@
;; Preactions.
;; Ks are various TrapKs or #f, signifying lack of interest.
(struct add-role (topic handlers k) #:prefab)
(struct add-role (topics handlers k) #:prefab)
(struct delete-role (eid reason) #:prefab)
(struct send-message (topic body) #:prefab)
(struct spawn (thunk k) #:prefab)
@ -111,7 +115,7 @@
(define-syntax role
(lambda (stx)
(syntax-parse stx
[(_ topic-expr
[(_ topics-expr
#:state state-pattern
(~or (~optional (~seq #:on-presence presence) #:name "#:on-presence handler")
(~optional (~seq #:on-absence absence) #:name "#:on-absence handler")
@ -137,7 +141,7 @@
[message-pattern clause-body ...]
...
[_ state])])))])
#'(add-role topic-expr
#'(add-role topics-expr
(handlers presence-handler absence-handler message-handler)
ready-handler))])))
@ -145,7 +149,7 @@
;; Smarter constructors for transitions and preactions.
(define (make-transition state . actions) (transition state actions))
(define (make-add-role topic handlers [k #f]) (add-role topic handlers k))
(define (make-add-role topics handlers [k #f]) (add-role topics handlers k))
(define (make-delete-role eid [reason #f]) (delete-role eid reason))
(define (make-spawn thunk [k #f]) (spawn thunk k))
(define (make-kill [pid #f] [reason #f]) (kill pid reason))
@ -174,6 +178,11 @@
(for/list ([co-role (co-roles (topic-role t))])
(struct-copy topic t [role co-role])))
(define (topic-union . ts)
(unless (andmap topic? ts)
(error 'topic-union "Expects topics as arguments, but given ~v" ts))
(list->set ts))
(define (refine-topic remote-topic new-pattern)
(struct-copy topic remote-topic [pattern new-pattern]))
@ -228,15 +237,23 @@
(with-handlers ([exn:fail? failure-proc])
(apply f args)))
(define (ensure-topic-union t)
(cond [(topic? t) (set t)]
[(set? t) t]
[else
(error 'ensure-topic-union
"Expected either a single topic or a set of topics; got ~v"
t)]))
(define (perform-action pid preaction state)
(match preaction
[(add-role topic hs k) (do-subscribe pid topic hs k state)]
[(add-role topics hs k) (do-subscribe pid (ensure-topic-union topics) hs k state)]
[(delete-role eid reason) (do-unsubscribe pid eid reason state)]
[(send-message topic body) (route-and-deliver topic body state)]
[(spawn thunk k) (do-spawn pid thunk k state)]
[(kill pid-to-kill reason) (do-kill (or pid-to-kill pid) reason state)]))
(define (do-subscribe pid topic hs k state)
(define (do-subscribe pid topics hs k state)
(cond
[(hash-has-key? (vm-processes state) pid)
(define old-process (hash-ref (vm-processes state) pid))
@ -246,7 +263,9 @@
([(matching-pid p) (in-hash (vm-processes state))]
[matching-eid (in-set (process-endpoints p))]
[e (in-value (hash-ref (vm-endpoints state) matching-eid))]
[matching-topic (in-value (endpoint-topic e))]
[matching-topics (in-value (endpoint-topics e))]
[matching-topic (in-set matching-topics)]
[topic (in-set topics)]
[flow-pattern (in-value (topic-intersection topic matching-topic))]
#:when flow-pattern)
(define inbound-flow (refine-topic matching-topic flow-pattern))
@ -276,7 +295,7 @@
[endpoints (hash-set (vm-endpoints state)
new-eid
(endpoint new-eid
topic
topics
hs))])]
[else state]))
@ -284,7 +303,7 @@
(cond
[(hash-has-key? (vm-endpoints state) eid)
(define endpoint-to-remove (hash-ref (vm-endpoints state) eid))
(define removed-topic (endpoint-topic endpoint-to-remove))
(define removed-topics (endpoint-topics endpoint-to-remove))
(define old-process (hash-ref (vm-processes state) pid))
(define new-process (struct-copy process old-process
[endpoints (set-remove (process-endpoints old-process) eid)]))
@ -295,7 +314,9 @@
([(matching-pid p) (in-hash (vm-processes state))]
[matching-eid (in-set (process-endpoints p))]
[e (in-value (hash-ref (vm-endpoints state) matching-eid))]
[matching-topic (in-value (endpoint-topic e))]
[matching-topics (in-value (endpoint-topics e))]
[matching-topic (in-set matching-topics)]
[removed-topic (in-set removed-topics)]
[flow-pattern (in-value (topic-intersection removed-topic matching-topic))]
#:when flow-pattern)
(define outbound-flow (refine-topic removed-topic flow-pattern))
@ -312,7 +333,8 @@
(for*/set ([(matching-pid p) (in-hash (vm-processes state))]
[matching-eid (in-set (process-endpoints p))]
[e (in-value (hash-ref (vm-endpoints state) matching-eid))]
[matching-topic (in-value (endpoint-topic e))]
[matching-topics (in-value (endpoint-topics e))]
[matching-topic (in-set matching-topics)]
[flow-pattern (in-value (topic-intersection message-topic matching-topic))]
#:when flow-pattern)
(cons matching-pid e)))
@ -384,8 +406,8 @@
(define (transform-meta-action pid preaction)
(match preaction
[(add-role topic hs k)
(add-role topic
[(add-role topics hs k)
(add-role topics
(handlers (wrap-trapk pid (handlers-presence hs))
(wrap-trapk pid (handlers-absence hs))
(wrap-trapk pid (handlers-message hs)))
@ -406,11 +428,12 @@
(when (not (null? actions))
(error 'ground-vm "No meta-actions available in ground-vm: ~v" actions))
(define waiting? (null? (vm-pending-actions state)))
(define active-events (for/list ([(eid e) (in-hash (vm-endpoints state))]
#:when (and (evt? (topic-pattern (endpoint-topic e)))
(eq? (topic-role (endpoint-topic e))
'subscriber)))
(define evt (topic-pattern (endpoint-topic e)))
(define active-events (for*/list ([(eid e) (in-hash (vm-endpoints state))]
[topic (in-set (endpoint-topics e))]
#:when (and (evt? (topic-pattern topic))
(eq? (topic-role topic)
'subscriber)))
(define evt (topic-pattern topic))
(wrap-evt evt (lambda (message)
(lambda (state)
(route-and-deliver (topic-publisher evt)