From 8dba9a66c6627fbc8592659267fec4809d4a0e0a Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 25 Jul 2016 21:30:14 -0400 Subject: [PATCH] Line reader for TCP --- racket/syndicate/drivers/line-reader.rkt | 28 ++++++++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 racket/syndicate/drivers/line-reader.rkt diff --git a/racket/syndicate/drivers/line-reader.rkt b/racket/syndicate/drivers/line-reader.rkt new file mode 100644 index 0000000..0e6b8ee --- /dev/null +++ b/racket/syndicate/drivers/line-reader.rkt @@ -0,0 +1,28 @@ +#lang syndicate/actor + +(provide (struct-out tcp-channel-line)) + +(require "tcp.rkt") + +(struct tcp-channel-line (source destination bytes) #:prefab) + +;; This should probably be in the standard library. +(define (bytes-index bs b) + (define len (bytes-length bs)) + (let loop ((i 0)) + (cond [(= i len) #f] + [(eqv? (bytes-ref bs i) b) i] + [else (loop (+ i 1))]))) + +(actor + (react + (during/actor (observe (tcp-channel-line $src $dst _)) + (field [buffer #""]) + (on (message (tcp-channel src dst $bs)) + (buffer (bytes-append (buffer) bs))) + (begin/dataflow + (define newline-pos (bytes-index (buffer) (char->integer #\newline))) + (when newline-pos + (define line (subbytes (buffer) 0 newline-pos)) + (buffer (subbytes (buffer) (+ newline-pos 1))) + (send! (tcp-channel-line src dst line)))))))