Fix bugs in channel shutdown handling. Add tty cooker.
This commit is contained in:
parent
887aada203
commit
d47f553e21
|
@ -0,0 +1,67 @@
|
|||
#lang racket/base
|
||||
|
||||
(require racket/port)
|
||||
|
||||
(provide cook-io)
|
||||
|
||||
(define clear-to-eol "\033[2K")
|
||||
(define kill-line (string-append "\r" clear-to-eol))
|
||||
|
||||
(struct buffer (chars consume-next-linefeed?) #:transparent)
|
||||
|
||||
(define (buffer-empty? b)
|
||||
(null? (buffer-chars b)))
|
||||
|
||||
(define (buffer-adjust b new-chars)
|
||||
(struct-copy buffer b
|
||||
[chars new-chars]
|
||||
[consume-next-linefeed? #f]))
|
||||
|
||||
(define (buffer-contents b)
|
||||
(list->string (reverse (buffer-chars b))))
|
||||
|
||||
(define (update-buffer b key prompt k-eof k-complete k-ongoing)
|
||||
(case key
|
||||
((#\backspace) (if (buffer-empty? b)
|
||||
(k-ongoing b "")
|
||||
(k-ongoing (buffer-adjust b (cdr (buffer-chars b))) "\b \b")))
|
||||
((#\return) (k-complete (buffer-contents b) (buffer '() #t)))
|
||||
((#\newline) (if (buffer-consume-next-linefeed? b)
|
||||
(k-ongoing (struct-copy buffer b [consume-next-linefeed? #f]) "")
|
||||
(k-complete (buffer-contents b) (buffer '() #f))))
|
||||
((#\page) (k-ongoing b (string-append kill-line prompt (buffer-contents b))))
|
||||
((#\004) ;; control-D, UNIX EOF
|
||||
(if (buffer-empty? b)
|
||||
(k-eof)
|
||||
(k-ongoing b "")))
|
||||
((#\033) ;; escape
|
||||
(k-ongoing (buffer '() #f) (string-append kill-line prompt)))
|
||||
(else (if (char-iso-control? key)
|
||||
(k-ongoing b "")
|
||||
(k-ongoing (buffer-adjust b (cons key (buffer-chars b))) (string key))))))
|
||||
|
||||
(define (cook-io raw-in raw-out prompt)
|
||||
(define-values (cooked-in cooked-out) (make-pipe))
|
||||
(thread
|
||||
(lambda ()
|
||||
(let loop ((b (buffer '() #f)))
|
||||
(if (port-closed? cooked-out)
|
||||
;; our job here is
|
||||
'done
|
||||
(let ((s (read-string 1 raw-in)))
|
||||
(if (eof-object? s)
|
||||
(begin (close-output-port cooked-out)
|
||||
(loop b))
|
||||
(begin (update-buffer b (string-ref s 0) prompt
|
||||
(lambda ()
|
||||
(close-output-port cooked-out)
|
||||
(close-input-port raw-in))
|
||||
(lambda (line new-b)
|
||||
(write-string "\r\n" raw-out)
|
||||
(write-string line cooked-out)
|
||||
(newline cooked-out)
|
||||
(loop new-b))
|
||||
(lambda (new-b feedback)
|
||||
(write-string feedback raw-out)
|
||||
(loop new-b))))))))))
|
||||
(values cooked-in raw-out))
|
|
@ -12,6 +12,8 @@
|
|||
(require "ssh-session.rkt")
|
||||
(require "standard-thread.rkt")
|
||||
|
||||
(require "cook-port.rkt")
|
||||
|
||||
(define (t-client)
|
||||
(let-values (((i o) (tcp-connect "localhost"
|
||||
2323
|
||||
|
@ -21,27 +23,42 @@
|
|||
(printf "Got API ~v\n" api)
|
||||
(semaphore-wait (make-semaphore 0)))))
|
||||
|
||||
(define (repl-channel-main oob-ch in out)
|
||||
(let loop ()
|
||||
(sync (handle-evt oob-ch
|
||||
(match-lambda
|
||||
(`(notify ,type ,data)
|
||||
(log-info (format "repl-channel: notify ~v ~v" type data))
|
||||
(loop))
|
||||
(`(request (#"shell" ,_) ,k)
|
||||
(k 'ok)
|
||||
(loop))
|
||||
(`(request ,other ,k)
|
||||
(log-info (format "repl-channel: request ~v" other))
|
||||
(k 'error)
|
||||
(loop))))
|
||||
(handle-evt (read-line-evt in 'any)
|
||||
(lambda (line)
|
||||
(if (eof-object? line)
|
||||
(begin (fprintf out "Goodbye\n" out)
|
||||
'done)
|
||||
(begin (fprintf out "You said ~s\n" line)
|
||||
(loop))))))))
|
||||
(define prompt "RacketSSH> ")
|
||||
|
||||
(define (make-repl-channel-main username)
|
||||
(lambda (oob-ch in out)
|
||||
(fprintf out "Hello, ~a.\r\n~a" username prompt)
|
||||
(let update-channels ((in in) (out out))
|
||||
(let loop ()
|
||||
(sync (handle-evt oob-ch
|
||||
(match-lambda
|
||||
(`(notify ,type ,data)
|
||||
(log-info (format "repl-channel: notify ~v ~v" type data))
|
||||
(loop))
|
||||
(`(request (#"shell" ,_) ,k)
|
||||
(k 'ok)
|
||||
(loop))
|
||||
(`(request (#"pty-req" ,_) ,k)
|
||||
(k 'ok)
|
||||
(define-values (cooked-in cooked-out) (cook-io in out prompt))
|
||||
(update-channels cooked-in cooked-out))
|
||||
(`(request ,other ,k)
|
||||
(log-info (format "repl-channel: request ~v" other))
|
||||
(k 'error)
|
||||
(loop))))
|
||||
#;(handle-evt (read-bytes-evt 10000 in)
|
||||
(lambda (buf)
|
||||
(write-bytes buf out)
|
||||
(loop)))
|
||||
(handle-evt (read-line-evt in 'any)
|
||||
(lambda (line)
|
||||
;;(log-info (format "received ~v" line))
|
||||
(if (eof-object? line)
|
||||
(begin (fprintf out "\r\nGoodbye\r\n")
|
||||
'done)
|
||||
(begin (fprintf out "You said ~s\r\n" line)
|
||||
(write-string prompt out)
|
||||
(loop))))))))))
|
||||
|
||||
(define (t-server)
|
||||
(define s (tcp-listen 2322 4 #t "localhost"))
|
||||
|
@ -54,7 +71,7 @@
|
|||
(lambda (username channel-type extra-request-data state)
|
||||
(match channel-type
|
||||
(#"session"
|
||||
(values `(ok ,repl-channel-main #"")
|
||||
(values `(ok ,(make-repl-channel-main username) #"")
|
||||
state))
|
||||
(else
|
||||
(values `(error ,SSH_OPEN_UNKNOWN_CHANNEL_TYPE
|
||||
|
|
|
@ -937,7 +937,7 @@
|
|||
;; Session API
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(define (run-channel oob-ch in out handle channel-thread)
|
||||
(define (run-channel oob-ch app-out-port in out handle channel-thread)
|
||||
(define (close-in)
|
||||
(when (not (port-closed? in))
|
||||
(close-input-port in)))
|
||||
|
@ -946,45 +946,62 @@
|
|||
(close-output-port out)))
|
||||
(define (close-ports)
|
||||
(close-in)
|
||||
(close-out))
|
||||
(close-out)
|
||||
'done)
|
||||
(let loop ((oob-queue (make-queue))
|
||||
(remaining-credit 0))
|
||||
(remaining-credit 0)
|
||||
(channel-thread-alive #t))
|
||||
(sync (if (queue-empty? oob-queue)
|
||||
never-evt
|
||||
(let-values (((first rest) (dequeue oob-queue)))
|
||||
(handle-evt (channel-put-evt oob-ch first)
|
||||
(lambda (dummy) (loop rest remaining-credit)))))
|
||||
(handle-evt (thread-dead-evt channel-thread)
|
||||
(lambda (dummy)
|
||||
(close-ports)
|
||||
'done))
|
||||
(if (and (not (port-closed? in)) (positive? remaining-credit))
|
||||
(let ((buffer (make-bytes (min (channel-io-transfer-buffer-size) remaining-credit))))
|
||||
(handle-evt (read-bytes-avail!-evt buffer in)
|
||||
(lambda (count)
|
||||
(if (eof-object? count)
|
||||
(begin (close-in)
|
||||
(loop oob-queue remaining-credit))
|
||||
(begin (send handle say
|
||||
`(data ,(sub-bit-string buffer 0 (* 8 count))))
|
||||
(loop oob-queue (- remaining-credit count)))))))
|
||||
(lambda (dummy) (loop rest remaining-credit channel-thread-alive)))))
|
||||
(if channel-thread-alive
|
||||
(handle-evt (thread-dead-evt channel-thread)
|
||||
(lambda (dummy)
|
||||
(when (not (port-closed? app-out-port))
|
||||
(close-output-port app-out-port))
|
||||
(loop oob-queue remaining-credit #f)))
|
||||
never-evt)
|
||||
(if (port-closed? in)
|
||||
(if channel-thread-alive
|
||||
never-evt
|
||||
(handle-evt always-evt (lambda (dummy) (close-ports))))
|
||||
(if (positive? remaining-credit)
|
||||
(let ((buffer (make-bytes (min (channel-io-transfer-buffer-size)
|
||||
remaining-credit))))
|
||||
(handle-evt (read-bytes-avail!-evt buffer in)
|
||||
(lambda (count)
|
||||
(if (eof-object? count)
|
||||
(begin (close-in)
|
||||
(if channel-thread-alive
|
||||
(loop oob-queue
|
||||
remaining-credit
|
||||
channel-thread-alive)
|
||||
(close-ports)))
|
||||
(begin (send handle say
|
||||
`(data ,(sub-bit-string buffer 0 (* 8 count))))
|
||||
(loop oob-queue
|
||||
(- remaining-credit count)
|
||||
channel-thread-alive))))))
|
||||
never-evt))
|
||||
(handle-evt (send handle listen-evt)
|
||||
(match-lambda
|
||||
((arrived _)
|
||||
(loop oob-queue remaining-credit))
|
||||
(loop oob-queue remaining-credit channel-thread-alive))
|
||||
((departed _ _)
|
||||
(close-ports)
|
||||
'done)
|
||||
(close-ports))
|
||||
((says _ (credit _ amount) _)
|
||||
(loop oob-queue (+ remaining-credit amount)))
|
||||
(loop oob-queue (+ remaining-credit amount) channel-thread-alive))
|
||||
((says _ `(data ,data) _)
|
||||
(write-bytes data out)
|
||||
;; TODO: propagate backpressure through pipes
|
||||
(send handle say (credit 'session (bytes-length data)))
|
||||
(loop oob-queue remaining-credit))
|
||||
(loop oob-queue remaining-credit channel-thread-alive))
|
||||
((says _ (and notification `(notify ,type ,data)) _)
|
||||
(loop (enqueue oob-queue notification) remaining-credit))
|
||||
(loop (enqueue oob-queue notification)
|
||||
remaining-credit
|
||||
channel-thread-alive))
|
||||
((says _ (rpc-request reply-to id message) _)
|
||||
(loop (enqueue oob-queue
|
||||
`(request ,message
|
||||
|
@ -992,11 +1009,12 @@
|
|||
(send handle say
|
||||
(rpc-reply id answer)
|
||||
reply-to))))
|
||||
remaining-credit)))))))
|
||||
remaining-credit
|
||||
channel-thread-alive)))))))
|
||||
|
||||
(define (start-app-channel channel-main)
|
||||
(define channel-room (make-room 'channel))
|
||||
(spy-on channel-room)
|
||||
;;(spy-on channel-room)
|
||||
|
||||
(define oob-ch (make-channel))
|
||||
(define-values (session-a2s app-a2s) (make-pipe))
|
||||
|
@ -1004,6 +1022,7 @@
|
|||
|
||||
(standard-thread (lambda ()
|
||||
(run-channel oob-ch
|
||||
app-a2s
|
||||
session-a2s
|
||||
session-s2a
|
||||
(join-room channel-room 'app)
|
||||
|
|
Loading…
Reference in New Issue