Multiserver.

This commit is contained in:
Tony Garnock-Jones 2016-07-28 15:44:00 -04:00
parent 56d2fc2c0d
commit 9241775879
1 changed files with 104 additions and 0 deletions

View File

@ -0,0 +1,104 @@
#lang syndicate/actor
(require racket/file)
(require racket/serialize)
(require racket/set)
(require operational-transformation)
(require operational-transformation/text/simple-document)
(require/activate syndicate/drivers/tcp)
(require/activate syndicate/drivers/line-reader)
(struct snapshot-for (filename snap) #:prefab)
(struct proposed-op (filename p) #:prefab)
(struct accepted-op (filename p) #:prefab)
(struct client-seen-up-to (filename revision) #:prefab)
(define cmdline-port (make-parameter 5889))
(define cmdline-filenames (make-parameter '()))
(actor (for [(filename (cmdline-filenames))]
(run-one-server filename)))
(define (run-one-server filename)
(actor (react (field [state (make-server (simple-document
(if (file-exists? filename)
(begin (log-info "loading ~v" filename)
(file->string filename))
(begin (log-info "will create ~v" filename)
""))))])
(assert (snapshot-for filename (extract-snapshot (state))))
(define/query-set client-seen-revs (client-seen-up-to filename $rev) rev)
(field [oldest-needed-rev #f])
(begin/dataflow
(define min-rev
(or (for/fold [(min-rev #f)] [(rev (client-seen-revs))]
(min (or min-rev rev) rev))
(server-state-revision (state))))
(when (not (equal? (oldest-needed-rev) min-rev))
(oldest-needed-rev min-rev)
(state (forget-operation-history (state) min-rev))))
(begin/dataflow
(display-to-file (simple-document-text (server-state-document (state)))
filename
#:exists 'replace))
(on (message (proposed-op filename $p))
(state (incorporate-operation-from-client (state) p))
(define sp (extract-operation (state)))
(when sp (send! (accepted-op filename sp)))))))
(actor (define s (tcp-listener (cmdline-port)))
(log-info "listening on port ~v" (cmdline-port))
(forever (assert (advertise (observe (tcp-channel _ s _))))
(during/actor (advertise (tcp-channel $c s _))
(assert (advertise (tcp-channel s c _)))
(on-start (log-info "~a: connected" c))
(on-stop (log-info "~a: disconnected" c))
(connection-react c s))))
(define (connection-react c s)
(define (output v)
;; (log-info "~a: sending them ~v" c v)
(define p (open-output-bytes))
(write (serialize v) p)
(newline p)
(send! (tcp-channel s c (get-output-bytes p))))
(field [seen-up-to 0])
(assert #:when (selected-filename) (client-seen-up-to (selected-filename) (seen-up-to)))
(define/query-set available-filenames (observe (proposed-op $f _)) f)
(begin/dataflow
(output (set->list (available-filenames))))
(field [selected-filename #f])
(begin/dataflow
(when (selected-filename)
(log-info "~a: attached to file ~a" c (selected-filename))
(let-event [(asserted (snapshot-for (selected-filename) $snapshot))]
(output snapshot)
(seen-up-to (server-snapshot-revision snapshot)))))
(on #:when (selected-filename)
(message (accepted-op (selected-filename) $p))
(output p))
(on (message (tcp-channel-line c s $line))
(match (deserialize (read (open-input-bytes line)))
[(? string? new-filename)
(when (selected-filename) (log-info "~a: detached from file ~a" c (selected-filename)))
(seen-up-to 0)
(selected-filename new-filename)]
[(? number? n) (seen-up-to n)]
[(? pending-operation? p) (send! (proposed-op (selected-filename) p))])))
(module+ main
(require racket/cmdline)
(command-line
#:once-each
[("-p" "--port") server-port ((format "Server port (default ~v)" (cmdline-port)))
(cmdline-port (string->number server-port))]
#:args filenames
(cmdline-filenames filenames)))