#lang racket/base (require racket/match) (require preserves) (require (except-in "main.rkt" actor-system)) (require "bag.rkt") (require "schemas/gen/box-protocol.rkt") (require "schemas/gen/dataspace.rkt") (require "syntax.rkt") (define box (action (ds LIMIT REPORT_EVERY) (spawn (define root-facet this-facet) (define-field value 0) (define start-time (current-inexact-milliseconds)) (define prev-value 0) (at ds (assert (BoxState (value))) (when (message (SetBox $new-value)) (when (zero? (remainder new-value REPORT_EVERY)) (define end-time (current-inexact-milliseconds)) (define delta (/ (- end-time start-time) 1000.0)) (define count (- new-value prev-value)) (set! prev-value new-value) (set! start-time end-time) (log-info "Box got ~a (~a Hz)" new-value (/ count delta))) (when (= new-value LIMIT) (stop-facet root-facet)) (value new-value)))))) (define client (action (ds) (spawn (define root-facet this-facet) (define count 0) (at ds (when (asserted (BoxState $value)) (send! ds (SetBox->preserves (SetBox (+ value 1))))) ;; (during (BoxState _) ;; (on-start (set! count (+ count 1))) ;; (on-stop (set! count (- count 1)) ;; (when (zero? count) ;; (log-info "Client detected box termination") ;; (stop-facet root-facet)))) (assert (Observe 'BoxState (ref (entity #:assert (action (_v _h) (set! count (+ count 1))) #:retract (action (_h) (set! count (- count 1)) (when (zero? count) (log-info "Client detected box termination") (stop-facet root-facet))))))) ;; (during (BoxState _) ;; (on-stop (log-info "Client detected box termination") ;; (stop-facet root-facet))) )))) (define (dataspace) (define handles (make-hash)) (define assertions (make-bag)) (define subscriptions (make-hash)) (entity #:assert (action (rec handle) (when (record? rec) (hash-set! handles handle rec) (when (eq? (bag-change! assertions rec +1) 'absent->present) (match (parse-Observe rec) [(? eof-object?) (void)] [(Observe label observer) (define seen (make-hash)) (hash-set! (hash-ref! subscriptions label make-hasheq) observer seen) (for [(existing (in-bag assertions))] (when (preserve=? (record-label existing) label) (hash-set! seen existing (turn-assert! this-turn observer existing))))]) (for [((observer seen) (in-hash (hash-ref subscriptions (record-label rec) '#hash())))] (unless (hash-has-key? seen rec) (hash-set! seen rec (turn-assert! this-turn observer rec))))))) #:retract (action (upstream-handle) (define rec (hash-ref handles upstream-handle #f)) (when rec (hash-remove! handles upstream-handle) (when (eq? (bag-change! assertions rec -1) 'present->absent) (for [(seen (in-hash-values (hash-ref subscriptions (record-label rec) '#hash())))] (turn-retract! this-turn (hash-ref seen rec)) (hash-remove! seen rec)) (match (parse-Observe rec) [(? eof-object?) (void)] [(Observe label observer) (let ((subscribers (hash-ref subscriptions label))) (hash-remove! subscribers observer) (when (hash-empty? subscribers) (hash-remove! subscriptions label)))])))) #:message (action (message) (when (record? message) (for [(peer (in-hash-keys (hash-ref subscriptions (record-label message) '#hash())))] (turn-message! this-turn peer message)))))) (module+ main (time (actor-system (define disarm (facet-prevent-inert-check! this-facet)) (define ds (ref (dataspace))) (box this-turn ds 500000 100000) (client this-turn ds))))