diff --git a/os2.rkt b/os2.rkt index ae74429..2c7efbd 100644 --- a/os2.rkt +++ b/os2.rkt @@ -14,6 +14,7 @@ topic-subscriber co-roles co-topics + topic-union (struct-out handlers) @@ -53,7 +54,10 @@ ;; process's PID and the second being an integer. (Except for the ;; ground-vm, where they're different because there aren't any PIDs.) -;; One endpoint, one topic. +;; One endpoint, one topic, with the caveat that as we are at present +;; unable to represent true topic unions, we actually store a *set* of +;; topics against each endpoint. The topic for the endpoint is to be +;; taken as the union of all the members in the set. ;; A Flow is a Topic that comes from the intersection of two dual ;; topics. @@ -66,10 +70,10 @@ pending-actions ;; QuasiQueue<(cons PID Action)> ) #:transparent) -;; An Endpoint is an (endpoint EID Topic Handlers), representing a +;; An Endpoint is an (endpoint EID Set Handlers), representing a ;; facet of a process responsible for playing a particular role (the ;; topic) in a conversation. -(struct endpoint (id topic handlers) #:transparent) +(struct endpoint (id topics handlers) #:transparent) ;; A Process is an Exists State . (process PID State ;; NonnegativeInteger Set), representing a VM process and its @@ -94,7 +98,7 @@ ;; Preactions. ;; Ks are various TrapKs or #f, signifying lack of interest. -(struct add-role (topic handlers k) #:prefab) +(struct add-role (topics handlers k) #:prefab) (struct delete-role (eid reason) #:prefab) (struct send-message (topic body) #:prefab) (struct spawn (thunk k) #:prefab) @@ -111,7 +115,7 @@ (define-syntax role (lambda (stx) (syntax-parse stx - [(_ topic-expr + [(_ topics-expr #:state state-pattern (~or (~optional (~seq #:on-presence presence) #:name "#:on-presence handler") (~optional (~seq #:on-absence absence) #:name "#:on-absence handler") @@ -137,7 +141,7 @@ [message-pattern clause-body ...] ... [_ state])])))]) - #'(add-role topic-expr + #'(add-role topics-expr (handlers presence-handler absence-handler message-handler) ready-handler))]))) @@ -145,7 +149,7 @@ ;; Smarter constructors for transitions and preactions. (define (make-transition state . actions) (transition state actions)) -(define (make-add-role topic handlers [k #f]) (add-role topic handlers k)) +(define (make-add-role topics handlers [k #f]) (add-role topics handlers k)) (define (make-delete-role eid [reason #f]) (delete-role eid reason)) (define (make-spawn thunk [k #f]) (spawn thunk k)) (define (make-kill [pid #f] [reason #f]) (kill pid reason)) @@ -174,6 +178,11 @@ (for/list ([co-role (co-roles (topic-role t))]) (struct-copy topic t [role co-role]))) +(define (topic-union . ts) + (unless (andmap topic? ts) + (error 'topic-union "Expects topics as arguments, but given ~v" ts)) + (list->set ts)) + (define (refine-topic remote-topic new-pattern) (struct-copy topic remote-topic [pattern new-pattern])) @@ -228,15 +237,23 @@ (with-handlers ([exn:fail? failure-proc]) (apply f args))) +(define (ensure-topic-union t) + (cond [(topic? t) (set t)] + [(set? t) t] + [else + (error 'ensure-topic-union + "Expected either a single topic or a set of topics; got ~v" + t)])) + (define (perform-action pid preaction state) (match preaction - [(add-role topic hs k) (do-subscribe pid topic hs k state)] + [(add-role topics hs k) (do-subscribe pid (ensure-topic-union topics) hs k state)] [(delete-role eid reason) (do-unsubscribe pid eid reason state)] [(send-message topic body) (route-and-deliver topic body state)] [(spawn thunk k) (do-spawn pid thunk k state)] [(kill pid-to-kill reason) (do-kill (or pid-to-kill pid) reason state)])) -(define (do-subscribe pid topic hs k state) +(define (do-subscribe pid topics hs k state) (cond [(hash-has-key? (vm-processes state) pid) (define old-process (hash-ref (vm-processes state) pid)) @@ -246,7 +263,9 @@ ([(matching-pid p) (in-hash (vm-processes state))] [matching-eid (in-set (process-endpoints p))] [e (in-value (hash-ref (vm-endpoints state) matching-eid))] - [matching-topic (in-value (endpoint-topic e))] + [matching-topics (in-value (endpoint-topics e))] + [matching-topic (in-set matching-topics)] + [topic (in-set topics)] [flow-pattern (in-value (topic-intersection topic matching-topic))] #:when flow-pattern) (define inbound-flow (refine-topic matching-topic flow-pattern)) @@ -276,7 +295,7 @@ [endpoints (hash-set (vm-endpoints state) new-eid (endpoint new-eid - topic + topics hs))])] [else state])) @@ -284,7 +303,7 @@ (cond [(hash-has-key? (vm-endpoints state) eid) (define endpoint-to-remove (hash-ref (vm-endpoints state) eid)) - (define removed-topic (endpoint-topic endpoint-to-remove)) + (define removed-topics (endpoint-topics endpoint-to-remove)) (define old-process (hash-ref (vm-processes state) pid)) (define new-process (struct-copy process old-process [endpoints (set-remove (process-endpoints old-process) eid)])) @@ -295,7 +314,9 @@ ([(matching-pid p) (in-hash (vm-processes state))] [matching-eid (in-set (process-endpoints p))] [e (in-value (hash-ref (vm-endpoints state) matching-eid))] - [matching-topic (in-value (endpoint-topic e))] + [matching-topics (in-value (endpoint-topics e))] + [matching-topic (in-set matching-topics)] + [removed-topic (in-set removed-topics)] [flow-pattern (in-value (topic-intersection removed-topic matching-topic))] #:when flow-pattern) (define outbound-flow (refine-topic removed-topic flow-pattern)) @@ -312,7 +333,8 @@ (for*/set ([(matching-pid p) (in-hash (vm-processes state))] [matching-eid (in-set (process-endpoints p))] [e (in-value (hash-ref (vm-endpoints state) matching-eid))] - [matching-topic (in-value (endpoint-topic e))] + [matching-topics (in-value (endpoint-topics e))] + [matching-topic (in-set matching-topics)] [flow-pattern (in-value (topic-intersection message-topic matching-topic))] #:when flow-pattern) (cons matching-pid e))) @@ -384,8 +406,8 @@ (define (transform-meta-action pid preaction) (match preaction - [(add-role topic hs k) - (add-role topic + [(add-role topics hs k) + (add-role topics (handlers (wrap-trapk pid (handlers-presence hs)) (wrap-trapk pid (handlers-absence hs)) (wrap-trapk pid (handlers-message hs))) @@ -406,11 +428,12 @@ (when (not (null? actions)) (error 'ground-vm "No meta-actions available in ground-vm: ~v" actions)) (define waiting? (null? (vm-pending-actions state))) - (define active-events (for/list ([(eid e) (in-hash (vm-endpoints state))] - #:when (and (evt? (topic-pattern (endpoint-topic e))) - (eq? (topic-role (endpoint-topic e)) - 'subscriber))) - (define evt (topic-pattern (endpoint-topic e))) + (define active-events (for*/list ([(eid e) (in-hash (vm-endpoints state))] + [topic (in-set (endpoint-topics e))] + #:when (and (evt? (topic-pattern topic)) + (eq? (topic-role topic) + 'subscriber))) + (define evt (topic-pattern topic)) (wrap-evt evt (lambda (message) (lambda (state) (route-and-deliver (topic-publisher evt)