More channel support; Lift event-handlers into a state-monad-like form.
This commit is contained in:
parent
204edd2679
commit
038be62f03
224
ssh-session.rkt
224
ssh-session.rkt
|
@ -79,6 +79,22 @@
|
|||
session-id) ;; starts off #f until initial keying
|
||||
#:transparent)
|
||||
|
||||
;; A CloseState is one of
|
||||
;; - 'neither, indicating that neither side has signalled closure
|
||||
;; - 'local, only the local end has signalled closure
|
||||
;; - 'remote, only the remote end has signalled closure
|
||||
;; - 'both, both ends have signalled closure.
|
||||
;; Represents local knowledge of the state of a shared shutdown state
|
||||
;; machine.
|
||||
;;
|
||||
;; 'neither
|
||||
;; / \
|
||||
;; \/ \/
|
||||
;; 'local 'remote
|
||||
;; \ /
|
||||
;; \/ \/
|
||||
;; 'both
|
||||
|
||||
;; A ChannelState is a (ssh-channel ...) TODO
|
||||
;; Named ssh-channel to avoid conflicts with Racket's built-in
|
||||
;; synchronous channels.
|
||||
|
@ -90,6 +106,8 @@
|
|||
outbound-window ;; Maybe<Natural>
|
||||
outbound-packet-size ;; Maybe<Natural>
|
||||
inbound-window ;; Natural
|
||||
eof-state ;; CloseState covering EOF signals
|
||||
close-state ;; CloseState covering CLOSE signals
|
||||
)
|
||||
#:transparent)
|
||||
|
||||
|
@ -520,11 +538,62 @@
|
|||
outbound-window
|
||||
outbound-packet-size
|
||||
1048576 ;; TODO: parameterize? Make configurable by app?
|
||||
'neither
|
||||
'neither
|
||||
))
|
||||
(values ch
|
||||
(struct-copy connection conn
|
||||
[channel-map (hash-set (connection-channel-map conn) my-ref ch)])))
|
||||
|
||||
(define (get-channel conn my-ref)
|
||||
(hash-ref (connection-channel-map conn) my-ref))
|
||||
|
||||
(define (update-channel conn ch)
|
||||
(struct-copy connection conn
|
||||
[channel-map (hash-set (connection-channel-map conn) (ssh-channel-my-ref ch) ch)]))
|
||||
|
||||
(define (discard-channel ch conn)
|
||||
(struct-copy connection conn
|
||||
[channel-map (hash-remove (connection-channel-map conn) (ssh-channel-my-ref ch))]))
|
||||
|
||||
;; CloseState Either<'local,'remote> -> CloseState
|
||||
(define (update-close-state old-state action)
|
||||
(define local? (case action ((local) #t) ((remote) #f)))
|
||||
(case old-state
|
||||
((neither) (if local? 'local 'remote))
|
||||
((local) (if local? 'local 'both))
|
||||
((remote) (if local? 'both 'remote))
|
||||
((both) 'both)))
|
||||
|
||||
(define (maybe-close-channel ch conn action)
|
||||
(define new-close-state (update-close-state (ssh-channel-close-state ch) action))
|
||||
(if (eq? new-close-state 'both)
|
||||
(discard-channel ch conn)
|
||||
(update-channel conn (struct-copy ssh-channel ch
|
||||
[close-state new-close-state]))))
|
||||
|
||||
|
||||
|
||||
(define (channel-request conn ch message k)
|
||||
(update-channel conn
|
||||
(struct-copy ssh-channel ch
|
||||
[continuations (room-rpc (ssh-channel-room-handle ch)
|
||||
(ssh-channel-continuations conn)
|
||||
message
|
||||
k)])))
|
||||
|
||||
(define (finish-channel-request ch conn txn message)
|
||||
(define-values (worklist new-continuations)
|
||||
(room-rpc-finish (ssh-channel-continuations ch) txn message))
|
||||
(let loop ((worklist worklist)
|
||||
(ch (struct-copy ssh-channel ch [continuations new-continuations]))
|
||||
(conn conn))
|
||||
(if (null? worklist)
|
||||
(update-channel conn ch)
|
||||
(let ((item (car worklist)))
|
||||
(define-values (new-ch new-conn) (item ch conn))
|
||||
(loop (cdr worklist) new-ch new-conn)))))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Connection service
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
@ -579,15 +648,7 @@
|
|||
(string->bytes/utf-8 description)
|
||||
#"")
|
||||
conn)
|
||||
conn)))
|
||||
(lambda (reason conn)
|
||||
(write-message!/flush (ssh-msg-channel-open-failure
|
||||
sender-channel
|
||||
0
|
||||
#"Internal error"
|
||||
#"")
|
||||
conn)
|
||||
conn)))))
|
||||
conn)))))))
|
||||
|
||||
(define (handle-msg-window-adjust packet message conn)
|
||||
(log-error "TODO: Unimplemented: handle-msg-window-adjust")
|
||||
|
@ -631,22 +692,19 @@
|
|||
(send (connection-session-room-handle conn) say message)
|
||||
conn)
|
||||
|
||||
(define (app-request conn message k-ok k-error)
|
||||
(define-values (transaction new-continuations)
|
||||
(open-transaction (connection-continuations conn) (list k-ok k-error)))
|
||||
(send (connection-session-room-handle conn) say (rpc-request 'session transaction message))
|
||||
(struct-copy connection conn [continuations new-continuations]))
|
||||
(define (app-request conn message k)
|
||||
(struct-copy connection conn
|
||||
[continuations (room-rpc (connection-session-room-handle conn)
|
||||
(connection-continuations conn)
|
||||
message
|
||||
k)]))
|
||||
|
||||
(define (finish-app-request conn txn context-extractor message-or-reason)
|
||||
(close-transaction! txn (list context-extractor message-or-reason))
|
||||
(let loop ((conn conn))
|
||||
(if (transaction-available? (connection-continuations conn))
|
||||
(let-values (((txn rest) (dequeue-transaction (connection-continuations conn))))
|
||||
(match-define (list context-extractor message-or-reason) (transaction-value txn))
|
||||
(loop ((context-extractor (transaction-context txn))
|
||||
message-or-reason
|
||||
(struct-copy connection conn [continuations rest]))))
|
||||
conn)))
|
||||
(define (finish-app-request conn txn message)
|
||||
(define-values (worklist new-continuations)
|
||||
(room-rpc-finish (connection-continuations conn) txn message))
|
||||
(foldl (lambda (item conn) (item conn))
|
||||
(struct-copy connection conn [continuations new-continuations])
|
||||
worklist))
|
||||
|
||||
(define (maybe-send-disconnect-message! e conn)
|
||||
(when (not (exn:fail:contract:protocol-originated-at-peer? e))
|
||||
|
@ -658,6 +716,69 @@
|
|||
(define (bump-total amount conn)
|
||||
(struct-copy connection conn [total-transferred (+ (connection-total-transferred conn) amount)]))
|
||||
|
||||
(define io-room-message-handler
|
||||
(lambda (message)
|
||||
(lambda (conn)
|
||||
(match message
|
||||
((arrived 'read-thread)
|
||||
(send (connection-io-room-handle conn) say (credit 'read-thread 1))
|
||||
conn)
|
||||
((arrived _)
|
||||
conn)
|
||||
((and departure (departed who why))
|
||||
(disconnect-with-error/local-info
|
||||
departure
|
||||
SSH_DISCONNECT_CONNECTION_LOST
|
||||
"I/O error"))
|
||||
((says _ amount 'output-byte-count)
|
||||
;; writer reporting bytes transferred
|
||||
(bump-total amount conn))
|
||||
((says _ (received-packet seq packet message transferred-count) _)
|
||||
(send (connection-io-room-handle conn) say (credit 'read-thread 1))
|
||||
(bump-total
|
||||
transferred-count
|
||||
(if (connection-discard-next-packet? conn)
|
||||
(struct-copy connection conn [discard-next-packet? #f])
|
||||
(dispatch-packet seq packet message conn))))))))
|
||||
|
||||
(define session-room-message-handler
|
||||
(lambda (message)
|
||||
(lambda (conn)
|
||||
(match message
|
||||
((arrived _)
|
||||
conn)
|
||||
((and departure (departed who why))
|
||||
(disconnect-with-error/local-info
|
||||
departure
|
||||
SSH_DISCONNECT_BY_APPLICATION
|
||||
"Application disconnected"))
|
||||
((says _ (rpc-reply transaction message) _)
|
||||
;; TODO: not cap-secure. Introduce sealers, or indirect.
|
||||
(finish-app-request conn transaction message))))))
|
||||
|
||||
;; (K V A -> A) A Hash<K,V> -> A
|
||||
(define (hash-fold fn seed hash)
|
||||
(do ((pos (hash-iterate-first hash) (hash-iterate-next hash pos))
|
||||
(seed seed (fn (hash-iterate-key hash pos) (hash-iterate-value hash pos) seed)))
|
||||
((not pos) seed)))
|
||||
|
||||
(define (channel-events conn)
|
||||
(hash-fold (lambda (my-ref ch evt)
|
||||
(choice-evt evt
|
||||
(handle-evt (send (ssh-channel-room-handle ch) listen-evt)
|
||||
(channel-room-message-handler my-ref))))
|
||||
never-evt
|
||||
(connection-channel-map conn)))
|
||||
|
||||
(define (channel-room-message-handler my-ref)
|
||||
(lambda (message)
|
||||
(lambda (conn)
|
||||
(define ch (get-channel conn my-ref))
|
||||
(match message
|
||||
((arrived _) conn)
|
||||
((departed _ _) (maybe-close-channel ch conn 'local))
|
||||
((says _ (rpc-reply id m) _) (finish-channel-request ch conn id m))))))
|
||||
|
||||
(define (run-ssh-session conn)
|
||||
(with-handlers
|
||||
((exn:fail:contract:protocol? (lambda (e)
|
||||
|
@ -669,49 +790,18 @@
|
|||
(let ((algs ((local-algorithm-list))))
|
||||
(write-message!/flush algs conn)
|
||||
(loop (struct-copy connection conn [rekey-state (rekey-local algs)])))
|
||||
(sync (if (rekey-wait? rekey)
|
||||
(handle-evt (alarm-evt (* (rekey-wait-deadline rekey) 1000))
|
||||
(lambda (dummy) (loop conn)))
|
||||
never-evt)
|
||||
(handle-evt (send (connection-io-room-handle conn) listen-evt)
|
||||
(match-lambda
|
||||
((arrived 'read-thread)
|
||||
(send (connection-io-room-handle conn) say (credit 'read-thread 1))
|
||||
(loop conn))
|
||||
((arrived _)
|
||||
(loop conn))
|
||||
((and departure (departed who why))
|
||||
(disconnect-with-error/local-info
|
||||
departure
|
||||
SSH_DISCONNECT_CONNECTION_LOST
|
||||
"I/O error"))
|
||||
((says _ amount 'output-byte-count)
|
||||
;; writer reporting bytes transferred
|
||||
(loop (bump-total amount conn)))
|
||||
((says _ (received-packet seq packet message transferred-count) _)
|
||||
(send (connection-io-room-handle conn) say (credit 'read-thread 1))
|
||||
(loop
|
||||
(bump-total
|
||||
transferred-count
|
||||
(if (connection-discard-next-packet? conn)
|
||||
(struct-copy connection conn [discard-next-packet? #f])
|
||||
(dispatch-packet seq packet message conn)))))))
|
||||
(handle-evt (send (connection-session-room-handle conn) listen-evt)
|
||||
(match-lambda
|
||||
((arrived _)
|
||||
(loop conn))
|
||||
((and departure (departed who why))
|
||||
(disconnect-with-error/local-info
|
||||
departure
|
||||
SSH_DISCONNECT_BY_APPLICATION
|
||||
"Application disconnected"))
|
||||
((says _ (rpc-reply transaction message) _)
|
||||
;; TODO: not cap-secure. Introduce sealers, or indirect.
|
||||
(loop (finish-app-request conn transaction car message)))
|
||||
((says _ (rpc-error transaction reason) _)
|
||||
;; TODO: not cap-secure. Introduce sealers, or indirect.
|
||||
(loop (finish-app-request conn transaction cadr reason)))
|
||||
)))))))
|
||||
(let ((handler (sync (if (rekey-wait? rekey)
|
||||
(handle-evt (alarm-evt (* (rekey-wait-deadline rekey) 1000))
|
||||
(lambda (dummy)
|
||||
(lambda (conn)
|
||||
conn)))
|
||||
never-evt)
|
||||
(handle-evt (send (connection-io-room-handle conn) listen-evt)
|
||||
io-room-message-handler)
|
||||
(handle-evt (send (connection-session-room-handle conn) listen-evt)
|
||||
session-room-message-handler)
|
||||
(channel-events conn))))
|
||||
(loop (handler conn)))))))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Session choreography
|
||||
|
|
Loading…
Reference in New Issue