110 lines
5.9 KiB
Racket
110 lines
5.9 KiB
Racket
#lang racket/base
|
|
|
|
(require racket/match)
|
|
(require preserves)
|
|
|
|
(require "main.rkt")
|
|
(require "bag.rkt")
|
|
(require "schemas/gen/box-protocol.rkt")
|
|
(require "schemas/gen/dataspace.rkt")
|
|
|
|
(define ((box ds LIMIT REPORT_EVERY) turn)
|
|
(define value (turn-field! turn 'box-value 0))
|
|
(turn-assert/dataflow! turn ds (lambda (turn) (BoxState->preserves (BoxState (value)))))
|
|
(define start-time (current-inexact-milliseconds))
|
|
(define prev-value 0)
|
|
(turn-assert! turn ds
|
|
(Observe->preserves
|
|
(Observe 'SetBox
|
|
(turn-ref turn
|
|
(entity #:message
|
|
(lambda (turn new-value-rec)
|
|
(define new-value (SetBox-value new-value-rec))
|
|
(when (zero? (remainder new-value REPORT_EVERY))
|
|
(define end-time (current-inexact-milliseconds))
|
|
(define delta (/ (- end-time start-time) 1000.0))
|
|
(define count (- new-value prev-value))
|
|
(set! prev-value new-value)
|
|
(set! start-time end-time)
|
|
(log-info "Box got ~a (~a Hz)"
|
|
new-value
|
|
(/ count delta)))
|
|
(when (= new-value LIMIT)
|
|
(turn-stop-actor! turn))
|
|
(value new-value))))))))
|
|
|
|
(define ((client ds) turn)
|
|
(turn-assert! turn ds
|
|
(Observe->preserves
|
|
(Observe 'BoxState
|
|
(turn-ref turn
|
|
(entity #:assert
|
|
(lambda (turn current-value _handle)
|
|
;; (log-info "Client got ~a" current-value)
|
|
(turn-message! turn ds
|
|
(SetBox->preserves
|
|
(SetBox
|
|
(+ (BoxState-value current-value)
|
|
1))))))))))
|
|
(let ((count 0))
|
|
(turn-assert! turn ds
|
|
(Observe->preserves
|
|
(Observe 'BoxState
|
|
(turn-ref turn
|
|
(entity #:assert
|
|
(lambda (turn current-value _handle)
|
|
(set! count (+ count 1)))
|
|
#:retract
|
|
(lambda (turn _handle)
|
|
(set! count (- count 1))
|
|
(when (zero? count)
|
|
(log-info "Client detected box termination")
|
|
(turn-stop-actor! turn))))))))))
|
|
|
|
(define (dataspace)
|
|
(define handles (make-hash))
|
|
(define assertions (make-bag))
|
|
(define subscriptions (make-hash))
|
|
(entity #:assert (lambda (turn rec handle)
|
|
(when (record? rec)
|
|
(hash-set! handles handle rec)
|
|
(when (eq? (bag-change! assertions rec +1) 'absent->present)
|
|
(match (parse-Observe rec)
|
|
[(? eof-object?) (void)]
|
|
[(Observe label observer)
|
|
(define seen (make-hash))
|
|
(hash-set! (hash-ref! subscriptions label make-hasheq) observer seen)
|
|
(for [(existing (in-bag assertions))]
|
|
(when (preserve=? (record-label existing) label)
|
|
(hash-set! seen existing (turn-assert! turn observer existing))))])
|
|
(for [((observer seen) (in-hash (hash-ref subscriptions (record-label rec) '#hash())))]
|
|
(unless (hash-has-key? seen rec)
|
|
(hash-set! seen rec (turn-assert! turn observer rec)))))))
|
|
#:retract (lambda (turn upstream-handle)
|
|
(define rec (hash-ref handles upstream-handle #f))
|
|
(when rec
|
|
(hash-remove! handles upstream-handle)
|
|
(when (eq? (bag-change! assertions rec -1) 'present->absent)
|
|
(for [(seen (in-hash-values (hash-ref subscriptions (record-label rec) '#hash())))]
|
|
(turn-retract! turn (hash-ref seen rec))
|
|
(hash-remove! seen rec))
|
|
(match (parse-Observe rec)
|
|
[(? eof-object?) (void)]
|
|
[(Observe label observer)
|
|
(let ((subscribers (hash-ref subscriptions label)))
|
|
(hash-remove! subscribers observer)
|
|
(when (hash-empty? subscribers)
|
|
(hash-remove! subscriptions label)))]))))
|
|
#:message (lambda (turn message)
|
|
(when (record? message)
|
|
(for [(peer (in-hash-keys (hash-ref subscriptions (record-label message) '#hash())))]
|
|
(turn-message! turn peer message))))))
|
|
|
|
(actor-system
|
|
(lambda (turn)
|
|
(actor-daemon! (facet-actor (turn-active-facet turn)) #t)
|
|
(define disarm (facet-prevent-inert-check! (turn-active-facet turn)))
|
|
(define ds (turn-ref turn (dataspace)))
|
|
(turn-spawn! turn (box ds 500000 100000))
|
|
(turn-spawn! turn (client ds))))
|