Remove poorly-thought-out distinction between message topic and message body

This commit is contained in:
Tony Garnock-Jones 2012-04-26 13:05:19 -04:00
parent bb24f19317
commit 062aa9f1e3
2 changed files with 27 additions and 18 deletions

View File

@ -102,9 +102,10 @@
(transition state (transition state
(delete-role 'time-listener) (delete-role 'time-listener)
(and next (and next
(role 'time-listener (topic-subscriber (timer-evt (pending-timer-deadline next))) (role 'time-listener (topic-subscriber (cons (timer-evt (pending-timer-deadline next))
(wild)))
#:state state #:state state
[now [(cons (? evt?) now)
(define to-send (fire-timers! (driver-state-heap state) now)) (define to-send (fire-timers! (driver-state-heap state) now))
;; Note: compute to-send before recursing, because of side-effects on heap ;; Note: compute to-send before recursing, because of side-effects on heap
(extend-transition (update-time-listener! state) to-send)])))) (extend-transition (update-time-listener! state) to-send)]))))

40
os2.rkt
View File

@ -71,6 +71,11 @@
;; A Flow is a Topic that comes from the intersection of two dual ;; A Flow is a Topic that comes from the intersection of two dual
;; topics. ;; topics.
;; A sent message includes a "body" and a "role", and is equivalent to
;; a non-virtual topic with that role and with the given "body" as a
;; pattern. In a sense, topics quite literally are patterns over
;; entire messages.
;; A QuasiQueue<X> is a list of Xs in *reversed* order. ;; A QuasiQueue<X> is a list of Xs in *reversed* order.
(struct vm (processes ;; Hash<PID, Process> (struct vm (processes ;; Hash<PID, Process>
@ -114,8 +119,8 @@
;; (delete-role PreEID Any) ;; (delete-role PreEID Any)
(struct delete-role (pre-eid reason) #:prefab) (struct delete-role (pre-eid reason) #:prefab)
;; ;;
;; (send-message Any Topic) ;; (send-message Any Role)
(struct send-message (body topic) #:prefab) (struct send-message (body role) #:prefab)
;; ;;
;; (spawn BootK Maybe<TrapK<PID>>) ;; (spawn BootK Maybe<TrapK<PID>>)
(struct spawn (main k) #:prefab) (struct spawn (main k) #:prefab)
@ -167,7 +172,7 @@
(define (make-transition state . actions) (transition state actions)) (define (make-transition state . actions) (transition state actions))
(define make-add-role add-role) ;; no special treatment required at present (define make-add-role add-role) ;; no special treatment required at present
(define (make-delete-role pre-eid [reason #f]) (delete-role pre-eid reason)) (define (make-delete-role pre-eid [reason #f]) (delete-role pre-eid reason))
(define (make-send-message body [topic (topic-publisher body)]) (send-message body topic)) (define (make-send-message body [role 'publisher]) (send-message body role))
(define (make-spawn main [k #f]) (spawn main k)) (define (make-spawn main [k #f]) (spawn main k))
(define (make-kill [pid #f] [reason #f]) (kill pid reason)) (define (make-kill [pid #f] [reason #f]) (kill pid reason))
@ -267,7 +272,7 @@
(match preaction (match preaction
[(add-role pre-eid topics hs) (do-subscribe pid pre-eid (ensure-topic-union topics) hs state)] [(add-role pre-eid topics hs) (do-subscribe pid pre-eid (ensure-topic-union topics) hs state)]
[(delete-role pre-eid reason) (do-unsubscribe pid pre-eid reason state)] [(delete-role pre-eid reason) (do-unsubscribe pid pre-eid reason state)]
[(send-message body topic) (route-and-deliver topic body state)] [(send-message body role) (route-and-deliver role body state)]
[(spawn main k) (do-spawn pid main k state)] [(spawn main k) (do-spawn pid main k state)]
[(kill pid-to-kill reason) (do-kill (or pid-to-kill pid) reason state)])) [(kill pid-to-kill reason) (do-kill (or pid-to-kill pid) reason state)]))
@ -345,7 +350,8 @@
(define absence-handler (handlers-absence (endpoint-handlers e))) (define absence-handler (handlers-absence (endpoint-handlers e)))
(run-trapk state matching-pid absence-handler outbound-flow reason))) (run-trapk state matching-pid absence-handler outbound-flow reason)))
(define (route-and-deliver message-topic body state) (define (route-and-deliver role body state)
(define message-topic (topic role body #f))
(define endpoints (define endpoints
(for*/set ([(matching-pid p) (in-hash (vm-processes state))] (for*/set ([(matching-pid p) (in-hash (vm-processes state))]
[matching-eid (in-set (process-endpoints p))] [matching-eid (in-set (process-endpoints p))]
@ -454,17 +460,19 @@
(when (not (null? actions)) (when (not (null? actions))
(error 'ground-vm "No meta-actions available in ground-vm: ~v" actions)) (error 'ground-vm "No meta-actions available in ground-vm: ~v" actions))
(define waiting? (null? (vm-pending-actions state))) (define waiting? (null? (vm-pending-actions state)))
(define active-events (for*/list ([(eid e) (in-hash (vm-endpoints state))] (define active-events
[topic (in-set (endpoint-topics e))] (for*/fold ([acc '()])
#:when (and (evt? (topic-pattern topic)) ([(eid e) (in-hash (vm-endpoints state))]
(eq? (topic-role topic) [active-topic (in-set (endpoint-topics e))])
'subscriber))) (match active-topic
(define evt (topic-pattern topic)) [(topic 'subscriber (cons (? evt? evt) _) #f)
(wrap-evt evt (lambda (message) (cons (wrap-evt evt (lambda (message)
(lambda (state) (lambda (state)
(route-and-deliver (topic-publisher evt) (route-and-deliver 'publisher
message (cons evt message)
state)))))) state))))
acc)]
[_ acc])))
(if (and waiting? (null? active-events)) (if (and waiting? (null? active-events))
'done ;; About to block, and nothing can wake us 'done ;; About to block, and nothing can wake us
(let ((interruptk (apply sync (let ((interruptk (apply sync