External-event and TCP drivers; chat server and stdin echo programs
This commit is contained in:
parent
f45323a097
commit
91596b066f
|
@ -7,6 +7,7 @@
|
|||
message-struct
|
||||
assertion-struct
|
||||
(struct-out observe)
|
||||
(struct-out seal)
|
||||
|
||||
dataspace?
|
||||
dataspace-assertions ;; TODO: shouldn't be provided - needed by test.rkt
|
||||
|
@ -70,6 +71,13 @@
|
|||
|
||||
(assertion-struct observe (specification))
|
||||
|
||||
;; Seals are used by protocols to prevent routing from examining
|
||||
;; internal structure of values.
|
||||
(struct seal (contents) ;; NB. Neither transparent nor prefab
|
||||
#:methods gen:custom-write
|
||||
[(define (write-proc s port mode)
|
||||
(fprintf port "#{~v}" (seal-contents s)))])
|
||||
|
||||
;; An `ActorID` uniquely identifies an actor in a `Dataspace`.
|
||||
;; A `FID` is a Facet ID, uniquely identifying a facet in a `Dataspace`.
|
||||
|
||||
|
@ -166,8 +174,9 @@
|
|||
[(f v)
|
||||
(define ac (current-actor))
|
||||
(when (not (eq? (field-handle-owner f) (current-actor))) (field-scope-error 'field-set! f))
|
||||
(when (not (equal? (field-handle-value f) v))
|
||||
(dataflow-record-damage! (dataspace-dataflow (actor-dataspace ac)) f)
|
||||
(set-field-handle-value! f v)]))
|
||||
(set-field-handle-value! f v))]))
|
||||
|
||||
(define (field-scope-error who f)
|
||||
(error who "Field ~a used out-of-scope; owner = ~a, current = ~a"
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
#lang imperative-syndicate
|
||||
|
||||
(provide (struct-out external-event))
|
||||
|
||||
(message-struct external-event (descriptor values))
|
||||
|
||||
(spawn #:name 'external-event-relay
|
||||
(during/spawn (observe (inbound (external-event $desc _)))
|
||||
(define ch (make-channel))
|
||||
(thread (lambda ()
|
||||
(let loop ()
|
||||
(sync ch
|
||||
(handle-evt desc
|
||||
(lambda results
|
||||
(ground-send! (inbound (external-event desc results)))
|
||||
(loop)))))))
|
||||
(signal-background-activity! +1)
|
||||
(on-stop (channel-put ch 'quit)
|
||||
(signal-background-activity! -1))))
|
|
@ -0,0 +1,198 @@
|
|||
#lang imperative-syndicate
|
||||
;; TCP/IP driver interface.
|
||||
;;
|
||||
;; A nice refinement would be to introduce something like a
|
||||
;; `(tcp-error id _)` assertion, for when something goes wrong
|
||||
;; listening or connecting. At present, for example, if connecting to
|
||||
;; some other host that isn't listening, the driver pretends the
|
||||
;; connection is open for an infinitesimal instant before closing.
|
||||
;; This would be nicer if it never signalled "open" at all, instead
|
||||
;; asserting something like `tcp-error` until interest in the
|
||||
;; connection goes away.
|
||||
|
||||
(provide (struct-out tcp-connection)
|
||||
(struct-out tcp-accepted)
|
||||
(struct-out tcp-out)
|
||||
(struct-out tcp-in)
|
||||
(struct-out tcp-in-line)
|
||||
|
||||
(struct-out tcp-address)
|
||||
(struct-out tcp-listener))
|
||||
|
||||
(define-logger syndicate/tcp)
|
||||
|
||||
(require racket/exn)
|
||||
(require (prefix-in tcp: racket/tcp))
|
||||
(require (only-in racket/port read-bytes-avail!-evt))
|
||||
|
||||
(require racket/unit)
|
||||
(require net/tcp-sig)
|
||||
(require net/tcp-unit)
|
||||
|
||||
(require syndicate/support/bytes)
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Protocol messages
|
||||
|
||||
(assertion-struct tcp-connection (id spec))
|
||||
(assertion-struct tcp-accepted (id))
|
||||
(message-struct tcp-out (id bytes))
|
||||
(message-struct tcp-in (id bytes))
|
||||
(message-struct tcp-in-line (id bytes))
|
||||
|
||||
(assertion-struct tcp-address (host port))
|
||||
(assertion-struct tcp-listener (port))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Ground-level communication messages
|
||||
|
||||
(message-struct raw-tcp-accepted (local-addr remote-addr cin cout))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Driver
|
||||
|
||||
(spawn #:name 'drivers/tcp
|
||||
|
||||
(during/spawn (observe (tcp-connection _ (tcp-listener $port)))
|
||||
#:name (list 'drivers/tcp 'listener port)
|
||||
(run-listener port))
|
||||
|
||||
(during/spawn (tcp-connection $id (tcp-address $host $port))
|
||||
#:name (list 'drivers/tcp 'outbound id host port)
|
||||
(define-values (cin cout)
|
||||
(with-handlers ([exn:fail? (lambda (e)
|
||||
;; TODO: it'd be nice to somehow
|
||||
;; communicate the actual error to
|
||||
;; the local peer.
|
||||
(log-syndicate/tcp-error "~a" (exn->string e))
|
||||
(define o (open-output-string))
|
||||
(close-output-port o)
|
||||
(values (open-input-string "")
|
||||
o))])
|
||||
(tcp:tcp-connect host port)))
|
||||
(define unblock! (run-connection id cin cout))
|
||||
(unblock!))
|
||||
|
||||
(during/spawn (observe (tcp-in-line $id _))
|
||||
#:name (list 'drivers/tcp 'line-reader id)
|
||||
(field [buffer #""])
|
||||
(on (message (tcp-in id $bs)) (buffer (bytes-append (buffer) bs)))
|
||||
(begin/dataflow
|
||||
(define newline-pos (bytes-index (buffer) (char->integer #\newline)))
|
||||
(when newline-pos
|
||||
(define line (subbytes (buffer) 0 newline-pos))
|
||||
(buffer (subbytes (buffer) (+ newline-pos 1)))
|
||||
(send! (tcp-in-line id line))))))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Listener
|
||||
|
||||
(define (run-listener port)
|
||||
(define server-addr (tcp-listener port))
|
||||
(define listener (tcp:tcp-listen port 128 #t))
|
||||
(define control-ch (make-channel))
|
||||
|
||||
(thread (lambda ()
|
||||
(let loop ((blocked? #t))
|
||||
(sync (handle-evt control-ch
|
||||
(match-lambda
|
||||
['unblock (loop #f)]
|
||||
['quit (void)]))
|
||||
(if blocked?
|
||||
never-evt
|
||||
(handle-evt (tcp:tcp-accept-evt listener)
|
||||
(lambda (cin+cout)
|
||||
(match-define (list cin cout) cin+cout)
|
||||
(define-values
|
||||
(local-hostname local-port remote-hostname remote-port)
|
||||
(tcp:tcp-addresses cin #t))
|
||||
(ground-send!
|
||||
(inbound
|
||||
(raw-tcp-accepted server-addr
|
||||
(tcp-address remote-hostname remote-port)
|
||||
cin
|
||||
cout)))
|
||||
(loop blocked?))))))
|
||||
(tcp:tcp-close listener)
|
||||
(signal-background-activity! -1)))
|
||||
(signal-background-activity! +1)
|
||||
|
||||
(on-start (channel-put control-ch 'unblock))
|
||||
(on-stop (channel-put control-ch 'quit))
|
||||
|
||||
(on (message (inbound (raw-tcp-accepted server-addr $remote-addr $cin $cout)))
|
||||
(define id (seal (list port remote-addr)))
|
||||
(spawn #:name (list 'drivers/tcp 'inbound id)
|
||||
(assert (tcp-connection id server-addr))
|
||||
(define unblock! (run-connection id cin cout))
|
||||
(on (asserted (tcp-accepted id)) (unblock!))
|
||||
(stop-when (retracted (tcp-accepted id))))))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Connection
|
||||
|
||||
(define (run-connection id cin cout)
|
||||
(define control-ch (make-channel))
|
||||
(thread (lambda () (connection-thread control-ch id cin)))
|
||||
(signal-background-activity! +1)
|
||||
|
||||
(define (shutdown-connection!)
|
||||
(when control-ch
|
||||
(channel-put control-ch 'quit)
|
||||
(set! control-ch #f))
|
||||
(when cout
|
||||
(close-output-port cout)
|
||||
(set! cout #f)))
|
||||
|
||||
(on-stop (shutdown-connection!))
|
||||
|
||||
(on (message (inbound (tcp-in id $eof-or-bs)))
|
||||
(if (eof-object? eof-or-bs)
|
||||
(stop-current-facet)
|
||||
(send! (tcp-in id eof-or-bs))))
|
||||
|
||||
(define-syntax-rule (trap-exns body ...)
|
||||
(with-handlers ([(lambda (e) (not (exn:break? e)))
|
||||
(lambda (e)
|
||||
(shutdown-connection!)
|
||||
(raise e))])
|
||||
body ...))
|
||||
|
||||
(on (message (tcp-out id $bs))
|
||||
(trap-exns
|
||||
(if (string? bs)
|
||||
(write-string bs cout)
|
||||
(write-bytes bs cout))
|
||||
(flush-output cout)))
|
||||
|
||||
(let ((unblocked? #f))
|
||||
(lambda ()
|
||||
(when (not unblocked?)
|
||||
(set! unblocked? #t)
|
||||
(when control-ch (channel-put control-ch 'unblock))))))
|
||||
|
||||
(define (connection-thread control-ch id cin)
|
||||
(let loop ((blocked? #t))
|
||||
(sync (handle-evt control-ch
|
||||
(match-lambda
|
||||
['unblock (loop #f)]
|
||||
['quit (void)]))
|
||||
(if blocked?
|
||||
never-evt
|
||||
(handle-evt (read-bytes-avail-evt 32768 cin)
|
||||
(lambda (eof-or-bs)
|
||||
(ground-send! (inbound (tcp-in id eof-or-bs)))
|
||||
(loop (or blocked? (eof-object? eof-or-bs))))))))
|
||||
(close-input-port cin)
|
||||
(signal-background-activity! -1))
|
||||
|
||||
(define (read-bytes-avail-evt len input-port)
|
||||
(guard-evt
|
||||
(lambda ()
|
||||
(let ([bstr (make-bytes len)])
|
||||
(handle-evt
|
||||
(read-bytes-avail!-evt bstr input-port)
|
||||
(lambda (v)
|
||||
(if (number? v)
|
||||
(if (= v len) bstr (subbytes bstr 0 v))
|
||||
v)))))))
|
|
@ -0,0 +1,27 @@
|
|||
#lang imperative-syndicate
|
||||
|
||||
(require/activate imperative-syndicate/drivers/tcp)
|
||||
(require racket/format)
|
||||
|
||||
(message-struct speak (who what))
|
||||
(assertion-struct present (who))
|
||||
|
||||
(message-struct stop-server ())
|
||||
|
||||
(spawn #:name 'chat-server
|
||||
(stop-when (message (stop-server)))
|
||||
(during/spawn (tcp-connection $id (tcp-listener 5999))
|
||||
#:name (list 'chat-connection id)
|
||||
(assert (tcp-accepted id))
|
||||
(let ((me (gensym 'user)))
|
||||
(assert (present me))
|
||||
(on (message (tcp-in-line id $bs))
|
||||
(match bs
|
||||
[#"/quit" (stop-current-facet)]
|
||||
[#"/stop-server" (send! (stop-server))]
|
||||
[_ (send! (speak me (bytes->string/utf-8 bs)))])))
|
||||
(during (present $user)
|
||||
(on-start (send! (tcp-out id (string->bytes/utf-8 (~a user " arrived\n")))))
|
||||
(on-stop (send! (tcp-out id (string->bytes/utf-8 (~a user " left\n")))))
|
||||
(on (message (speak user $text))
|
||||
(send! (tcp-out id (string->bytes/utf-8 (~a user " says '" text "'\n"))))))))
|
|
@ -0,0 +1,10 @@
|
|||
#lang imperative-syndicate
|
||||
|
||||
(require/activate imperative-syndicate/drivers/external-event)
|
||||
(require (only-in racket/port read-bytes-line-evt))
|
||||
|
||||
(spawn (define stdin-evt (read-bytes-line-evt (current-input-port) 'any))
|
||||
(on (message (inbound (external-event stdin-evt (list $line))))
|
||||
(if (eof-object? line)
|
||||
(stop-current-facet)
|
||||
(printf "~a\n" line))))
|
|
@ -1,9 +1,76 @@
|
|||
#lang racket/base
|
||||
;; Breaking the infinite tower of nested dataspaces, connecting to Racket at the fracture line.
|
||||
|
||||
(provide run-ground)
|
||||
(provide current-ground-event-async-channel
|
||||
ground-send!
|
||||
ground-assert!
|
||||
ground-retract!
|
||||
signal-background-activity!
|
||||
run-ground)
|
||||
|
||||
(define-logger syndicate/ground)
|
||||
|
||||
(require racket/async-channel)
|
||||
(require racket/set)
|
||||
(require racket/match)
|
||||
(require racket/list)
|
||||
(require "dataspace.rkt")
|
||||
(require "syntax.rkt")
|
||||
|
||||
(define current-ground-event-async-channel (make-parameter #f))
|
||||
|
||||
(define (ground-enqueue! item)
|
||||
(async-channel-put (current-ground-event-async-channel) item))
|
||||
|
||||
(define (ground-send! body)
|
||||
(ground-enqueue! (lambda (ac) (enqueue-send! ac body))))
|
||||
|
||||
(define (ground-assert! assertion)
|
||||
(ground-enqueue! (lambda (ac) (adhoc-assert! ac assertion))))
|
||||
|
||||
(define (ground-retract! assertion)
|
||||
(ground-enqueue! (lambda (ac) (adhoc-retract! ac assertion))))
|
||||
|
||||
(define (signal-background-activity! delta)
|
||||
(ground-enqueue! delta))
|
||||
|
||||
(define (run-ground boot-proc)
|
||||
(define ds (make-dataspace (lambda () (schedule-script! (current-actor) boot-proc))))
|
||||
(let loop () (when (run-scripts! ds) (loop))))
|
||||
(define ch (make-async-channel))
|
||||
(parameterize ((current-ground-event-async-channel ch))
|
||||
(define ground-event-relay-actor #f)
|
||||
(define background-activity-count 0)
|
||||
|
||||
(define ground-event-relay-evt
|
||||
(handle-evt ch (lambda (item)
|
||||
(match item
|
||||
[(? procedure? proc)
|
||||
(push-script! ground-event-relay-actor
|
||||
(lambda () (proc ground-event-relay-actor)))]
|
||||
[(? number? delta)
|
||||
(set! background-activity-count (+ background-activity-count delta))]))))
|
||||
|
||||
(define ds (make-dataspace
|
||||
(lambda ()
|
||||
(schedule-script! (current-actor)
|
||||
(lambda ()
|
||||
(spawn #:name 'ground-event-relay
|
||||
(set! ground-event-relay-actor (current-actor))
|
||||
;; v Adds a dummy endpoint to keep this actor alive
|
||||
(begin/dataflow (void)))))
|
||||
(schedule-script! (current-actor) boot-proc))))
|
||||
|
||||
(let loop ()
|
||||
(define work-remaining? (run-scripts! ds))
|
||||
(define events-expected? (positive? background-activity-count))
|
||||
(log-info "GROUND: ~a; ~a background activities"
|
||||
(if work-remaining? "busy" "idle")
|
||||
background-activity-count)
|
||||
(cond
|
||||
[events-expected?
|
||||
(sync ground-event-relay-evt (if work-remaining? (system-idle-evt) never-evt))
|
||||
(loop)]
|
||||
[work-remaining?
|
||||
(sync ground-event-relay-evt (system-idle-evt))
|
||||
(loop)]
|
||||
[else
|
||||
(sync (handle-evt ground-event-relay-evt (lambda _ (loop))) (system-idle-evt))]))))
|
||||
|
|
|
@ -22,6 +22,8 @@
|
|||
(struct inbound (assertion) #:prefab)
|
||||
(struct outbound (assertion) #:prefab)
|
||||
|
||||
;; TODO: inbound^n, outbound^n -- protocol/standard-relay, iow
|
||||
|
||||
(define-syntax (dataspace stx)
|
||||
(syntax-parse stx
|
||||
[(_ name:name form ...)
|
||||
|
|
Loading…
Reference in New Issue