syndicate-rkt/syndicate/dataspace.rkt

61 lines
2.9 KiB
Racket
Raw Normal View History

2021-06-02 13:00:25 +00:00
#lang racket/base
(provide (all-from-out "schemas/gen/dataspace.rkt")
(all-from-out "schemas/gen/dataspace-patterns.rkt")
(all-from-out "schemas/gen/dataspace-patterns.meta.rkt")
dataspace)
(require racket/match)
(require preserves)
(require "bag.rkt")
(require "main.rkt")
(require "schemas/gen/dataspace.rkt")
(require "schemas/gen/dataspace-patterns.rkt")
(require "schemas/gen/dataspace-patterns.meta.rkt")
(define (dataspace)
(define handles (make-hash))
(define assertions (make-bag))
(define subscriptions (make-hash))
(entity #:assert (action (rec handle)
(log-info "+ ~v ~v" handle rec)
(when (record? rec)
(hash-set! handles handle rec)
(when (eq? (bag-change! assertions rec +1) 'absent->present)
(match (parse-Observe rec)
[(? eof-object?) (void)]
[(Observe label observer)
(define seen (make-hash))
(hash-set! (hash-ref! subscriptions label make-hasheq) observer seen)
(for [(existing (in-bag assertions))]
(when (preserve=? (record-label existing) label)
(hash-set! seen existing (turn-assert! this-turn observer existing))))])
(for [((observer seen) (in-hash (hash-ref subscriptions (record-label rec) '#hash())))]
(unless (hash-has-key? seen rec)
(hash-set! seen rec (turn-assert! this-turn observer rec)))))))
#:retract (action (upstream-handle)
(define rec (hash-ref handles upstream-handle #f))
(log-info "- ~v ~v" upstream-handle rec)
(when rec
(hash-remove! handles upstream-handle)
(when (eq? (bag-change! assertions rec -1) 'present->absent)
(for [(seen (in-hash-values (hash-ref subscriptions (record-label rec) '#hash())))]
(turn-retract! this-turn (hash-ref seen rec))
(hash-remove! seen rec))
(match (parse-Observe rec)
[(? eof-object?) (void)]
[(Observe label observer)
(let ((subscribers (hash-ref subscriptions label)))
(hash-remove! subscribers observer)
(when (hash-empty? subscribers)
(hash-remove! subscriptions label)))]))))
#:message (action (message)
(log-info "! ~v" message)
(when (record? message)
(for [(peer (in-hash-keys (hash-ref subscriptions (record-label message) '#hash())))]
(turn-message! this-turn peer message))))))