#lang racket/base (require racket/match) (require preserves) (require "main.rkt") (require "bag.rkt") (require "schemas/gen/box-protocol.rkt") (require "schemas/gen/dataspace.rkt") (define ((box ds LIMIT REPORT_EVERY) turn) (define value (turn-field! turn 'box-value 0)) (turn-assert/dataflow! turn ds (lambda (turn) (BoxState->preserves (BoxState (value))))) (define start-time (current-inexact-milliseconds)) (define prev-value 0) (turn-assert! turn ds (Observe->preserves (Observe 'SetBox (turn-ref turn (entity #:message (lambda (turn new-value-rec) (define new-value (SetBox-value new-value-rec)) (when (zero? (remainder new-value REPORT_EVERY)) (define end-time (current-inexact-milliseconds)) (define delta (/ (- end-time start-time) 1000.0)) (define count (- new-value prev-value)) (set! prev-value new-value) (set! start-time end-time) (log-info "Box got ~a (~a Hz)" new-value (/ count delta))) (when (= new-value LIMIT) (turn-stop-actor! turn)) (value new-value)))))))) (define ((client ds) turn) (turn-assert! turn ds (Observe->preserves (Observe 'BoxState (turn-ref turn (entity #:assert (lambda (turn current-value _handle) ;; (log-info "Client got ~a" current-value) (turn-message! turn ds (SetBox->preserves (SetBox (+ (BoxState-value current-value) 1)))))))))) (let ((count 0)) (turn-assert! turn ds (Observe->preserves (Observe 'BoxState (turn-ref turn (entity #:assert (lambda (turn current-value _handle) (set! count (+ count 1))) #:retract (lambda (turn _handle) (set! count (- count 1)) (when (zero? count) (log-info "Client detected box termination") (turn-stop-actor! turn)))))))))) (define (dataspace) (define handles (make-hash)) (define assertions (make-bag)) (define subscriptions (make-hash)) (entity #:assert (lambda (turn rec handle) (when (record? rec) (hash-set! handles handle rec) (when (eq? (bag-change! assertions rec +1) 'absent->present) (match (parse-Observe rec) [(? eof-object?) (void)] [(Observe label observer) (define seen (make-hash)) (hash-set! (hash-ref! subscriptions label make-hasheq) observer seen) (for [(existing (in-bag assertions))] (when (preserve=? (record-label existing) label) (hash-set! seen existing (turn-assert! turn observer existing))))]) (for [((observer seen) (in-hash (hash-ref subscriptions (record-label rec) '#hash())))] (unless (hash-has-key? seen rec) (hash-set! seen rec (turn-assert! turn observer rec))))))) #:retract (lambda (turn upstream-handle) (define rec (hash-ref handles upstream-handle #f)) (when rec (hash-remove! handles upstream-handle) (when (eq? (bag-change! assertions rec -1) 'present->absent) (for [(seen (in-hash-values (hash-ref subscriptions (record-label rec) '#hash())))] (turn-retract! turn (hash-ref seen rec)) (hash-remove! seen rec)) (match (parse-Observe rec) [(? eof-object?) (void)] [(Observe label observer) (let ((subscribers (hash-ref subscriptions label))) (hash-remove! subscribers observer) (when (hash-empty? subscribers) (hash-remove! subscriptions label)))])))) #:message (lambda (turn message) (when (record? message) (for [(peer (in-hash-keys (hash-ref subscriptions (record-label message) '#hash())))] (turn-message! turn peer message)))))) (module+ main (time (actor-system (lambda (turn) (actor-daemon! (facet-actor (turn-active-facet turn)) #t) (define disarm (facet-prevent-inert-check! (turn-active-facet turn))) (define ds (turn-ref turn (dataspace))) (turn-spawn! turn (box ds 500000 100000)) (turn-spawn! turn (client ds))))))