From 038be62f03b2d4ed3ae1502b504c41b7ff96b12e Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Tue, 25 Oct 2011 14:30:41 -0400 Subject: [PATCH] More channel support; Lift event-handlers into a state-monad-like form. --- ssh-session.rkt | 224 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 157 insertions(+), 67 deletions(-) diff --git a/ssh-session.rkt b/ssh-session.rkt index a0791ac..03db48b 100644 --- a/ssh-session.rkt +++ b/ssh-session.rkt @@ -79,6 +79,22 @@ session-id) ;; starts off #f until initial keying #:transparent) +;; A CloseState is one of +;; - 'neither, indicating that neither side has signalled closure +;; - 'local, only the local end has signalled closure +;; - 'remote, only the remote end has signalled closure +;; - 'both, both ends have signalled closure. +;; Represents local knowledge of the state of a shared shutdown state +;; machine. +;; +;; 'neither +;; / \ +;; \/ \/ +;; 'local 'remote +;; \ / +;; \/ \/ +;; 'both + ;; A ChannelState is a (ssh-channel ...) TODO ;; Named ssh-channel to avoid conflicts with Racket's built-in ;; synchronous channels. @@ -90,6 +106,8 @@ outbound-window ;; Maybe outbound-packet-size ;; Maybe inbound-window ;; Natural + eof-state ;; CloseState covering EOF signals + close-state ;; CloseState covering CLOSE signals ) #:transparent) @@ -520,11 +538,62 @@ outbound-window outbound-packet-size 1048576 ;; TODO: parameterize? Make configurable by app? + 'neither + 'neither )) (values ch (struct-copy connection conn [channel-map (hash-set (connection-channel-map conn) my-ref ch)]))) +(define (get-channel conn my-ref) + (hash-ref (connection-channel-map conn) my-ref)) + +(define (update-channel conn ch) + (struct-copy connection conn + [channel-map (hash-set (connection-channel-map conn) (ssh-channel-my-ref ch) ch)])) + +(define (discard-channel ch conn) + (struct-copy connection conn + [channel-map (hash-remove (connection-channel-map conn) (ssh-channel-my-ref ch))])) + +;; CloseState Either<'local,'remote> -> CloseState +(define (update-close-state old-state action) + (define local? (case action ((local) #t) ((remote) #f))) + (case old-state + ((neither) (if local? 'local 'remote)) + ((local) (if local? 'local 'both)) + ((remote) (if local? 'both 'remote)) + ((both) 'both))) + +(define (maybe-close-channel ch conn action) + (define new-close-state (update-close-state (ssh-channel-close-state ch) action)) + (if (eq? new-close-state 'both) + (discard-channel ch conn) + (update-channel conn (struct-copy ssh-channel ch + [close-state new-close-state])))) + + + +(define (channel-request conn ch message k) + (update-channel conn + (struct-copy ssh-channel ch + [continuations (room-rpc (ssh-channel-room-handle ch) + (ssh-channel-continuations conn) + message + k)]))) + +(define (finish-channel-request ch conn txn message) + (define-values (worklist new-continuations) + (room-rpc-finish (ssh-channel-continuations ch) txn message)) + (let loop ((worklist worklist) + (ch (struct-copy ssh-channel ch [continuations new-continuations])) + (conn conn)) + (if (null? worklist) + (update-channel conn ch) + (let ((item (car worklist))) + (define-values (new-ch new-conn) (item ch conn)) + (loop (cdr worklist) new-ch new-conn))))) + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Connection service ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -579,15 +648,7 @@ (string->bytes/utf-8 description) #"") conn) - conn))) - (lambda (reason conn) - (write-message!/flush (ssh-msg-channel-open-failure - sender-channel - 0 - #"Internal error" - #"") - conn) - conn))))) + conn))))))) (define (handle-msg-window-adjust packet message conn) (log-error "TODO: Unimplemented: handle-msg-window-adjust") @@ -631,22 +692,19 @@ (send (connection-session-room-handle conn) say message) conn) -(define (app-request conn message k-ok k-error) - (define-values (transaction new-continuations) - (open-transaction (connection-continuations conn) (list k-ok k-error))) - (send (connection-session-room-handle conn) say (rpc-request 'session transaction message)) - (struct-copy connection conn [continuations new-continuations])) +(define (app-request conn message k) + (struct-copy connection conn + [continuations (room-rpc (connection-session-room-handle conn) + (connection-continuations conn) + message + k)])) -(define (finish-app-request conn txn context-extractor message-or-reason) - (close-transaction! txn (list context-extractor message-or-reason)) - (let loop ((conn conn)) - (if (transaction-available? (connection-continuations conn)) - (let-values (((txn rest) (dequeue-transaction (connection-continuations conn)))) - (match-define (list context-extractor message-or-reason) (transaction-value txn)) - (loop ((context-extractor (transaction-context txn)) - message-or-reason - (struct-copy connection conn [continuations rest])))) - conn))) +(define (finish-app-request conn txn message) + (define-values (worklist new-continuations) + (room-rpc-finish (connection-continuations conn) txn message)) + (foldl (lambda (item conn) (item conn)) + (struct-copy connection conn [continuations new-continuations]) + worklist)) (define (maybe-send-disconnect-message! e conn) (when (not (exn:fail:contract:protocol-originated-at-peer? e)) @@ -658,6 +716,69 @@ (define (bump-total amount conn) (struct-copy connection conn [total-transferred (+ (connection-total-transferred conn) amount)])) +(define io-room-message-handler + (lambda (message) + (lambda (conn) + (match message + ((arrived 'read-thread) + (send (connection-io-room-handle conn) say (credit 'read-thread 1)) + conn) + ((arrived _) + conn) + ((and departure (departed who why)) + (disconnect-with-error/local-info + departure + SSH_DISCONNECT_CONNECTION_LOST + "I/O error")) + ((says _ amount 'output-byte-count) + ;; writer reporting bytes transferred + (bump-total amount conn)) + ((says _ (received-packet seq packet message transferred-count) _) + (send (connection-io-room-handle conn) say (credit 'read-thread 1)) + (bump-total + transferred-count + (if (connection-discard-next-packet? conn) + (struct-copy connection conn [discard-next-packet? #f]) + (dispatch-packet seq packet message conn)))))))) + +(define session-room-message-handler + (lambda (message) + (lambda (conn) + (match message + ((arrived _) + conn) + ((and departure (departed who why)) + (disconnect-with-error/local-info + departure + SSH_DISCONNECT_BY_APPLICATION + "Application disconnected")) + ((says _ (rpc-reply transaction message) _) + ;; TODO: not cap-secure. Introduce sealers, or indirect. + (finish-app-request conn transaction message)))))) + +;; (K V A -> A) A Hash -> A +(define (hash-fold fn seed hash) + (do ((pos (hash-iterate-first hash) (hash-iterate-next hash pos)) + (seed seed (fn (hash-iterate-key hash pos) (hash-iterate-value hash pos) seed))) + ((not pos) seed))) + +(define (channel-events conn) + (hash-fold (lambda (my-ref ch evt) + (choice-evt evt + (handle-evt (send (ssh-channel-room-handle ch) listen-evt) + (channel-room-message-handler my-ref)))) + never-evt + (connection-channel-map conn))) + +(define (channel-room-message-handler my-ref) + (lambda (message) + (lambda (conn) + (define ch (get-channel conn my-ref)) + (match message + ((arrived _) conn) + ((departed _ _) (maybe-close-channel ch conn 'local)) + ((says _ (rpc-reply id m) _) (finish-channel-request ch conn id m)))))) + (define (run-ssh-session conn) (with-handlers ((exn:fail:contract:protocol? (lambda (e) @@ -669,49 +790,18 @@ (let ((algs ((local-algorithm-list)))) (write-message!/flush algs conn) (loop (struct-copy connection conn [rekey-state (rekey-local algs)]))) - (sync (if (rekey-wait? rekey) - (handle-evt (alarm-evt (* (rekey-wait-deadline rekey) 1000)) - (lambda (dummy) (loop conn))) - never-evt) - (handle-evt (send (connection-io-room-handle conn) listen-evt) - (match-lambda - ((arrived 'read-thread) - (send (connection-io-room-handle conn) say (credit 'read-thread 1)) - (loop conn)) - ((arrived _) - (loop conn)) - ((and departure (departed who why)) - (disconnect-with-error/local-info - departure - SSH_DISCONNECT_CONNECTION_LOST - "I/O error")) - ((says _ amount 'output-byte-count) - ;; writer reporting bytes transferred - (loop (bump-total amount conn))) - ((says _ (received-packet seq packet message transferred-count) _) - (send (connection-io-room-handle conn) say (credit 'read-thread 1)) - (loop - (bump-total - transferred-count - (if (connection-discard-next-packet? conn) - (struct-copy connection conn [discard-next-packet? #f]) - (dispatch-packet seq packet message conn))))))) - (handle-evt (send (connection-session-room-handle conn) listen-evt) - (match-lambda - ((arrived _) - (loop conn)) - ((and departure (departed who why)) - (disconnect-with-error/local-info - departure - SSH_DISCONNECT_BY_APPLICATION - "Application disconnected")) - ((says _ (rpc-reply transaction message) _) - ;; TODO: not cap-secure. Introduce sealers, or indirect. - (loop (finish-app-request conn transaction car message))) - ((says _ (rpc-error transaction reason) _) - ;; TODO: not cap-secure. Introduce sealers, or indirect. - (loop (finish-app-request conn transaction cadr reason))) - ))))))) + (let ((handler (sync (if (rekey-wait? rekey) + (handle-evt (alarm-evt (* (rekey-wait-deadline rekey) 1000)) + (lambda (dummy) + (lambda (conn) + conn))) + never-evt) + (handle-evt (send (connection-io-room-handle conn) listen-evt) + io-room-message-handler) + (handle-evt (send (connection-session-room-handle conn) listen-evt) + session-room-message-handler) + (channel-events conn)))) + (loop (handler conn))))))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Session choreography