hop-2012/experiments/erlang/src/hop_queue.erl

126 lines
4.5 KiB
Erlang

%% Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
%%
%% This file is part of Hop.
%%
%% Hop is free software: you can redistribute it and/or modify it
%% under the terms of the GNU General Public License as published by
%% the Free Software Foundation, either version 3 of the License, or
%% (at your option) any later version.
%%
%% Hop is distributed in the hope that it will be useful, but WITHOUT
%% ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
%% or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
%% License for more details.
%%
%% You should have received a copy of the GNU General Public License
%% along with Hop. If not, see <http://www.gnu.org/licenses/>.
-module(hop_queue).
-behaviour(gen_server).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
-export([hop_create/1]).
-include("hop.hrl").
hop_create([Name]) ->
{ok, Pid} = gen_server:start(?MODULE, [], []),
case hop:register_idempotent(Name, Pid, ?MODULE) of
ok ->
{ok, []};
{error, Info} ->
gen_server:cast(Pid, shutdown),
{error, Info}
end.
%%---------------------------------------------------------------------------
-record(state, {backlog, waiters}).
tick(State) ->
tick(State, 2).
tick(State, 0) ->
{0, State};
tick(State = #state{backlog = Backlog, waiters = Waiters}, TicksLeft) ->
case queue:out(Waiters) of
{empty, _} ->
{infinity, State};
{{value, WaiterRef}, WaitersRemainder} ->
case get(WaiterRef) of
undefined ->
tick(State#state{waiters = WaitersRemainder}, TicksLeft);
#hop_sub{ref = Ref, sink = Sink, name = Name} ->
case queue:out(Backlog) of
{empty, _} ->
{infinity, State};
{{value, Message}, BacklogRemainder} ->
case hop:post(Sink, Name, Message, term_to_binary(Ref)) of
true ->
NewState = State#state{backlog = BacklogRemainder,
waiters = queue:in(WaiterRef,
WaitersRemainder)},
tick(NewState, TicksLeft - 1);
false ->
tick(State#state{waiters = WaitersRemainder}, TicksLeft)
end
end
end
end.
noreply(State) ->
{Timeout, NewState} = tick(State),
{noreply, NewState, Timeout}.
reply(Reply, State) ->
{Timeout, NewState} = tick(State),
{reply, Reply, NewState, Timeout}.
init([]) ->
{ok, #state{backlog = queue:new(), waiters = queue:new()}}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
handle_call(hop_class_module, _From, State) ->
reply(?MODULE, State);
handle_call(_Request, _From, State) ->
{stop, {bad_call, _Request}, State}.
handle_cast(shutdown, State) ->
{stop, normal, State};
handle_cast(_Request, State) ->
{stop, {bad_cast, _Request}, State}.
handle_info({hop, Sexp}, State = #state{backlog = OldBacklog, waiters = OldWaiters}) ->
noreply(case Sexp of
[<<"post">>, _Name, Body, _Token] ->
State#state{backlog = queue:in(Body, OldBacklog)};
[<<"subscribe">>, Filter, Sink, Name, ReplySink, ReplyName] ->
SubRef = make_ref(),
Sub = #hop_sub{ref = SubRef, filter = Filter, sink = Sink, name = Name},
put(SubRef, Sub),
_ = hop:post(ReplySink, ReplyName,
[<<"subscribe-ok">>, term_to_binary(SubRef)], <<>>),
State#state{waiters = queue:in(SubRef, OldWaiters)};
[<<"unsubscribe">>, Token] ->
case catch binary_to_term(Token) of
SubRef when is_reference(SubRef) ->
erase(SubRef),
State;
_ ->
State
end;
_ ->
error_logger:warning_report({?MODULE, message_not_understood, Sexp}),
State
end);
handle_info(timeout, State) ->
noreply(State);
handle_info(_Message, State) ->
{stop, {bad_info, _Message}, State}.