From 7ad6291124957112099669d67310fd8c90e9fcf6 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Wed, 26 Oct 2011 19:16:16 -0400 Subject: [PATCH] Fix EOF- and close-negotiation; switch API for closing; connect sandbox repl. --- repl-server.rkt | 56 ++++++++++++-------- ssh-session.rkt | 138 ++++++++++++++++++++++++------------------------ 2 files changed, 104 insertions(+), 90 deletions(-) diff --git a/repl-server.rkt b/repl-server.rkt index 80312c3..ab13279 100644 --- a/repl-server.rkt +++ b/repl-server.rkt @@ -6,6 +6,7 @@ (require racket/match) (require racket/class) (require racket/port) +(require racket/sandbox) (require "conversation.rkt") (require "ssh-numbers.rkt") @@ -23,11 +24,37 @@ (printf "Got API ~v\n" api) (semaphore-wait (make-semaphore 0))))) -(define prompt "RacketSSH> ") - (define (make-repl-channel-main username) (lambda (oob-ch in out) - (fprintf out "Hello, ~a.\r\n~a" username prompt) + (define (run-shell in out) + (define reader-thread + (thread + (lambda () + (fprintf out "Hello, ~a.\n" username) + (parameterize ((current-input-port in) + (current-output-port out) + (current-error-port out) + (sandbox-input in) + (sandbox-output out) + (sandbox-error-output out) + (current-namespace (make-empty-namespace))) + (parameterize ((current-eval (make-evaluator 'racket/base))) + (read-eval-print-loop)) + (fprintf out "\nGoodbye!\n") + (close-input-port in) + (close-output-port out))))) + (let loop () + (sync (handle-evt oob-ch + (match-lambda + (`(notify ,type ,data) + (log-info (format "repl-channel: notify ~v ~v" type data)) + (loop)) + (`(request ,other ,k) + (log-info (format "repl-channel: request ~v" other)) + (k 'error) + (loop)))) + (handle-evt reader-thread void)))) + (let update-channels ((in in) (out out)) (let loop () (sync (handle-evt oob-ch @@ -35,30 +62,17 @@ (`(notify ,type ,data) (log-info (format "repl-channel: notify ~v ~v" type data)) (loop)) - (`(request (#"shell" ,_) ,k) - (k 'ok) - (loop)) (`(request (#"pty-req" ,_) ,k) (k 'ok) - (define-values (cooked-in cooked-out) (cook-io in out prompt)) + (define-values (cooked-in cooked-out) (cook-io in out "")) (update-channels cooked-in cooked-out)) + (`(request (#"shell" ,_) ,k) + (k 'ok) + (run-shell in out)) (`(request ,other ,k) (log-info (format "repl-channel: request ~v" other)) (k 'error) - (loop)))) - #;(handle-evt (read-bytes-evt 10000 in) - (lambda (buf) - (write-bytes buf out) - (loop))) - (handle-evt (read-line-evt in 'any) - (lambda (line) - ;;(log-info (format "received ~v" line)) - (if (eof-object? line) - (begin (fprintf out "\r\nGoodbye\r\n") - 'done) - (begin (fprintf out "You said ~s\r\n" line) - (write-string prompt out) - (loop)))))))))) + (loop))))))))) (define (t-server) (define s (tcp-listen 2322 4 #t "localhost")) diff --git a/ssh-session.rkt b/ssh-session.rkt index 26a8a3c..34ba6da 100644 --- a/ssh-session.rkt +++ b/ssh-session.rkt @@ -793,7 +793,7 @@ (match message ((arrived _) conn) - ((and departure (departed who why)) + ((and departure (departed _ _)) (disconnect-with-error/local-info departure SSH_DISCONNECT_BY_APPLICATION @@ -832,6 +832,9 @@ ((says _ `(data ,bits) _) (write-message!/flush (ssh-msg-channel-data your-ref bits) conn) conn) + ((says _ `(eof) _) + (write-message!/flush (ssh-msg-channel-eof your-ref) conn) + conn) ((says _ (rpc-reply id m) _) (finish-channel-request ch conn id m)))))) @@ -944,76 +947,70 @@ ;; Session API ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(define (run-channel oob-ch app-out-port in out handle channel-thread) - (define (close-in) - (when (not (port-closed? in)) - (close-input-port in))) - (define (close-out) - (when (not (port-closed? out)) - (close-output-port out))) +(define (run-channel oob-ch app-out-port app-in-port in out handle) + (define (close-in) (when (not (port-closed? in)) (close-input-port in))) + (define (close-out) (when (not (port-closed? out)) (close-output-port out))) (define (close-ports) (close-in) (close-out) - 'done) + 'closed) (let loop ((oob-queue (make-queue)) (remaining-credit 0)) - (define channel-thread-dead (thread-dead? channel-thread)) - (sync (if (queue-empty? oob-queue) - never-evt - (let-values (((first rest) (dequeue oob-queue))) - (handle-evt (channel-put-evt oob-ch first) - (lambda (dummy) (loop rest remaining-credit))))) - (if channel-thread-dead - never-evt - (handle-evt (thread-dead-evt channel-thread) - (lambda (dummy) - (when (not (port-closed? app-out-port)) - ;; If the thread died without closing its output-port, do - ;; that here. That way we get to drain the port before - ;; terminating ourselves. - (close-output-port app-out-port)) - (loop oob-queue remaining-credit)))) - (if (port-closed? in) - (if channel-thread-dead - (handle-evt always-evt (lambda (dummy) (close-ports))) - never-evt) - (if (positive? remaining-credit) - (let ((buffer (make-bytes (min (channel-io-transfer-buffer-size) - remaining-credit)))) - (handle-evt (read-bytes-avail!-evt buffer in) - (lambda (count) - (if (eof-object? count) - (begin (close-in) - (if channel-thread-dead - (close-ports) - (loop oob-queue remaining-credit))) - (begin (send handle say - `(data ,(sub-bit-string buffer 0 (* 8 count)))) - (loop oob-queue (- remaining-credit count))))))) - never-evt)) - (handle-evt (send handle listen-evt) - (match-lambda - ((arrived _) - (loop oob-queue remaining-credit)) - ((departed _ _) - (close-ports)) - ((says _ (credit _ amount) _) - (loop oob-queue (+ remaining-credit amount))) - ((says _ `(data ,data) _) - (write-bytes data out) - ;; TODO: propagate backpressure through pipes - (send handle say (credit 'session (bytes-length data))) - (loop oob-queue remaining-credit)) - ((says _ (and notification `(notify ,type ,data)) _) - (loop (enqueue oob-queue notification) remaining-credit)) - ((says _ (rpc-request reply-to id message) _) - (loop (enqueue oob-queue - `(request ,message - ,(lambda (answer) - (send handle say - (rpc-reply id answer) - reply-to)))) - remaining-credit))))))) + (when (port-closed? app-in-port) + ;; The application has stopped listening. Ensure we stop sending, just as if an EOF + ;; was received from the remote. + (close-out)) + (define finished-reading? (port-closed? in)) + (define finished-writing? (port-closed? out)) + (if (and finished-reading? finished-writing?) + 'closed + (sync (if (queue-empty? oob-queue) + never-evt + (let-values (((first rest) (dequeue oob-queue))) + (handle-evt (channel-put-evt oob-ch first) + (lambda (dummy) (loop rest remaining-credit))))) + (if finished-reading? + never-evt + (if (positive? remaining-credit) + (let ((buffer (make-bytes (min (channel-io-transfer-buffer-size) + remaining-credit)))) + (handle-evt (read-bytes-avail!-evt buffer in) + (lambda (count) + (if (eof-object? count) + (begin (send handle say `(eof)) + (close-in) + (loop oob-queue remaining-credit)) + (let ((data (sub-bit-string buffer 0 (* 8 count)))) + (begin (send handle say `(data ,data)) + (loop oob-queue (- remaining-credit count)))))))) + never-evt)) + (handle-evt (send handle listen-evt) + (match-lambda + ((arrived _) + (loop oob-queue remaining-credit)) + ((and departure (departed who why)) + (send handle depart departure) + (close-ports)) + ((says _ (credit _ amount) _) + (loop oob-queue (+ remaining-credit amount))) + ((says _ `(data ,data) _) + (when (not finished-writing?) (write-bytes data out)) + ;; TODO: propagate backpressure through pipes + (send handle say (credit 'session (bytes-length data))) + (loop oob-queue remaining-credit)) + ((says _ `(eof) _) + (close-out) + (loop oob-queue remaining-credit)) + ((says _ (and notification `(notify ,type ,data)) _) + (loop (enqueue oob-queue notification) remaining-credit)) + ((says _ (rpc-request reply-to id message) _) + (loop (enqueue oob-queue + `(request ,message + ,(lambda (answer) + (send handle say + (rpc-reply id answer) + reply-to)))) + remaining-credit)))))))) (define (start-app-channel channel-main) (define channel-room (make-room 'channel)) @@ -1026,12 +1023,15 @@ (standard-thread (lambda () (run-channel oob-ch app-a2s + app-s2a session-a2s session-s2a - (join-room channel-room 'app) - (standard-thread (lambda () - (channel-main oob-ch app-s2a app-a2s)))))) + (join-room channel-room 'app)))) (wait-for-members channel-room '(app)) + + (standard-thread (lambda () + (channel-main oob-ch app-s2a app-a2s))) + channel-room) (define (simple-ssh-server handle channel-open-callback state)