Channel support, and all the way up to the REPL

This commit is contained in:
Tony Garnock-Jones 2021-06-19 00:01:45 +02:00
parent 1b5006189b
commit 3daae80a25
6 changed files with 335 additions and 438 deletions

View File

@ -2,9 +2,10 @@
;;; SPDX-License-Identifier: LGPL-3.0-or-later ;;; SPDX-License-Identifier: LGPL-3.0-or-later
;;; SPDX-FileCopyrightText: Copyright © 2011-2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com> ;;; SPDX-FileCopyrightText: Copyright © 2011-2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
(require racket/match)
(require racket/port) (require racket/port)
(provide cook-io) (provide cook-io cook-output)
(define clear-to-eol "\033[2K") (define clear-to-eol "\033[2K")
(define kill-line (string-append "\r" clear-to-eol)) (define kill-line (string-append "\r" clear-to-eol))
@ -52,33 +53,30 @@
(lambda () (lambda ()
(define input-buffer (make-bytes 4096)) (define input-buffer (make-bytes 4096))
(let loop ((b (buffer '() #f))) (let loop ((b (buffer '() #f)))
(if (port-closed? cooked-in) (sync (handle-evt
;; The ultimate reader of our cooked output has closed (read-bytes-avail!-evt input-buffer raw-in)
;; their input port. We are therefore done. (match-lambda
(close-ports) [(? eof-object?) ;; end-of-file on input
;; TODO: remove polling for port-closed when we get port-closed-evt (close-ports)]
(let ((count (sync/timeout 0.5 (read-bytes-avail!-evt input-buffer raw-in)))) [(? number? count)
(cond (let process-bytes ((i 0) (b b))
((eof-object? count) ;; end-of-file on input (if (>= i count)
(close-ports)) (loop b)
((eq? count #f) ;; timeout - poll to see if cooked-out has been closed (update-buffer b (integer->char (bytes-ref input-buffer i)) prompt
(loop b)) close-ports
(else ;; a number - count of bytes read (lambda (line new-b)
(let process-bytes ((i 0) (b b)) (with-handlers ((exn:fail? void)) ;; ignore write errors
(if (>= i count) (write-string "\r\n" raw-out))
(loop b) (write-string line cooked-out)
(update-buffer b (integer->char (bytes-ref input-buffer i)) prompt (newline cooked-out)
close-ports (process-bytes (+ i 1) new-b))
(lambda (line new-b) (lambda (new-b feedback)
(with-handlers ((exn:fail? void)) ;; ignore write errors (with-handlers ((exn:fail? void)) ;; ignore write errors
(write-string "\r\n" raw-out)) (write-string feedback raw-out))
(write-string line cooked-out) (process-bytes (+ i 1) new-b)))))]))
(newline cooked-out) (handle-evt
(process-bytes (+ i 1) new-b)) (port-closed-evt cooked-in)
(lambda (new-b feedback) (lambda (dummy) (close-ports)))))))
(with-handlers ((exn:fail? void)) ;; ignore write errors
(write-string feedback raw-out))
(process-bytes (+ i 1) new-b))))))))))))
(values cooked-in (cook-output raw-out))) (values cooked-in (cook-output raw-out)))
(define (cook-output raw-out) (define (cook-output raw-out)

View File

@ -4,9 +4,12 @@
;;; (Temporary) example client and server ;;; (Temporary) example client and server
(require syndicate/drivers/racket-event)
(require syndicate/drivers/timer) (require syndicate/drivers/timer)
(require syndicate/drivers/tcp) (require syndicate/drivers/tcp)
(require syndicate/driver-support)
(require syndicate/dataspace) (require syndicate/dataspace)
(require syndicate/pattern)
(require (only-in racket/port peek-bytes-avail!-evt)) (require (only-in racket/port peek-bytes-avail!-evt))
(require "cook-port.rkt") (require "cook-port.rkt")
@ -18,15 +21,18 @@
(require "ssh-channel.rkt") (require "ssh-channel.rkt")
(require "ssh-message-types.rkt") (require "ssh-message-types.rkt")
(require "ssh-exceptions.rkt") (require "ssh-exceptions.rkt")
(require "schemas/gen/channel.rkt")
(require "schemas/gen/auth.rkt")
(module+ main (module+ main
(standard-actor-system (ds) (standard-actor-system (ds)
(define spec (TcpLocal "0.0.0.0" 29418)) (with-services [syndicate/drivers/racket-event]
(at ds (define spec (TcpLocal "0.0.0.0" 29418))
(stop-on (asserted (TcpListenError spec $message))) (at ds
(during/spawn (StreamConnection $source $sink spec) (stop-on (asserted (StreamListenerError spec $message)))
#:name (list 'ssh source) (during/spawn (StreamConnection $source $sink spec)
(session ds source sink))))) #:name (list 'ssh source)
(session ds source sink))))))
;;--------------------------------------------------------------------------- ;;---------------------------------------------------------------------------
@ -86,18 +92,12 @@
ground-ds ground-ds
local-identification local-identification
remote-identification remote-identification
(lambda (user-name)
(error 'repl-boot "Would start session with ~a" user-name))
'server))))) 'server)))))
;; (at conn-ds
;; ;; (during $m
;; ;; (on-start (log-info "++ ~v" m))
;; ;; (on-stop (log-info "-- ~v" m)))
;; (on (message $m)
;; (log-info ">> ~v" m)))
(at conn-ds (at conn-ds
(during (SshAuthenticatedUser $user-name #"ssh-connection")
(run-repl-instance conn-ds user-name))
(on (asserted (protocol-error $reason-code $message _ $originated-at-peer?)) (on (asserted (protocol-error $reason-code $message _ $originated-at-peer?))
(when (not originated-at-peer?) (when (not originated-at-peer?)
(send! (outbound-packet (ssh-msg-disconnect reason-code (send! (outbound-packet (ssh-msg-disconnect reason-code
@ -109,100 +109,80 @@
;;--------------------------------------------------------------------------- ;;---------------------------------------------------------------------------
;; ;; (repl-instance InputPort OutputPort InputPort OutputPort)
;; (struct repl-instance-state (c2s-in ;; used by thread to read input from relay
;; c2s-out ;; used by relay to feed input from remote to the thread
;; s2c-in ;; used by relay to feed output from thread to remote
;; s2c-out ;; used by thread to write output to relay
;; ) #:prefab)
;; (define (repl-instance user-name cname) (define (run-repl-instance conn-ds user-name)
;; (define inbound-stream (channel-stream-name #t cname)) (on-start (log-info "~s connected" user-name))
;; (define outbound-stream (channel-stream-name #f cname)) (on-stop (log-info "~s disconnected" user-name))
;; (define (ch-do action-ctor stream body)
;; (at-meta-level (action-ctor (channel-message stream body)))) (at conn-ds
;; (define (handle-channel-message state body) (assert (SshChannelTypeAvailable #"session"))
;; (match body (during (StreamConnection $source $sink (SshChannelLocal #"session" _))
;; [(channel-stream-request #"pty-req" _) ;; c2s-in used by repl to read input from channel
;; (match-define (repl-instance-state old-in _ _ old-out) state) ;; c2s-out used by channel to feed input from remote to the repl
;; (define-values (cooked-in cooked-out) (cook-io old-in old-out "> ")) ;; s2c-in used by channel to feed output from repl to remote
;; (transition (struct-copy repl-instance-state state ;; s2c-out used by repl to write output to channel
;; [c2s-in cooked-in] (define-values (c2s-in c2s-out) (make-pipe))
;; [s2c-out cooked-out]) (define-values (s2c-in s2c-out) (make-pipe))
;; (ch-do send-feedback inbound-stream (channel-stream-ok)))] (define-values (s2c-err-in s2c-err-out) (make-pipe))
;; [(channel-stream-notify #"env" _) (on-stop (close-input-port c2s-in)
;; ;; Don't care (close-output-port c2s-out)
;; (transition state)] (close-input-port s2c-in)
;; [(channel-stream-request #"shell" _) (close-output-port s2c-out)
;; (match-define (repl-instance-state c2s-in _ s2c-in s2c-out) state) (close-input-port s2c-err-in)
;; (define buffer-size 1024) (close-output-port s2c-err-out))
;; (define dummy-buffer (make-bytes buffer-size))
;; (define repl-thread (thread (lambda () (repl-shell user-name c2s-in s2c-out)))) (define (handle-data data mode)
;; (transition state (match mode
;; (ch-do send-feedback inbound-stream (channel-stream-ok)) [(Mode-bytes)
;; (subscriber (cons (thread-dead-evt repl-thread) (wild)) (write-bytes data c2s-out)
;; (on-message [_ (quit #f "REPL thread exited")])) (flush-output c2s-out)
;; (subscriber (cons (peek-bytes-avail!-evt dummy-buffer 0 #f s2c-in) (wild)) (send-bytes-credit source (bytes-length data))]
;; ;; We're using peek-bytes-avail!-evt rather than [(Mode-object (:parse (SshChannelObject-extended-data type-code)))
;; ;; read-bytes-avail!-evt because of potential overwriting (match type-code
;; ;; of the buffer. The overwriting can happen when there's [SSH_EXTENDED_DATA_STDERR
;; ;; any latency between handling the event and the next (log-info "2> ~s" data)]
;; ;; firing of the event, since the peek-bytes-avail!-evt [_
;; ;; will overwrite its buffer next time it's synced on. (log-warning "Ignoring extended data type-code ~s: ~s" type-code data)])
;; (match-state state (send-bytes-credit source (bytes-length data))]
;; (on-message [(Mode-object (:parse (SshChannelObject-request type want-reply)))
;; [(cons _ (? eof-object?)) (define ok? (handle-request type))
;; (let () (when want-reply
;; (match-define (repl-instance-state c2s-in c2s-out s2c-in s2c-out) state) (define reply (if ok? (SshChannelObject-success) (SshChannelObject-failure)))
;; (close-input-port c2s-in) (send-data sink #"" (Mode-object reply)))]))
;; (close-output-port c2s-out)
;; (close-input-port s2c-in) (define (handle-eof)
;; (close-output-port s2c-out) (close-output-port c2s-out))
;; (transition state (quit)))]
;; [(cons _ (? number? count)) (define (handle-request type)
;; (transition state (match type
;; (ch-do send-message outbound-stream (channel-stream-data [#"pty-req"
;; (read-bytes count s2c-in))))]))))] (define-values (cooked-c2s-in cooked-s2c-out) (cook-io c2s-in s2c-out "> "))
;; [(or (channel-stream-data #"\4") ;; C-d a.k.a EOT (set! c2s-in cooked-c2s-in)
;; (channel-stream-eof)) (set! s2c-out cooked-s2c-out)
;; (let () (set! s2c-err-out (cook-output s2c-err-out))
;; (close-output-port (repl-instance-state-c2s-out state)) #t]
;; ;; ^ this signals the repl thread to exit. [#"env"
;; ;; Now, wait for it to do so. ;; Don't care
;; (transition state))] ;; TODO: care?
;; [(channel-stream-data bs) #t]
;; (write-bytes bs (repl-instance-state-c2s-out state)) [#"shell"
;; (flush-output (repl-instance-state-c2s-out state)) (make-sink #:initial-source (port-source s2c-in)
;; (transition state #:on-connect (lambda (s) (send-credit s (CreditAmount-unbounded) (Mode-bytes)))
;; (ch-do send-feedback inbound-stream (channel-stream-credit (bytes-length bs))))] #:on-data (lambda (data _mode) (send-data sink data))
;; [m #:on-eof (lambda () (stop-current-facet)))
;; (write `(channel inbound ,m)) (newline) (make-sink #:initial-source (port-source s2c-err-in)
;; (transition state)])) #:on-connect (lambda (s) (send-credit s (CreditAmount-unbounded) (Mode-bytes)))
;; (match (channel-name-type cname) #:on-data (lambda (data _mode)
;; [#"session" (send-data sink data
;; (define-values (c2s-in c2s-out) (make-pipe)) (Mode-object (SshChannelObject-extended-data
;; (define-values (s2c-in s2c-out) (make-pipe)) SSH_EXTENDED_DATA_STDERR)))))
;; (transition (repl-instance-state c2s-in c2s-out s2c-in s2c-out) (linked-thread #:name 'repl
;; (at-meta-level (lambda (_facet)
;; (subscriber (channel-message inbound-stream (wild)) (repl-shell user-name c2s-in s2c-out s2c-err-out)))
;; (match-state state #t]
;; (on-presence (transition state [_
;; (ch-do send-feedback inbound-stream (channel-stream-config (log-warning "Unsupported channel request type ~s" type)
;; (default-packet-limit) #f]))
;; #""))
;; (ch-do send-feedback inbound-stream (channel-stream-credit 1024)))) (handle-connection source sink #:initial-credit #f #:on-data handle-data #:on-eof handle-eof)
;; (on-message (assert (SshChannelOpenResponse-ok sink #"")))))
;; [(channel-message _ body)
;; (handle-channel-message state body)]))))
;; (at-meta-level
;; (publisher (channel-message outbound-stream (wild))
;; (on-message [m (begin
;; (write `(channel outbound ,cname ,m)) (newline)
;; (void))]))))]
;; [type
;; (transition/no-state
;; (at-meta-level (send-message
;; (channel-message outbound-stream
;; (channel-stream-open-failure
;; SSH_OPEN_UNKNOWN_CHANNEL_TYPE
;; (bytes-append #"Unknown channel type " type))))))]))

View File

@ -6,6 +6,7 @@
(require racket/match) (require racket/match)
(require racket/sandbox) (require racket/sandbox)
(require (only-in racket/exn exn->string))
(provide repl-shell) (provide repl-shell)
@ -23,14 +24,14 @@
ns)))) ns))))
(hash-ref *user-states* username)) (hash-ref *user-states* username))
(define (repl-shell username in out) (define (repl-shell username in out [err out])
(match-define (user-state _ primary-sandbox primary-namespace) (get-user-state username)) (match-define (user-state _ primary-sandbox primary-namespace) (get-user-state username))
(parameterize ((current-input-port in) (parameterize ((current-input-port in)
(current-output-port out) (current-output-port out)
(current-error-port out) (current-error-port err)
(sandbox-input in) (sandbox-input in)
(sandbox-output out) (sandbox-output out)
(sandbox-error-output out) (sandbox-error-output err)
(sandbox-memory-limit 2) ;; megabytes (sandbox-memory-limit 2) ;; megabytes
(sandbox-eval-limits #f) (sandbox-eval-limits #f)
(sandbox-namespace-specs (list (lambda () primary-namespace)))) (sandbox-namespace-specs (list (lambda () primary-namespace))))
@ -39,8 +40,15 @@
;; ^^ uses primary-namespace via sandbox-namespace-specs ;; ^^ uses primary-namespace via sandbox-namespace-specs
(parameterize ((current-namespace primary-namespace) (parameterize ((current-namespace primary-namespace)
(current-eval secondary-sandbox)) (current-eval secondary-sandbox))
(read-eval-print-loop)) (let restart ()
(with-handlers ([exn?
(lambda (e)
(fprintf err "~a" (exn->string e))
(flush-output err)
(restart))])
(read-eval-print-loop))))
(fprintf out "\nGoodbye!\n") (fprintf out "\nGoodbye!\n")
(kill-evaluator secondary-sandbox) (kill-evaluator secondary-sandbox)
(close-input-port in) (close-input-port in)
(close-output-port out))) (close-output-port out)
(close-output-port err)))

View File

@ -0,0 +1,4 @@
version 1 .
embeddedType EntityRef.Ref .
SshAuthenticatedUser = <authenticated @username bytes @service bytes>.

View File

@ -1,3 +1,19 @@
version 1 . version 1 .
embeddedType EntityRef.Ref . embeddedType EntityRef.Ref .
SshChannelTypeAvailable = <channel-type-available @type bytes>.
SshChannelRemote = <channel-remote @type bytes @extra-data bytes>.
SshChannelLocal = <channel-local @type bytes @extra-data bytes>.
SshChannelOpenResponse =
/ @ok <channel-open-confirmation @sink #!stream.Sink @extra-data bytes>
/ @fail <channel-open-failure @sink #!stream.Sink @reason int @description bytes>
.
SshChannelObject =
/ @extended-data <channel-extended-data @type-code int>
/ @request <channel-request @type bytes @want-reply bool>
/ @success <channel-reply #t>
/ @failure <channel-reply #f>
.

View File

@ -4,6 +4,8 @@
(require bitsyntax) (require bitsyntax)
(require syndicate/drivers/timer) (require syndicate/drivers/timer)
(require syndicate/drivers/stream)
(require syndicate/pattern)
(require "crypto.rkt") (require "crypto.rkt")
(require "oakley-groups.rkt") (require "oakley-groups.rkt")
@ -13,6 +15,8 @@
(require "ssh-exceptions.rkt") (require "ssh-exceptions.rkt")
(require "ssh-transport.rkt") (require "ssh-transport.rkt")
(require "ssh-channel.rkt") (require "ssh-channel.rkt")
(require "schemas/gen/channel.rkt")
(require "schemas/gen/auth.rkt")
(provide rekey-interval (provide rekey-interval
rekey-volume rekey-volume
@ -36,12 +40,11 @@
;; An AuthenticationState is one of ;; An AuthenticationState is one of
;; - #f, for not-yet-authenticated ;; - #f, for not-yet-authenticated
;; - an (authenticated String String), recording successful completion ;; - an (SshAuthenticatedUser Bytes Bytes), recording successful completion
;; of the authentication protocol after a request to be identified ;; of the authentication protocol after a request to be identified
;; as the given username for the given service. ;; as the given username for the given service.
;; TODO: When authentication is properly implemented, we will need ;; TODO: When authentication is properly implemented, we will need
;; intermediate states here too. ;; intermediate states here too.
(struct authenticated (username service) #:prefab)
;; Generic inputs into the exchange-hash part of key ;; Generic inputs into the exchange-hash part of key
;; exchange. Diffie-Hellman uses these fields along with the host key, ;; exchange. Diffie-Hellman uses these fields along with the host key,
@ -71,18 +74,18 @@
(define-syntax with-incoming-task (define-syntax with-incoming-task
(syntax-rules () (syntax-rules ()
[(_ (seq-id type-byte packet-pattern message-pattern) body ...) [(_ (type-byte packet-pattern message-pattern) body ...)
(with-incoming-task* on (seq-id type-byte packet-pattern message-pattern) body ...)])) (with-incoming-task* on (type-byte packet-pattern message-pattern) body ...)]))
(define-syntax-rule (define-syntax-rule
(with-incoming-task/react (seq-id type-byte packet-pattern message-pattern) body ...) (with-incoming-task/react (type-byte packet-pattern message-pattern) body ...)
(react (react
(with-incoming-task* stop-on (seq-id type-byte packet-pattern message-pattern) (with-incoming-task* stop-on (type-byte packet-pattern message-pattern)
body ...))) body ...)))
(define-syntax with-incoming-task* (define-syntax with-incoming-task*
(syntax-rules () (syntax-rules ()
[(_ on-stx (seq-id type-byte packet-pattern message-pattern) body ...) [(_ on-stx (type-byte packet-pattern message-pattern) body ...)
(on-stx (message (task ($ seq-id _) type-byte packet-pattern message-pattern)) (on-stx (message (task ($ seq-id _) type-byte packet-pattern message-pattern))
body ... body ...
(send! (task-complete seq-id)))])) (send! (task-complete seq-id)))]))
@ -150,7 +153,7 @@
(match-define (list 'dh 'public p g public-key-as-integer) (match-define (list 'dh 'public p g public-key-as-integer)
(pk-key->datum private-key 'rkt-public)) (pk-key->datum private-key 'rkt-public))
(at conn-ds (at conn-ds
(with-incoming-task/react (seq SSH_MSG_KEXDH_INIT _ (ssh-msg-kexdh-init $e)) (with-incoming-task/react (SSH_MSG_KEXDH_INIT _ (ssh-msg-kexdh-init $e))
(define peer-key (datum->pk-key (list 'dh 'public p g e) 'rkt-public)) (define peer-key (datum->pk-key (list 'dh 'public p g e) 'rkt-public))
(define shared-secret (pk-derive-secret private-key peer-key)) (define shared-secret (pk-derive-secret private-key peer-key))
(define hash-alg sha256) (define hash-alg sha256)
@ -182,8 +185,8 @@
(pk-key->datum private-key 'rkt-public)) (pk-key->datum private-key 'rkt-public))
(at conn-ds (at conn-ds
(send! (outbound-packet (ssh-msg-kexdh-init public-key-as-integer))) (send! (outbound-packet (ssh-msg-kexdh-init public-key-as-integer)))
(with-incoming-task/react (seq SSH_MSG_KEXDH_REPLY _ (with-incoming-task/react
(ssh-msg-kexdh-reply $host-key-bytes $f $h-signature)) (SSH_MSG_KEXDH_REPLY _ (ssh-msg-kexdh-reply $host-key-bytes $f $h-signature))
(define peer-key (datum->pk-key (list 'dh 'public p g f) 'rkt-public)) (define peer-key (datum->pk-key (list 'dh 'public p g f) 'rkt-public))
(define shared-secret (pk-derive-secret private-key peer-key)) (define shared-secret (pk-derive-secret private-key peer-key))
(define hash-alg sha256) (define hash-alg sha256)
@ -290,7 +293,7 @@
(bit-string (k-h-prefix :: binary) (bit-string (k-h-prefix :: binary)
(key :: binary)))))))))) (key :: binary))))))))))
(at conn-ds (at conn-ds
(with-incoming-task/react (seq SSH_MSG_NEWKEYS _ (ssh-msg-newkeys)) (with-incoming-task/react (SSH_MSG_NEWKEYS _ (ssh-msg-newkeys))
;; First, send our SSH_MSG_NEWKEYS, incrementing the ;; First, send our SSH_MSG_NEWKEYS, incrementing the
;; various counters, and then apply the new algorithms. ;; various counters, and then apply the new algorithms.
;; Also arm our rekey timer. ;; Also arm our rekey timer.
@ -314,11 +317,12 @@
(define (service-request-handler conn-ds) (define (service-request-handler conn-ds)
(define-field authentication-state #f) (define-field authentication-state #f)
(begin/dataflow (log-info "authentication-state ~s" (authentication-state)))
(at conn-ds (at conn-ds
(assert #:when (authentication-state) (authentication-state)) (assert #:when (authentication-state) (authentication-state))
(with-incoming-task (seq SSH_MSG_SERVICE_REQUEST _ (ssh-msg-service-request $service)) (with-incoming-task (SSH_MSG_SERVICE_REQUEST _ (ssh-msg-service-request $service))
(match service (match service
[#"ssh-userauth" [#"ssh-userauth"
(cond (cond
@ -329,20 +333,18 @@
(at conn-ds (at conn-ds
(send! (outbound-packet (ssh-msg-service-accept service))) (send! (outbound-packet (ssh-msg-service-accept service)))
(with-incoming-task/react (with-incoming-task/react
(seq SSH_MSG_USERAUTH_REQUEST _ (SSH_MSG_USERAUTH_REQUEST _ (ssh-msg-userauth-request $user-name $service-name _ _))
(ssh-msg-userauth-request $user-name $service-name _ _))
(cond (cond
[(and (positive? (bytes-length user-name)) [(and (positive? (bytes-length user-name))
(equal? service-name #"ssh-connection")) (equal? service-name #"ssh-connection"))
;; TODO: Actually implement client authentication ;; TODO: Actually implement client authentication
(send! (outbound-packet (ssh-msg-userauth-success))) (send! (outbound-packet (ssh-msg-userauth-success)))
(authentication-state (authenticated user-name service-name)) (authentication-state (SshAuthenticatedUser user-name service-name))
(react (react
(with-incoming-task (seq SSH_MSG_USERAUTH_REQUEST _ _) (with-incoming-task (SSH_MSG_USERAUTH_REQUEST _ _)
;; RFC4252 section 5.1 page 6 ;; RFC4252 section 5.1 page 6
)) ))
(let ((a (authentication-state))) (spawn #:name 'channel-manager (run-channel-manager conn-ds))]
(spawn #:name 'connection-service (start-connection-service conn-ds a)))]
[else [else
(send! (outbound-packet (ssh-msg-userauth-failure '(none) #f)))])))])] (send! (outbound-packet (ssh-msg-userauth-failure '(none) #f)))])))])]
[_ [_
@ -354,206 +356,172 @@
;; Channel management ;; Channel management
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; (define (unused-local-channel-ref conn) (define (run-inbound-channel conn-ds
;; (define (bump-candidate candidate) #:type channel-type
;; (modulo (+ candidate 1) #x100000000)) #:remote-ref remote-ref
;; (define first-candidate (match (connection-channels conn) #:local-ref local-ref
;; ['() 0] #:initial-window-size initial-window-size
;; [(cons ch _) (bump-candidate (ssh-channel-local-ref ch))])) #:maximum-packet-size maximum-packet-size
;; (let examine-candidate ((candidate first-candidate)) #:extra-request-data extra-request-data)
;; (let loop ((chs (connection-channels conn))) (define (! message) (send! conn-ds (outbound-packet message)))
;; (cond
;; [(null? chs) candidate]
;; [(= (ssh-channel-local-ref (car chs)) candidate)
;; (examine-candidate (bump-candidate candidate))]
;; [else (loop (cdr chs))]))))
;; (define (replacef proc updater creator lst) (log-info "Starting channel, type ~s" channel-type)
;; (let loop ((lst lst)) (on-stop (log-info "Stopping channel, type ~s" channel-type))
;; (cond [(null? lst) (list (creator))]
;; [(proc (car lst)) (cons (updater (car lst)) (cdr lst))]
;; [else (cons (car lst) (loop (cdr lst)))])))
;; (define (remf proc lst) (define (on-connect source sink)
;; (cond [(null? lst) '()] (at conn-ds
;; [(proc (car lst)) (cdr lst)] (stop-on (message (task _ SSH_MSG_CHANNEL_CLOSE _ (ssh-msg-channel-close local-ref))))
;; [else (cons (car lst) (remf proc (cdr lst)))]))
;; ;; ChannelName -> ChannelState -> Boolean (with-incoming-task
;; (define ((ssh-channel-name=? cname) c) (SSH_MSG_CHANNEL_WINDOW_ADJUST _ (ssh-msg-channel-window-adjust local-ref $n))
;; (equal? (ssh-channel-name c) cname)) (send-bytes-credit source n))
;; ;; Connection Uint32 -> ChannelState (with-incoming-task (SSH_MSG_CHANNEL_DATA _ (ssh-msg-channel-data local-ref $data))
;; (define (get-channel conn local-ref) (send-data sink data))
;; (define ch (findf (lambda (c) (equal? (ssh-channel-local-ref c) local-ref))
;; (connection-channels conn)))
;; (when (not ch)
;; (disconnect-with-error SSH_DISCONNECT_PROTOCOL_ERROR
;; "Attempt to use known channel local-ref ~v"
;; local-ref))
;; ch)
;; ;; ChannelName Maybe<Uint32> Connection -> Connection (with-incoming-task
;; (define (update-channel cname updater conn) (SSH_MSG_CHANNEL_EXTENDED_DATA _ (ssh-msg-channel-extended-data local-ref $type-code $data))
;; (struct-copy connection conn (send-data sink data (Mode-object (SshChannelObject-extended-data type-code))))
;; [channels
;; (replacef (ssh-channel-name=? cname)
;; updater
;; (lambda () (updater (ssh-channel cname
;; (unused-local-channel-ref conn)
;; #f
;; #f
;; 'neither)))
;; (connection-channels conn))]))
;; ;; ChannelName Connection -> Connection (with-incoming-task (SSH_MSG_CHANNEL_EOF _ (ssh-msg-channel-eof local-ref))
;; (define (discard-channel cname conn) (send-eof sink))
;; (struct-copy connection conn
;; [channels
;; (remf (ssh-channel-name=? cname) (connection-channels conn))]))
;; ;; CloseState Either<'local,'remote> -> CloseState (with-incoming-task
;; (define (update-close-state old-state action) (SSH_MSG_CHANNEL_REQUEST _ (ssh-msg-channel-request local-ref $type $want-reply? $data))
;; (define local? (case action ((local) #t) ((remote) #f))) (send-data sink data (Mode-object (SshChannelObject-request type want-reply?))))
;; (case old-state
;; ((neither) (if local? 'local 'remote))
;; ((local) (if local? 'local 'both))
;; ((remote) (if local? 'both 'remote))
;; ((both) 'both)))
;; (define (maybe-close-channel cname conn action) (with-incoming-task
;; (cond (SSH_MSG_CHANNEL_SUCCESS _ (ssh-msg-channel-success local-ref))
;; [(findf (ssh-channel-name=? cname) (connection-channels conn)) => (send-data sink #"" (Mode-object (SshChannelObject-success))))
;; (lambda (ch)
;; (define old-close-state (ssh-channel-close-state ch))
;; (define new-close-state (update-close-state old-close-state action))
;; (transition (if (eq? new-close-state 'both)
;; (discard-channel ch conn)
;; (update-channel cname
;; (lambda (ch)
;; (struct-copy ssh-channel ch
;; [close-state new-close-state]))
;; conn))
;; (case action
;; [(local)
;; (case old-close-state
;; [(neither remote)
;; (list (send-message (outbound-packet
;; (ssh-msg-channel-close (ssh-channel-remote-ref ch)))))]
;; [else (list)])]
;; [(remote)
;; (case old-close-state
;; [(neither local)
;; (list (delete-endpoint (list cname 'outbound))
;; (delete-endpoint (list cname 'inbound)))]
;; [else (list)])])))]
;; [else (transition conn)]))
;; (define (channel-endpoints cname initial-message-producer) (with-incoming-task
;; (define inbound-stream-name (channel-stream-name #t cname)) (SSH_MSG_CHANNEL_FAILURE _ (ssh-msg-channel-failure local-ref))
;; (define outbound-stream-name (channel-stream-name #f cname)) (send-data sink #"" (Mode-object (SshChannelObject-failure))))
;; (define (! conn message)
;; (transition conn (send-message (outbound-packet message))))
;; (list
;; (name-endpoint (list cname 'outbound)
;; (subscriber (channel-message outbound-stream-name (wild))
;; (match-state conn
;; (on-presence (transition conn
;; (initial-message-producer inbound-stream-name outbound-stream-name)))
;; (on-absence (maybe-close-channel cname conn 'local))
;; (on-message
;; [(channel-message _ body)
;; (let ()
;; (define ch (findf (ssh-channel-name=? cname) (connection-channels conn)))
;; (define remote-ref (ssh-channel-remote-ref ch))
;; (match body
;; [(channel-stream-data data-bytes)
;; ;; TODO: split data-bytes into packets if longer than max packet size
;; (! conn (ssh-msg-channel-data remote-ref data-bytes))]
;; [(channel-stream-extended-data type data-bytes)
;; (! conn (ssh-msg-channel-extended-data remote-ref type data-bytes))]
;; [(channel-stream-eof)
;; (! conn (ssh-msg-channel-eof remote-ref))]
;; [(channel-stream-notify type data-bytes)
;; (! conn (ssh-msg-channel-request remote-ref type #f data-bytes))]
;; [(channel-stream-request type data-bytes)
;; (! conn (ssh-msg-channel-request remote-ref type #t data-bytes))]
;; [(channel-stream-open-failure reason description)
;; (! (discard-channel cname conn)
;; (ssh-msg-channel-open-failure remote-ref reason description #""))]))]))))
;; (name-endpoint (list cname 'inbound)
;; (publisher (channel-message inbound-stream-name (wild))
;; (match-state conn
;; (on-message
;; [(channel-message _ body)
;; (let ()
;; (define ch (findf (ssh-channel-name=? cname) (connection-channels conn)))
;; (define remote-ref (ssh-channel-remote-ref ch))
;; (match body
;; [(channel-stream-config maximum-packet-size extra-data)
;; (if (channel-name-locally-originated? cname)
;; ;; This must be intended to form the SSH_MSG_CHANNEL_OPEN.
;; (! conn (ssh-msg-channel-open (channel-name-type cname)
;; (ssh-channel-local-ref ch)
;; 0
;; maximum-packet-size
;; extra-data))
;; ;; This must be intended to form the SSH_MSG_CHANNEL_OPEN_CONFIRMATION.
;; (! conn (ssh-msg-channel-open-confirmation remote-ref
;; (ssh-channel-local-ref ch)
;; 0
;; maximum-packet-size
;; extra-data)))]
;; [(channel-stream-credit count)
;; (! conn (ssh-msg-channel-window-adjust remote-ref count))]
;; [(channel-stream-ok)
;; (! conn (ssh-msg-channel-success remote-ref))]
;; [(channel-stream-fail)
;; (! conn (ssh-msg-channel-failure remote-ref))]))]))))))
;; (define (channel-notify conn ch inbound? body) (once
;; (transition conn [(asserted (SshChannelOpenResponse-ok remote-sink $extra-data))
;; (send-message (channel-message (channel-stream-name inbound? (ssh-channel-name ch)) (! (ssh-msg-channel-open-confirmation remote-ref
;; body) local-ref
;; (if inbound? 'publisher 'subscriber)))) 1048576 ;; TODO
16384 ;; TODO
extra-data))]
[(asserted (SshChannelOpenResponse-fail remote-sink $reason $description))
(! (ssh-msg-channel-open-failure remote-ref
reason
description
#""))
(stop-current-facet)])))
(match-define
(list remote-source remote-sink)
(establish-connection conn-ds (SshChannelLocal channel-type extra-request-data)
#:name (list 'R remote-ref 'L local-ref)
#:on-connect on-connect
#:on-rejected
(lambda (message)
(! (ssh-msg-channel-open-failure remote-ref
SSH_OPEN_ADMINISTRATIVELY_PROHIBITED
(string->bytes/utf-8 message)
#""))
(stop-current-facet))
#:on-disconnect (lambda () (stop-current-facet))
#:on-error (lambda (message) (stop-current-facet))
#:on-credit
(lambda (amount mode)
(match-define (Mode-bytes) mode)
(match-define (CreditAmount-count n) amount)
(! (ssh-msg-channel-window-adjust remote-ref n)))
#:initial-credit (CreditAmount-count initial-window-size)
#:initial-mode (Mode-bytes)
#:on-data
(lambda (data mode)
(match mode
[(Mode-bytes)
(! (ssh-msg-channel-data remote-ref data))]
[(Mode-lines (LineMode-lf))
(! (ssh-msg-channel-data remote-ref (bytes-append data "\n")))]
[(Mode-lines (LineMode-crlf))
(! (ssh-msg-channel-data remote-ref (bytes-append data "\r\n")))]
[(Mode-object (:parse (SshChannelObject-extended-data type-code)))
(! (ssh-msg-channel-extended-data remote-ref type-code data))]
[(Mode-object (:parse (SshChannelObject-request type want-reply)))
(! (ssh-msg-channel-request remote-ref type want-reply data))]
[(Mode-object (:parse (SshChannelObject-success)))
(! (ssh-msg-channel-success remote-ref))]
[(Mode-object (:parse (SshChannelObject-failure)))
(! (ssh-msg-channel-failure remote-ref))]))
#:on-eof (lambda ()
(! (ssh-msg-channel-eof remote-ref)))))
(void))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Connection service ;; Channel manager
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; (define (respond-to-opened-outbound-channel conn cname) (define (run-channel-manager conn-ds)
;; (if (and (ground? cname) (define local-refs-by-remote-ref (make-hash))
;; (not (memf (ssh-channel-name=? cname) (connection-channels conn)))) (define remote-refs-by-local-ref (make-hash))
;; (transition (update-channel cname values conn)
;; (channel-endpoints cname (lambda (inbound-stream outbound-stream)
;; '())))
;; (transition conn)))
(define (start-connection-service conn-ds authentication) (define (allocate-local-ref remote-ref)
(match-define (authenticated user-name _service-name) authentication) (when (hash-has-key? local-refs-by-remote-ref remote-ref)
(disconnect-with-error conn-ds SSH_DISCONNECT_PROTOCOL_ERROR
"Attempt to reuse remote-ref ~a"
remote-ref))
(for/or ([i (in-range 0 32)]) ;; TODO: this is an arbitrary limit
(if (hash-has-key? remote-refs-by-local-ref i)
#f
(begin (hash-set! remote-refs-by-local-ref i remote-ref)
(hash-set! local-refs-by-remote-ref remote-ref i)
i))))
(handle-msg-channel-open conn-ds) (at conn-ds
;; (set-handlers conn (with-incoming-task (SSH_MSG_CHANNEL_CLOSE _ (ssh-msg-channel-close $local-ref))
;; ;; TODO: SSH_MSG_GLOBAL_REQUEST handle-msg-global-request (when (not (hash-has-key? remote-refs-by-local-ref local-ref))
;; SSH_MSG_CHANNEL_OPEN handle-msg-channel-open (disconnect-with-error conn-ds
;; SSH_MSG_CHANNEL_OPEN_CONFIRMATION handle-msg-channel-open-confirmation SSH_DISCONNECT_PROTOCOL_ERROR
;; SSH_MSG_CHANNEL_OPEN_FAILURE handle-msg-channel-open-failure "Received channel close for non-open channel ~a"
;; SSH_MSG_CHANNEL_WINDOW_ADJUST handle-msg-channel-window-adjust local-ref))
;; SSH_MSG_CHANNEL_DATA handle-msg-channel-data (hash-remove! remote-refs-by-local-ref local-ref))
;; SSH_MSG_CHANNEL_EXTENDED_DATA handle-msg-channel-extended-data
;; SSH_MSG_CHANNEL_EOF handle-msg-channel-eof (with-incoming-task (SSH_MSG_CHANNEL_OPEN _ (ssh-msg-channel-open $channel-type
;; SSH_MSG_CHANNEL_CLOSE handle-msg-channel-close $remote-ref
;; SSH_MSG_CHANNEL_REQUEST handle-msg-channel-request $initial-window-size
;; SSH_MSG_CHANNEL_SUCCESS handle-msg-channel-success $maximum-packet-size
;; SSH_MSG_CHANNEL_FAILURE handle-msg-channel-failure)) $extra-request-data))
(log-info "open ~s" (list channel-type remote-ref initial-window-size maximum-packet-size extra-request-data))
(with-assertion-presence conn-ds (SshChannelTypeAvailable channel-type)
#:on-present [(define local-ref (allocate-local-ref remote-ref))
(if (not local-ref)
(send! (outbound-packet
(ssh-msg-channel-open-failure remote-ref
SSH_OPEN_RESOURCE_SHORTAGE
#"Too many open channels"
#"")))
(react
(on-stop (log-info "Releasing channel assignment ~s"
(list 'R remote-ref 'L local-ref))
(send! (outbound-packet (ssh-msg-channel-close remote-ref)))
(hash-remove! local-refs-by-remote-ref remote-ref))
(spawn/link
#:name (list 'R remote-ref 'L local-ref)
(run-inbound-channel conn-ds
#:type channel-type
#:remote-ref remote-ref
#:local-ref local-ref
#:initial-window-size initial-window-size
#:maximum-packet-size maximum-packet-size
#:extra-request-data extra-request-data))))]
#:on-absent [(send! (outbound-packet
(ssh-msg-channel-open-failure remote-ref
SSH_OPEN_UNKNOWN_CHANNEL_TYPE
#"Unknown channel type"
#"")))])))
;; Start responding to channel interest coming from the application. We are responding to
;; channels appearing from the remote peer by virtue of our installation of the handler for
;; SSH_MSG_CHANNEL_OPEN above.
;; (at conn-ds
;; (during ...))
;; ;; Start responding to channel interest coming from the
;; ;; application. We are responding to channels appearing from the
;; ;; remote peer by virtue of our installation of the handler for
;; ;; SSH_MSG_CHANNEL_OPEN above.
;; (observe-subscribers (channel-message (channel-stream-name ? (channel-name #t ? ?)) ?) ;; (observe-subscribers (channel-message (channel-stream-name ? (channel-name #t ? ?)) ?)
;; (match-state conn ;; (match-state conn
;; (match-conversation (channel-message (channel-stream-name #t cname) _) ;; (match-conversation (channel-message (channel-stream-name #t cname) _)
@ -562,45 +530,10 @@
;; (match-state conn ;; (match-state conn
;; (match-conversation (channel-message (channel-stream-name #f cname) _) ;; (match-conversation (channel-message (channel-stream-name #f cname) _)
;; (on-presence (respond-to-opened-outbound-channel conn cname))))) ;; (on-presence (respond-to-opened-outbound-channel conn cname)))))
(void)
) )
(define (handle-msg-channel-open conn-ds)
(void)
;; (at conn-ds
;; (with-incoming-task (seq SSH_MSG_CHANNEL_OPEN _ (ssh-msg-channel-open $channel-type
;; $remote-ref
;; $initial-window-size
;; $maximum-packet-size
;; $extra-request-data))
;; (
;; (react
;; (on (asserted (Observe (:pattern (
;; (sync! conn-ds
;; (
;; (when (memf (lambda (c) (equal? (ssh-channel-remote-ref c) remote-ref))
;; (connection-channels conn))
;; (disconnect-with-error SSH_DISCONNECT_PROTOCOL_ERROR
;; "Attempt to open already-open channel ~v"
;; remote-ref))
;; (define channel-type (bit-string->bytes channel-type*))
;; (define extra-request-data (bit-string->bytes extra-request-data*))
;; (define cname (channel-name #f channel-type remote-ref))
;; (transition (update-channel cname
;; (lambda (e) (struct-copy ssh-channel e [remote-ref remote-ref]))
;; conn)
;; (channel-endpoints cname
;; (lambda (inbound-stream outbound-stream)
;; (list (send-feedback
;; (channel-message outbound-stream
;; (channel-stream-config maximum-packet-size
;; extra-request-data)))
;; (send-feedback
;; (channel-message outbound-stream
;; (channel-stream-credit initial-window-size))))))))
)
;; (define (handle-msg-channel-open-confirmation packet message conn) ;; (define (handle-msg-channel-open-confirmation packet message conn)
;; (match-define (ssh-msg-channel-open-confirmation local-ref ;; (match-define (ssh-msg-channel-open-confirmation local-ref
@ -638,50 +571,6 @@
;; (channel-stream-open-failure reason description))) ;; (channel-stream-open-failure reason description)))
;; (lambda (conn) (maybe-close-channel (ssh-channel-name ch) conn 'remote)))) ;; (lambda (conn) (maybe-close-channel (ssh-channel-name ch) conn 'remote))))
;; (define (handle-msg-channel-window-adjust packet message conn)
;; (match-define (ssh-msg-channel-window-adjust local-ref count) message)
;; (define ch (get-channel conn local-ref))
;; (channel-notify conn ch #f (channel-stream-credit count)))
;; (define (handle-msg-channel-data packet message conn)
;; (match-define (ssh-msg-channel-data local-ref data*) message)
;; (define data (bit-string->bytes data*))
;; (define ch (get-channel conn local-ref))
;; (channel-notify conn ch #t (channel-stream-data data)))
;; (define (handle-msg-channel-extended-data packet message conn)
;; (match-define (ssh-msg-channel-extended-data local-ref type-code data*) message)
;; (define data (bit-string->bytes data*))
;; (define ch (get-channel conn local-ref))
;; (channel-notify conn ch #t (channel-stream-extended-data type-code data)))
;; (define (handle-msg-channel-eof packet message conn)
;; (define ch (get-channel conn (ssh-msg-channel-eof-recipient-channel message)))
;; (channel-notify conn ch #t (channel-stream-eof)))
;; (define (handle-msg-channel-close packet message conn)
;; (define ch (get-channel conn (ssh-msg-channel-close-recipient-channel message)))
;; (maybe-close-channel (ssh-channel-name ch) conn 'remote))
;; (define (handle-msg-channel-request packet message conn)
;; (match-define (ssh-msg-channel-request local-ref type* want-reply? data*) message)
;; (define type (bit-string->bytes type*))
;; (define data (bit-string->bytes data*))
;; (define ch (get-channel conn local-ref))
;; (channel-notify conn ch #t
;; (if want-reply?
;; (channel-stream-request type data)
;; (channel-stream-notify type data))))
;; (define (handle-msg-channel-success packet message conn)
;; (match-define (ssh-msg-channel-success local-ref) message)
;; (define ch (get-channel conn local-ref))
;; (channel-notify conn ch #f (channel-stream-ok)))
;; (define (handle-msg-channel-failure packet message conn)
;; (match-define (ssh-msg-channel-failure local-ref) message)
;; (define ch (get-channel conn local-ref))
;; (channel-notify conn ch #f (channel-stream-fail)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Session main process ;; Session main process
@ -691,7 +580,6 @@
ground-ds ground-ds
local-identification-string local-identification-string
peer-identification-string peer-identification-string
application-boot
session-role) session-role)
(define-field rekey-state (rekey-in-seconds-or-bytes -1 -1 0)) (define-field rekey-state (rekey-in-seconds-or-bytes -1 -1 0))
(define-field session-id #f) (define-field session-id #f)
@ -701,29 +589,32 @@
(define channels '()) (define channels '())
(at conn-ds (at conn-ds
(with-incoming-task (seq SSH_MSG_DISCONNECT _ (with-incoming-task
(ssh-msg-disconnect $reason-code $description $language-tag)) (SSH_MSG_DISCONNECT _ (ssh-msg-disconnect $reason-code $description $language-tag))
(disconnect-with-error* conn-ds #t (if (= reason-code SSH_DISCONNECT_BY_APPLICATION)
'() (begin (log-debug "Received SSH_DISCONNECT_BY_APPLICATION")
reason-code (assert (protocol-error reason-code description '() #t)))
"Received SSH_MSG_DISCONNECT with reason code ~a and message ~s" (disconnect-with-error* conn-ds #t
reason-code '()
(bytes->string/utf-8 (bit-string->bytes description)))) reason-code
"Received SSH_MSG_DISCONNECT with reason code ~a and message ~s"
reason-code
(bytes->string/utf-8 (bit-string->bytes description)))))
(with-incoming-task (seq SSH_MSG_IGNORE _ (ssh-msg-ignore _))) (with-incoming-task (SSH_MSG_IGNORE _ (ssh-msg-ignore _)))
(with-incoming-task (seq SSH_MSG_UNIMPLEMENTED _ (ssh-msg-unimplemented $peer-seq)) (with-incoming-task (SSH_MSG_UNIMPLEMENTED _ (ssh-msg-unimplemented $peer-seq))
(disconnect-with-error/local-info (disconnect-with-error/local-info
conn-ds conn-ds
`((offending-sequence-number ,peer-seq)) `((offending-sequence-number ,peer-seq))
SSH_DISCONNECT_PROTOCOL_ERROR SSH_DISCONNECT_PROTOCOL_ERROR
"Disconnecting because of received SSH_MSG_UNIMPLEMENTED.")) "Disconnecting because of received SSH_MSG_UNIMPLEMENTED."))
(with-incoming-task (seq SSH_MSG_DEBUG _ ($ message (ssh-msg-debug _ _ _))) (with-incoming-task (SSH_MSG_DEBUG _ ($ message (ssh-msg-debug _ _ _)))
(log-info (format "Received SSHv2 SSH_MSG_DEBUG packet ~v" message))) (log-info (format "Received SSHv2 SSH_MSG_DEBUG packet ~v" message)))
(with-incoming-task (seq SSH_MSG_KEXINIT $packet (with-incoming-task
($ message (ssh-msg-kexinit _ _ _ _ _ _ _ _ _ _ _ _ _))) (SSH_MSG_KEXINIT $packet ($ message (ssh-msg-kexinit _ _ _ _ _ _ _ _ _ _ _ _ _)))
(do-kexinit conn-ds (do-kexinit conn-ds
ground-ds ground-ds
#:packet packet #:packet packet