Cosmetic.

This commit is contained in:
Tony Garnock-Jones 2012-07-18 11:34:21 -04:00
parent 7636862e31
commit 4bad2698bb
1 changed files with 118 additions and 121 deletions

View File

@ -44,127 +44,6 @@
"Invalid peer identification string ~v"
peer-identification-string)))
(define ((repl-boot user-name) self-pid)
(transition 'no-repl-state
(spawn event-relay #:debug-name (debug-name 'repl-event-relay))
(role 'spy (or (topic-subscriber (wild) #:monitor? #t)
(topic-publisher (wild) #:monitor? #t))
#:state state
[message
(write `(APP ,message))
(newline)
(flush-output)
state])
(at-meta-level
(role 'channel-listener (topic-subscriber (channel-message (channel-stream-name #t (wild))
(wild)))
#:state state
#:topic t
#:on-presence (match t
[(topic _ (channel-message (channel-stream-name _ cname) _) _)
(transition state (spawn (repl-instance user-name cname)
#:debug-name cname))])))))
;; (repl-instance InputPort OutputPort InputPort OutputPort)
(struct repl-instance-state (c2s-in ;; used by thread to read input from relay
c2s-out ;; used by relay to feed input from remote to the thread
s2c-in ;; used by relay to feed output from thread to remote
s2c-out ;; used by thread to write output to relay
) #:prefab)
(define (repl-instance user-name cname)
(define inbound-stream (channel-stream-name #t cname))
(define outbound-stream (channel-stream-name #f cname))
(define (ch-do action-ctor stream body)
(at-meta-level (action-ctor (channel-message stream body))))
(define (handle-channel-message state body)
(match body
[(channel-stream-request #"pty-req" _)
(match-define (repl-instance-state old-in _ _ old-out) state)
(define-values (cooked-in cooked-out) (cook-io old-in old-out "> "))
(transition (struct-copy repl-instance-state state
[c2s-in cooked-in]
[s2c-out cooked-out])
(ch-do send-feedback inbound-stream (channel-stream-ok)))]
[(channel-stream-notify #"env" _)
;; Don't care
state]
[(channel-stream-request #"shell" _)
(match-define (repl-instance-state c2s-in _ s2c-in s2c-out) state)
(define buffer-size 1024)
(define dummy-buffer (make-bytes buffer-size))
(define repl-thread (thread (lambda () (repl-shell user-name c2s-in s2c-out))))
(transition state
(ch-do send-feedback inbound-stream (channel-stream-ok))
(role 'thread-death-listener (topic-subscriber (cons (thread-dead-evt repl-thread) (wild)))
#:state state
[_
(transition state
(kill #:reason "REPL thread exited"))])
(role 'relay-out (topic-subscriber (cons (peek-bytes-avail!-evt dummy-buffer 0 #f s2c-in)
(wild)))
;; We're using peek-bytes-avail!-evt rather than
;; read-bytes-avail!-evt because of potential overwriting
;; of the buffer. The overwriting can happen when there's
;; any latency between handling the event and the next
;; firing of the event, since the peek-bytes-avail!-evt
;; will overwrite its buffer next time it's synced on.
#:state state
[(cons _ (? eof-object?))
(match-define (repl-instance-state c2s-in c2s-out s2c-in s2c-out) state)
(close-input-port c2s-in)
(close-output-port c2s-out)
(close-input-port s2c-in)
(close-output-port s2c-out)
(transition state (kill))]
[(cons _ (? number? count))
(transition state
(ch-do send-message outbound-stream (channel-stream-data
(read-bytes count s2c-in))))]))]
[(or (channel-stream-data #"\4") ;; C-d a.k.a EOT
(channel-stream-eof))
(close-output-port (repl-instance-state-c2s-out state))
;; ^ this signals the repl thread to exit.
;; Now, wait for it to do so.
state]
[(channel-stream-data bs)
(write-bytes bs (repl-instance-state-c2s-out state))
(flush-output (repl-instance-state-c2s-out state))
(transition state
(ch-do send-feedback inbound-stream (channel-stream-credit (bytes-length bs))))]
[m
(write `(channel inbound ,m)) (newline)
state]))
(match (channel-name-type cname)
[#"session"
(define-values (c2s-in c2s-out) (make-pipe))
(define-values (s2c-in s2c-out) (make-pipe))
(transition (repl-instance-state c2s-in c2s-out s2c-in s2c-out)
(at-meta-level
(role 'input (topic-subscriber (channel-message inbound-stream (wild)))
#:state state
#:on-presence (transition state
(ch-do send-feedback inbound-stream (channel-stream-config
(default-packet-limit)
#""))
(ch-do send-feedback inbound-stream (channel-stream-credit 1024)))
[(channel-message _ body)
(handle-channel-message state body)]))
(at-meta-level
(role 'output (topic-publisher (channel-message outbound-stream (wild)))
#:state state
[m
(write `(channel outbound ,cname ,m)) (newline)
state])))]
[type
(transition 'no-instance-state
(at-meta-level (send-message
(channel-message outbound-stream
(channel-stream-open-failure
SSH_OPEN_UNKNOWN_CHANNEL_TYPE
(bytes-append #"Unknown channel type " type))))))]))
(define (spy marker)
(role 'spy (or (topic-subscriber (wild) #:monitor? #t)
(topic-publisher (wild) #:monitor? #t))
@ -274,5 +153,123 @@
#:reason reason
#:on-absence (current-handler reason))))))))
;;---------------------------------------------------------------------------
(define ((repl-boot user-name) self-pid)
(transition 'no-repl-state
(spawn event-relay #:debug-name (debug-name 'repl-event-relay))
(spy 'APP)
(at-meta-level
(role 'channel-listener (topic-subscriber (channel-message (channel-stream-name #t (wild))
(wild)))
#:state state
#:topic t
#:on-presence (match t
[(topic _ (channel-message (channel-stream-name _ cname) _) _)
(transition state (spawn (repl-instance user-name cname)
#:debug-name cname))])))))
;; (repl-instance InputPort OutputPort InputPort OutputPort)
(struct repl-instance-state (c2s-in ;; used by thread to read input from relay
c2s-out ;; used by relay to feed input from remote to the thread
s2c-in ;; used by relay to feed output from thread to remote
s2c-out ;; used by thread to write output to relay
) #:prefab)
(define (repl-instance user-name cname)
(define inbound-stream (channel-stream-name #t cname))
(define outbound-stream (channel-stream-name #f cname))
(define (ch-do action-ctor stream body)
(at-meta-level (action-ctor (channel-message stream body))))
(define (handle-channel-message state body)
(match body
[(channel-stream-request #"pty-req" _)
(match-define (repl-instance-state old-in _ _ old-out) state)
(define-values (cooked-in cooked-out) (cook-io old-in old-out "> "))
(transition (struct-copy repl-instance-state state
[c2s-in cooked-in]
[s2c-out cooked-out])
(ch-do send-feedback inbound-stream (channel-stream-ok)))]
[(channel-stream-notify #"env" _)
;; Don't care
state]
[(channel-stream-request #"shell" _)
(match-define (repl-instance-state c2s-in _ s2c-in s2c-out) state)
(define buffer-size 1024)
(define dummy-buffer (make-bytes buffer-size))
(define repl-thread (thread (lambda () (repl-shell user-name c2s-in s2c-out))))
(transition state
(ch-do send-feedback inbound-stream (channel-stream-ok))
(role 'thread-death-listener (topic-subscriber (cons (thread-dead-evt repl-thread) (wild)))
#:state state
[_
(transition state
(kill #:reason "REPL thread exited"))])
(role 'relay-out (topic-subscriber (cons (peek-bytes-avail!-evt dummy-buffer 0 #f s2c-in)
(wild)))
;; We're using peek-bytes-avail!-evt rather than
;; read-bytes-avail!-evt because of potential overwriting
;; of the buffer. The overwriting can happen when there's
;; any latency between handling the event and the next
;; firing of the event, since the peek-bytes-avail!-evt
;; will overwrite its buffer next time it's synced on.
#:state state
[(cons _ (? eof-object?))
(match-define (repl-instance-state c2s-in c2s-out s2c-in s2c-out) state)
(close-input-port c2s-in)
(close-output-port c2s-out)
(close-input-port s2c-in)
(close-output-port s2c-out)
(transition state (kill))]
[(cons _ (? number? count))
(transition state
(ch-do send-message outbound-stream (channel-stream-data
(read-bytes count s2c-in))))]))]
[(or (channel-stream-data #"\4") ;; C-d a.k.a EOT
(channel-stream-eof))
(close-output-port (repl-instance-state-c2s-out state))
;; ^ this signals the repl thread to exit.
;; Now, wait for it to do so.
state]
[(channel-stream-data bs)
(write-bytes bs (repl-instance-state-c2s-out state))
(flush-output (repl-instance-state-c2s-out state))
(transition state
(ch-do send-feedback inbound-stream (channel-stream-credit (bytes-length bs))))]
[m
(write `(channel inbound ,m)) (newline)
state]))
(match (channel-name-type cname)
[#"session"
(define-values (c2s-in c2s-out) (make-pipe))
(define-values (s2c-in s2c-out) (make-pipe))
(transition (repl-instance-state c2s-in c2s-out s2c-in s2c-out)
(at-meta-level
(role 'input (topic-subscriber (channel-message inbound-stream (wild)))
#:state state
#:on-presence (transition state
(ch-do send-feedback inbound-stream (channel-stream-config
(default-packet-limit)
#""))
(ch-do send-feedback inbound-stream (channel-stream-credit 1024)))
[(channel-message _ body)
(handle-channel-message state body)]))
(at-meta-level
(role 'output (topic-publisher (channel-message outbound-stream (wild)))
#:state state
[m
(write `(channel outbound ,cname ,m)) (newline)
state])))]
[type
(transition 'no-instance-state
(at-meta-level (send-message
(channel-message outbound-stream
(channel-stream-open-failure
SSH_OPEN_UNKNOWN_CHANNEL_TYPE
(bytes-append #"Unknown channel type " type))))))]))
;;---------------------------------------------------------------------------
;; TODO: module+
(main)