Most remaining channel functionality.

This commit is contained in:
Tony Garnock-Jones 2011-10-25 19:00:13 -04:00
parent f16e876f75
commit f6611fa671
1 changed files with 92 additions and 25 deletions

View File

@ -5,6 +5,7 @@
(require racket/match)
(require racket/class)
(require racket/port)
(require "safe-io.rkt")
(require "oakley-groups.rkt")
@ -137,6 +138,8 @@
(define rekey-interval (make-parameter 60)) ;;3600))
(define rekey-volume (make-parameter 1000000000))
(define channel-io-transfer-buffer-size (make-parameter 4096))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Packet dispatch and handling
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -235,7 +238,6 @@
;; PacketHandler for handling SSH_MSG_DEBUG.
(define (handle-msg-debug packet message conn)
;; TODO: use Racket log API.
(log-debug (format "Received SSHv2 SSH_MSG_DEBUG packet ~v" message))
conn)
@ -543,9 +545,16 @@
'neither
))
(values ch
(struct-copy connection conn
(struct-copy connection (send-initial-credit conn ch)
[channel-map (hash-set (connection-channel-map conn) my-ref ch)])))
(define (send-initial-credit conn ch)
(define remaining-window (ssh-channel-outbound-window ch))
(if (and remaining-window
(positive? remaining-window))
(channel-notify conn ch (credit 'app remaining-window))
conn))
(define (get-channel conn my-ref)
(hash-ref (connection-channel-map conn) my-ref))
@ -568,6 +577,10 @@
(define (maybe-close-channel ch conn action)
(define new-close-state (update-close-state (ssh-channel-close-state ch) action))
(case action
((local) (write-message!/flush (ssh-msg-channel-close (ssh-channel-your-ref ch))
conn))
((remote) (send (ssh-channel-room-handle ch) depart 'remote-closed)))
(if (eq? new-close-state 'both)
(discard-channel ch conn)
(update-channel conn (struct-copy ssh-channel ch
@ -654,30 +667,38 @@
conn)))))
(define (handle-msg-window-adjust packet message conn)
(log-error "TODO: Unimplemented: handle-msg-window-adjust")
conn)
(match-define (ssh-msg-channel-window-adjust recipient-channel count) message)
(define ch (get-channel conn recipient-channel))
(channel-notify conn ch (credit 'app count)))
(define (handle-msg-channel-data packet message conn)
(log-error "TODO: Unimplemented: handle-msg-channel-data")
conn)
(match-define (ssh-msg-channel-data recipient-channel data*) message)
(define data (bit-string->bytes data*))
(define ch (get-channel conn recipient-channel))
(channel-notify conn ch `(data ,data)))
(define (handle-msg-channel-extended-data packet message conn)
(log-error "TODO: Unimplemented: handle-msg-channel-extended-data")
conn)
(match-define (ssh-msg-channel-extended-data recipient-channel type-code data*) message)
(define data (bit-string->bytes data*))
(define ch (get-channel conn recipient-channel))
(channel-notify conn ch `(extended-data ,type-code ,data)))
(define (handle-msg-channel-eof packet message conn)
(log-error "TODO: Unimplemented: handle-msg-channel-eof")
conn)
(define ch (get-channel conn (ssh-msg-channel-eof-recipient-channel message)))
(update-channel (channel-notify conn ch `(eof))
(struct-copy ssh-channel ch
[eof-state (update-close-state (ssh-channel-eof-state ch)
'remote)])))
(define (handle-msg-channel-close packet message conn)
(log-error "TODO: Unimplemented: handle-msg-channel-close")
conn)
(define ch (get-channel conn (ssh-msg-channel-close-recipient-channel message)))
(maybe-close-channel ch conn 'remote))
(define (handle-msg-channel-request packet message conn)
(match-define (ssh-msg-channel-request recipient-channel type* want-reply? data*) message)
(define type (bit-string->bytes type*))
(define data (bit-string->bytes data*))
(define ch (get-channel conn (ssh-msg-channel-request-recipient-channel message)))
(define ch (get-channel conn recipient-channel))
(if (not want-reply?)
(channel-notify conn ch `(notify ,type ,data))
(channel-request conn ch `(,type ,data)
@ -796,10 +817,20 @@
(lambda (message)
(lambda (conn)
(define ch (get-channel conn my-ref))
(define your-ref (ssh-channel-your-ref ch))
(match message
((arrived _) conn)
((departed _ _) (maybe-close-channel ch conn 'local))
((says _ (rpc-reply id m) _) (finish-channel-request ch conn id m))))))
((arrived _)
conn)
((departed _ _)
(maybe-close-channel ch conn 'local))
((says _ (credit _ amount) _)
(write-message!/flush (ssh-msg-channel-window-adjust your-ref amount) conn)
conn)
((says _ `(data ,bits) _)
(write-message!/flush (ssh-msg-channel-data your-ref bits) conn)
conn)
((says _ (rpc-reply id m) _)
(finish-channel-request ch conn id m))))))
(define (run-ssh-session conn)
(with-handlers
@ -863,9 +894,9 @@
(define (ssh-session role in out)
(define io-room (make-room (gensym 'ssh-io-room)))
(spy-on io-room)
;;(spy-on io-room)
(define session-room (make-room (gensym 'ssh-session-room)))
(spy-on session-room)
;;(spy-on session-room)
(define local-identification-string (send-preamble-and-identification! out))
(define peer-identification-string (read-preamble-and-identification! in))
@ -906,26 +937,62 @@
;; Session API
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(define (run-channel oob-ch a2s-port s2a-port handle channel-thread)
(let loop ((oob-queue (make-queue)))
(define (run-channel oob-ch in out handle channel-thread)
(define (close-in)
(when (not (port-closed? in))
(close-input-port in)))
(define (close-out)
(when (not (port-closed? out))
(close-output-port out)))
(define (close-ports)
(close-in)
(close-out))
(let loop ((oob-queue (make-queue))
(remaining-credit 0))
(sync (if (queue-empty? oob-queue)
never-evt
(let-values (((first rest) (dequeue oob-queue)))
(handle-evt (channel-put-evt oob-ch first)
(lambda (dummy) (loop rest)))))
(lambda (dummy) (loop rest remaining-credit)))))
(handle-evt (thread-dead-evt channel-thread)
(lambda (dummy)
(close-ports)
'done))
(if (and (not (port-closed? in)) (positive? remaining-credit))
(let ((buffer (make-bytes (min (channel-io-transfer-buffer-size) remaining-credit))))
(handle-evt (read-bytes-avail!-evt buffer in)
(lambda (count)
(if (eof-object? count)
(begin (close-in)
(loop oob-queue remaining-credit))
(begin (send handle say
`(data ,(sub-bit-string buffer 0 (* 8 count))))
(loop oob-queue (- remaining-credit count)))))))
never-evt)
(handle-evt (send handle listen-evt)
(match-lambda
((arrived _)
(loop oob-queue))
(loop oob-queue remaining-credit))
((departed _ _)
(close-ports)
'done)
((says _ (credit _ amount) _)
(loop oob-queue (+ remaining-credit amount)))
((says _ `(data ,data) _)
(write-bytes data out)
;; TODO: propagate backpressure through pipes
(send handle say (credit 'session (bytes-length data)))
(loop oob-queue remaining-credit))
((says _ (and notification `(notify ,type ,data)) _)
(loop (enqueue oob-queue notification)))
(loop (enqueue oob-queue notification) remaining-credit))
((says _ (rpc-request reply-to id message) _)
(loop (enqueue oob-queue
`(request ,message
,(lambda (answer)
(send handle say
(rpc-reply id answer)
reply-to)))))))))))
reply-to))))
remaining-credit)))))))
(define (start-app-channel channel-main)
(define channel-room (make-room 'channel))
@ -939,7 +1006,7 @@
(run-channel oob-ch
session-a2s
session-s2a
(join-room channel-room 'app #:break-on-departure? #t)
(join-room channel-room 'app)
(standard-thread (lambda ()
(channel-main oob-ch app-s2a app-a2s))))))
(wait-for-members channel-room '(app))