From a06b9d188a24588e3e51d43a90830424950d71aa Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 31 May 2021 13:05:37 +0200 Subject: [PATCH] Integrate dataflow --- syndicate/actor.rkt | 32 +++++++++++++++++++++++++++++--- syndicate/field.rkt | 35 +++++++++++++++++++++++++++++++++++ syndicate/go.rkt | 8 +++----- 3 files changed, 67 insertions(+), 8 deletions(-) create mode 100644 syndicate/field.rkt diff --git a/syndicate/actor.rkt b/syndicate/actor.rkt index 9f570a5..02f3f20 100644 --- a/syndicate/actor.rkt +++ b/syndicate/actor.rkt @@ -36,6 +36,9 @@ turn-spawn! turn-stop-actor! turn-crash! + turn-field! + turn-dataflow! + turn-assert/dataflow! turn-assert! turn-assert!* turn-retract! @@ -45,12 +48,15 @@ turn-sync!* turn-message!) +(require (only-in preserves preserve=?)) (require racket/match) (require (only-in racket/exn exn->string)) (require struct-defaults) (require "rewrite.rkt") (require "engine.rkt") +(require "dataflow.rkt") +(require "field.rkt") (require "support/counter.rkt") (struct entity (assert retract message sync)) @@ -69,6 +75,7 @@ (struct actor (id engine [daemon? #:mutable] + dataflow [root #:mutable] [exit-reason #:mutable] ;; #f -> running, #t -> terminated OK, exn -> error [exit-hooks #:mutable]) @@ -119,6 +126,7 @@ (define ac (actor (generate-actor-id) engine daemon? + (make-dataflow-graph) 'uninitialized #f '())) @@ -213,13 +221,15 @@ ;;--------------------------------------------------------------------------- (define (turn! f action [zombie-turn? #f]) - (when (or zombie-turn? (and (not (actor-exit-reason (facet-actor f))) (facet-live? f))) + (define ac (facet-actor f)) + (when (or zombie-turn? (and (not (actor-exit-reason ac)) (facet-live? f))) (let ((turn (turn (generate-turn-id) f (make-hasheq)))) (with-handlers ([exn? (lambda (e) - (turn! (actor-root (facet-actor f)) + (turn! (actor-root ac) (lambda (turn) - (actor-terminate! turn (facet-actor f) e))))]) + (actor-terminate! turn ac e))))]) (action turn) + (dataflow-repair-damage! (actor-dataflow ac) (lambda (action) (action turn))) (for [((ff qq) (in-hash (turn-queues turn)))] (queue-task! (actor-engine (facet-actor ff)) (lambda () @@ -273,6 +283,22 @@ (define ac (facet-actor (turn-active-facet turn))) (turn-enqueue! turn (actor-root ac) (lambda (turn) (actor-terminate! turn ac exn)))) +(define (turn-field! turn name initial-value) + (field (actor-dataflow (facet-actor (turn-active-facet turn))) name initial-value)) + +(define (turn-dataflow! turn action) + (parameterize ((current-dataflow-subject-id action)) + (action turn))) + +(define (turn-assert/dataflow! turn peer assertion-action) + (define handle #f) + (define assertion (void)) + (turn-dataflow! turn + (lambda (turn) + (define new-assertion (assertion-action turn)) + (when (not (preserve=? assertion new-assertion)) + (set! handle (turn-replace! turn peer handle new-assertion)))))) + (define (turn-assert! turn peer assertion) (define handle (generate-handle)) (turn-assert!* turn peer assertion handle) diff --git a/syndicate/field.rkt b/syndicate/field.rkt new file mode 100644 index 0000000..b5a30c1 --- /dev/null +++ b/syndicate/field.rkt @@ -0,0 +1,35 @@ +#lang racket/base + +(provide field? + (rename-out [make-field field]) + field-name + field-id) + +(require "dataflow.rkt") +(require "support/counter.rkt") + +(struct field (name ;; Symbol + id ;; Nat + dataflow ;; Dataflow + [value #:mutable] ;; Any + ) + #:methods gen:custom-write + [(define (write-proc f port mode) + (fprintf f "#" (field-id f) (field-name f)))] + #:property prop:procedure + (case-lambda + [(f) + (dataflow-record-observation! (field-dataflow f) f) + (field-value f)] + [(f new-value) + (when (not (equal? (field-value f) new-value)) + (dataflow-record-damage! (field-dataflow f) f) + (set-field-value! f new-value))])) + +(define generate-field-id (make-counter)) + +(define (make-field dataflow name initial-value) + (field name + (generate-field-id) + dataflow + initial-value)) diff --git a/syndicate/go.rkt b/syndicate/go.rkt index e2ade15..0d7e5a0 100644 --- a/syndicate/go.rkt +++ b/syndicate/go.rkt @@ -9,10 +9,8 @@ (require "schemas/gen/dataspace.rkt") (define ((box ds LIMIT REPORT_EVERY) turn) - (define value-handle #f) - (define (set-value turn value) - (set! value-handle (turn-replace! turn ds value-handle (BoxState->preserves (BoxState value))))) - (set-value turn 0) + (define value (turn-field! turn 'box-value 0)) + (turn-assert/dataflow! turn ds (lambda (turn) (BoxState->preserves (BoxState (value))))) (define start-time (current-inexact-milliseconds)) (define prev-value 0) (turn-assert! turn ds @@ -33,7 +31,7 @@ (/ count delta))) (when (= new-value LIMIT) (turn-stop-actor! turn)) - (set-value turn new-value)))))))) + (value new-value)))))))) (define ((client ds) turn) (turn-assert! turn ds