hop-2012/server/queuenode.ml

118 lines
3.4 KiB
OCaml
Raw Normal View History

2012-03-07 18:23:41 +00:00
(* Copyright 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>. *)
2012-05-01 21:36:38 +00:00
(* This file is part of Hop. *)
2012-03-07 18:23:41 +00:00
2012-05-01 21:36:38 +00:00
(* Hop is free software: you can redistribute it and/or modify it *)
2012-03-07 18:23:41 +00:00
(* under the terms of the GNU General Public License as published by the *)
(* Free Software Foundation, either version 3 of the License, or (at your *)
(* option) any later version. *)
2012-05-01 21:36:38 +00:00
(* Hop is distributed in the hope that it will be useful, but *)
2012-03-07 18:23:41 +00:00
(* WITHOUT ANY WARRANTY; without even the implied warranty of *)
(* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *)
(* General Public License for more details. *)
(* You should have received a copy of the GNU General Public License *)
2012-05-01 21:36:38 +00:00
(* along with Hop. If not, see <http://www.gnu.org/licenses/>. *)
2012-03-07 18:23:41 +00:00
2012-05-05 22:18:23 +00:00
open Lwt
2012-01-08 17:41:04 +00:00
open Sexp
open Status
2012-01-08 17:41:04 +00:00
2012-05-05 22:18:23 +00:00
(* TODO: on unsubscribe, wake up the shoveller to make it clean out its waiters queue *)
2012-01-08 17:41:04 +00:00
type t = {
2012-05-05 22:18:23 +00:00
name: Node.name;
subscriptions: Subscription.set_t;
backlog_in: Sexp.t Lwt_stream.t;
backlog_out: Sexp.t option -> unit;
waiters_in: Subscription.t Lwt_stream.t;
waiters_out: Subscription.t option -> unit;
mutable backlog: int;
mutable waiters: int;
}
2012-01-08 17:41:04 +00:00
let classname = (Bytes.of_string "queue")
2012-01-08 17:41:04 +00:00
let report info =
while_lwt true do
lwt () = Log.info (Printf.sprintf "%s: %d backlog, %d waiters"
(Bytes.to_string info.name.Node.label)
info.backlog
info.waiters) [] in
Lwt_unix.sleep 1.0
done
2012-01-08 19:03:09 +00:00
2012-01-08 17:41:04 +00:00
let shoveller info =
2012-05-05 22:18:23 +00:00
let rec message_loop () =
lwt body = Lwt_stream.next info.backlog_in in
let rec waiter_loop () =
lwt sub = Lwt_stream.next info.waiters_in in
match_lwt Subscription.send_to_subscription info.name info.subscriptions sub body with
| true ->
info.backlog <- info.backlog - 1;
info.waiters_out (Some sub);
message_loop ()
| false ->
2012-05-31 09:17:28 +00:00
info.waiters <- info.waiters - 1;
2012-05-05 22:18:23 +00:00
waiter_loop ()
in waiter_loop ()
in message_loop ()
let queue_handler info n sexp =
match Message.message_of_sexp sexp with
2012-01-08 17:41:04 +00:00
| Message.Post (name, body, token) ->
2012-05-06 03:07:20 +00:00
lwt () =
if info.backlog > 1000 && info.waiters > 0
then Lwt_unix.yield ()
else return ()
in
2012-05-05 22:18:23 +00:00
info.backlog <- info.backlog + 1;
info.backlog_out (Some body);
return ()
2012-01-08 17:41:04 +00:00
| Message.Subscribe (filter, Str sink, name, Str reply_sink, reply_name) ->
2012-05-10 19:22:58 +00:00
(match_lwt
Subscription.create info.name info.subscriptions filter sink name reply_sink reply_name
with
| Subscription.New sub ->
info.waiters <- info.waiters + 1;
info.waiters_out (Some sub);
return ()
| Subscription.Old sub ->
return ())
2012-01-08 17:41:04 +00:00
| Message.Unsubscribe (Str token) ->
2012-05-05 22:18:23 +00:00
ignore (Subscription.delete info.name info.subscriptions token);
info.waiters <- info.waiters - 1;
return ()
2012-01-08 17:41:04 +00:00
| m ->
2012-05-05 22:18:23 +00:00
Util.message_not_understood "queue" m
2012-01-08 17:41:04 +00:00
let queue_factory arg =
match arg with
2012-05-05 22:18:23 +00:00
| (Arr [Str name_str]) ->
let (bin, bout) = Lwt_stream.create () in
let (win, wout) = Lwt_stream.create () in
2012-01-08 17:41:04 +00:00
let info = {
name = Node.name_of_bytes name_str;
2012-01-08 17:41:04 +00:00
subscriptions = Subscription.new_set ();
2012-05-05 22:18:23 +00:00
backlog_in = bin;
backlog_out = bout;
waiters_in = win;
waiters_out = wout;
backlog = 0;
waiters = 0;
2012-01-08 17:41:04 +00:00
} in
2012-05-05 22:18:23 +00:00
replace_ok
(Node.make_idempotent_named classname info.name
(fun () ->
ignore (Util.create_thread name_str None shoveller info);
ignore (report info);
return ())
(queue_handler info))
2012-05-05 22:18:23 +00:00
(Str name_str)
| _ ->
return (Problem (litstr "bad-arg"))
2012-01-08 17:41:04 +00:00
let init () =
Factory.register_class classname queue_factory