diff --git a/syndicate/dataspace.rkt b/syndicate/dataspace.rkt index 27667cf..cd27500 100644 --- a/syndicate/dataspace.rkt +++ b/syndicate/dataspace.rkt @@ -7,6 +7,7 @@ message-struct assertion-struct (struct-out observe) + (struct-out seal) dataspace? dataspace-assertions ;; TODO: shouldn't be provided - needed by test.rkt @@ -70,6 +71,13 @@ (assertion-struct observe (specification)) +;; Seals are used by protocols to prevent routing from examining +;; internal structure of values. +(struct seal (contents) ;; NB. Neither transparent nor prefab + #:methods gen:custom-write + [(define (write-proc s port mode) + (fprintf port "#{~v}" (seal-contents s)))]) + ;; An `ActorID` uniquely identifies an actor in a `Dataspace`. ;; A `FID` is a Facet ID, uniquely identifying a facet in a `Dataspace`. @@ -166,8 +174,9 @@ [(f v) (define ac (current-actor)) (when (not (eq? (field-handle-owner f) (current-actor))) (field-scope-error 'field-set! f)) - (dataflow-record-damage! (dataspace-dataflow (actor-dataspace ac)) f) - (set-field-handle-value! f v)])) + (when (not (equal? (field-handle-value f) v)) + (dataflow-record-damage! (dataspace-dataflow (actor-dataspace ac)) f) + (set-field-handle-value! f v))])) (define (field-scope-error who f) (error who "Field ~a used out-of-scope; owner = ~a, current = ~a" diff --git a/syndicate/drivers/external-event.rkt b/syndicate/drivers/external-event.rkt new file mode 100644 index 0000000..98b4d06 --- /dev/null +++ b/syndicate/drivers/external-event.rkt @@ -0,0 +1,19 @@ +#lang imperative-syndicate + +(provide (struct-out external-event)) + +(message-struct external-event (descriptor values)) + +(spawn #:name 'external-event-relay + (during/spawn (observe (inbound (external-event $desc _))) + (define ch (make-channel)) + (thread (lambda () + (let loop () + (sync ch + (handle-evt desc + (lambda results + (ground-send! (inbound (external-event desc results))) + (loop))))))) + (signal-background-activity! +1) + (on-stop (channel-put ch 'quit) + (signal-background-activity! -1)))) diff --git a/syndicate/drivers/tcp.rkt b/syndicate/drivers/tcp.rkt new file mode 100644 index 0000000..9cf65d0 --- /dev/null +++ b/syndicate/drivers/tcp.rkt @@ -0,0 +1,198 @@ +#lang imperative-syndicate +;; TCP/IP driver interface. +;; +;; A nice refinement would be to introduce something like a +;; `(tcp-error id _)` assertion, for when something goes wrong +;; listening or connecting. At present, for example, if connecting to +;; some other host that isn't listening, the driver pretends the +;; connection is open for an infinitesimal instant before closing. +;; This would be nicer if it never signalled "open" at all, instead +;; asserting something like `tcp-error` until interest in the +;; connection goes away. + +(provide (struct-out tcp-connection) + (struct-out tcp-accepted) + (struct-out tcp-out) + (struct-out tcp-in) + (struct-out tcp-in-line) + + (struct-out tcp-address) + (struct-out tcp-listener)) + +(define-logger syndicate/tcp) + +(require racket/exn) +(require (prefix-in tcp: racket/tcp)) +(require (only-in racket/port read-bytes-avail!-evt)) + +(require racket/unit) +(require net/tcp-sig) +(require net/tcp-unit) + +(require syndicate/support/bytes) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Protocol messages + +(assertion-struct tcp-connection (id spec)) +(assertion-struct tcp-accepted (id)) +(message-struct tcp-out (id bytes)) +(message-struct tcp-in (id bytes)) +(message-struct tcp-in-line (id bytes)) + +(assertion-struct tcp-address (host port)) +(assertion-struct tcp-listener (port)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Ground-level communication messages + +(message-struct raw-tcp-accepted (local-addr remote-addr cin cout)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Driver + +(spawn #:name 'drivers/tcp + + (during/spawn (observe (tcp-connection _ (tcp-listener $port))) + #:name (list 'drivers/tcp 'listener port) + (run-listener port)) + + (during/spawn (tcp-connection $id (tcp-address $host $port)) + #:name (list 'drivers/tcp 'outbound id host port) + (define-values (cin cout) + (with-handlers ([exn:fail? (lambda (e) + ;; TODO: it'd be nice to somehow + ;; communicate the actual error to + ;; the local peer. + (log-syndicate/tcp-error "~a" (exn->string e)) + (define o (open-output-string)) + (close-output-port o) + (values (open-input-string "") + o))]) + (tcp:tcp-connect host port))) + (define unblock! (run-connection id cin cout)) + (unblock!)) + + (during/spawn (observe (tcp-in-line $id _)) + #:name (list 'drivers/tcp 'line-reader id) + (field [buffer #""]) + (on (message (tcp-in id $bs)) (buffer (bytes-append (buffer) bs))) + (begin/dataflow + (define newline-pos (bytes-index (buffer) (char->integer #\newline))) + (when newline-pos + (define line (subbytes (buffer) 0 newline-pos)) + (buffer (subbytes (buffer) (+ newline-pos 1))) + (send! (tcp-in-line id line)))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Listener + +(define (run-listener port) + (define server-addr (tcp-listener port)) + (define listener (tcp:tcp-listen port 128 #t)) + (define control-ch (make-channel)) + + (thread (lambda () + (let loop ((blocked? #t)) + (sync (handle-evt control-ch + (match-lambda + ['unblock (loop #f)] + ['quit (void)])) + (if blocked? + never-evt + (handle-evt (tcp:tcp-accept-evt listener) + (lambda (cin+cout) + (match-define (list cin cout) cin+cout) + (define-values + (local-hostname local-port remote-hostname remote-port) + (tcp:tcp-addresses cin #t)) + (ground-send! + (inbound + (raw-tcp-accepted server-addr + (tcp-address remote-hostname remote-port) + cin + cout))) + (loop blocked?)))))) + (tcp:tcp-close listener) + (signal-background-activity! -1))) + (signal-background-activity! +1) + + (on-start (channel-put control-ch 'unblock)) + (on-stop (channel-put control-ch 'quit)) + + (on (message (inbound (raw-tcp-accepted server-addr $remote-addr $cin $cout))) + (define id (seal (list port remote-addr))) + (spawn #:name (list 'drivers/tcp 'inbound id) + (assert (tcp-connection id server-addr)) + (define unblock! (run-connection id cin cout)) + (on (asserted (tcp-accepted id)) (unblock!)) + (stop-when (retracted (tcp-accepted id)))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Connection + +(define (run-connection id cin cout) + (define control-ch (make-channel)) + (thread (lambda () (connection-thread control-ch id cin))) + (signal-background-activity! +1) + + (define (shutdown-connection!) + (when control-ch + (channel-put control-ch 'quit) + (set! control-ch #f)) + (when cout + (close-output-port cout) + (set! cout #f))) + + (on-stop (shutdown-connection!)) + + (on (message (inbound (tcp-in id $eof-or-bs))) + (if (eof-object? eof-or-bs) + (stop-current-facet) + (send! (tcp-in id eof-or-bs)))) + + (define-syntax-rule (trap-exns body ...) + (with-handlers ([(lambda (e) (not (exn:break? e))) + (lambda (e) + (shutdown-connection!) + (raise e))]) + body ...)) + + (on (message (tcp-out id $bs)) + (trap-exns + (if (string? bs) + (write-string bs cout) + (write-bytes bs cout)) + (flush-output cout))) + + (let ((unblocked? #f)) + (lambda () + (when (not unblocked?) + (set! unblocked? #t) + (when control-ch (channel-put control-ch 'unblock)))))) + +(define (connection-thread control-ch id cin) + (let loop ((blocked? #t)) + (sync (handle-evt control-ch + (match-lambda + ['unblock (loop #f)] + ['quit (void)])) + (if blocked? + never-evt + (handle-evt (read-bytes-avail-evt 32768 cin) + (lambda (eof-or-bs) + (ground-send! (inbound (tcp-in id eof-or-bs))) + (loop (or blocked? (eof-object? eof-or-bs)))))))) + (close-input-port cin) + (signal-background-activity! -1)) + +(define (read-bytes-avail-evt len input-port) + (guard-evt + (lambda () + (let ([bstr (make-bytes len)]) + (handle-evt + (read-bytes-avail!-evt bstr input-port) + (lambda (v) + (if (number? v) + (if (= v len) bstr (subbytes bstr 0 v)) + v))))))) diff --git a/syndicate/examples/chat-server.rkt b/syndicate/examples/chat-server.rkt new file mode 100644 index 0000000..e374dfe --- /dev/null +++ b/syndicate/examples/chat-server.rkt @@ -0,0 +1,27 @@ +#lang imperative-syndicate + +(require/activate imperative-syndicate/drivers/tcp) +(require racket/format) + +(message-struct speak (who what)) +(assertion-struct present (who)) + +(message-struct stop-server ()) + +(spawn #:name 'chat-server + (stop-when (message (stop-server))) + (during/spawn (tcp-connection $id (tcp-listener 5999)) + #:name (list 'chat-connection id) + (assert (tcp-accepted id)) + (let ((me (gensym 'user))) + (assert (present me)) + (on (message (tcp-in-line id $bs)) + (match bs + [#"/quit" (stop-current-facet)] + [#"/stop-server" (send! (stop-server))] + [_ (send! (speak me (bytes->string/utf-8 bs)))]))) + (during (present $user) + (on-start (send! (tcp-out id (string->bytes/utf-8 (~a user " arrived\n"))))) + (on-stop (send! (tcp-out id (string->bytes/utf-8 (~a user " left\n"))))) + (on (message (speak user $text)) + (send! (tcp-out id (string->bytes/utf-8 (~a user " says '" text "'\n")))))))) diff --git a/syndicate/examples/stdin-echo.rkt b/syndicate/examples/stdin-echo.rkt new file mode 100644 index 0000000..ab74ffc --- /dev/null +++ b/syndicate/examples/stdin-echo.rkt @@ -0,0 +1,10 @@ +#lang imperative-syndicate + +(require/activate imperative-syndicate/drivers/external-event) +(require (only-in racket/port read-bytes-line-evt)) + +(spawn (define stdin-evt (read-bytes-line-evt (current-input-port) 'any)) + (on (message (inbound (external-event stdin-evt (list $line)))) + (if (eof-object? line) + (stop-current-facet) + (printf "~a\n" line)))) diff --git a/syndicate/ground.rkt b/syndicate/ground.rkt index 1cef01c..812b193 100644 --- a/syndicate/ground.rkt +++ b/syndicate/ground.rkt @@ -1,9 +1,76 @@ #lang racket/base +;; Breaking the infinite tower of nested dataspaces, connecting to Racket at the fracture line. -(provide run-ground) +(provide current-ground-event-async-channel + ground-send! + ground-assert! + ground-retract! + signal-background-activity! + run-ground) +(define-logger syndicate/ground) + +(require racket/async-channel) +(require racket/set) +(require racket/match) +(require racket/list) (require "dataspace.rkt") +(require "syntax.rkt") + +(define current-ground-event-async-channel (make-parameter #f)) + +(define (ground-enqueue! item) + (async-channel-put (current-ground-event-async-channel) item)) + +(define (ground-send! body) + (ground-enqueue! (lambda (ac) (enqueue-send! ac body)))) + +(define (ground-assert! assertion) + (ground-enqueue! (lambda (ac) (adhoc-assert! ac assertion)))) + +(define (ground-retract! assertion) + (ground-enqueue! (lambda (ac) (adhoc-retract! ac assertion)))) + +(define (signal-background-activity! delta) + (ground-enqueue! delta)) (define (run-ground boot-proc) - (define ds (make-dataspace (lambda () (schedule-script! (current-actor) boot-proc)))) - (let loop () (when (run-scripts! ds) (loop)))) + (define ch (make-async-channel)) + (parameterize ((current-ground-event-async-channel ch)) + (define ground-event-relay-actor #f) + (define background-activity-count 0) + + (define ground-event-relay-evt + (handle-evt ch (lambda (item) + (match item + [(? procedure? proc) + (push-script! ground-event-relay-actor + (lambda () (proc ground-event-relay-actor)))] + [(? number? delta) + (set! background-activity-count (+ background-activity-count delta))])))) + + (define ds (make-dataspace + (lambda () + (schedule-script! (current-actor) + (lambda () + (spawn #:name 'ground-event-relay + (set! ground-event-relay-actor (current-actor)) + ;; v Adds a dummy endpoint to keep this actor alive + (begin/dataflow (void))))) + (schedule-script! (current-actor) boot-proc)))) + + (let loop () + (define work-remaining? (run-scripts! ds)) + (define events-expected? (positive? background-activity-count)) + (log-info "GROUND: ~a; ~a background activities" + (if work-remaining? "busy" "idle") + background-activity-count) + (cond + [events-expected? + (sync ground-event-relay-evt (if work-remaining? (system-idle-evt) never-evt)) + (loop)] + [work-remaining? + (sync ground-event-relay-evt (system-idle-evt)) + (loop)] + [else + (sync (handle-evt ground-event-relay-evt (lambda _ (loop))) (system-idle-evt))])))) diff --git a/syndicate/relay.rkt b/syndicate/relay.rkt index 9d68d57..8ffc27c 100644 --- a/syndicate/relay.rkt +++ b/syndicate/relay.rkt @@ -22,6 +22,8 @@ (struct inbound (assertion) #:prefab) (struct outbound (assertion) #:prefab) +;; TODO: inbound^n, outbound^n -- protocol/standard-relay, iow + (define-syntax (dataspace stx) (syntax-parse stx [(_ name:name form ...)