Low-hanging fruit: nested-vm -> spawn-vm, at-meta-level:, examples
This commit is contained in:
parent
640f395bec
commit
f671ac3bef
|
@ -13,7 +13,7 @@
|
||||||
|
|
||||||
@defproc[(event-relay [self-id Symbol]) Spawn]{
|
@defproc[(event-relay [self-id Symbol]) Spawn]{
|
||||||
|
|
||||||
Lets processes in some @racket[nested-vm] interact with the outside
|
Lets processes in some nested VM interact with the outside
|
||||||
world using @racket[ground-vm]-level event-based subscriptions.
|
world using @racket[ground-vm]-level event-based subscriptions.
|
||||||
|
|
||||||
Returns a @racket[spawn] which starts an event-relay process with
|
Returns a @racket[spawn] which starts an event-relay process with
|
||||||
|
|
|
@ -10,78 +10,83 @@ Here is a complete Marketplace program:
|
||||||
|
|
||||||
@#reader scribble/comment-reader (racketmod #:file "examples/echo-paper.rkt" marketplace
|
@#reader scribble/comment-reader (racketmod #:file "examples/echo-paper.rkt" marketplace
|
||||||
|
|
||||||
(endpoint #:subscriber (tcp-channel ? (tcp-listener 5999) ?)
|
(observe-publishers (tcp-channel ? (tcp-listener 5999) ?)
|
||||||
#:conversation (tcp-channel from to _)
|
(match-conversation (tcp-channel from to _)
|
||||||
#:on-presence (spawn #:child (echoer from to)))
|
(on-presence (spawn (echoer from to)))))
|
||||||
|
|
||||||
(define (echoer from to)
|
(define (echoer from to)
|
||||||
(transition stateless
|
(transition stateless
|
||||||
(endpoint #:subscriber (tcp-channel from to ?)
|
(publisher (tcp-channel to from ?))
|
||||||
#:on-absence (quit)
|
(subscriber (tcp-channel from to ?)
|
||||||
[(tcp-channel _ _ data)
|
(on-absence (quit))
|
||||||
(send-message (tcp-channel to from data))])))
|
(on-message
|
||||||
|
[(tcp-channel _ _ data)
|
||||||
|
(send-message (tcp-channel to from data))]))))
|
||||||
)
|
)
|
||||||
|
|
||||||
The top-level @racket[endpoint] action subscribes to TCP connections
|
The top-level @racket[observe-publishers] monitors TCP connections
|
||||||
arriving on port 5999, and @racket[spawn]s a fresh process in response to
|
arriving on port 5999 and @racket[spawn]s a fresh process in response
|
||||||
each (@racket[#:on-presence]). The topic of
|
to each with the help of the auxiliary @racket[echoer] function. The
|
||||||
conversation (@racket[#:conversation]) associated with the newly-present
|
topic of conversation associated with the each new connection is
|
||||||
subscription is analyzed to give the remote
|
parsed (with @racket[match-conversation]) to name the remote
|
||||||
(@racket[from]) and local (@racket[to]) TCP addresses, which are
|
(@racket[from]) and local (@racket[to]) TCP addresses, which are
|
||||||
passed to the @racket[echoer] function to give the initial actions for
|
passed to @racket[echoer] to create the initial state and actions for
|
||||||
the corresponding process. Here, the process is stateless, using the
|
the corresponding process. In this case, the process is stateless,
|
||||||
special constant @racket[stateless] as its state.
|
indicated by the special constant @racket[stateless].
|
||||||
|
|
||||||
Each connection's process creates an endpoint subscribing to data
|
Each connection's process watches for incoming data, using
|
||||||
arriving on its particular connection, using @racket[from] and @racket[to]
|
@racket[from] and @racket[to] to configure a @racket[subscriber]. It
|
||||||
passed in from the top-level @racket[endpoint]. When data arrives, it is
|
also declares its intent to produce outbound TCP data, using
|
||||||
echoed back to the remote peer using @racket[send-message]. Presence
|
@racket[publisher]. When data arrives, it is echoed back to the remote
|
||||||
manages disconnection; when the remote peer closes the TCP connection,
|
peer using the @racket[send-message] operation. Absence notifications
|
||||||
the @racket[#:on-absence] handler in @racket[echoer] issues a @racket[quit]
|
signal disconnection; when the remote peer closes the TCP connection,
|
||||||
action, terminating the connection's process. The heart of our system
|
the @racket[on-absence] handler issues a @racket[quit] action, which
|
||||||
is the interface between a process and its containing VM. Our
|
terminates the connection's process.
|
||||||
implementation instantiates this interface as a collection of Typed
|
|
||||||
Racket programs.
|
|
||||||
|
|
||||||
@section[#:tag "chat-server-example"]{TCP chat server}
|
@section[#:tag "chat-server-example"]{TCP chat server}
|
||||||
|
|
||||||
@#reader scribble/comment-reader (racketmod #:file "examples/chat-paper.rkt" marketplace
|
@#reader scribble/comment-reader (racketmod #:file "examples/chat-paper.rkt" marketplace
|
||||||
|
|
||||||
(nested-vm
|
(spawn-vm
|
||||||
(at-meta-level
|
(at-meta-level
|
||||||
(endpoint #:subscriber (tcp-channel ? (tcp-listener 5999) ?) #:observer
|
(observe-publishers (tcp-channel ? (tcp-listener 5999) ?)
|
||||||
#:conversation (tcp-channel them us _)
|
(match-conversation (tcp-channel them us _)
|
||||||
#:on-presence (spawn #:child (chat-session them us)))))
|
(on-presence (spawn (chat-session them us)))))))
|
||||||
|
|
||||||
(define (chat-session them us)
|
(define (chat-session them us)
|
||||||
(define user (gensym 'user))
|
(define user (gensym 'user))
|
||||||
(transition stateless
|
(transition stateless
|
||||||
(listen-to-user user them us)
|
(listen-to-user user them us)
|
||||||
(speak-to-user user them us)))
|
(speak-to-user user them us)))
|
||||||
|
|
||||||
(define (listen-to-user user them us)
|
(define (listen-to-user user them us)
|
||||||
(list
|
(list
|
||||||
(endpoint #:publisher `(,user says ,?))
|
|
||||||
(at-meta-level
|
(at-meta-level
|
||||||
(endpoint #:subscriber (tcp-channel them us ?)
|
(subscriber (tcp-channel them us ?)
|
||||||
#:on-absence (quit)
|
(on-absence (quit))
|
||||||
[(tcp-channel _ _ (? bytes? text))
|
(on-message
|
||||||
(send-message `(,user says ,text))]))))
|
[(tcp-channel _ _ (? bytes? text))
|
||||||
|
(send-message `(,user says ,text))])))
|
||||||
|
(publisher `(,user says ,?))))
|
||||||
|
|
||||||
(define (speak-to-user user them us)
|
(define (speak-to-user user them us)
|
||||||
(define (say fmt . args)
|
(define (say fmt . args)
|
||||||
(at-meta-level (send-message (tcp-channel us them (apply format fmt args)))))
|
(at-meta-level
|
||||||
|
(send-message
|
||||||
|
(tcp-channel us them (apply format fmt args)))))
|
||||||
(define (announce who did-what)
|
(define (announce who did-what)
|
||||||
(unless (equal? who user) (say "~s ~s.~n" who did-what)))
|
(unless (equal? who user)
|
||||||
|
(say "~s ~s.~n" who did-what)))
|
||||||
(list
|
(list
|
||||||
(say "You are ~s.~n" user)
|
(say "You are ~s.~n" user)
|
||||||
(at-meta-level
|
(at-meta-level
|
||||||
(endpoint #:publisher (tcp-channel us them ?)))
|
(publisher (tcp-channel us them ?)))
|
||||||
(endpoint #:subscriber `(,? says ,?)
|
(subscriber `(,? says ,?)
|
||||||
#:conversation `(,who says ,_)
|
(match-conversation `(,who says ,_)
|
||||||
#:on-presence (announce who 'arrived)
|
(on-presence (announce who 'arrived))
|
||||||
#:on-absence (announce who 'departed)
|
(on-absence (announce who 'departed))
|
||||||
[`(,who says ,what) (say "~a: ~a" who what)])))
|
(on-message [`(,who says ,what)
|
||||||
|
(say "~a: ~a" who what)])))))
|
||||||
)
|
)
|
||||||
|
|
||||||
@section[#:tag "chat-client-example"]{TCP chat client}
|
@section[#:tag "chat-client-example"]{TCP chat client}
|
||||||
|
@ -89,45 +94,46 @@ Racket programs.
|
||||||
@#reader scribble/comment-reader (racketmod #:file "examples/chat-client.rkt" marketplace
|
@#reader scribble/comment-reader (racketmod #:file "examples/chat-client.rkt" marketplace
|
||||||
(require racket/port)
|
(require racket/port)
|
||||||
|
|
||||||
(spawn #:debug-name 'console-output-driver
|
;; Usually it's OK to just use display and friends directly.
|
||||||
#:child
|
;; Here we have a console output driver just to show how it's done.
|
||||||
(transition/no-state
|
(name-process 'console-output-driver
|
||||||
(endpoint #:subscriber (list 'console-output ?)
|
(spawn (transition/no-state
|
||||||
[(list 'console-output item)
|
(subscriber (list 'console-output ?)
|
||||||
(begin (printf "~a" item)
|
(on-message [(list 'console-output item)
|
||||||
(void))])))
|
(printf "~a" item)
|
||||||
|
(void)])))))
|
||||||
|
|
||||||
(spawn #:debug-name 'console-input-driver
|
(name-process 'console-input-driver
|
||||||
#:child
|
(spawn (transition/no-state
|
||||||
(transition/no-state
|
(name-endpoint 'input-relay
|
||||||
(endpoint #:publisher (list 'console-input ?)
|
(publisher (list 'console-input ?)
|
||||||
#:name 'input-relay
|
(on-absence
|
||||||
#:on-absence
|
(send-message (list 'console-output "Connection terminated.\n"))
|
||||||
(list (send-message (list 'console-output "Connection terminated.\n"))
|
(quit))))
|
||||||
(quit)))
|
(subscriber (cons (read-line-evt (current-input-port) 'any) ?)
|
||||||
(endpoint #:subscriber (cons (read-line-evt (current-input-port) 'any) ?)
|
(on-message
|
||||||
[(cons _ (? eof-object?))
|
[(cons _ (? eof-object?))
|
||||||
(list (send-message (list 'console-output "Terminating on local EOF.\n"))
|
(send-message (list 'console-output "Terminating on local EOF.\n"))
|
||||||
(delete-endpoint 'input-relay))]
|
(delete-endpoint 'input-relay)]
|
||||||
[(cons _ (? string? line))
|
[(cons _ (? string? line))
|
||||||
(send-message (list 'console-input line))])))
|
(send-message (list 'console-input line))])))))
|
||||||
|
|
||||||
(spawn #:debug-name 'outbound-connection
|
(name-process 'outbound-connection
|
||||||
#:child
|
(spawn (let ((local (tcp-handle 'outbound))
|
||||||
(let ((local (tcp-handle 'outbound))
|
(remote (tcp-address "localhost" 5999)))
|
||||||
(remote (tcp-address "localhost" 5999)))
|
(transition/no-state
|
||||||
(transition/no-state
|
(subscriber (list 'console-input ?)
|
||||||
(endpoint #:subscriber (list 'console-input ?)
|
(on-absence (quit))
|
||||||
#:on-absence (quit)
|
(on-message
|
||||||
[(list 'console-input line)
|
[(list 'console-input line)
|
||||||
(list (send-message (list 'console-output (format "> ~a \n" line)))
|
(send-message (list 'console-output (format "> ~a \n" line)))
|
||||||
(send-message (tcp-channel local remote (string-append line "\n"))))])
|
(send-message (tcp-channel local remote (string-append line "\n")))]))
|
||||||
(endpoint #:publisher (tcp-channel local remote ?))
|
(publisher (tcp-channel local remote ?))
|
||||||
(endpoint #:subscriber (tcp-channel remote local ?)
|
(subscriber (tcp-channel remote local ?)
|
||||||
#:on-absence (quit)
|
(on-absence (quit))
|
||||||
[(tcp-channel _ _ (? eof-object?))
|
(on-message
|
||||||
(quit)]
|
[(tcp-channel _ _ (? eof-object?))
|
||||||
[(tcp-channel _ _ data)
|
(quit)]
|
||||||
(list (send-message (list 'console-output (format "< ~a" data)))
|
[(tcp-channel _ _ data)
|
||||||
(void))]))))
|
(send-message (list 'console-output (format "< ~a" data)))]))))))
|
||||||
)
|
)
|
||||||
|
|
|
@ -611,16 +611,16 @@ If @racket[pattern] is supplied, @racket[k-expr] should evaluate to a
|
||||||
@section{Creating nested VMs}
|
@section{Creating nested VMs}
|
||||||
|
|
||||||
@deftogether[(
|
@deftogether[(
|
||||||
@defform[(nested-vm maybe-vm-pid-binding maybe-boot-pid-binding
|
@defform[(spawn-vm maybe-vm-pid-binding maybe-boot-pid-binding
|
||||||
maybe-initial-state
|
maybe-initial-state
|
||||||
maybe-debug-name
|
maybe-debug-name
|
||||||
boot-action-expr ...)]
|
boot-action-expr ...)]
|
||||||
@defform[#:literals (:)
|
@defform[#:literals (:)
|
||||||
(nested-vm: : ParentStateType
|
(spawn-vm: : ParentStateType
|
||||||
maybe-vm-pid-binding maybe-boot-pid-binding
|
maybe-vm-pid-binding maybe-boot-pid-binding
|
||||||
maybe-typed-initial-state
|
maybe-typed-initial-state
|
||||||
maybe-debug-name
|
maybe-debug-name
|
||||||
boot-action-expr ...)
|
boot-action-expr ...)
|
||||||
#:grammar
|
#:grammar
|
||||||
[(maybe-vm-pid-binding (code:line)
|
[(maybe-vm-pid-binding (code:line)
|
||||||
(code:line #:vm-pid identifier))
|
(code:line #:vm-pid identifier))
|
||||||
|
@ -651,7 +651,7 @@ primordial process in the new VM.
|
||||||
@section{Relaying across layers}
|
@section{Relaying across layers}
|
||||||
|
|
||||||
@deftogether[(
|
@deftogether[(
|
||||||
@defform[#:literals (:) (at-meta-level: : StateType preaction ...)]
|
@defform[(at-meta-level: StateType preaction ...)]
|
||||||
@defproc[(at-meta-level [preaction (PreAction State)] ...) (Action StateType)]
|
@defproc[(at-meta-level [preaction (PreAction State)] ...) (Action StateType)]
|
||||||
)]{
|
)]{
|
||||||
|
|
||||||
|
@ -670,29 +670,29 @@ For example, wrapping an @racket[endpoint] in @racket[at-meta-level]
|
||||||
adds a subscription to the VM's container's network. Instead of
|
adds a subscription to the VM's container's network. Instead of
|
||||||
listening to sibling processes of the acting process, the new endpoint
|
listening to sibling processes of the acting process, the new endpoint
|
||||||
will listen to sibling processes of the acting process's VM. In this
|
will listen to sibling processes of the acting process's VM. In this
|
||||||
example, the primordial process in the @racket[nested-vm] creates an
|
example, the primordial process in the nested VM creates an
|
||||||
endpoint in the VM's own network, the ground VM:
|
endpoint in the VM's own network, the ground VM:
|
||||||
|
|
||||||
@racketblock[
|
@racketblock[
|
||||||
(nested-vm
|
(spawn-vm
|
||||||
(at-meta-level
|
(at-meta-level
|
||||||
(endpoint #:subscriber (tcp-channel ? (tcp-listener 5999) ?) ...)))
|
(endpoint #:subscriber (tcp-channel ? (tcp-listener 5999) ?) ...)))
|
||||||
]
|
]
|
||||||
|
|
||||||
In this example, a new process is spawned as a sibling of the
|
In this example, a new process is spawned as a sibling of the
|
||||||
@racket[nested-vm] rather than as a sibling of its primordial process:
|
nested VM rather than as a sibling of its primordial process:
|
||||||
|
|
||||||
@racketblock[
|
@racketblock[
|
||||||
(nested-vm
|
(spawn-vm
|
||||||
(at-meta-level
|
(at-meta-level
|
||||||
(spawn #:child (transition/no-state (send-message 'hello-world)))))
|
(spawn #:child (transition/no-state (send-message 'hello-world)))))
|
||||||
]
|
]
|
||||||
|
|
||||||
Compare to this example, which spawns a sibling of the
|
Compare to this example, which spawns a sibling of the
|
||||||
@racket[nested-vm]'s primordial process:
|
nested VM's primordial process:
|
||||||
|
|
||||||
@racketblock[
|
@racketblock[
|
||||||
(nested-vm
|
(spawn-vm
|
||||||
(spawn #:child (transition/no-state (send-message 'hello-world))))
|
(spawn #:child (transition/no-state (send-message 'hello-world))))
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue