tcp-rejected; reassert-on
This commit is contained in:
parent
1d51d1d014
commit
3bfef265a5
|
@ -17,6 +17,7 @@
|
|||
(provide (struct-out tcp-connection)
|
||||
(struct-out tcp-connection-peer)
|
||||
(struct-out tcp-accepted)
|
||||
(struct-out tcp-rejected)
|
||||
(struct-out tcp-out)
|
||||
(struct-out tcp-in)
|
||||
(struct-out tcp-in-line)
|
||||
|
@ -42,6 +43,7 @@
|
|||
(assertion-struct tcp-connection (id spec))
|
||||
(assertion-struct tcp-connection-peer (id addr))
|
||||
(assertion-struct tcp-accepted (id))
|
||||
(assertion-struct tcp-rejected (id exn))
|
||||
(message-struct tcp-out (id bytes))
|
||||
(message-struct tcp-in (id bytes))
|
||||
(message-struct tcp-in-line (id bytes))
|
||||
|
@ -65,20 +67,14 @@
|
|||
|
||||
(during/spawn (tcp-connection $id (tcp-address $host $port))
|
||||
#:name (list 'drivers/tcp 'outbound id host port)
|
||||
(define-values (cin cout)
|
||||
(with-handlers ([exn:fail? (lambda (e)
|
||||
;; TODO: it'd be nice to somehow
|
||||
;; communicate the actual error to
|
||||
;; the local peer.
|
||||
(log-syndicate/tcp-error "~a" (exn->string e))
|
||||
(define o (open-output-string))
|
||||
(close-output-port o)
|
||||
(values (open-input-string "")
|
||||
o))])
|
||||
(tcp:tcp-connect host port)))
|
||||
(assert (tcp-accepted id))
|
||||
(define unblock! (run-connection id cin cout))
|
||||
(unblock!))
|
||||
(match (with-handlers ([exn:fail? (lambda (e) (list e))])
|
||||
(define-values (cin cout) (tcp:tcp-connect host port))
|
||||
(list cin cout))
|
||||
[(list e) (assert (tcp-rejected id e))]
|
||||
[(list cin cout)
|
||||
(assert (tcp-accepted id))
|
||||
(define unblock! (run-connection id cin cout))
|
||||
(unblock!)]))
|
||||
|
||||
(during/spawn (observe (tcp-in-line $id _))
|
||||
#:name (list 'drivers/tcp 'line-reader id)
|
||||
|
@ -134,6 +130,7 @@
|
|||
(assert (tcp-connection-peer id remote-addr))
|
||||
(define unblock! (run-connection id cin cout))
|
||||
(on (asserted (tcp-accepted id)) (unblock!))
|
||||
(stop-when (asserted (tcp-rejected id _)))
|
||||
(stop-when (retracted (tcp-accepted id))))))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
|
|
@ -2,20 +2,30 @@
|
|||
|
||||
(require/activate imperative-syndicate/drivers/tcp)
|
||||
(require/activate imperative-syndicate/drivers/external-event)
|
||||
(require/activate imperative-syndicate/reassert)
|
||||
(require (only-in racket/port read-bytes-line-evt))
|
||||
|
||||
(spawn (define id 'chat)
|
||||
(define root-facet (current-facet))
|
||||
|
||||
(assert (tcp-connection id (tcp-address "localhost" 5999)))
|
||||
(on (asserted (tcp-accepted id)) (printf "*** Connected.\n"))
|
||||
(stop-when (retracted (tcp-accepted id)) (printf "*** Remote EOF. Terminating.\n"))
|
||||
(reassert-on (tcp-connection id (tcp-address "localhost" 5999))
|
||||
(retracted (tcp-accepted id))
|
||||
(asserted (tcp-rejected id _)))
|
||||
|
||||
(on (message (tcp-in id $bs))
|
||||
(write-bytes bs)
|
||||
(flush-output))
|
||||
(on (asserted (tcp-rejected id $reason))
|
||||
(printf "*** ~a\n" (exn-message reason)))
|
||||
|
||||
(define stdin-evt (read-bytes-line-evt (current-input-port) 'any))
|
||||
(on (message (inbound (external-event stdin-evt (list $line))))
|
||||
(if (eof-object? line)
|
||||
(stop-current-facet (printf "*** Local EOF. Terminating.\n"))
|
||||
(send! (tcp-out id (bytes-append line #"\n"))))))
|
||||
(during (tcp-accepted id)
|
||||
(on-start (printf "*** Connected.\n"))
|
||||
(on (retracted (tcp-accepted id)) (printf "*** Remote EOF.\n"))
|
||||
;; ^ Not on-stop, because the facet is stopped by local EOF too!
|
||||
|
||||
(on (message (tcp-in id $bs))
|
||||
(write-bytes bs)
|
||||
(flush-output))
|
||||
|
||||
(define stdin-evt (read-bytes-line-evt (current-input-port) 'any))
|
||||
(on (message (inbound (external-event stdin-evt (list $line))))
|
||||
(if (eof-object? line)
|
||||
(stop-facet root-facet (printf "*** Local EOF. Terminating.\n"))
|
||||
(send! (tcp-out id (bytes-append line #"\n")))))))
|
||||
|
|
|
@ -12,10 +12,14 @@
|
|||
|
||||
(let ()
|
||||
(define host "www.w3.org")
|
||||
(define port 80)
|
||||
(define connection-id 'httpclient)
|
||||
(define remote-handle (tcp-address (dns-get-address (dns-find-nameserver) "www.w3.org") 80))
|
||||
(define remote-handle (tcp-address (dns-get-address (dns-find-nameserver) host) port))
|
||||
|
||||
(spawn (assert (tcp-connection connection-id remote-handle))
|
||||
(stop-when (asserted (tcp-rejected connection-id $reason))
|
||||
(local-require racket/exn)
|
||||
(printf "Connection failed:\n ~a" (exn->string reason)))
|
||||
(on (asserted (tcp-accepted connection-id))
|
||||
(send! (tcp-out connection-id
|
||||
(bytes-append #"GET / HTTP/1.0\r\nHost: "
|
||||
|
|
|
@ -34,6 +34,20 @@
|
|||
(send! (outbound
|
||||
(tcp-out id (string->bytes/utf-8 (~a user " says '" text "'\n")))))))))))
|
||||
|
||||
(let ()
|
||||
(dataspace #:name 'connection-rejection-test
|
||||
(spawn #:name 'connection-rejection-main
|
||||
(local-require racket/exn)
|
||||
(define peer-host "192.168.1.1")
|
||||
;; TODO: ^ this will only reliably "fail" the way we want on my own network...
|
||||
(define peer-port 9999)
|
||||
(assert (outbound (tcp-connection 'x (tcp-address peer-host peer-port))))
|
||||
(stop-when (asserted (inbound (tcp-rejected 'x $reason)))
|
||||
(log-info "Connection to ~a:~a rejected:\n~a" peer-host peer-port (exn->string reason)))
|
||||
(on (asserted (inbound (tcp-accepted 'x)))
|
||||
(error 'connection-rejection-main
|
||||
"Unexpected accepted connection???")))))
|
||||
|
||||
(let ((dst (udp-listener 6667)))
|
||||
(dataspace #:name 'udp-echo-program-app
|
||||
(spawn #:name 'udp-echo-program
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
(provide (struct-out tcp-connection)
|
||||
(struct-out tcp-accepted)
|
||||
(struct-out tcp-rejected)
|
||||
(struct-out tcp-out)
|
||||
(struct-out tcp-in)
|
||||
(struct-out tcp-in-line)
|
||||
|
@ -30,6 +31,7 @@
|
|||
|
||||
(assertion-struct tcp-connection (id spec))
|
||||
(assertion-struct tcp-accepted (id))
|
||||
(assertion-struct tcp-rejected (id exn))
|
||||
(message-struct tcp-out (id bytes))
|
||||
(message-struct tcp-in (id bytes))
|
||||
(message-struct tcp-in-line (id bytes))
|
||||
|
@ -671,8 +673,8 @@
|
|||
(send! reset-packet))
|
||||
|
||||
(assert q) ;; Declare that this state vector exists
|
||||
(on-start (log-netstack/tcp-info "Starting ~a" (tcp-quad->string #t q)))
|
||||
(on-stop (log-netstack/tcp-info "Stopping ~a" (tcp-quad->string #t q)))
|
||||
(on-start (log-netstack/tcp-info "Starting ~a" (tcp-quad->string (not outbound?) q)))
|
||||
(on-stop (log-netstack/tcp-info "Stopping ~a" (tcp-quad->string (not outbound?) q)))
|
||||
|
||||
(stop-when #:when (and (buffer-finished? (outbound))
|
||||
(buffer-finished? (inbound))
|
||||
|
@ -701,7 +703,15 @@
|
|||
(define is-syn? (set-member? flags 'syn))
|
||||
(define is-fin? (set-member? flags 'fin))
|
||||
(cond
|
||||
[(set-member? flags 'rst) (stop-facet root-facet)]
|
||||
[(set-member? flags 'rst)
|
||||
(stop-facet root-facet
|
||||
(when (not (connected?)) ;; --> rejected!
|
||||
(define e (exn:fail:network
|
||||
(format "~a: Connection rejected" (tcp-quad->string #f q))
|
||||
(current-continuation-marks)))
|
||||
(react (assert (tcp-rejected connection-id e))
|
||||
(on-start (sleep 5)
|
||||
(stop-current-facet)))))]
|
||||
[(and (not expected) ;; no syn yet
|
||||
(or (not is-syn?) ;; and this isn't it
|
||||
(and (not (listener-listening?)) ;; or it is, but no listener...
|
||||
|
@ -743,6 +753,12 @@
|
|||
(close-outbound-stream!)))
|
||||
(begin
|
||||
(assert #:when (connected?) (tcp-connection connection-id (tcp-listener local-port)))
|
||||
(on (asserted (tcp-rejected connection-id _))
|
||||
;; In principle, we have the flexibility to delay
|
||||
;; replying to SYN until userland decides whether or not
|
||||
;; to accept an incoming connection! We don't do that yet
|
||||
;; though.
|
||||
(close-outbound-stream!))
|
||||
(on (retracted (tcp-accepted connection-id))
|
||||
(close-outbound-stream!))
|
||||
(on-start (sleep 5)
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
#lang imperative-syndicate
|
||||
;; Re-assert an assertion when one of a set of triggering events is seen, after a delay.
|
||||
;; Building block for building reconnection strategies.
|
||||
|
||||
(provide reassert-on
|
||||
(struct-out fixed-retry))
|
||||
|
||||
(require/activate imperative-syndicate/drivers/timer)
|
||||
|
||||
(struct fixed-retry (delay-ms) #:transparent
|
||||
#:property prop:procedure
|
||||
(lambda (f) (values (fixed-retry-delay-ms f) f)))
|
||||
|
||||
(define-logger syndicate/reassert)
|
||||
|
||||
(define-syntax reassert-on
|
||||
(syntax-rules ()
|
||||
[(_ assertion #:strategy strategy reset-event ...)
|
||||
(reassert-on* assertion
|
||||
#:strategy strategy
|
||||
(list (lambda (k) (stop-when reset-event (k))) ...))]
|
||||
[(_ assertion reset-event ...)
|
||||
(reassert-on assertion #:strategy (fixed-retry 1000) reset-event ...)]))
|
||||
|
||||
(begin-for-declarations
|
||||
(define (reassert-on* assertion #:strategy strategy event-fns)
|
||||
(on-start (let reassert ((strategy strategy))
|
||||
(react (log-syndicate/reassert-debug "~v: Asserting" assertion)
|
||||
(assert assertion)
|
||||
(define (reset)
|
||||
(log-syndicate/reassert-debug "~v: Resetting with ~v" assertion strategy)
|
||||
(define-values (delay-ms next-strategy) (strategy))
|
||||
(sleep (/ delay-ms 1000.0))
|
||||
(reassert next-strategy))
|
||||
(for-each (lambda (f) (f reset)) event-fns))))))
|
Loading…
Reference in New Issue