hop-2012/server/queuenode.ml

118 lines
3.4 KiB
OCaml

(* Copyright 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>. *)
(* This file is part of Hop. *)
(* Hop is free software: you can redistribute it and/or modify it *)
(* under the terms of the GNU General Public License as published by the *)
(* Free Software Foundation, either version 3 of the License, or (at your *)
(* option) any later version. *)
(* Hop is distributed in the hope that it will be useful, but *)
(* WITHOUT ANY WARRANTY; without even the implied warranty of *)
(* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *)
(* General Public License for more details. *)
(* You should have received a copy of the GNU General Public License *)
(* along with Hop. If not, see <http://www.gnu.org/licenses/>. *)
open Lwt
open Sexp
open Status
(* TODO: on unsubscribe, wake up the shoveller to make it clean out its waiters queue *)
type t = {
name: Node.name;
subscriptions: Subscription.set_t;
backlog_in: Sexp.t Lwt_stream.t;
backlog_out: Sexp.t option -> unit;
waiters_in: Subscription.t Lwt_stream.t;
waiters_out: Subscription.t option -> unit;
mutable backlog: int;
mutable waiters: int;
}
let classname = (Bytes.of_string "queue")
let report info =
while_lwt true do
lwt () = Log.info (Printf.sprintf "%s: %d backlog, %d waiters"
(Bytes.to_string info.name.Node.label)
info.backlog
info.waiters) [] in
Lwt_unix.sleep 1.0
done
let shoveller info =
let rec message_loop () =
lwt body = Lwt_stream.next info.backlog_in in
let rec waiter_loop () =
lwt sub = Lwt_stream.next info.waiters_in in
match_lwt Subscription.send_to_subscription info.name info.subscriptions sub body with
| true ->
info.backlog <- info.backlog - 1;
info.waiters_out (Some sub);
message_loop ()
| false ->
info.waiters <- info.waiters - 1;
waiter_loop ()
in waiter_loop ()
in message_loop ()
let queue_handler info n sexp =
match Message.message_of_sexp sexp with
| Message.Post (name, body, token) ->
lwt () =
if info.backlog > 1000 && info.waiters > 0
then Lwt_unix.yield ()
else return ()
in
info.backlog <- info.backlog + 1;
info.backlog_out (Some body);
return ()
| Message.Subscribe (filter, Str sink, name, Str reply_sink, reply_name) ->
(match_lwt
Subscription.create info.name info.subscriptions filter sink name reply_sink reply_name
with
| Subscription.New sub ->
info.waiters <- info.waiters + 1;
info.waiters_out (Some sub);
return ()
| Subscription.Old sub ->
return ())
| Message.Unsubscribe (Str token) ->
ignore (Subscription.delete info.name info.subscriptions token);
info.waiters <- info.waiters - 1;
return ()
| m ->
Util.message_not_understood "queue" m
let queue_factory arg =
match arg with
| (Arr [Str name_str]) ->
let (bin, bout) = Lwt_stream.create () in
let (win, wout) = Lwt_stream.create () in
let info = {
name = Node.name_of_bytes name_str;
subscriptions = Subscription.new_set ();
backlog_in = bin;
backlog_out = bout;
waiters_in = win;
waiters_out = wout;
backlog = 0;
waiters = 0;
} in
replace_ok
(Node.make_idempotent_named classname info.name
(fun () ->
ignore (Util.create_thread name_str None shoveller info);
ignore (report info);
return ())
(queue_handler info))
(Str name_str)
| _ ->
return (Problem (litstr "bad-arg"))
let init () =
Factory.register_class classname queue_factory