Split out yet another layer, a combination of code from ssh-session and repl-server.
This commit is contained in:
parent
7ad6291124
commit
128ef2d43c
|
@ -8,14 +8,10 @@
|
|||
(require racket/port)
|
||||
(require racket/sandbox)
|
||||
|
||||
(require "conversation.rkt")
|
||||
(require "ssh-numbers.rkt")
|
||||
(require "ssh-session.rkt")
|
||||
(require "ssh-service.rkt")
|
||||
(require "standard-thread.rkt")
|
||||
|
||||
(require "cook-port.rkt")
|
||||
|
||||
(define (t-client)
|
||||
#;(define (t-client)
|
||||
(let-values (((i o) (tcp-connect "localhost"
|
||||
2323
|
||||
;;22
|
||||
|
@ -24,76 +20,26 @@
|
|||
(printf "Got API ~v\n" api)
|
||||
(semaphore-wait (make-semaphore 0)))))
|
||||
|
||||
(define (make-repl-channel-main username)
|
||||
(lambda (oob-ch in out)
|
||||
(define (run-shell in out)
|
||||
(define reader-thread
|
||||
(thread
|
||||
(lambda ()
|
||||
(fprintf out "Hello, ~a.\n" username)
|
||||
(parameterize ((current-input-port in)
|
||||
(current-output-port out)
|
||||
(current-error-port out)
|
||||
(sandbox-input in)
|
||||
(sandbox-output out)
|
||||
(sandbox-error-output out)
|
||||
(current-namespace (make-empty-namespace)))
|
||||
(parameterize ((current-eval (make-evaluator 'racket/base)))
|
||||
(read-eval-print-loop))
|
||||
(fprintf out "\nGoodbye!\n")
|
||||
(close-input-port in)
|
||||
(close-output-port out)))))
|
||||
(let loop ()
|
||||
(sync (handle-evt oob-ch
|
||||
(match-lambda
|
||||
(`(notify ,type ,data)
|
||||
(log-info (format "repl-channel: notify ~v ~v" type data))
|
||||
(loop))
|
||||
(`(request ,other ,k)
|
||||
(log-info (format "repl-channel: request ~v" other))
|
||||
(k 'error)
|
||||
(loop))))
|
||||
(handle-evt reader-thread void))))
|
||||
|
||||
(let update-channels ((in in) (out out))
|
||||
(let loop ()
|
||||
(sync (handle-evt oob-ch
|
||||
(match-lambda
|
||||
(`(notify ,type ,data)
|
||||
(log-info (format "repl-channel: notify ~v ~v" type data))
|
||||
(loop))
|
||||
(`(request (#"pty-req" ,_) ,k)
|
||||
(k 'ok)
|
||||
(define-values (cooked-in cooked-out) (cook-io in out ""))
|
||||
(update-channels cooked-in cooked-out))
|
||||
(`(request (#"shell" ,_) ,k)
|
||||
(k 'ok)
|
||||
(run-shell in out))
|
||||
(`(request ,other ,k)
|
||||
(log-info (format "repl-channel: request ~v" other))
|
||||
(k 'error)
|
||||
(loop)))))))))
|
||||
(define (repl-shell username in out)
|
||||
(fprintf out "Hello, ~a.\n" username)
|
||||
(parameterize ((current-input-port in)
|
||||
(current-output-port out)
|
||||
(current-error-port out)
|
||||
(sandbox-input in)
|
||||
(sandbox-output out)
|
||||
(sandbox-error-output out)
|
||||
(current-namespace (make-empty-namespace)))
|
||||
(parameterize ((current-eval (make-evaluator 'racket/base)))
|
||||
(read-eval-print-loop))
|
||||
(fprintf out "\nGoodbye!\n")
|
||||
(close-input-port in)
|
||||
(close-output-port out)))
|
||||
|
||||
(define (t-server)
|
||||
(define s (tcp-listen 2322 4 #t "localhost"))
|
||||
(printf "Accepting...\n")
|
||||
(let loop ()
|
||||
(let-values (((i o) (tcp-accept s)))
|
||||
(standard-thread
|
||||
(lambda ()
|
||||
(simple-ssh-server (ssh-session 'server i o)
|
||||
(lambda (username channel-type extra-request-data state)
|
||||
(match channel-type
|
||||
(#"session"
|
||||
(values `(ok ,(make-repl-channel-main username) #"")
|
||||
state))
|
||||
(else
|
||||
(values `(error ,SSH_OPEN_UNKNOWN_CHANNEL_TYPE
|
||||
"Unknown channel type")
|
||||
state))))
|
||||
'no-state)))
|
||||
(loop))))
|
||||
(tcp-pty-ssh-server s repl-shell))
|
||||
|
||||
(if (getenv "clientmode")
|
||||
(t-client)
|
||||
(void) #;(t-client)
|
||||
(t-server))
|
||||
|
|
|
@ -0,0 +1,206 @@
|
|||
#lang racket/base
|
||||
|
||||
(require (planet tonyg/bitsyntax))
|
||||
|
||||
(require racket/tcp)
|
||||
(require racket/match)
|
||||
(require racket/class)
|
||||
(require racket/port)
|
||||
|
||||
(require "conversation.rkt")
|
||||
(require "ssh-numbers.rkt")
|
||||
(require "ssh-session.rkt")
|
||||
(require "standard-thread.rkt")
|
||||
(require "functional-queue.rkt")
|
||||
|
||||
(require "cook-port.rkt")
|
||||
|
||||
(provide channel-io-transfer-buffer-size
|
||||
|
||||
raw-ssh-server-session
|
||||
raw-ssh-server-session/session
|
||||
|
||||
pty-ssh-server-session
|
||||
pty-ssh-server-session-callback
|
||||
|
||||
tcp-pty-ssh-server)
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Parameters
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(define channel-io-transfer-buffer-size (make-parameter 4096))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Generic services
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(define (run-channel oob-ch app-out-port app-in-port in out handle)
|
||||
(define (close-in) (when (not (port-closed? in)) (close-input-port in)))
|
||||
(define (close-out) (when (not (port-closed? out)) (close-output-port out)))
|
||||
(define (close-ports)
|
||||
(close-in)
|
||||
(close-out)
|
||||
'closed)
|
||||
(let loop ((oob-queue (make-queue))
|
||||
(remaining-credit 0))
|
||||
(when (port-closed? app-in-port)
|
||||
;; The application has stopped listening. Ensure we stop sending, just as if an EOF
|
||||
;; was received from the remote.
|
||||
(close-out))
|
||||
(define finished-reading? (port-closed? in))
|
||||
(define finished-writing? (port-closed? out))
|
||||
(if (and finished-reading? finished-writing?)
|
||||
'closed
|
||||
(sync (if (queue-empty? oob-queue)
|
||||
never-evt
|
||||
(let-values (((first rest) (dequeue oob-queue)))
|
||||
(handle-evt (channel-put-evt oob-ch first)
|
||||
(lambda (dummy) (loop rest remaining-credit)))))
|
||||
(if finished-reading?
|
||||
never-evt
|
||||
(if (positive? remaining-credit)
|
||||
(let ((buffer (make-bytes (min (channel-io-transfer-buffer-size)
|
||||
remaining-credit))))
|
||||
(handle-evt (read-bytes-avail!-evt buffer in)
|
||||
(lambda (count)
|
||||
(if (eof-object? count)
|
||||
(begin (send handle say `(eof))
|
||||
(close-in)
|
||||
(loop oob-queue remaining-credit))
|
||||
(let ((data (sub-bit-string buffer 0 (* 8 count))))
|
||||
(begin (send handle say `(data ,data))
|
||||
(loop oob-queue (- remaining-credit count))))))))
|
||||
never-evt))
|
||||
(handle-evt (send handle listen-evt)
|
||||
(match-lambda
|
||||
((arrived _)
|
||||
(loop oob-queue remaining-credit))
|
||||
((and departure (departed who why))
|
||||
(send handle depart departure)
|
||||
(close-ports))
|
||||
((says _ (credit _ amount) _)
|
||||
(loop oob-queue (+ remaining-credit amount)))
|
||||
((says _ `(data ,data) _)
|
||||
(when (not finished-writing?) (write-bytes data out))
|
||||
;; TODO: propagate backpressure through pipes
|
||||
(send handle say (credit 'session (bytes-length data)))
|
||||
(loop oob-queue remaining-credit))
|
||||
((says _ `(eof) _)
|
||||
(close-out)
|
||||
(loop oob-queue remaining-credit))
|
||||
((says _ (and notification `(notify ,type ,data)) _)
|
||||
(loop (enqueue oob-queue notification) remaining-credit))
|
||||
((says _ (rpc-request reply-to id message) _)
|
||||
(loop (enqueue oob-queue
|
||||
`(request ,message
|
||||
,(lambda (answer)
|
||||
(send handle say
|
||||
(rpc-reply id answer)
|
||||
reply-to))))
|
||||
remaining-credit))))))))
|
||||
|
||||
(define (start-app-channel channel-main)
|
||||
(define channel-room (make-room 'channel))
|
||||
;;(spy-on channel-room)
|
||||
|
||||
(define oob-ch (make-channel))
|
||||
(define-values (session-a2s app-a2s) (make-pipe))
|
||||
(define-values (app-s2a session-s2a) (make-pipe))
|
||||
|
||||
(standard-thread (lambda ()
|
||||
(run-channel oob-ch
|
||||
app-a2s
|
||||
app-s2a
|
||||
session-a2s
|
||||
session-s2a
|
||||
(join-room channel-room 'app))))
|
||||
(wait-for-members channel-room '(app))
|
||||
|
||||
(standard-thread (lambda ()
|
||||
(channel-main oob-ch app-s2a app-a2s)))
|
||||
|
||||
channel-room)
|
||||
|
||||
(define (raw-ssh-server-session handle channel-open-callback state)
|
||||
(let loop ((state state))
|
||||
(match (send handle listen)
|
||||
((arrived _)
|
||||
(loop state))
|
||||
((and departure (departed _ _))
|
||||
(send handle depart departure))
|
||||
((says _ (rpc-request reply-to id message) _)
|
||||
(match message
|
||||
(`(open-channel ,username ,channel-type ,extra-request-data)
|
||||
(define-values (reply new-state)
|
||||
(channel-open-callback username channel-type extra-request-data state))
|
||||
(match reply
|
||||
(`(ok ,(? procedure? channel-main) ,(? bit-string? extra-reply-data))
|
||||
(send handle say
|
||||
(rpc-reply id `(ok ,(start-app-channel channel-main) ,extra-reply-data))
|
||||
reply-to))
|
||||
((and err `(error ,_ ,_))
|
||||
(send handle say (rpc-reply id err) reply-to)))
|
||||
(loop new-state)))))))
|
||||
|
||||
(define (raw-ssh-server-session/session handle session-callback)
|
||||
(raw-ssh-server-session handle
|
||||
(lambda (username channel-type extra-request-data state)
|
||||
(match channel-type
|
||||
(#"session"
|
||||
(define (start-session oob-ch in out)
|
||||
(session-callback username oob-ch in out))
|
||||
(values `(ok ,start-session #"") state))
|
||||
(else
|
||||
(values `(error ,SSH_OPEN_UNKNOWN_CHANNEL_TYPE
|
||||
"Unknown channel type")
|
||||
state))))
|
||||
'no-state))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; PTY-based/shell-like services
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(define (pty-ssh-server-session handle shell-callback)
|
||||
(raw-ssh-server-session/session handle (pty-ssh-server-session-callback shell-callback)))
|
||||
|
||||
(define (pty-ssh-server-session-callback shell-callback)
|
||||
(lambda (username oob-ch in out)
|
||||
(define (base-eh loop)
|
||||
(match-lambda
|
||||
(`(notify ,type ,data) ;; ignore notifications
|
||||
(log-debug (format "pty-ssh-server-session-callback: notification ~v ~v" type data))
|
||||
(loop))
|
||||
(`(request ,req ,k)
|
||||
(log-debug (format "pty-ssh-server-session-callback: ignored request ~v" req))
|
||||
(k 'error) ;; we don't support requests
|
||||
(loop))))
|
||||
|
||||
(define (start-shell in out)
|
||||
(define shell-thread (thread (lambda () (shell-callback username in out))))
|
||||
(let loop ()
|
||||
(sync (handle-evt oob-ch (base-eh loop))
|
||||
(handle-evt shell-thread void))))
|
||||
|
||||
(define (configure-shell in out)
|
||||
(let loop ()
|
||||
(sync (handle-evt oob-ch
|
||||
(match-lambda
|
||||
(`(request (#"pty-req" ,_) ,k)
|
||||
(k 'ok)
|
||||
(define-values (cooked-in cooked-out) (cook-io in out ""))
|
||||
(configure-shell cooked-in cooked-out))
|
||||
(`(request (#"shell" ,_) ,k)
|
||||
(k 'ok)
|
||||
(start-shell in out))
|
||||
(other ((base-eh loop) other)))))))
|
||||
|
||||
(configure-shell in out)))
|
||||
|
||||
(define (tcp-pty-ssh-server server-socket shell-callback)
|
||||
(let loop ()
|
||||
(define-values (i o) (tcp-accept server-socket))
|
||||
(standard-thread
|
||||
(lambda ()
|
||||
(pty-ssh-server-session (ssh-session 'server i o) shell-callback)))
|
||||
(loop)))
|
117
ssh-session.rkt
117
ssh-session.rkt
|
@ -28,8 +28,7 @@
|
|||
rekey-interval
|
||||
rekey-volume
|
||||
|
||||
ssh-session
|
||||
simple-ssh-server)
|
||||
ssh-session)
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Data definitions
|
||||
|
@ -138,8 +137,6 @@
|
|||
(define rekey-interval (make-parameter 60)) ;;3600))
|
||||
(define rekey-volume (make-parameter 1000000000))
|
||||
|
||||
(define channel-io-transfer-buffer-size (make-parameter 4096))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Packet dispatch and handling
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
@ -942,115 +939,3 @@
|
|||
#f))))
|
||||
|
||||
(join-room session-room 'app))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Session API
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(define (run-channel oob-ch app-out-port app-in-port in out handle)
|
||||
(define (close-in) (when (not (port-closed? in)) (close-input-port in)))
|
||||
(define (close-out) (when (not (port-closed? out)) (close-output-port out)))
|
||||
(define (close-ports)
|
||||
(close-in)
|
||||
(close-out)
|
||||
'closed)
|
||||
(let loop ((oob-queue (make-queue))
|
||||
(remaining-credit 0))
|
||||
(when (port-closed? app-in-port)
|
||||
;; The application has stopped listening. Ensure we stop sending, just as if an EOF
|
||||
;; was received from the remote.
|
||||
(close-out))
|
||||
(define finished-reading? (port-closed? in))
|
||||
(define finished-writing? (port-closed? out))
|
||||
(if (and finished-reading? finished-writing?)
|
||||
'closed
|
||||
(sync (if (queue-empty? oob-queue)
|
||||
never-evt
|
||||
(let-values (((first rest) (dequeue oob-queue)))
|
||||
(handle-evt (channel-put-evt oob-ch first)
|
||||
(lambda (dummy) (loop rest remaining-credit)))))
|
||||
(if finished-reading?
|
||||
never-evt
|
||||
(if (positive? remaining-credit)
|
||||
(let ((buffer (make-bytes (min (channel-io-transfer-buffer-size)
|
||||
remaining-credit))))
|
||||
(handle-evt (read-bytes-avail!-evt buffer in)
|
||||
(lambda (count)
|
||||
(if (eof-object? count)
|
||||
(begin (send handle say `(eof))
|
||||
(close-in)
|
||||
(loop oob-queue remaining-credit))
|
||||
(let ((data (sub-bit-string buffer 0 (* 8 count))))
|
||||
(begin (send handle say `(data ,data))
|
||||
(loop oob-queue (- remaining-credit count))))))))
|
||||
never-evt))
|
||||
(handle-evt (send handle listen-evt)
|
||||
(match-lambda
|
||||
((arrived _)
|
||||
(loop oob-queue remaining-credit))
|
||||
((and departure (departed who why))
|
||||
(send handle depart departure)
|
||||
(close-ports))
|
||||
((says _ (credit _ amount) _)
|
||||
(loop oob-queue (+ remaining-credit amount)))
|
||||
((says _ `(data ,data) _)
|
||||
(when (not finished-writing?) (write-bytes data out))
|
||||
;; TODO: propagate backpressure through pipes
|
||||
(send handle say (credit 'session (bytes-length data)))
|
||||
(loop oob-queue remaining-credit))
|
||||
((says _ `(eof) _)
|
||||
(close-out)
|
||||
(loop oob-queue remaining-credit))
|
||||
((says _ (and notification `(notify ,type ,data)) _)
|
||||
(loop (enqueue oob-queue notification) remaining-credit))
|
||||
((says _ (rpc-request reply-to id message) _)
|
||||
(loop (enqueue oob-queue
|
||||
`(request ,message
|
||||
,(lambda (answer)
|
||||
(send handle say
|
||||
(rpc-reply id answer)
|
||||
reply-to))))
|
||||
remaining-credit))))))))
|
||||
|
||||
(define (start-app-channel channel-main)
|
||||
(define channel-room (make-room 'channel))
|
||||
;;(spy-on channel-room)
|
||||
|
||||
(define oob-ch (make-channel))
|
||||
(define-values (session-a2s app-a2s) (make-pipe))
|
||||
(define-values (app-s2a session-s2a) (make-pipe))
|
||||
|
||||
(standard-thread (lambda ()
|
||||
(run-channel oob-ch
|
||||
app-a2s
|
||||
app-s2a
|
||||
session-a2s
|
||||
session-s2a
|
||||
(join-room channel-room 'app))))
|
||||
(wait-for-members channel-room '(app))
|
||||
|
||||
(standard-thread (lambda ()
|
||||
(channel-main oob-ch app-s2a app-a2s)))
|
||||
|
||||
channel-room)
|
||||
|
||||
(define (simple-ssh-server handle channel-open-callback state)
|
||||
(let loop ((state state))
|
||||
(match (send handle listen)
|
||||
((arrived _)
|
||||
(loop state))
|
||||
((and departure (departed _ _))
|
||||
(send handle depart departure))
|
||||
((says _ (rpc-request reply-to id message) _)
|
||||
(match message
|
||||
(`(open-channel ,username ,channel-type ,extra-request-data)
|
||||
(define-values (reply new-state)
|
||||
(channel-open-callback username channel-type extra-request-data state))
|
||||
(match reply
|
||||
(`(ok ,(? procedure? channel-main) ,(? bit-string? extra-reply-data))
|
||||
(send handle say
|
||||
(rpc-reply id `(ok ,(start-app-channel channel-main) ,extra-reply-data))
|
||||
reply-to))
|
||||
((and err `(error ,_ ,_))
|
||||
(send handle say (rpc-reply id err) reply-to)))
|
||||
(loop new-state)))))))
|
||||
|
|
Loading…
Reference in New Issue