From a890a7147be96d2f40df09bc935135424ef13eb1 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Wed, 10 Aug 2016 19:04:08 -0400 Subject: [PATCH] Queue implementations without credit tracking --- .../examples/actor/queue-no-credit.rkt | 106 ++++++++++++++++++ .../examples/actor/queue-no-credit2.rkt | 105 +++++++++++++++++ 2 files changed, 211 insertions(+) create mode 100644 racket/syndicate/examples/actor/queue-no-credit.rkt create mode 100644 racket/syndicate/examples/actor/queue-no-credit2.rkt diff --git a/racket/syndicate/examples/actor/queue-no-credit.rkt b/racket/syndicate/examples/actor/queue-no-credit.rkt new file mode 100644 index 0000000..db72b5d --- /dev/null +++ b/racket/syndicate/examples/actor/queue-no-credit.rkt @@ -0,0 +1,106 @@ +#lang syndicate/actor +;; A Queue with no flow control. + +(require racket/set) +(require syndicate/functional-queue) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Queue Protocol + +;; Assertion. Scopes flow from source to target. +(struct subscription (source target) #:prefab) + +;; Message. Delivery from source to target. +(struct delivery (source target body) #:prefab) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Metrics Protocol + +;; Assertion. Describes some attribute of monitoringish interest. +(struct metric (key value) #:prefab) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Utilities + +;; (Fieldof (Queueof X)) -> X +;; EFFECT: Changes f. +;; EFFECT: Error if f contains the empty queue. +(define (deq! f) + (define-values (item remainder) (dequeue (f))) + (f remainder) + item) + +;; (Fieldof (Queueof X)) X -> Void +;; EFFECT: Changes f. +(define (enq! f v) + (f (enqueue (f) v))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Queue Implementation + +;; EFFECT: Spawn a queue process named `queue-id`. +(define (spawn-queue queue-id) + (actor #:name (list 'queue queue-id) + (react (field [waiters (make-queue)]) + (field [messages (make-queue)]) + + (define/query-set subscribers (subscription queue-id $who) who + #:on-add (enq! waiters who)) + + (on (message (delivery $who queue-id $body)) + (enq! messages body)) + + (begin/dataflow + (when (and (not (queue-empty? (waiters))) + (not (queue-empty? (messages)))) + (define who (deq! waiters)) + (when (set-member? (subscribers) who) ;; lazily remove entries from waiters + (enq! waiters who) + (define msg (deq! messages)) + (log-info "~a: sending ~a message ~a" queue-id who msg) + (send! (delivery queue-id who msg))))) + + (assert (metric (list 'subscriber-count queue-id) (set-count (subscribers)))) + (assert (metric (list 'backlog queue-id) (queue-length (messages)))) + + ;;------------------------------------------------------------ + + (local-require (submod syndicate/actor priorities)) + (begin/dataflow #:priority *idle-priority* ;; Check invariants + (define has-waiters? (not (queue-empty? (waiters)))) + (define has-messages? (not (queue-empty? (messages)))) + (unless (and (or (not has-waiters?) (not has-messages?)) + (or (not has-messages?) (not has-waiters?))) + (error 'queue + "~a: invariant violated: ~v" + queue-id + `((has-waiters? ,has-waiters?) + (has-messages? ,has-messages?) + (waiters ,(queue->list (waiters))) + (messages ,(queue->list (messages)))))))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Example + +(define (spawn-consumer consumer-id #:variant [variant 'normal]) + (actor #:name (list 'consumer consumer-id) + (react (assert (subscription 'q consumer-id)) + (on (message (delivery 'q consumer-id $body)) + (log-info "Consumer ~a got: ~a" consumer-id body) + (when (eq? variant 'crashy) + (error consumer-id + "Hark, canst thou hear me? I will play the swan / and die in music.")))))) + +(actor (react (define/query-hash metrics (metric $k $v) k v) + (begin/dataflow (log-info " ~a" (hash->list (metrics)))))) + +(spawn-queue 'q) +(spawn-consumer 'c1) +(spawn-consumer 'c2 #:variant 'crashy) +(spawn-consumer 'c3) + +(actor (until (asserted (observe (delivery _ 'q _)))) + (for ((n (in-range 10))) + (send! (delivery #f 'q n)) + ;; (flush!) + )) diff --git a/racket/syndicate/examples/actor/queue-no-credit2.rkt b/racket/syndicate/examples/actor/queue-no-credit2.rkt new file mode 100644 index 0000000..d837a3f --- /dev/null +++ b/racket/syndicate/examples/actor/queue-no-credit2.rkt @@ -0,0 +1,105 @@ +#lang syndicate/actor +;; A Queue with no flow control. + +(require racket/set) +(require syndicate/functional-queue) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Queue Protocol + +;; Assertion. Scopes flow from source to target. +(struct subscription (source target) #:prefab) + +;; Message. Delivery from source to target. +(struct delivery (source target body) #:prefab) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Metrics Protocol + +;; Assertion. Describes some attribute of monitoringish interest. +(struct metric (key value) #:prefab) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Utilities + +;; (Fieldof (Queueof X)) -> X +;; EFFECT: Changes f. +;; EFFECT: Error if f contains the empty queue. +(define (deq! f) + (define-values (item remainder) (dequeue (f))) + (f remainder) + item) + +;; (Fieldof (Queueof X)) X -> Void +;; EFFECT: Changes f. +(define (enq! f v) + (f (enqueue (f) v))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Queue Implementation + +;; EFFECT: Spawn a queue process named `queue-id`. +(define (spawn-queue queue-id) + (actor #:name (list 'queue queue-id) + (react (field [waiters (make-queue)]) + (field [messages (make-queue)]) + + (on (asserted (subscription queue-id $who)) (enq! waiters who)) + (on (retracted (subscription queue-id $who)) (waiters (queue-remove who (waiters)))) + + (on (message (delivery $who queue-id $body)) + (enq! messages body)) + + (begin/dataflow + (when (and (not (queue-empty? (waiters))) + (not (queue-empty? (messages)))) + (define who (deq! waiters)) + (define msg (deq! messages)) + (log-info "~a: sending ~a message ~a" queue-id who msg) + (send! (delivery queue-id who msg)) + (enq! waiters who))) + + (assert (metric (list 'subscriber-count queue-id) (queue-length (waiters)))) + (assert (metric (list 'backlog queue-id) (queue-length (messages)))) + + ;;------------------------------------------------------------ + + (local-require (submod syndicate/actor priorities)) + (begin/dataflow #:priority *idle-priority* ;; Check invariants + (define has-waiters? (not (queue-empty? (waiters)))) + (define has-messages? (not (queue-empty? (messages)))) + (unless (and (or (not has-waiters?) (not has-messages?)) + (or (not has-messages?) (not has-waiters?))) + (error 'queue + "~a: invariant violated: ~v" + queue-id + `((has-waiters? ,has-waiters?) + (has-messages? ,has-messages?) + (waiters ,(queue->list (waiters))) + (messages ,(queue->list (messages)))))))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Example + +(define (spawn-consumer consumer-id #:variant [variant 'normal]) + (actor #:name (list 'consumer consumer-id) + (react (assert (subscription 'q consumer-id)) + (on (message (delivery 'q consumer-id $body)) + (log-info "Consumer ~a got: ~a" consumer-id body) + (when (eq? variant 'crashy) + (error consumer-id + "Hark, canst thou hear me? I will play the swan / and die in music.")))))) + +(actor (react (define/query-hash metrics (metric $k $v) k v) + (begin/dataflow (log-info " ~a" (hash->list (metrics)))))) + +(spawn-queue 'q) +(spawn-consumer 'c1) +(spawn-consumer 'c2 #:variant 'crashy) +(spawn-consumer 'c3) + +(actor (until (asserted (observe (delivery _ 'q _)))) + (for ((n (in-range 10))) + (send! (delivery #f 'q n)) + (when (odd? n) (flush!)) + ))