hop-2012/experiments/erlang/src/hop_relay.erl

130 lines
4.6 KiB
Erlang

%% Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
%%
%% This file is part of Hop.
%%
%% Hop is free software: you can redistribute it and/or modify it
%% under the terms of the GNU General Public License as published by
%% the Free Software Foundation, either version 3 of the License, or
%% (at your option) any later version.
%%
%% Hop is distributed in the hope that it will be useful, but WITHOUT
%% ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
%% or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
%% License for more details.
%%
%% You should have received a copy of the GNU General Public License
%% along with Hop. If not, see <http://www.gnu.org/licenses/>.
-module(hop_relay).
-behaviour(gen_server).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
-export([hop_create/1, start_link/1]).
hop_create([HostBin, PortBin]) ->
Host = binary_to_list(HostBin),
Port = list_to_integer(binary_to_list(PortBin)),
case gen_tcp:connect(Host, Port, [{active, false}]) of
{ok, Sock} ->
{ok, Pid} = start_link([Sock]),
{ok, []};
{error, Reason} ->
{error, iolist_to_binary(io_lib:format("Connect failed: ~p", [Reason]))}
end.
start_link(Args) ->
gen_server:start_link(?MODULE, Args, []).
%%---------------------------------------------------------------------------
-record(state, {sock, parser}).
send(Sexp, State = #state{sock = Sock}) ->
%% Using port_command is dicey, but done to avoid selective
%% receive in gen_tcp:send/2, which causes crippling slowdown when
%% the queue of outbound sexps waiting to be relayed gets long.
port_command(Sock, sexp:format_iolist(Sexp)),
State.
request_data(Sock) ->
%% We use prim_inet:async_recv here, again to avoid selective
%% receives.
{ok, _Ref} = prim_inet:async_recv(Sock, 0, -1),
ok.
send_error(Message, Details, State) ->
send([<<"error">>, list_to_binary(Message), Details], State).
handle_sexp1(Sexp, State) ->
error_logger:info_report({?MODULE, self(), Sexp}),
handle_sexp(Sexp, State).
handle_sexp([<<"post">>, Name, Body, _Token], State) ->
_ = hop:send(Name, Body),
State;
handle_sexp([<<"subscribe">>, Filter, _Sink, _Name, ReplySink, ReplyName], State) ->
case global:register_name(Filter, self()) of
yes ->
_ = hop:post(ReplySink, ReplyName, [<<"subscribe-ok">>, Filter], <<>>),
State;
no ->
error_logger:warning_report({?MODULE, bind_failed, Filter}),
State
end;
handle_sexp([<<"unsubscribe">>, Token], State) ->
global:unregister_name(Token), %% TODO security problem
State;
handle_sexp(Other, State) ->
send_error("Message not understood", [Other], State).
%%---------------------------------------------------------------------------
init([Sock]) ->
{ok, #state{sock = Sock, parser = sexp:parse_state()}}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
handle_call(_Request, _From, State) ->
{stop, {bad_call, _Request}, State}.
handle_cast({socket_control_transferred, Sock}, State0 = #state{sock = Sock}) ->
inet:setopts(Sock, [binary]),
request_data(Sock),
State1 = send([<<"hop">>], State0),
State2 = send([<<"subscribe">>, hop:name(), <<>>, <<>>, <<>>, <<>>], State1),
{noreply, State2};
handle_cast(_Request, State) ->
{stop, {bad_cast, _Request}, State}.
handle_info({hop, Sexp}, State) ->
{noreply, send(Sexp, State)};
handle_info({inet_async, Sock, _Ref, {ok, Bin}}, State = #state{sock = Sock, parser = Parser})
when is_binary(Bin) ->
case catch sexp:parse(Bin, Parser) of
{'EXIT', Reason} ->
{stop,
{received_syntax_error, Reason},
send_error("Syntax error",
[<<"http://people.csail.mit.edu/rivest/Sexp.txt">>],
State)};
{Terms, NewParser} ->
NewState = lists:foldl(fun handle_sexp/2, State#state{parser = NewParser}, Terms),
request_data(Sock),
{noreply, NewState}
end;
handle_info({tcp_closed, Sock}, State = #state{sock = Sock}) ->
{stop, normal, State};
handle_info({inet_reply, Sock, _}, State = #state{sock = Sock}) ->
%% These are the replies to the raw port_command we're doing above
%% in send/2. We ignore them because higher level protocols deal
%% with acknowledgements and errors, and we trust that the socket
%% will close eventually if write failures start to happen.
{noreply, State};
handle_info(_Message, State) ->
{stop, {bad_info, _Message}, State}.