Incorporate port-cooker and change to being a sexp relay

This commit is contained in:
Tony Garnock-Jones 2012-07-09 16:24:51 -04:00
parent 9a87abc574
commit 59d783a897
1 changed files with 76 additions and 9 deletions

View File

@ -4,6 +4,8 @@
(require racket/set)
(require racket/match)
(require racket/contract)
(require (only-in racket/port peek-bytes-avail!-evt))
(require "cook-port.rkt")
(require "ssh-numbers.rkt")
(require "ssh-transport.rkt")
@ -51,6 +53,7 @@
(define (repl-boot self-pid)
(transition 'no-repl-state
(spawn event-relay #:debug-name (debug-name 'repl-event-relay))
(role 'spy (or (topic-subscriber (wild) #:monitor? #t)
(topic-publisher (wild) #:monitor? #t))
#:state state
@ -68,6 +71,32 @@
[(topic _ (channel-message (channel-stream-name _ cname) _) _)
(transition state (spawn (repl-instance cname) #:debug-name cname))])))))
(define (repl-evaluator in out)
(fprintf out "Welcome!~n")
(flush-output out)
(let loop ()
(define v (read in))
(write `(REPL ,v)) (newline) (flush-output)
(cond
[(eof-object? v)
(fprintf out "Goodbye!~n")
(flush-output out)
(sleep 0.1) ;; TODO: reliable transmission to avoid the sleep. (At least drain the pipe.)
]
[else
(fprintf out "You said ~v~n" v)
(flush-output out)
(loop)]))
(close-input-port in)
(close-output-port out))
;; (repl-instance InputPort OutputPort InputPort OutputPort)
(struct repl-instance-state (c2s-in ;; used by thread to read input from relay
c2s-out ;; used by relay to feed input from remote to the thread
s2c-in ;; used by relay to feed output from thread to remote
s2c-out ;; used by thread to write output to relay
) #:prefab)
(define (repl-instance cname)
(define inbound-stream (channel-stream-name #t cname))
(define outbound-stream (channel-stream-name #f cname))
@ -76,29 +105,66 @@
(define (handle-channel-message state body)
(match body
[(channel-stream-request #"pty-req" _)
(transition state (ch-do send-feedback inbound-stream (channel-stream-ok)))]
(match-define (repl-instance-state old-in _ _ old-out) state)
(define-values (cooked-in cooked-out) (cook-io old-in old-out "RacketSSH> "))
(transition (struct-copy repl-instance-state state
[c2s-in cooked-in]
[s2c-out cooked-out])
(ch-do send-feedback inbound-stream (channel-stream-ok)))]
[(channel-stream-notify #"env" _)
;; Don't care
state]
[(channel-stream-request #"shell" _)
(match-define (repl-instance-state c2s-in _ s2c-in s2c-out) state)
(define buffer-size 1024)
(define dummy-buffer (make-bytes buffer-size))
(define repl-thread (thread (lambda () (repl-evaluator c2s-in s2c-out))))
(transition state
(ch-do send-feedback inbound-stream (channel-stream-ok))
(ch-do send-message outbound-stream (channel-stream-data #"Welcome!\r\n")))]
(role 'thread-death-listener (topic-subscriber (cons (thread-dead-evt repl-thread) (wild)))
#:state state
[_
(transition state
(kill #:reason "REPL thread exited"))])
(role 'relay-out (topic-subscriber (cons (peek-bytes-avail!-evt dummy-buffer 0 #f s2c-in)
(wild)))
;; We're using peek-bytes-avail!-evt rather than
;; read-bytes-avail!-evt because of potential overwriting
;; of the buffer. The overwriting can happen when there's
;; any latency between handling the event and the next
;; firing of the event, since the peek-bytes-avail!-evt
;; will overwrite its buffer next time it's synced on.
#:state state
[(cons _ (? eof-object?))
(match-define (repl-instance-state c2s-in c2s-out s2c-in s2c-out) state)
(close-input-port c2s-in)
(close-output-port c2s-out)
(close-input-port s2c-in)
(close-output-port s2c-out)
(transition state (kill))]
[(cons _ (? number? count))
(transition state
(ch-do send-message outbound-stream (channel-stream-data
(read-bytes count s2c-in))))]))]
[(or (channel-stream-data #"\4") ;; C-d a.k.a EOT
(channel-stream-eof))
(transition state
(ch-do send-message outbound-stream (channel-stream-data #"Goodbye!\r\n"))
(kill))]
(close-output-port (repl-instance-state-c2s-out state))
;; ^ this signals the repl thread to exit.
;; Now, wait for it to do so.
state]
[(channel-stream-data bs)
(write-bytes bs (repl-instance-state-c2s-out state))
(flush-output (repl-instance-state-c2s-out state))
(transition state
(ch-do send-feedback inbound-stream (channel-stream-credit (bytes-length bs)))
(ch-do send-message outbound-stream (channel-stream-data bs)))]
(ch-do send-feedback inbound-stream (channel-stream-credit (bytes-length bs))))]
[m
(write `(channel inbound ,m)) (newline)
state]))
(match (channel-name-type cname)
[#"session"
(transition 'no-instance-state
(define-values (c2s-in c2s-out) (make-pipe))
(define-values (s2c-in s2c-out) (make-pipe))
(transition (repl-instance-state c2s-in c2s-out s2c-in s2c-out)
(at-meta-level
(role 'input (topic-subscriber (channel-message inbound-stream (wild)))
#:state state
@ -106,7 +172,7 @@
(ch-do send-feedback inbound-stream (channel-stream-config
(default-packet-limit)
#""))
(ch-do send-feedback inbound-stream (channel-stream-credit 64)))
(ch-do send-feedback inbound-stream (channel-stream-credit 1024)))
[(channel-message _ body)
(handle-channel-message state body)]))
(at-meta-level
@ -214,6 +280,7 @@
;; process in case of error.
(lambda (nested-boot-pid)
(transition 'no-state
(spawn event-relay #:debug-name (debug-name 'session-event-relay))
(spawn (timer-relay 'ssh-timer-relay) #:debug-name 'ssh-timer-relay)
(spy 'SSH)