examples/flink: create tasks & jobs from an input string rather than
manually
This commit is contained in:
parent
5cb0462ec4
commit
fb778ab1ee
|
@ -1,6 +1,7 @@
|
|||
#lang syndicate
|
||||
|
||||
(require (only-in racket/set
|
||||
set
|
||||
set-count
|
||||
set-empty?
|
||||
set-first
|
||||
|
@ -11,6 +12,13 @@
|
|||
split-at))
|
||||
(require (only-in racket/hash
|
||||
hash-union))
|
||||
(require (only-in racket/string
|
||||
string-split))
|
||||
(require (only-in racket/sequence
|
||||
sequence->list))
|
||||
|
||||
(module+ test
|
||||
(require rackunit))
|
||||
|
||||
;; ---------------------------------------------------------------------------------------------------
|
||||
;; Logging
|
||||
|
@ -297,19 +305,129 @@
|
|||
(define (split-at/lenient lst n)
|
||||
(split-at lst (min n (length lst))))
|
||||
|
||||
;; ---------------------------------------------------------------------------------------------------
|
||||
;; Creating a Job
|
||||
|
||||
;; a WorkDesc is one of
|
||||
;; (map-task data)
|
||||
;; (reduce-task WorkDesc WorkDesc)
|
||||
|
||||
;; (Listof WordDesc) -> (Values (Listof WorkDesc) (Optionof WorkDesc))
|
||||
;; Pair up elements of the input list into a list of reduce tasks, and if the input list is odd also
|
||||
;; return the odd-one out
|
||||
(define (pair-up ls)
|
||||
(let loop ([ls ls]
|
||||
[reductions '()])
|
||||
(match ls
|
||||
['()
|
||||
(values reductions #f)]
|
||||
[(list x)
|
||||
(values reductions x)]
|
||||
[(list-rest x y more)
|
||||
(loop more (cons (reduce-task x y) reductions))])))
|
||||
|
||||
|
||||
;; a TaskTree is one of
|
||||
;; (map-task data)
|
||||
;; (reduce-task TaskTree TaskTree)
|
||||
|
||||
;; (Listof String) -> TaskTree
|
||||
;; Create a tree structure of tasks
|
||||
(define (create-task-tree lines)
|
||||
(define map-tasks
|
||||
(for/list ([line (in-list lines)])
|
||||
;; it may be more realistic to have the task runner do the split,
|
||||
;; but this is how Jonathan's input looks
|
||||
(map-task (string-split line))))
|
||||
;; build the tree up from the leaves
|
||||
(let loop ([nodes map-tasks])
|
||||
(match nodes
|
||||
['()
|
||||
;; input was empty
|
||||
(map-task "")]
|
||||
[(list x)
|
||||
x]
|
||||
[_
|
||||
(define-values (reductions left-over?)
|
||||
(pair-up nodes))
|
||||
(loop (if left-over?
|
||||
(cons left-over? reductions)
|
||||
reductions))])))
|
||||
|
||||
;; TaskTree -> (Listof Task)
|
||||
;; flatten a task tree by assigning job-unique IDs
|
||||
(define (task-tree->list tt)
|
||||
(define-values (tasks _)
|
||||
;; TaskTree ID -> (Values (Listof Task) ID)
|
||||
;; the input id is for the current node of the tree
|
||||
;; returned id is the "next available" id, given ids are assigned in strict ascending order
|
||||
(let loop ([tt tt]
|
||||
[next-id 0])
|
||||
(match tt
|
||||
[(map-task _)
|
||||
(values (list (task next-id tt))
|
||||
(add1 next-id))]
|
||||
[(reduce-task left right)
|
||||
(define left-id (add1 next-id))
|
||||
(define-values (lefts right-id)
|
||||
(loop left left-id))
|
||||
(define-values (rights next)
|
||||
(loop right right-id))
|
||||
(values (cons (task next-id (reduce-task left-id right-id))
|
||||
(append lefts rights))
|
||||
next)])))
|
||||
tasks)
|
||||
|
||||
;; InputPort -> Job
|
||||
(define (create-job in)
|
||||
(define job-id (gensym 'job))
|
||||
(define input-lines (sequence->list (in-lines in)))
|
||||
(define tasks (task-tree->list (create-task-tree input-lines)))
|
||||
(job job-id tasks))
|
||||
|
||||
;; String -> Job
|
||||
(define (string->job s)
|
||||
(create-job (open-input-string s)))
|
||||
|
||||
(module+ test
|
||||
(test-case
|
||||
"two-line job parsing"
|
||||
(define input "a b c\nd e f")
|
||||
(define j (string->job input))
|
||||
(check-true (job? j))
|
||||
(match-define (job jid tasks) j)
|
||||
(check-true (id? jid))
|
||||
(check-true (list? tasks))
|
||||
(check-true (andmap task? tasks))
|
||||
(match tasks
|
||||
[(list-no-order (task rid (reduce-task left right))
|
||||
(task mid1 (map-task data1))
|
||||
(task mid2 (map-task data2)))
|
||||
(check-true (id? left))
|
||||
(check-true (id? right))
|
||||
(check-equal? (set left right) (set mid1 mid2))
|
||||
(check-equal? (set (list "a" "b" "c") (list "d" "e" "f"))
|
||||
(set data1 data2))]
|
||||
[_
|
||||
(displayln tasks)]))
|
||||
(test-case
|
||||
"empty input"
|
||||
(define input "")
|
||||
(define j (string->job input))
|
||||
(check-true (job? j))
|
||||
(match-define (job jid tasks) j)
|
||||
(check-true (id? jid))
|
||||
(check-true (list? tasks))
|
||||
(check-equal? (length tasks) 1)
|
||||
(check-equal? (task-desc (car tasks))
|
||||
(map-task ""))))
|
||||
|
||||
;; ---------------------------------------------------------------------------------------------------
|
||||
;; Client
|
||||
|
||||
(define (spawn-client)
|
||||
;; Job -> Void
|
||||
(define (spawn-client j)
|
||||
(spawn
|
||||
(define j (job 1
|
||||
(list (task 1 (map-task (list "a" "b" "c" "a" "b" "c")))
|
||||
(task 2 (map-task (list "a" "b")))
|
||||
(task 3 (map-task (list "a" "b")))
|
||||
(task 4 (map-task (list "a" "b")))
|
||||
(task 5 (reduce-task 1 2))
|
||||
(task 6 (reduce-task 3 4))
|
||||
(task 7 (reduce-task 5 6)))))
|
||||
(assert j)
|
||||
(on (asserted (job-finished (job-id j) $data))
|
||||
(printf "job done!\n~a\n" data))))
|
||||
|
@ -317,7 +435,12 @@
|
|||
;; ---------------------------------------------------------------------------------------------------
|
||||
;; Main
|
||||
|
||||
(spawn-client)
|
||||
(define input "a b c a b c\na b\n a b\na b")
|
||||
(define j (string->job input))
|
||||
;; expected:
|
||||
;; #hash((a . 5) (b . 5) (c . 2))
|
||||
|
||||
(spawn-client j)
|
||||
(spawn-job-manager)
|
||||
(spawn-task-manager)
|
||||
(spawn-task-runner)
|
||||
|
|
Loading…
Reference in New Issue