Exploit thread-dead?, rather than tracking it ourselves.

This commit is contained in:
Tony Garnock-Jones 2011-10-26 10:47:21 -04:00
parent d47f553e21
commit 5e0d17e48f
1 changed files with 21 additions and 25 deletions

View File

@ -949,24 +949,27 @@
(close-out)
'done)
(let loop ((oob-queue (make-queue))
(remaining-credit 0)
(channel-thread-alive #t))
(remaining-credit 0))
(define channel-thread-dead (thread-dead? channel-thread))
(sync (if (queue-empty? oob-queue)
never-evt
(let-values (((first rest) (dequeue oob-queue)))
(handle-evt (channel-put-evt oob-ch first)
(lambda (dummy) (loop rest remaining-credit channel-thread-alive)))))
(if channel-thread-alive
(lambda (dummy) (loop rest remaining-credit)))))
(if channel-thread-dead
never-evt
(handle-evt (thread-dead-evt channel-thread)
(lambda (dummy)
(when (not (port-closed? app-out-port))
;; If the thread died without closing its output-port, do
;; that here. That way we get to drain the port before
;; terminating ourselves.
(close-output-port app-out-port))
(loop oob-queue remaining-credit #f)))
never-evt)
(loop oob-queue remaining-credit))))
(if (port-closed? in)
(if channel-thread-alive
never-evt
(handle-evt always-evt (lambda (dummy) (close-ports))))
(if channel-thread-dead
(handle-evt always-evt (lambda (dummy) (close-ports)))
never-evt)
(if (positive? remaining-credit)
(let ((buffer (make-bytes (min (channel-io-transfer-buffer-size)
remaining-credit))))
@ -974,34 +977,28 @@
(lambda (count)
(if (eof-object? count)
(begin (close-in)
(if channel-thread-alive
(loop oob-queue
remaining-credit
channel-thread-alive)
(close-ports)))
(if channel-thread-dead
(close-ports)
(loop oob-queue remaining-credit)))
(begin (send handle say
`(data ,(sub-bit-string buffer 0 (* 8 count))))
(loop oob-queue
(- remaining-credit count)
channel-thread-alive))))))
(loop oob-queue (- remaining-credit count)))))))
never-evt))
(handle-evt (send handle listen-evt)
(match-lambda
((arrived _)
(loop oob-queue remaining-credit channel-thread-alive))
(loop oob-queue remaining-credit))
((departed _ _)
(close-ports))
((says _ (credit _ amount) _)
(loop oob-queue (+ remaining-credit amount) channel-thread-alive))
(loop oob-queue (+ remaining-credit amount)))
((says _ `(data ,data) _)
(write-bytes data out)
;; TODO: propagate backpressure through pipes
(send handle say (credit 'session (bytes-length data)))
(loop oob-queue remaining-credit channel-thread-alive))
(loop oob-queue remaining-credit))
((says _ (and notification `(notify ,type ,data)) _)
(loop (enqueue oob-queue notification)
remaining-credit
channel-thread-alive))
(loop (enqueue oob-queue notification) remaining-credit))
((says _ (rpc-request reply-to id message) _)
(loop (enqueue oob-queue
`(request ,message
@ -1009,8 +1006,7 @@
(send handle say
(rpc-reply id answer)
reply-to))))
remaining-credit
channel-thread-alive)))))))
remaining-credit)))))))
(define (start-app-channel channel-main)
(define channel-room (make-room 'channel))