syndicate-rkt/syndicate/go.rkt

108 lines
5.1 KiB
Racket
Raw Normal View History

2021-05-27 09:28:10 +00:00
#lang racket/base
(require racket/match)
(require preserves)
2021-06-01 08:04:10 +00:00
(require (except-in "main.rkt" actor-system))
(require "bag.rkt")
(require "schemas/gen/box-protocol.rkt")
(require "schemas/gen/dataspace.rkt")
2021-06-01 08:04:10 +00:00
(require "syntax.rkt")
2021-06-01 08:04:10 +00:00
(define box
(action (ds LIMIT REPORT_EVERY)
(spawn (define root-facet this-facet)
(define-field value 0)
(define start-time (current-inexact-milliseconds))
(define prev-value 0)
(at ds
(assert (BoxState->preserves (BoxState (value))))
(when (message (SetBox new-value))
(when (zero? (remainder new-value REPORT_EVERY))
(define end-time (current-inexact-milliseconds))
(define delta (/ (- end-time start-time) 1000.0))
(define count (- new-value prev-value))
(set! prev-value new-value)
(set! start-time end-time)
(log-info "Box got ~a (~a Hz)" new-value (/ count delta)))
(when (= new-value LIMIT)
(stop-facet root-facet))
(value new-value))))))
(define client
(action (ds)
(spawn (define root-facet this-facet)
(define count 0)
(at ds
(when (asserted (BoxState value))
(send! ds (SetBox->preserves (SetBox (+ value 1)))))
;; (during (BoxState _)
;; (on-start (set! count (+ count 1)))
;; (on-stop (set! count (- count 1))
;; (when (zero? count)
;; (log-info "Client detected box termination")
;; (stop-facet root-facet))))
(assert (Observe->preserves
(Observe 'BoxState
(ref (entity #:assert
(action (_v _h)
2021-05-28 08:33:02 +00:00
(set! count (+ count 1)))
#:retract
2021-06-01 08:04:10 +00:00
(action (_h)
2021-05-28 08:33:02 +00:00
(set! count (- count 1))
(when (zero? count)
(log-info "Client detected box termination")
2021-06-01 08:04:10 +00:00
(stop-facet root-facet))))))))
;; (during (BoxState _)
;; (on-stop (log-info "Client detected box termination")
;; (stop-facet root-facet)))
))))
(define (dataspace)
(define handles (make-hash))
(define assertions (make-bag))
(define subscriptions (make-hash))
2021-06-01 08:04:10 +00:00
(entity #:assert (action (rec handle)
(when (record? rec)
(hash-set! handles handle rec)
(when (eq? (bag-change! assertions rec +1) 'absent->present)
(match (parse-Observe rec)
[(? eof-object?) (void)]
[(Observe label observer)
(define seen (make-hash))
(hash-set! (hash-ref! subscriptions label make-hasheq) observer seen)
(for [(existing (in-bag assertions))]
(when (preserve=? (record-label existing) label)
2021-06-01 08:04:10 +00:00
(hash-set! seen existing (turn-assert! this-turn observer existing))))])
(for [((observer seen) (in-hash (hash-ref subscriptions (record-label rec) '#hash())))]
(unless (hash-has-key? seen rec)
2021-06-01 08:04:10 +00:00
(hash-set! seen rec (turn-assert! this-turn observer rec)))))))
#:retract (action (upstream-handle)
(define rec (hash-ref handles upstream-handle #f))
(when rec
(hash-remove! handles upstream-handle)
(when (eq? (bag-change! assertions rec -1) 'present->absent)
(for [(seen (in-hash-values (hash-ref subscriptions (record-label rec) '#hash())))]
2021-06-01 08:04:10 +00:00
(turn-retract! this-turn (hash-ref seen rec))
(hash-remove! seen rec))
(match (parse-Observe rec)
[(? eof-object?) (void)]
[(Observe label observer)
(let ((subscribers (hash-ref subscriptions label)))
(hash-remove! subscribers observer)
(when (hash-empty? subscribers)
(hash-remove! subscriptions label)))]))))
2021-06-01 08:04:10 +00:00
#:message (action (message)
(when (record? message)
(for [(peer (in-hash-keys (hash-ref subscriptions (record-label message) '#hash())))]
2021-06-01 08:04:10 +00:00
(turn-message! this-turn peer message))))))
2021-05-27 09:28:10 +00:00
2021-05-31 11:07:37 +00:00
(module+ main
(time
(actor-system
2021-06-01 08:04:10 +00:00
(define disarm (facet-prevent-inert-check! this-facet))
(define ds (ref (dataspace)))
(box this-turn ds 500000 100000)
(client this-turn ds))))