From 26779e1a4a9d1a2fb851b250e6fd16d89d4a9be8 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 31 May 2012 10:19:45 +0100 Subject: [PATCH] Erlang hop --- experiments/erlang/Makefile | 2 +- experiments/erlang/include/hop.hrl | 18 ++++ experiments/erlang/src/hop.erl | 58 ++++++++++++ experiments/erlang/src/hop_demo.erl | 27 ++++++ experiments/erlang/src/hop_factory.erl | 80 ++++++++++++++++ experiments/erlang/src/hop_queue.erl | 125 +++++++++++++++++++++++++ experiments/erlang/src/hop_relay.erl | 119 +++++++++++++++++++++++ experiments/erlang/src/hop_server.erl | 79 ++++++++++++++++ 8 files changed, 507 insertions(+), 1 deletion(-) create mode 100644 experiments/erlang/include/hop.hrl create mode 100644 experiments/erlang/src/hop.erl create mode 100644 experiments/erlang/src/hop_demo.erl create mode 100644 experiments/erlang/src/hop_factory.erl create mode 100644 experiments/erlang/src/hop_queue.erl create mode 100644 experiments/erlang/src/hop_relay.erl create mode 100644 experiments/erlang/src/hop_server.erl diff --git a/experiments/erlang/Makefile b/experiments/erlang/Makefile index 61a938b..5a5da5c 100644 --- a/experiments/erlang/Makefile +++ b/experiments/erlang/Makefile @@ -14,4 +14,4 @@ veryclean: clean run: compile erl -pa ebin \ -boot start_sasl \ - -run hop_demo start localhost + -run hop_demo start 5671 diff --git a/experiments/erlang/include/hop.hrl b/experiments/erlang/include/hop.hrl new file mode 100644 index 0000000..b2899ed --- /dev/null +++ b/experiments/erlang/include/hop.hrl @@ -0,0 +1,18 @@ +%% Copyright 2010, 2011, 2012 Tony Garnock-Jones . +%% +%% This file is part of Hop. +%% +%% Hop is free software: you can redistribute it and/or modify it +%% under the terms of the GNU General Public License as published by +%% the Free Software Foundation, either version 3 of the License, or +%% (at your option) any later version. +%% +%% Hop is distributed in the hope that it will be useful, but WITHOUT +%% ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +%% or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public +%% License for more details. +%% +%% You should have received a copy of the GNU General Public License +%% along with Hop. If not, see . + +-record(hop_sub, {ref, filter, sink, name}). diff --git a/experiments/erlang/src/hop.erl b/experiments/erlang/src/hop.erl new file mode 100644 index 0000000..18b9980 --- /dev/null +++ b/experiments/erlang/src/hop.erl @@ -0,0 +1,58 @@ +%% Copyright 2010, 2011, 2012 Tony Garnock-Jones . +%% +%% This file is part of Hop. +%% +%% Hop is free software: you can redistribute it and/or modify it +%% under the terms of the GNU General Public License as published by +%% the Free Software Foundation, either version 3 of the License, or +%% (at your option) any later version. +%% +%% Hop is distributed in the hope that it will be useful, but WITHOUT +%% ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +%% or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public +%% License for more details. +%% +%% You should have received a copy of the GNU General Public License +%% along with Hop. If not, see . + +-module(hop). + +-export([register_idempotent/3, class_of/1, send/2, post/4]). + +register_idempotent(Name, Pid, ClassModule) -> + case global:register_name(Name, Pid) of + yes -> + ok; + no -> + case class_of(Name) of + undefined -> + register_idempotent(Name, Pid, ClassModule); + ClassModule -> + ok; + _ -> + {error, <<"class-mismatch">>} + end + end. + +class_of(Name) -> + case global:whereis_name(Name) of + undefined -> + undefined; + Pid -> + gen_server:call(Pid, hop_class_module) + end. + +send(<<>>, _Body) -> + ok; +send(Name, Body) -> + case global:whereis_name(Name) of + undefined -> + error_logger:warning_report({?MODULE, send, undefined_name, Name}), + false; + Pid -> + Pid ! {hop, Body}, + true + end. + +post(Sink, Name, Body, Token) -> + send(Sink, [<<"post">>, Name, Body, Token]). diff --git a/experiments/erlang/src/hop_demo.erl b/experiments/erlang/src/hop_demo.erl new file mode 100644 index 0000000..a067371 --- /dev/null +++ b/experiments/erlang/src/hop_demo.erl @@ -0,0 +1,27 @@ +%% Copyright 2010, 2011, 2012 Tony Garnock-Jones . +%% +%% This file is part of Hop. +%% +%% Hop is free software: you can redistribute it and/or modify it +%% under the terms of the GNU General Public License as published by +%% the Free Software Foundation, either version 3 of the License, or +%% (at your option) any later version. +%% +%% Hop is distributed in the hope that it will be useful, but WITHOUT +%% ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +%% or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public +%% License for more details. +%% +%% You should have received a copy of the GNU General Public License +%% along with Hop. If not, see . + +-module(hop_demo). + +-export([start/1]). + +start([Port]) -> + hop_factory:start_link([]), + ok = hop_factory:register_class(<<"queue">>, hop_queue), + hop_server:start_link(hop_relay, "0.0.0.0", list_to_integer(Port), + [{reuseaddr, true}, {active, false}], + []). diff --git a/experiments/erlang/src/hop_factory.erl b/experiments/erlang/src/hop_factory.erl new file mode 100644 index 0000000..2512a8a --- /dev/null +++ b/experiments/erlang/src/hop_factory.erl @@ -0,0 +1,80 @@ +%% Copyright 2010, 2011, 2012 Tony Garnock-Jones . +%% +%% This file is part of Hop. +%% +%% Hop is free software: you can redistribute it and/or modify it +%% under the terms of the GNU General Public License as published by +%% the Free Software Foundation, either version 3 of the License, or +%% (at your option) any later version. +%% +%% Hop is distributed in the hope that it will be useful, but WITHOUT +%% ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +%% or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public +%% License for more details. +%% +%% You should have received a copy of the GNU General Public License +%% along with Hop. If not, see . + +-module(hop_factory). + +-behaviour(gen_server). +-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). + +-export([start_link/1, register_class/2]). + +start_link(Args) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, Args, []). + +register_class(ClassName, ClassModule) -> + gen_server:call(?MODULE, {register_class, ClassName, ClassModule}). + +%%--------------------------------------------------------------------------- + +-record(state, {classes}). + +init([]) -> + yes = global:register_name(<<"factory">>, self()), + {ok, #state{classes = []}}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +handle_call({register_class, ClassName, ClassModule}, _From, State = #state{classes = Classes}) -> + {reply, ok, State#state{classes = [{ClassName, ClassModule} | Classes]}}; +handle_call(_Request, _From, State) -> + {stop, {bad_call, _Request}, State}. + +handle_cast(_Request, State) -> + {stop, {bad_cast, _Request}, State}. + +handle_info({hop, Sexp}, State = #state{classes = Classes}) -> + case Sexp of + [<<"create">>, ClassName, Arg, ReplySink, ReplyName] -> + Reply = + case lists:keysearch(ClassName, 1, Classes) of + false -> + error_logger:warning_report({?MODULE, class_not_found, ClassName}), + [<<"create-failed">>, [<<"factory">>, <<"class-not-found">>]]; + {value, {_, ClassModule}} -> + case catch ClassModule:hop_create(Arg) of + {ok, Info} -> + [<<"create-ok">>, Info]; + {error, Info} -> + [<<"create-failed">>, [<<"constructor">>, Info]]; + Otherwise -> + error_logger:warning_report({?MODULE, creation_failed, + Sexp, Otherwise}), + [<<"create-failed">>, [<<"constructor">>]] + end + end, + hop:post(ReplySink, ReplyName, Reply, <<>>), + {noreply, State}; + _ -> + error_logger:warning_report({?MODULE, message_not_understood, Sexp}), + {noreply, State} + end; +handle_info(_Message, State) -> + {stop, {bad_info, _Message}, State}. diff --git a/experiments/erlang/src/hop_queue.erl b/experiments/erlang/src/hop_queue.erl new file mode 100644 index 0000000..f5c98ac --- /dev/null +++ b/experiments/erlang/src/hop_queue.erl @@ -0,0 +1,125 @@ +%% Copyright 2010, 2011, 2012 Tony Garnock-Jones . +%% +%% This file is part of Hop. +%% +%% Hop is free software: you can redistribute it and/or modify it +%% under the terms of the GNU General Public License as published by +%% the Free Software Foundation, either version 3 of the License, or +%% (at your option) any later version. +%% +%% Hop is distributed in the hope that it will be useful, but WITHOUT +%% ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +%% or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public +%% License for more details. +%% +%% You should have received a copy of the GNU General Public License +%% along with Hop. If not, see . + +-module(hop_queue). + +-behaviour(gen_server). +-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). + +-export([hop_create/1]). + +-include("hop.hrl"). + +hop_create([Name]) -> + {ok, Pid} = gen_server:start(?MODULE, [], []), + case hop:register_idempotent(Name, Pid, ?MODULE) of + ok -> + {ok, []}; + {error, Info} -> + gen_server:cast(Pid, shutdown), + {error, Info} + end. + +%%--------------------------------------------------------------------------- + +-record(state, {backlog, waiters}). + +tick(State) -> + tick(State, 2). + +tick(State, 0) -> + {0, State}; +tick(State = #state{backlog = Backlog, waiters = Waiters}, TicksLeft) -> + case queue:out(Waiters) of + {empty, _} -> + {infinity, State}; + {{value, WaiterRef}, WaitersRemainder} -> + case get(WaiterRef) of + undefined -> + tick(State#state{waiters = WaitersRemainder}, TicksLeft); + #hop_sub{ref = Ref, sink = Sink, name = Name} -> + case queue:out(Backlog) of + {empty, _} -> + {infinity, State}; + {{value, Message}, BacklogRemainder} -> + case hop:post(Sink, Name, Message, term_to_binary(Ref)) of + true -> + NewState = State#state{backlog = BacklogRemainder, + waiters = queue:in(WaiterRef, + WaitersRemainder)}, + tick(NewState, TicksLeft - 1); + false -> + tick(State#state{waiters = WaitersRemainder}, TicksLeft) + end + end + end + end. + +noreply(State) -> + {Timeout, NewState} = tick(State), + {noreply, NewState, Timeout}. + +reply(Reply, State) -> + {Timeout, NewState} = tick(State), + {reply, Reply, NewState, Timeout}. + +init([]) -> + {ok, #state{backlog = queue:new(), waiters = queue:new()}}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +handle_call(hop_class_module, _From, State) -> + reply(?MODULE, State); +handle_call(_Request, _From, State) -> + {stop, {bad_call, _Request}, State}. + +handle_cast(shutdown, State) -> + {stop, normal, State}; +handle_cast(_Request, State) -> + {stop, {bad_cast, _Request}, State}. + +handle_info({hop, Sexp}, State = #state{backlog = OldBacklog, waiters = OldWaiters}) -> + noreply(case Sexp of + [<<"post">>, _Name, Body, _Token] -> + State#state{backlog = queue:in(Body, OldBacklog)}; + [<<"subscribe">>, Filter, Sink, Name, ReplySink, ReplyName] -> + SubRef = make_ref(), + Sub = #hop_sub{ref = SubRef, filter = Filter, sink = Sink, name = Name}, + put(SubRef, Sub), + _ = hop:post(ReplySink, ReplyName, + [<<"subscribe-ok">>, term_to_binary(SubRef)], <<>>), + State#state{waiters = queue:in(SubRef, OldWaiters)}; + [<<"unsubscribe">>, Token] -> + case catch binary_to_term(Token) of + SubRef when is_reference(SubRef) -> + erase(SubRef), + State; + _ -> + State + end; + _ -> + error_logger:warning_report({?MODULE, message_not_understood, Sexp}), + State + end); +handle_info(timeout, State) -> + noreply(State); +handle_info(_Message, State) -> + {stop, {bad_info, _Message}, State}. diff --git a/experiments/erlang/src/hop_relay.erl b/experiments/erlang/src/hop_relay.erl new file mode 100644 index 0000000..e3ac376 --- /dev/null +++ b/experiments/erlang/src/hop_relay.erl @@ -0,0 +1,119 @@ +%% Copyright 2010, 2011, 2012 Tony Garnock-Jones . +%% +%% This file is part of Hop. +%% +%% Hop is free software: you can redistribute it and/or modify it +%% under the terms of the GNU General Public License as published by +%% the Free Software Foundation, either version 3 of the License, or +%% (at your option) any later version. +%% +%% Hop is distributed in the hope that it will be useful, but WITHOUT +%% ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +%% or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public +%% License for more details. +%% +%% You should have received a copy of the GNU General Public License +%% along with Hop. If not, see . + +-module(hop_relay). + +-behaviour(gen_server). +-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). + +-export([start_link/1]). + +start_link(Args) -> + gen_server:start_link(?MODULE, Args, []). + +%%--------------------------------------------------------------------------- + +-record(state, {sock, parser}). + +send(Sexp, State = #state{sock = Sock}) -> + %% Using port_command is dicey, but done to avoid selective + %% receive in gen_tcp:send/2, which causes crippling slowdown when + %% the queue of outbound sexps waiting to be relayed gets long. + port_command(Sock, sexp:format_iolist(Sexp)), + State. + +request_data(Sock) -> + %% We use prim_inet:async_recv here, again to avoid selective + %% receives. + {ok, _Ref} = prim_inet:async_recv(Sock, 0, -1), + ok. + +send_error(Message, Details, State) -> + send([<<"error">>, list_to_binary(Message), Details], State). + +handle_sexp1(Sexp, State) -> + error_logger:info_report({?MODULE, self(), Sexp}), + handle_sexp(Sexp, State). + +handle_sexp([<<"post">>, Name, Body, _Token], State) -> + _ = hop:send(Name, Body), + State; +handle_sexp([<<"subscribe">>, Filter, _Sink, _Name, ReplySink, ReplyName], State) -> + case global:register_name(Filter, self()) of + yes -> + _ = hop:post(ReplySink, ReplyName, [<<"subscribe-ok">>, Filter], <<>>), + State; + no -> + error_logger:warning_report({?MODULE, bind_failed, Filter}), + State + end; +handle_sexp([<<"unsubscribe">>, Token], State) -> + global:unregister_name(Token), %% TODO security problem + State; +handle_sexp(Other, State) -> + send_error("Message not understood", [Other], State). + +%%--------------------------------------------------------------------------- + +init([Sock]) -> + {ok, #state{sock = Sock, parser = sexp:parse_state()}}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +handle_call(_Request, _From, State) -> + {stop, {bad_call, _Request}, State}. + +handle_cast({socket_control_transferred, Sock}, State0 = #state{sock = Sock}) -> + inet:setopts(Sock, [binary]), + request_data(Sock), + State1 = send([<<"hop">>], State0), + State2 = send([<<"subscribe">>, list_to_binary(atom_to_list(node())), <<>>, <<>>, <<>>, <<>>], + State1), + {noreply, State2}; +handle_cast(_Request, State) -> + {stop, {bad_cast, _Request}, State}. + +handle_info({hop, Sexp}, State) -> + {noreply, send(Sexp, State)}; +handle_info({inet_async, Sock, _Ref, {ok, Bin}}, State = #state{sock = Sock, parser = Parser}) + when is_binary(Bin) -> + case catch sexp:parse(Bin, Parser) of + {'EXIT', Reason} -> + {stop, + {received_syntax_error, Reason}, + send_error("Syntax error", + [<<"http://people.csail.mit.edu/rivest/Sexp.txt">>], + State)}; + {Terms, NewParser} -> + NewState = lists:foldl(fun handle_sexp/2, State#state{parser = NewParser}, Terms), + request_data(Sock), + {noreply, NewState} + end; +handle_info({tcp_closed, Sock}, State = #state{sock = Sock}) -> + {stop, normal, State}; +handle_info({inet_reply, Sock, _}, State = #state{sock = Sock}) -> + %% These are the replies to the raw port_command we're doing above + %% in send/2. We ignore them because higher level protocols deal + %% with acknowledgements and errors, and we trust that the socket + %% will close eventually if write failures start to happen. + {noreply, State}; +handle_info(_Message, State) -> + {stop, {bad_info, _Message}, State}. diff --git a/experiments/erlang/src/hop_server.erl b/experiments/erlang/src/hop_server.erl new file mode 100644 index 0000000..36a94d8 --- /dev/null +++ b/experiments/erlang/src/hop_server.erl @@ -0,0 +1,79 @@ +%%--------------------------------------------------------------------------- +%% Copyright (c) 2007 Tony Garnock-Jones +%% Copyright (c) 2007 LShift Ltd. +%% +%% Permission is hereby granted, free of charge, to any person +%% obtaining a copy of this software and associated documentation +%% files (the "Software"), to deal in the Software without +%% restriction, including without limitation the rights to use, copy, +%% modify, merge, publish, distribute, sublicense, and/or sell copies +%% of the Software, and to permit persons to whom the Software is +%% furnished to do so, subject to the following conditions: +%% +%% The above copyright notice and this permission notice shall be +%% included in all copies or substantial portions of the Software. +%% +%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +%% EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +%% MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +%% NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS +%% BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN +%% ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +%% CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%% SOFTWARE. +%%--------------------------------------------------------------------------- + +-module(hop_server). + +-behaviour(gen_server). + +-export([start_link/5]). +-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). + +start_link(Module, Host, Port, ListenOpts, ModuleOpts) -> + gen_server:start_link(?MODULE, [Module, Host, Port, ListenOpts, ModuleOpts], []). + +%--------------------------------------------------------------------------- + +accept_and_start(Module, ModuleOpts, LSock) -> + spawn_link(fun () -> + case gen_tcp:accept(LSock) of + {ok, Sock} -> + accept_and_start(Module, ModuleOpts, LSock), + {ok, Pid} = gen_server:start(Module, [Sock | ModuleOpts], []), + gen_tcp:controlling_process(Sock, Pid), + gen_server:cast(Pid, {socket_control_transferred, Sock}); + {error, Reason} -> + exit({error, Reason}) + end + end). + +ip_listen_opt(any) -> + []; +ip_listen_opt(Host) -> + {ok, IP} = inet:getaddr(Host, inet), + [{ip, IP}]. + +%--------------------------------------------------------------------------- + +init([Module, Host, Port, ListenOpts, ModuleOpts]) -> + {ok, LSock} = gen_tcp:listen(Port, ip_listen_opt(Host) ++ ListenOpts), + accept_and_start(Module, ModuleOpts, LSock), + {ok, LSock}. + +terminate(_Reason, State) -> + LSock = State, + gen_tcp:close(LSock), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +handle_call(_Request, _From, State) -> + {reply, ignored, State}. + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info(_Message, State) -> + {noreply, State}.