From d47f553e21fb18be059c5748640b9276f6ac13a4 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Wed, 26 Oct 2011 00:07:38 -0400 Subject: [PATCH] Fix bugs in channel shutdown handling. Add tty cooker. --- cook-port.rkt | 67 ++++++++++++++++++++++++++++++++++++++++++++++ repl-server.rkt | 61 +++++++++++++++++++++++++++--------------- ssh-session.rkt | 71 +++++++++++++++++++++++++++++++------------------ 3 files changed, 151 insertions(+), 48 deletions(-) create mode 100644 cook-port.rkt diff --git a/cook-port.rkt b/cook-port.rkt new file mode 100644 index 0000000..d7246eb --- /dev/null +++ b/cook-port.rkt @@ -0,0 +1,67 @@ +#lang racket/base + +(require racket/port) + +(provide cook-io) + +(define clear-to-eol "\033[2K") +(define kill-line (string-append "\r" clear-to-eol)) + +(struct buffer (chars consume-next-linefeed?) #:transparent) + +(define (buffer-empty? b) + (null? (buffer-chars b))) + +(define (buffer-adjust b new-chars) + (struct-copy buffer b + [chars new-chars] + [consume-next-linefeed? #f])) + +(define (buffer-contents b) + (list->string (reverse (buffer-chars b)))) + +(define (update-buffer b key prompt k-eof k-complete k-ongoing) + (case key + ((#\backspace) (if (buffer-empty? b) + (k-ongoing b "") + (k-ongoing (buffer-adjust b (cdr (buffer-chars b))) "\b \b"))) + ((#\return) (k-complete (buffer-contents b) (buffer '() #t))) + ((#\newline) (if (buffer-consume-next-linefeed? b) + (k-ongoing (struct-copy buffer b [consume-next-linefeed? #f]) "") + (k-complete (buffer-contents b) (buffer '() #f)))) + ((#\page) (k-ongoing b (string-append kill-line prompt (buffer-contents b)))) + ((#\004) ;; control-D, UNIX EOF + (if (buffer-empty? b) + (k-eof) + (k-ongoing b ""))) + ((#\033) ;; escape + (k-ongoing (buffer '() #f) (string-append kill-line prompt))) + (else (if (char-iso-control? key) + (k-ongoing b "") + (k-ongoing (buffer-adjust b (cons key (buffer-chars b))) (string key)))))) + +(define (cook-io raw-in raw-out prompt) + (define-values (cooked-in cooked-out) (make-pipe)) + (thread + (lambda () + (let loop ((b (buffer '() #f))) + (if (port-closed? cooked-out) + ;; our job here is + 'done + (let ((s (read-string 1 raw-in))) + (if (eof-object? s) + (begin (close-output-port cooked-out) + (loop b)) + (begin (update-buffer b (string-ref s 0) prompt + (lambda () + (close-output-port cooked-out) + (close-input-port raw-in)) + (lambda (line new-b) + (write-string "\r\n" raw-out) + (write-string line cooked-out) + (newline cooked-out) + (loop new-b)) + (lambda (new-b feedback) + (write-string feedback raw-out) + (loop new-b)))))))))) + (values cooked-in raw-out)) diff --git a/repl-server.rkt b/repl-server.rkt index 3292280..80312c3 100644 --- a/repl-server.rkt +++ b/repl-server.rkt @@ -12,6 +12,8 @@ (require "ssh-session.rkt") (require "standard-thread.rkt") +(require "cook-port.rkt") + (define (t-client) (let-values (((i o) (tcp-connect "localhost" 2323 @@ -21,27 +23,42 @@ (printf "Got API ~v\n" api) (semaphore-wait (make-semaphore 0))))) -(define (repl-channel-main oob-ch in out) - (let loop () - (sync (handle-evt oob-ch - (match-lambda - (`(notify ,type ,data) - (log-info (format "repl-channel: notify ~v ~v" type data)) - (loop)) - (`(request (#"shell" ,_) ,k) - (k 'ok) - (loop)) - (`(request ,other ,k) - (log-info (format "repl-channel: request ~v" other)) - (k 'error) - (loop)))) - (handle-evt (read-line-evt in 'any) - (lambda (line) - (if (eof-object? line) - (begin (fprintf out "Goodbye\n" out) - 'done) - (begin (fprintf out "You said ~s\n" line) - (loop)))))))) +(define prompt "RacketSSH> ") + +(define (make-repl-channel-main username) + (lambda (oob-ch in out) + (fprintf out "Hello, ~a.\r\n~a" username prompt) + (let update-channels ((in in) (out out)) + (let loop () + (sync (handle-evt oob-ch + (match-lambda + (`(notify ,type ,data) + (log-info (format "repl-channel: notify ~v ~v" type data)) + (loop)) + (`(request (#"shell" ,_) ,k) + (k 'ok) + (loop)) + (`(request (#"pty-req" ,_) ,k) + (k 'ok) + (define-values (cooked-in cooked-out) (cook-io in out prompt)) + (update-channels cooked-in cooked-out)) + (`(request ,other ,k) + (log-info (format "repl-channel: request ~v" other)) + (k 'error) + (loop)))) + #;(handle-evt (read-bytes-evt 10000 in) + (lambda (buf) + (write-bytes buf out) + (loop))) + (handle-evt (read-line-evt in 'any) + (lambda (line) + ;;(log-info (format "received ~v" line)) + (if (eof-object? line) + (begin (fprintf out "\r\nGoodbye\r\n") + 'done) + (begin (fprintf out "You said ~s\r\n" line) + (write-string prompt out) + (loop)))))))))) (define (t-server) (define s (tcp-listen 2322 4 #t "localhost")) @@ -54,7 +71,7 @@ (lambda (username channel-type extra-request-data state) (match channel-type (#"session" - (values `(ok ,repl-channel-main #"") + (values `(ok ,(make-repl-channel-main username) #"") state)) (else (values `(error ,SSH_OPEN_UNKNOWN_CHANNEL_TYPE diff --git a/ssh-session.rkt b/ssh-session.rkt index fbd4b96..71a3260 100644 --- a/ssh-session.rkt +++ b/ssh-session.rkt @@ -937,7 +937,7 @@ ;; Session API ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(define (run-channel oob-ch in out handle channel-thread) +(define (run-channel oob-ch app-out-port in out handle channel-thread) (define (close-in) (when (not (port-closed? in)) (close-input-port in))) @@ -946,45 +946,62 @@ (close-output-port out))) (define (close-ports) (close-in) - (close-out)) + (close-out) + 'done) (let loop ((oob-queue (make-queue)) - (remaining-credit 0)) + (remaining-credit 0) + (channel-thread-alive #t)) (sync (if (queue-empty? oob-queue) never-evt (let-values (((first rest) (dequeue oob-queue))) (handle-evt (channel-put-evt oob-ch first) - (lambda (dummy) (loop rest remaining-credit))))) - (handle-evt (thread-dead-evt channel-thread) - (lambda (dummy) - (close-ports) - 'done)) - (if (and (not (port-closed? in)) (positive? remaining-credit)) - (let ((buffer (make-bytes (min (channel-io-transfer-buffer-size) remaining-credit)))) - (handle-evt (read-bytes-avail!-evt buffer in) - (lambda (count) - (if (eof-object? count) - (begin (close-in) - (loop oob-queue remaining-credit)) - (begin (send handle say - `(data ,(sub-bit-string buffer 0 (* 8 count)))) - (loop oob-queue (- remaining-credit count))))))) + (lambda (dummy) (loop rest remaining-credit channel-thread-alive))))) + (if channel-thread-alive + (handle-evt (thread-dead-evt channel-thread) + (lambda (dummy) + (when (not (port-closed? app-out-port)) + (close-output-port app-out-port)) + (loop oob-queue remaining-credit #f))) never-evt) + (if (port-closed? in) + (if channel-thread-alive + never-evt + (handle-evt always-evt (lambda (dummy) (close-ports)))) + (if (positive? remaining-credit) + (let ((buffer (make-bytes (min (channel-io-transfer-buffer-size) + remaining-credit)))) + (handle-evt (read-bytes-avail!-evt buffer in) + (lambda (count) + (if (eof-object? count) + (begin (close-in) + (if channel-thread-alive + (loop oob-queue + remaining-credit + channel-thread-alive) + (close-ports))) + (begin (send handle say + `(data ,(sub-bit-string buffer 0 (* 8 count)))) + (loop oob-queue + (- remaining-credit count) + channel-thread-alive)))))) + never-evt)) (handle-evt (send handle listen-evt) (match-lambda ((arrived _) - (loop oob-queue remaining-credit)) + (loop oob-queue remaining-credit channel-thread-alive)) ((departed _ _) - (close-ports) - 'done) + (close-ports)) ((says _ (credit _ amount) _) - (loop oob-queue (+ remaining-credit amount))) + (loop oob-queue (+ remaining-credit amount) channel-thread-alive)) ((says _ `(data ,data) _) (write-bytes data out) ;; TODO: propagate backpressure through pipes (send handle say (credit 'session (bytes-length data))) - (loop oob-queue remaining-credit)) + (loop oob-queue remaining-credit channel-thread-alive)) ((says _ (and notification `(notify ,type ,data)) _) - (loop (enqueue oob-queue notification) remaining-credit)) + (loop (enqueue oob-queue notification) + remaining-credit + channel-thread-alive)) ((says _ (rpc-request reply-to id message) _) (loop (enqueue oob-queue `(request ,message @@ -992,11 +1009,12 @@ (send handle say (rpc-reply id answer) reply-to)))) - remaining-credit))))))) + remaining-credit + channel-thread-alive))))))) (define (start-app-channel channel-main) (define channel-room (make-room 'channel)) - (spy-on channel-room) + ;;(spy-on channel-room) (define oob-ch (make-channel)) (define-values (session-a2s app-a2s) (make-pipe)) @@ -1004,6 +1022,7 @@ (standard-thread (lambda () (run-channel oob-ch + app-a2s session-a2s session-s2a (join-room channel-room 'app)