From f6611fa67118f4e76af3ed7cf1b6392f02036758 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Tue, 25 Oct 2011 19:00:13 -0400 Subject: [PATCH] Most remaining channel functionality. --- ssh-session.rkt | 117 +++++++++++++++++++++++++++++++++++++----------- 1 file changed, 92 insertions(+), 25 deletions(-) diff --git a/ssh-session.rkt b/ssh-session.rkt index 3388b15..3cfd4fc 100644 --- a/ssh-session.rkt +++ b/ssh-session.rkt @@ -5,6 +5,7 @@ (require racket/match) (require racket/class) +(require racket/port) (require "safe-io.rkt") (require "oakley-groups.rkt") @@ -137,6 +138,8 @@ (define rekey-interval (make-parameter 60)) ;;3600)) (define rekey-volume (make-parameter 1000000000)) +(define channel-io-transfer-buffer-size (make-parameter 4096)) + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Packet dispatch and handling ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -235,7 +238,6 @@ ;; PacketHandler for handling SSH_MSG_DEBUG. (define (handle-msg-debug packet message conn) - ;; TODO: use Racket log API. (log-debug (format "Received SSHv2 SSH_MSG_DEBUG packet ~v" message)) conn) @@ -543,9 +545,16 @@ 'neither )) (values ch - (struct-copy connection conn + (struct-copy connection (send-initial-credit conn ch) [channel-map (hash-set (connection-channel-map conn) my-ref ch)]))) +(define (send-initial-credit conn ch) + (define remaining-window (ssh-channel-outbound-window ch)) + (if (and remaining-window + (positive? remaining-window)) + (channel-notify conn ch (credit 'app remaining-window)) + conn)) + (define (get-channel conn my-ref) (hash-ref (connection-channel-map conn) my-ref)) @@ -568,6 +577,10 @@ (define (maybe-close-channel ch conn action) (define new-close-state (update-close-state (ssh-channel-close-state ch) action)) + (case action + ((local) (write-message!/flush (ssh-msg-channel-close (ssh-channel-your-ref ch)) + conn)) + ((remote) (send (ssh-channel-room-handle ch) depart 'remote-closed))) (if (eq? new-close-state 'both) (discard-channel ch conn) (update-channel conn (struct-copy ssh-channel ch @@ -654,30 +667,38 @@ conn))))) (define (handle-msg-window-adjust packet message conn) - (log-error "TODO: Unimplemented: handle-msg-window-adjust") - conn) + (match-define (ssh-msg-channel-window-adjust recipient-channel count) message) + (define ch (get-channel conn recipient-channel)) + (channel-notify conn ch (credit 'app count))) (define (handle-msg-channel-data packet message conn) - (log-error "TODO: Unimplemented: handle-msg-channel-data") - conn) + (match-define (ssh-msg-channel-data recipient-channel data*) message) + (define data (bit-string->bytes data*)) + (define ch (get-channel conn recipient-channel)) + (channel-notify conn ch `(data ,data))) (define (handle-msg-channel-extended-data packet message conn) - (log-error "TODO: Unimplemented: handle-msg-channel-extended-data") - conn) + (match-define (ssh-msg-channel-extended-data recipient-channel type-code data*) message) + (define data (bit-string->bytes data*)) + (define ch (get-channel conn recipient-channel)) + (channel-notify conn ch `(extended-data ,type-code ,data))) (define (handle-msg-channel-eof packet message conn) - (log-error "TODO: Unimplemented: handle-msg-channel-eof") - conn) + (define ch (get-channel conn (ssh-msg-channel-eof-recipient-channel message))) + (update-channel (channel-notify conn ch `(eof)) + (struct-copy ssh-channel ch + [eof-state (update-close-state (ssh-channel-eof-state ch) + 'remote)]))) (define (handle-msg-channel-close packet message conn) - (log-error "TODO: Unimplemented: handle-msg-channel-close") - conn) + (define ch (get-channel conn (ssh-msg-channel-close-recipient-channel message))) + (maybe-close-channel ch conn 'remote)) (define (handle-msg-channel-request packet message conn) (match-define (ssh-msg-channel-request recipient-channel type* want-reply? data*) message) (define type (bit-string->bytes type*)) (define data (bit-string->bytes data*)) - (define ch (get-channel conn (ssh-msg-channel-request-recipient-channel message))) + (define ch (get-channel conn recipient-channel)) (if (not want-reply?) (channel-notify conn ch `(notify ,type ,data)) (channel-request conn ch `(,type ,data) @@ -796,10 +817,20 @@ (lambda (message) (lambda (conn) (define ch (get-channel conn my-ref)) + (define your-ref (ssh-channel-your-ref ch)) (match message - ((arrived _) conn) - ((departed _ _) (maybe-close-channel ch conn 'local)) - ((says _ (rpc-reply id m) _) (finish-channel-request ch conn id m)))))) + ((arrived _) + conn) + ((departed _ _) + (maybe-close-channel ch conn 'local)) + ((says _ (credit _ amount) _) + (write-message!/flush (ssh-msg-channel-window-adjust your-ref amount) conn) + conn) + ((says _ `(data ,bits) _) + (write-message!/flush (ssh-msg-channel-data your-ref bits) conn) + conn) + ((says _ (rpc-reply id m) _) + (finish-channel-request ch conn id m)))))) (define (run-ssh-session conn) (with-handlers @@ -863,9 +894,9 @@ (define (ssh-session role in out) (define io-room (make-room (gensym 'ssh-io-room))) - (spy-on io-room) + ;;(spy-on io-room) (define session-room (make-room (gensym 'ssh-session-room))) - (spy-on session-room) + ;;(spy-on session-room) (define local-identification-string (send-preamble-and-identification! out)) (define peer-identification-string (read-preamble-and-identification! in)) @@ -906,26 +937,62 @@ ;; Session API ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(define (run-channel oob-ch a2s-port s2a-port handle channel-thread) - (let loop ((oob-queue (make-queue))) +(define (run-channel oob-ch in out handle channel-thread) + (define (close-in) + (when (not (port-closed? in)) + (close-input-port in))) + (define (close-out) + (when (not (port-closed? out)) + (close-output-port out))) + (define (close-ports) + (close-in) + (close-out)) + (let loop ((oob-queue (make-queue)) + (remaining-credit 0)) (sync (if (queue-empty? oob-queue) never-evt (let-values (((first rest) (dequeue oob-queue))) (handle-evt (channel-put-evt oob-ch first) - (lambda (dummy) (loop rest))))) + (lambda (dummy) (loop rest remaining-credit))))) + (handle-evt (thread-dead-evt channel-thread) + (lambda (dummy) + (close-ports) + 'done)) + (if (and (not (port-closed? in)) (positive? remaining-credit)) + (let ((buffer (make-bytes (min (channel-io-transfer-buffer-size) remaining-credit)))) + (handle-evt (read-bytes-avail!-evt buffer in) + (lambda (count) + (if (eof-object? count) + (begin (close-in) + (loop oob-queue remaining-credit)) + (begin (send handle say + `(data ,(sub-bit-string buffer 0 (* 8 count)))) + (loop oob-queue (- remaining-credit count))))))) + never-evt) (handle-evt (send handle listen-evt) (match-lambda ((arrived _) - (loop oob-queue)) + (loop oob-queue remaining-credit)) + ((departed _ _) + (close-ports) + 'done) + ((says _ (credit _ amount) _) + (loop oob-queue (+ remaining-credit amount))) + ((says _ `(data ,data) _) + (write-bytes data out) + ;; TODO: propagate backpressure through pipes + (send handle say (credit 'session (bytes-length data))) + (loop oob-queue remaining-credit)) ((says _ (and notification `(notify ,type ,data)) _) - (loop (enqueue oob-queue notification))) + (loop (enqueue oob-queue notification) remaining-credit)) ((says _ (rpc-request reply-to id message) _) (loop (enqueue oob-queue `(request ,message ,(lambda (answer) (send handle say (rpc-reply id answer) - reply-to))))))))))) + reply-to)))) + remaining-credit))))))) (define (start-app-channel channel-main) (define channel-room (make-room 'channel)) @@ -939,7 +1006,7 @@ (run-channel oob-ch session-a2s session-s2a - (join-room channel-room 'app #:break-on-departure? #t) + (join-room channel-room 'app) (standard-thread (lambda () (channel-main oob-ch app-s2a app-a2s)))))) (wait-for-members channel-room '(app))