diff --git a/syndicate/dataspace.rkt b/syndicate/dataspace.rkt new file mode 100644 index 0000000..3f5d8ed --- /dev/null +++ b/syndicate/dataspace.rkt @@ -0,0 +1,60 @@ +#lang racket/base + +(provide (all-from-out "schemas/gen/dataspace.rkt") + (all-from-out "schemas/gen/dataspace-patterns.rkt") + (all-from-out "schemas/gen/dataspace-patterns.meta.rkt") + + dataspace) + +(require racket/match) + +(require preserves) + +(require "bag.rkt") +(require "main.rkt") + +(require "schemas/gen/dataspace.rkt") +(require "schemas/gen/dataspace-patterns.rkt") +(require "schemas/gen/dataspace-patterns.meta.rkt") + +(define (dataspace) + (define handles (make-hash)) + (define assertions (make-bag)) + (define subscriptions (make-hash)) + (entity #:assert (action (rec handle) + (log-info "+ ~v ~v" handle rec) + (when (record? rec) + (hash-set! handles handle rec) + (when (eq? (bag-change! assertions rec +1) 'absent->present) + (match (parse-Observe rec) + [(? eof-object?) (void)] + [(Observe label observer) + (define seen (make-hash)) + (hash-set! (hash-ref! subscriptions label make-hasheq) observer seen) + (for [(existing (in-bag assertions))] + (when (preserve=? (record-label existing) label) + (hash-set! seen existing (turn-assert! this-turn observer existing))))]) + (for [((observer seen) (in-hash (hash-ref subscriptions (record-label rec) '#hash())))] + (unless (hash-has-key? seen rec) + (hash-set! seen rec (turn-assert! this-turn observer rec))))))) + #:retract (action (upstream-handle) + (define rec (hash-ref handles upstream-handle #f)) + (log-info "- ~v ~v" upstream-handle rec) + (when rec + (hash-remove! handles upstream-handle) + (when (eq? (bag-change! assertions rec -1) 'present->absent) + (for [(seen (in-hash-values (hash-ref subscriptions (record-label rec) '#hash())))] + (turn-retract! this-turn (hash-ref seen rec)) + (hash-remove! seen rec)) + (match (parse-Observe rec) + [(? eof-object?) (void)] + [(Observe label observer) + (let ((subscribers (hash-ref subscriptions label))) + (hash-remove! subscribers observer) + (when (hash-empty? subscribers) + (hash-remove! subscriptions label)))])))) + #:message (action (message) + (log-info "! ~v" message) + (when (record? message) + (for [(peer (in-hash-keys (hash-ref subscriptions (record-label message) '#hash())))] + (turn-message! this-turn peer message)))))) diff --git a/syndicate/go.rkt b/syndicate/go.rkt index 92df533..8f96205 100644 --- a/syndicate/go.rkt +++ b/syndicate/go.rkt @@ -1,14 +1,12 @@ #lang racket/base (require racket/match) -(require preserves) -(require (except-in "main.rkt" actor-system)) -(require "bag.rkt") +(require "main.rkt") +(require "dataspace.rkt") + (require "schemas/gen/box-protocol.rkt") -(require "schemas/gen/dataspace.rkt") -(require "syntax.rkt") (require (only-in "pattern.rkt" :pattern)) (define box @@ -59,48 +57,6 @@ ;; (stop-facet root-facet))) )))) -(define (dataspace) - (define handles (make-hash)) - (define assertions (make-bag)) - (define subscriptions (make-hash)) - (entity #:assert (action (rec handle) - (log-info "+ ~v ~v" handle rec) - (when (record? rec) - (hash-set! handles handle rec) - (when (eq? (bag-change! assertions rec +1) 'absent->present) - (match (parse-Observe rec) - [(? eof-object?) (void)] - [(Observe label observer) - (define seen (make-hash)) - (hash-set! (hash-ref! subscriptions label make-hasheq) observer seen) - (for [(existing (in-bag assertions))] - (when (preserve=? (record-label existing) label) - (hash-set! seen existing (turn-assert! this-turn observer existing))))]) - (for [((observer seen) (in-hash (hash-ref subscriptions (record-label rec) '#hash())))] - (unless (hash-has-key? seen rec) - (hash-set! seen rec (turn-assert! this-turn observer rec))))))) - #:retract (action (upstream-handle) - (define rec (hash-ref handles upstream-handle #f)) - (log-info "- ~v ~v" upstream-handle rec) - (when rec - (hash-remove! handles upstream-handle) - (when (eq? (bag-change! assertions rec -1) 'present->absent) - (for [(seen (in-hash-values (hash-ref subscriptions (record-label rec) '#hash())))] - (turn-retract! this-turn (hash-ref seen rec)) - (hash-remove! seen rec)) - (match (parse-Observe rec) - [(? eof-object?) (void)] - [(Observe label observer) - (let ((subscribers (hash-ref subscriptions label))) - (hash-remove! subscribers observer) - (when (hash-empty? subscribers) - (hash-remove! subscriptions label)))])))) - #:message (action (message) - (log-info "! ~v" message) - (when (record? message) - (for [(peer (in-hash-keys (hash-ref subscriptions (record-label message) '#hash())))] - (turn-message! this-turn peer message)))))) - (module+ main (time (actor-system diff --git a/syndicate/main.rkt b/syndicate/main.rkt index 3f05dab..2c03168 100644 --- a/syndicate/main.rkt +++ b/syndicate/main.rkt @@ -1,19 +1,11 @@ #lang racket/base -(provide (all-from-out "actor.rkt")) +(provide (all-from-out "actor.rkt") + (all-from-out "syntax.rkt") + (all-from-out preserves)) -(require "actor.rkt") - -;; (provide (all-from-out "dataspace.rkt") -;; (all-from-out "assertions.rkt") -;; (all-from-out "syntax.rkt") -;; (all-from-out "ground.rkt") -;; (all-from-out "relay.rkt")) +(require (except-in "actor.rkt" actor-system)) +(require "syntax.rkt") +(require preserves) ;; (module reader syntax/module-reader syndicate/lang) - -;; (require "dataspace.rkt") -;; (require "assertions.rkt") -;; (require "syntax.rkt") -;; (require "ground.rkt") -;; (require "relay.rkt")