From 4e33429b194c5a64defa94c316cc0c9963a4ace7 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 18 Mar 2019 23:27:59 +0000 Subject: [PATCH] tcp-rejected; reassert-on --- syndicate/drivers/tcp.rkt | 25 ++++++++--------- syndicate/examples/chat-client.rkt | 32 ++++++++++++++-------- syndicate/examples/netstack/fetchurl.rkt | 6 +++- syndicate/examples/netstack/main.rkt | 14 ++++++++++ syndicate/examples/netstack/tcp.rkt | 22 +++++++++++++-- syndicate/reassert.rkt | 35 ++++++++++++++++++++++++ 6 files changed, 105 insertions(+), 29 deletions(-) create mode 100644 syndicate/reassert.rkt diff --git a/syndicate/drivers/tcp.rkt b/syndicate/drivers/tcp.rkt index 824dbf6..1589b7b 100644 --- a/syndicate/drivers/tcp.rkt +++ b/syndicate/drivers/tcp.rkt @@ -17,6 +17,7 @@ (provide (struct-out tcp-connection) (struct-out tcp-connection-peer) (struct-out tcp-accepted) + (struct-out tcp-rejected) (struct-out tcp-out) (struct-out tcp-in) (struct-out tcp-in-line) @@ -42,6 +43,7 @@ (assertion-struct tcp-connection (id spec)) (assertion-struct tcp-connection-peer (id addr)) (assertion-struct tcp-accepted (id)) +(assertion-struct tcp-rejected (id exn)) (message-struct tcp-out (id bytes)) (message-struct tcp-in (id bytes)) (message-struct tcp-in-line (id bytes)) @@ -65,20 +67,14 @@ (during/spawn (tcp-connection $id (tcp-address $host $port)) #:name (list 'drivers/tcp 'outbound id host port) - (define-values (cin cout) - (with-handlers ([exn:fail? (lambda (e) - ;; TODO: it'd be nice to somehow - ;; communicate the actual error to - ;; the local peer. - (log-syndicate/tcp-error "~a" (exn->string e)) - (define o (open-output-string)) - (close-output-port o) - (values (open-input-string "") - o))]) - (tcp:tcp-connect host port))) - (assert (tcp-accepted id)) - (define unblock! (run-connection id cin cout)) - (unblock!)) + (match (with-handlers ([exn:fail? (lambda (e) (list e))]) + (define-values (cin cout) (tcp:tcp-connect host port)) + (list cin cout)) + [(list e) (assert (tcp-rejected id e))] + [(list cin cout) + (assert (tcp-accepted id)) + (define unblock! (run-connection id cin cout)) + (unblock!)])) (during/spawn (observe (tcp-in-line $id _)) #:name (list 'drivers/tcp 'line-reader id) @@ -134,6 +130,7 @@ (assert (tcp-connection-peer id remote-addr)) (define unblock! (run-connection id cin cout)) (on (asserted (tcp-accepted id)) (unblock!)) + (stop-when (asserted (tcp-rejected id _))) (stop-when (retracted (tcp-accepted id)))))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; diff --git a/syndicate/examples/chat-client.rkt b/syndicate/examples/chat-client.rkt index 371476c..54b687e 100644 --- a/syndicate/examples/chat-client.rkt +++ b/syndicate/examples/chat-client.rkt @@ -2,20 +2,30 @@ (require/activate imperative-syndicate/drivers/tcp) (require/activate imperative-syndicate/drivers/external-event) +(require/activate imperative-syndicate/reassert) (require (only-in racket/port read-bytes-line-evt)) (spawn (define id 'chat) + (define root-facet (current-facet)) - (assert (tcp-connection id (tcp-address "localhost" 5999))) - (on (asserted (tcp-accepted id)) (printf "*** Connected.\n")) - (stop-when (retracted (tcp-accepted id)) (printf "*** Remote EOF. Terminating.\n")) + (reassert-on (tcp-connection id (tcp-address "localhost" 5999)) + (retracted (tcp-accepted id)) + (asserted (tcp-rejected id _))) - (on (message (tcp-in id $bs)) - (write-bytes bs) - (flush-output)) + (on (asserted (tcp-rejected id $reason)) + (printf "*** ~a\n" (exn-message reason))) - (define stdin-evt (read-bytes-line-evt (current-input-port) 'any)) - (on (message (inbound (external-event stdin-evt (list $line)))) - (if (eof-object? line) - (stop-current-facet (printf "*** Local EOF. Terminating.\n")) - (send! (tcp-out id (bytes-append line #"\n")))))) + (during (tcp-accepted id) + (on-start (printf "*** Connected.\n")) + (on (retracted (tcp-accepted id)) (printf "*** Remote EOF.\n")) + ;; ^ Not on-stop, because the facet is stopped by local EOF too! + + (on (message (tcp-in id $bs)) + (write-bytes bs) + (flush-output)) + + (define stdin-evt (read-bytes-line-evt (current-input-port) 'any)) + (on (message (inbound (external-event stdin-evt (list $line)))) + (if (eof-object? line) + (stop-facet root-facet (printf "*** Local EOF. Terminating.\n")) + (send! (tcp-out id (bytes-append line #"\n"))))))) diff --git a/syndicate/examples/netstack/fetchurl.rkt b/syndicate/examples/netstack/fetchurl.rkt index b2948cc..72ed953 100644 --- a/syndicate/examples/netstack/fetchurl.rkt +++ b/syndicate/examples/netstack/fetchurl.rkt @@ -12,10 +12,14 @@ (let () (define host "www.w3.org") + (define port 80) (define connection-id 'httpclient) - (define remote-handle (tcp-address (dns-get-address (dns-find-nameserver) "www.w3.org") 80)) + (define remote-handle (tcp-address (dns-get-address (dns-find-nameserver) host) port)) (spawn (assert (tcp-connection connection-id remote-handle)) + (stop-when (asserted (tcp-rejected connection-id $reason)) + (local-require racket/exn) + (printf "Connection failed:\n ~a" (exn->string reason))) (on (asserted (tcp-accepted connection-id)) (send! (tcp-out connection-id (bytes-append #"GET / HTTP/1.0\r\nHost: " diff --git a/syndicate/examples/netstack/main.rkt b/syndicate/examples/netstack/main.rkt index 0f32bb8..6119466 100644 --- a/syndicate/examples/netstack/main.rkt +++ b/syndicate/examples/netstack/main.rkt @@ -34,6 +34,20 @@ (send! (outbound (tcp-out id (string->bytes/utf-8 (~a user " says '" text "'\n"))))))))))) +(let () + (dataspace #:name 'connection-rejection-test + (spawn #:name 'connection-rejection-main + (local-require racket/exn) + (define peer-host "192.168.1.1") + ;; TODO: ^ this will only reliably "fail" the way we want on my own network... + (define peer-port 9999) + (assert (outbound (tcp-connection 'x (tcp-address peer-host peer-port)))) + (stop-when (asserted (inbound (tcp-rejected 'x $reason))) + (log-info "Connection to ~a:~a rejected:\n~a" peer-host peer-port (exn->string reason))) + (on (asserted (inbound (tcp-accepted 'x))) + (error 'connection-rejection-main + "Unexpected accepted connection???"))))) + (let ((dst (udp-listener 6667))) (dataspace #:name 'udp-echo-program-app (spawn #:name 'udp-echo-program diff --git a/syndicate/examples/netstack/tcp.rkt b/syndicate/examples/netstack/tcp.rkt index a44e76c..b7fbb84 100644 --- a/syndicate/examples/netstack/tcp.rkt +++ b/syndicate/examples/netstack/tcp.rkt @@ -2,6 +2,7 @@ (provide (struct-out tcp-connection) (struct-out tcp-accepted) + (struct-out tcp-rejected) (struct-out tcp-out) (struct-out tcp-in) (struct-out tcp-in-line) @@ -30,6 +31,7 @@ (assertion-struct tcp-connection (id spec)) (assertion-struct tcp-accepted (id)) +(assertion-struct tcp-rejected (id exn)) (message-struct tcp-out (id bytes)) (message-struct tcp-in (id bytes)) (message-struct tcp-in-line (id bytes)) @@ -671,8 +673,8 @@ (send! reset-packet)) (assert q) ;; Declare that this state vector exists - (on-start (log-netstack/tcp-info "Starting ~a" (tcp-quad->string #t q))) - (on-stop (log-netstack/tcp-info "Stopping ~a" (tcp-quad->string #t q))) + (on-start (log-netstack/tcp-info "Starting ~a" (tcp-quad->string (not outbound?) q))) + (on-stop (log-netstack/tcp-info "Stopping ~a" (tcp-quad->string (not outbound?) q))) (stop-when #:when (and (buffer-finished? (outbound)) (buffer-finished? (inbound)) @@ -701,7 +703,15 @@ (define is-syn? (set-member? flags 'syn)) (define is-fin? (set-member? flags 'fin)) (cond - [(set-member? flags 'rst) (stop-facet root-facet)] + [(set-member? flags 'rst) + (stop-facet root-facet + (when (not (connected?)) ;; --> rejected! + (define e (exn:fail:network + (format "~a: Connection rejected" (tcp-quad->string #f q)) + (current-continuation-marks))) + (react (assert (tcp-rejected connection-id e)) + (on-start (sleep 5) + (stop-current-facet)))))] [(and (not expected) ;; no syn yet (or (not is-syn?) ;; and this isn't it (and (not (listener-listening?)) ;; or it is, but no listener... @@ -743,6 +753,12 @@ (close-outbound-stream!))) (begin (assert #:when (connected?) (tcp-connection connection-id (tcp-listener local-port))) + (on (asserted (tcp-rejected connection-id _)) + ;; In principle, we have the flexibility to delay + ;; replying to SYN until userland decides whether or not + ;; to accept an incoming connection! We don't do that yet + ;; though. + (close-outbound-stream!)) (on (retracted (tcp-accepted connection-id)) (close-outbound-stream!)) (on-start (sleep 5) diff --git a/syndicate/reassert.rkt b/syndicate/reassert.rkt new file mode 100644 index 0000000..700eaf2 --- /dev/null +++ b/syndicate/reassert.rkt @@ -0,0 +1,35 @@ +#lang imperative-syndicate +;; Re-assert an assertion when one of a set of triggering events is seen, after a delay. +;; Building block for building reconnection strategies. + +(provide reassert-on + (struct-out fixed-retry)) + +(require/activate imperative-syndicate/drivers/timer) + +(struct fixed-retry (delay-ms) #:transparent + #:property prop:procedure + (lambda (f) (values (fixed-retry-delay-ms f) f))) + +(define-logger syndicate/reassert) + +(define-syntax reassert-on + (syntax-rules () + [(_ assertion #:strategy strategy reset-event ...) + (reassert-on* assertion + #:strategy strategy + (list (lambda (k) (stop-when reset-event (k))) ...))] + [(_ assertion reset-event ...) + (reassert-on assertion #:strategy (fixed-retry 1000) reset-event ...)])) + +(begin-for-declarations + (define (reassert-on* assertion #:strategy strategy event-fns) + (on-start (let reassert ((strategy strategy)) + (react (log-syndicate/reassert-debug "~v: Asserting" assertion) + (assert assertion) + (define (reset) + (log-syndicate/reassert-debug "~v: Resetting with ~v" assertion strategy) + (define-values (delay-ms next-strategy) (strategy)) + (sleep (/ delay-ms 1000.0)) + (reassert next-strategy)) + (for-each (lambda (f) (f reset)) event-fns))))))