diff --git a/new-server.rkt b/new-server.rkt index a966b9d..3f6ca12 100644 --- a/new-server.rkt +++ b/new-server.rkt @@ -4,6 +4,8 @@ (require racket/set) (require racket/match) (require racket/contract) +(require (only-in racket/port peek-bytes-avail!-evt)) +(require "cook-port.rkt") (require "ssh-numbers.rkt") (require "ssh-transport.rkt") @@ -51,6 +53,7 @@ (define (repl-boot self-pid) (transition 'no-repl-state + (spawn event-relay #:debug-name (debug-name 'repl-event-relay)) (role 'spy (or (topic-subscriber (wild) #:monitor? #t) (topic-publisher (wild) #:monitor? #t)) #:state state @@ -68,6 +71,32 @@ [(topic _ (channel-message (channel-stream-name _ cname) _) _) (transition state (spawn (repl-instance cname) #:debug-name cname))]))))) +(define (repl-evaluator in out) + (fprintf out "Welcome!~n") + (flush-output out) + (let loop () + (define v (read in)) + (write `(REPL ,v)) (newline) (flush-output) + (cond + [(eof-object? v) + (fprintf out "Goodbye!~n") + (flush-output out) + (sleep 0.1) ;; TODO: reliable transmission to avoid the sleep. (At least drain the pipe.) + ] + [else + (fprintf out "You said ~v~n" v) + (flush-output out) + (loop)])) + (close-input-port in) + (close-output-port out)) + +;; (repl-instance InputPort OutputPort InputPort OutputPort) +(struct repl-instance-state (c2s-in ;; used by thread to read input from relay + c2s-out ;; used by relay to feed input from remote to the thread + s2c-in ;; used by relay to feed output from thread to remote + s2c-out ;; used by thread to write output to relay + ) #:prefab) + (define (repl-instance cname) (define inbound-stream (channel-stream-name #t cname)) (define outbound-stream (channel-stream-name #f cname)) @@ -76,29 +105,66 @@ (define (handle-channel-message state body) (match body [(channel-stream-request #"pty-req" _) - (transition state (ch-do send-feedback inbound-stream (channel-stream-ok)))] + (match-define (repl-instance-state old-in _ _ old-out) state) + (define-values (cooked-in cooked-out) (cook-io old-in old-out "RacketSSH> ")) + (transition (struct-copy repl-instance-state state + [c2s-in cooked-in] + [s2c-out cooked-out]) + (ch-do send-feedback inbound-stream (channel-stream-ok)))] [(channel-stream-notify #"env" _) ;; Don't care state] [(channel-stream-request #"shell" _) + (match-define (repl-instance-state c2s-in _ s2c-in s2c-out) state) + (define buffer-size 1024) + (define dummy-buffer (make-bytes buffer-size)) + (define repl-thread (thread (lambda () (repl-evaluator c2s-in s2c-out)))) (transition state (ch-do send-feedback inbound-stream (channel-stream-ok)) - (ch-do send-message outbound-stream (channel-stream-data #"Welcome!\r\n")))] + (role 'thread-death-listener (topic-subscriber (cons (thread-dead-evt repl-thread) (wild))) + #:state state + [_ + (transition state + (kill #:reason "REPL thread exited"))]) + (role 'relay-out (topic-subscriber (cons (peek-bytes-avail!-evt dummy-buffer 0 #f s2c-in) + (wild))) + ;; We're using peek-bytes-avail!-evt rather than + ;; read-bytes-avail!-evt because of potential overwriting + ;; of the buffer. The overwriting can happen when there's + ;; any latency between handling the event and the next + ;; firing of the event, since the peek-bytes-avail!-evt + ;; will overwrite its buffer next time it's synced on. + #:state state + [(cons _ (? eof-object?)) + (match-define (repl-instance-state c2s-in c2s-out s2c-in s2c-out) state) + (close-input-port c2s-in) + (close-output-port c2s-out) + (close-input-port s2c-in) + (close-output-port s2c-out) + (transition state (kill))] + [(cons _ (? number? count)) + (transition state + (ch-do send-message outbound-stream (channel-stream-data + (read-bytes count s2c-in))))]))] [(or (channel-stream-data #"\4") ;; C-d a.k.a EOT (channel-stream-eof)) - (transition state - (ch-do send-message outbound-stream (channel-stream-data #"Goodbye!\r\n")) - (kill))] + (close-output-port (repl-instance-state-c2s-out state)) + ;; ^ this signals the repl thread to exit. + ;; Now, wait for it to do so. + state] [(channel-stream-data bs) + (write-bytes bs (repl-instance-state-c2s-out state)) + (flush-output (repl-instance-state-c2s-out state)) (transition state - (ch-do send-feedback inbound-stream (channel-stream-credit (bytes-length bs))) - (ch-do send-message outbound-stream (channel-stream-data bs)))] + (ch-do send-feedback inbound-stream (channel-stream-credit (bytes-length bs))))] [m (write `(channel inbound ,m)) (newline) state])) (match (channel-name-type cname) [#"session" - (transition 'no-instance-state + (define-values (c2s-in c2s-out) (make-pipe)) + (define-values (s2c-in s2c-out) (make-pipe)) + (transition (repl-instance-state c2s-in c2s-out s2c-in s2c-out) (at-meta-level (role 'input (topic-subscriber (channel-message inbound-stream (wild))) #:state state @@ -106,7 +172,7 @@ (ch-do send-feedback inbound-stream (channel-stream-config (default-packet-limit) #"")) - (ch-do send-feedback inbound-stream (channel-stream-credit 64))) + (ch-do send-feedback inbound-stream (channel-stream-credit 1024))) [(channel-message _ body) (handle-channel-message state body)])) (at-meta-level @@ -214,6 +280,7 @@ ;; process in case of error. (lambda (nested-boot-pid) (transition 'no-state + (spawn event-relay #:debug-name (debug-name 'session-event-relay)) (spawn (timer-relay 'ssh-timer-relay) #:debug-name 'ssh-timer-relay) (spy 'SSH)