2012-03-07 18:23:41 +00:00
|
|
|
(* Copyright 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>. *)
|
|
|
|
|
2012-05-01 21:36:38 +00:00
|
|
|
(* This file is part of Hop. *)
|
2012-03-07 18:23:41 +00:00
|
|
|
|
2012-05-01 21:36:38 +00:00
|
|
|
(* Hop is free software: you can redistribute it and/or modify it *)
|
2012-03-07 18:23:41 +00:00
|
|
|
(* under the terms of the GNU General Public License as published by the *)
|
|
|
|
(* Free Software Foundation, either version 3 of the License, or (at your *)
|
|
|
|
(* option) any later version. *)
|
|
|
|
|
2012-05-01 21:36:38 +00:00
|
|
|
(* Hop is distributed in the hope that it will be useful, but *)
|
2012-03-07 18:23:41 +00:00
|
|
|
(* WITHOUT ANY WARRANTY; without even the implied warranty of *)
|
|
|
|
(* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *)
|
|
|
|
(* General Public License for more details. *)
|
|
|
|
|
|
|
|
(* You should have received a copy of the GNU General Public License *)
|
2012-05-01 21:36:38 +00:00
|
|
|
(* along with Hop. If not, see <http://www.gnu.org/licenses/>. *)
|
2012-03-07 18:23:41 +00:00
|
|
|
|
2012-05-05 22:18:23 +00:00
|
|
|
open Lwt
|
2012-01-08 17:41:04 +00:00
|
|
|
open Sexp
|
2012-03-04 17:12:20 +00:00
|
|
|
open Status
|
2012-01-08 17:41:04 +00:00
|
|
|
|
2012-05-05 22:18:23 +00:00
|
|
|
(* TODO: on unsubscribe, wake up the shoveller to make it clean out its waiters queue *)
|
|
|
|
|
2012-01-08 17:41:04 +00:00
|
|
|
type t = {
|
2012-05-05 22:18:23 +00:00
|
|
|
name: Node.name;
|
|
|
|
subscriptions: Subscription.set_t;
|
|
|
|
backlog_in: Sexp.t Lwt_stream.t;
|
|
|
|
backlog_out: Sexp.t option -> unit;
|
|
|
|
waiters_in: Subscription.t Lwt_stream.t;
|
|
|
|
waiters_out: Subscription.t option -> unit;
|
|
|
|
mutable backlog: int;
|
|
|
|
mutable waiters: int;
|
|
|
|
}
|
2012-01-08 17:41:04 +00:00
|
|
|
|
|
|
|
let classname = "queue"
|
|
|
|
|
2012-05-06 03:00:10 +00:00
|
|
|
let report info =
|
|
|
|
while_lwt true do
|
|
|
|
lwt () = Log.info (Printf.sprintf "%s: %d backlog, %d waiters"
|
|
|
|
info.name.Node.label
|
|
|
|
info.backlog
|
|
|
|
info.waiters) [] in
|
|
|
|
Lwt_unix.sleep 1.0
|
|
|
|
done
|
2012-01-08 19:03:09 +00:00
|
|
|
|
2012-01-08 17:41:04 +00:00
|
|
|
let shoveller info =
|
2012-05-05 22:18:23 +00:00
|
|
|
let rec message_loop () =
|
|
|
|
lwt body = Lwt_stream.next info.backlog_in in
|
|
|
|
let rec waiter_loop () =
|
|
|
|
lwt sub = Lwt_stream.next info.waiters_in in
|
|
|
|
match_lwt Subscription.send_to_subscription info.name info.subscriptions sub body with
|
|
|
|
| true ->
|
|
|
|
info.backlog <- info.backlog - 1;
|
|
|
|
info.waiters_out (Some sub);
|
|
|
|
message_loop ()
|
|
|
|
| false ->
|
|
|
|
waiter_loop ()
|
|
|
|
in waiter_loop ()
|
|
|
|
in message_loop ()
|
|
|
|
|
|
|
|
let queue_handler info n sexp =
|
|
|
|
match Message.message_of_sexp sexp with
|
2012-01-08 17:41:04 +00:00
|
|
|
| Message.Post (name, body, token) ->
|
2012-05-06 03:07:20 +00:00
|
|
|
lwt () =
|
|
|
|
if info.backlog > 1000 && info.waiters > 0
|
|
|
|
then Lwt_unix.yield ()
|
|
|
|
else return ()
|
|
|
|
in
|
2012-05-05 22:18:23 +00:00
|
|
|
info.backlog <- info.backlog + 1;
|
|
|
|
info.backlog_out (Some body);
|
|
|
|
return ()
|
2012-01-08 17:41:04 +00:00
|
|
|
| Message.Subscribe (filter, Str sink, name, Str reply_sink, reply_name) ->
|
2012-05-05 22:18:23 +00:00
|
|
|
lwt sub =
|
|
|
|
Subscription.create
|
|
|
|
info.name info.subscriptions filter sink name reply_sink reply_name in
|
|
|
|
info.waiters <- info.waiters + 1;
|
|
|
|
info.waiters_out (Some sub);
|
|
|
|
return ()
|
2012-01-08 17:41:04 +00:00
|
|
|
| Message.Unsubscribe (Str token) ->
|
2012-05-05 22:18:23 +00:00
|
|
|
ignore (Subscription.delete info.name info.subscriptions token);
|
|
|
|
info.waiters <- info.waiters - 1;
|
|
|
|
return ()
|
2012-01-08 17:41:04 +00:00
|
|
|
| m ->
|
2012-05-05 22:18:23 +00:00
|
|
|
Util.message_not_understood "queue" m
|
2012-01-08 17:41:04 +00:00
|
|
|
|
|
|
|
let queue_factory arg =
|
|
|
|
match arg with
|
2012-05-05 22:18:23 +00:00
|
|
|
| (Arr [Str name_str]) ->
|
|
|
|
let (bin, bout) = Lwt_stream.create () in
|
|
|
|
let (win, wout) = Lwt_stream.create () in
|
2012-01-08 17:41:04 +00:00
|
|
|
let info = {
|
2012-05-05 15:46:48 +00:00
|
|
|
name = Node.name_of_string name_str;
|
2012-01-08 17:41:04 +00:00
|
|
|
subscriptions = Subscription.new_set ();
|
2012-05-05 22:18:23 +00:00
|
|
|
backlog_in = bin;
|
|
|
|
backlog_out = bout;
|
|
|
|
waiters_in = win;
|
|
|
|
waiters_out = wout;
|
|
|
|
backlog = 0;
|
2012-05-06 03:00:10 +00:00
|
|
|
waiters = 0;
|
2012-01-08 17:41:04 +00:00
|
|
|
} in
|
2012-05-05 22:18:23 +00:00
|
|
|
replace_ok
|
2012-05-06 03:00:10 +00:00
|
|
|
(Node.make_idempotent_named classname info.name
|
|
|
|
(fun () ->
|
|
|
|
ignore (Util.create_thread name_str None shoveller info);
|
|
|
|
ignore (report info);
|
|
|
|
return ())
|
|
|
|
(queue_handler info))
|
2012-05-05 22:18:23 +00:00
|
|
|
(Str name_str)
|
|
|
|
| _ ->
|
|
|
|
return (Problem (Str "bad-arg"))
|
2012-01-08 17:41:04 +00:00
|
|
|
|
|
|
|
let init () =
|
|
|
|
Factory.register_class classname queue_factory
|