|
- %% ``The contents of this file are subject to the Erlang Public License,
- %% Version 1.1, (the "License"); you may not use this file except in
- %% compliance with the License. You should have received a copy of the
- %% Erlang Public License along with this software. If not, it can be
- %% retrieved via the world wide web at http://www.erlang.org/.
- %%
- %% Software distributed under the License is distributed on an "AS IS"
- %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
- %% the License for the specific language governing rights and limitations
- %% under the License.
- %%
- %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
- %% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
- %% AB. All Rights Reserved.''
- %%
- %% @author Ulf Wiger <ulf.wiger@ericsson.com>
- %% @author Thomas Arts <thomas.arts@ituniv.se>
- %%
- %% @doc Leader election behaviour.
- %% <p>This application implements a leader election behaviour modeled after
- %% gen_server. This behaviour intends to make it reasonably
- %% straightforward to implement a fully distributed server with
- %% master-slave semantics.</p>
- %% <p>The gen_leader behaviour supports nearly everything that gen_server
- %% does (some functions, such as multicall() and the internal timeout,
- %% have been removed), and adds a few callbacks and API functions to
- %% support leader election etc.</p>
- %% <p>Also included is an example program, a global dictionary, based
- %% on the modules gen_leader and dict. The callback implementing the
- %% global dictionary is called 'test_cb', for no particularly logical
- %% reason.</p>
- %% @end
- %%
- %% @type election() = tuple(). Opaque state of the gen_leader behaviour.
- %% @type node() = atom(). A node name.
- %% @type name() = atom(). A locally registered name.
- %% @type serverRef() = Name | {name(),node()} | {global,Name} | pid().
- %% See gen_server.
- %% @type callerRef() = {pid(), reference()}. See gen_server.
- %%
- -module(gen_leader).
- -export([start/4, start/6,
- start_link/4, start_link/6,
- leader_call/2, leader_call/3, leader_cast/2,
- call/2, call/3, cast/2,
- reply/2]).
- %% Query functions
- -export([alive/1,
- down/1,
- candidates/1,
- workers/1]).
- -export([
- system_continue/3,
- system_terminate/4,
- system_code_change/4,
- format_status/2
- ]).
- -export([behaviour_info/1]).
- %% Internal exports
- -export([init_it/6, print_event/3
- %%, safe_send/2
- ]).
- -import(error_logger , [format/2]).
- -import(lists, [foldl/3,
- foreach/2,
- member/2,
- keydelete/3,
- keysearch/3,
- keymember/3]).
- -record(election,{leader = none,
- mode = global,
- name,
- leadernode = none,
- candidate_nodes = [],
- worker_nodes = [],
- alive = [],
- iteration,
- down = [],
- monitored = [],
- buffered = []
- }).
- -record(server, {parent,
- mod,
- state,
- debug}).
- %%% ---------------------------------------------------
- %%% Interface functions.
- %%% ---------------------------------------------------
- %% @hidden
- behaviour_info(callbacks) ->
- [{init,1},
- {elected,2},
- {surrendered,3},
- {handle_leader_call,4},
- {handle_leader_cast,3},
- {handle_local_only, 4},
- {from_leader,3},
- {handle_call,3},
- {handle_cast,2},
- {handle_DOWN,3},
- {handle_info,2},
- {terminate,2},
- {code_change,4}];
- behaviour_info(_Other) ->
- undefined.
- start(Name, Mod, Arg, Options) when is_atom(Name) ->
- gen:start(?MODULE, nolink, {local,Name},
- Mod, {local_only, Arg}, Options).
- %% @spec start(Name::node(), CandidateNodes::[node()],
- %% Workers::[node()], Mod::atom(), Arg, Options::list()) ->
- %% {ok,pid()}
- %%
- %% @doc Starts a gen_leader process without linking to the parent.
- %%
- start(Name, [_|_] = CandidateNodes, Workers, Mod, Arg, Options)
- when is_atom(Name) ->
- gen:start(?MODULE, nolink, {local,Name},
- Mod, {CandidateNodes, Workers, Arg}, Options).
- %% @spec start_link(Name::atom(), CandidateNodes::[atom()],
- %% Workers::[atom()], Mod::atom(), Arg, Options::list()) ->
- %% {ok, pid()}
- %%
- %% @doc Starts a gen_leader process.
- %% <table>
- %% <tr><td>Name</td><td>The locally registered name of the process</td></tr>
- %% <tr><td>CandidateNodes</td><td>The names of nodes capable of assuming
- %% a leadership role</td></tr>
- %% <tr><td>Workers</td>
- %% <td>The names of nodes that will be part of the "cluster",
- %% but cannot ever assume a leadership role.</td></tr>
- %% <tr><td>Mod</td><td>The name of the callback module</td></tr>
- %% <tr><td>Arg</td><td>Argument passed on to <code>Mod:init/1</code></td></tr>
- %% <tr><td>Options</td><td>Same as gen_server's Options</td></tr>
- %% </table>
- %%
- %% <p>The list of candidates needs to be known from the start. Workers
- %% can be added at runtime.</p>
- %% @end
- start_link(Name, [_|_] = CandidateNodes, Workers,
- Mod, Arg, Options) when is_atom(Name) ->
- gen:start(?MODULE, link, {local,Name}, Mod,
- {CandidateNodes, Workers, Arg}, Options).
- start_link(Name, Mod, Arg, Options) when is_atom(Name) ->
- gen:start(?MODULE, link, {local,Name}, Mod,
- {local_only, Arg}, Options).
- %% Query functions to be used from the callback module
- %% @spec alive(E::election()) -> [node()]
- %%
- %% @doc Returns a list of live nodes (candidates and workers).
- %%
- alive(#election{alive = Alive}) ->
- Alive.
- %% @spec down(E::election()) -> [node()]
- %%
- %% @doc Returns a list of candidates currently not running.
- %%
- down(#election{down = Down}) ->
- Down.
- %% @spec candidates(E::election()) -> [node()]
- %%
- %% @doc Returns a list of known candidates.
- %%
- candidates(#election{candidate_nodes = Cands}) ->
- Cands.
- %% @spec workers(E::election()) -> [node()]
- %%
- %% @doc Returns a list of known workers.
- %%
- workers(#election{worker_nodes = Workers}) ->
- Workers.
- %% @spec call(Name::serverRef(), Request) -> term()
- %%
- %% @doc Equivalent to <code>gen_server:call/2</code>, but with a slightly
- %% different exit reason if something goes wrong. This function calls
- %% the <code>gen_leader</code> process exactly as if it were a gen_server
- %% (which, for practical purposes, it is.)
- %% @end
- call(Name, Request) ->
- case catch gen:call(Name, '$gen_call', Request) of
- {ok,Res} ->
- Res;
- {'EXIT',Reason} ->
- exit({Reason, {?MODULE, local_call, [Name, Request]}})
- end.
- %% @spec call(Name::serverRef(), Request, Timeout::integer()) ->
- %% Reply
- %%
- %% Reply = term()
- %%
- %% @doc Equivalent to <code>gen_server:call/3</code>, but with a slightly
- %% different exit reason if something goes wrong. This function calls
- %% the <code>gen_leader</code> process exactly as if it were a gen_server
- %% (which, for practical purposes, it is.)
- %% @end
- call(Name, Request, Timeout) ->
- case catch gen:call(Name, '$gen_call', Request, Timeout) of
- {ok,Res} ->
- Res;
- {'EXIT',Reason} ->
- exit({Reason, {?MODULE, local_call, [Name, Request, Timeout]}})
- end.
- %% @spec leader_call(Name::name(), Request::term())
- %% -> Reply
- %%
- %% Reply = term()
- %%
- %% @doc Makes a call (similar to <code>gen_server:call/2</code>) to the
- %% leader. The call is forwarded via the local gen_leader instance, if
- %% that one isn't actually the leader. The client will exit if the
- %% leader dies while the request is outstanding.
- %% <p>This function uses <code>gen:call/3</code>, and is subject to the
- %% same default timeout as e.g. <code>gen_server:call/2</code>.</p>
- %% @end
- %%
- leader_call(Name, Request) ->
- case catch gen:call(Name, '$leader_call', Request) of
- {ok,{leader,reply,Res}} ->
- Res;
- {ok,{error, leader_died}} ->
- exit({leader_died, {?MODULE, leader_call, [Name, Request]}});
- {'EXIT',Reason} ->
- exit({Reason, {?MODULE, leader_call, [Name, Request]}})
- end.
- %% @spec leader_call(Name::name(), Request::term(), Timeout::integer())
- %% -> Reply
- %%
- %% Reply = term()
- %%
- %% @doc Makes a call (similar to <code>gen_server:call/3</code>) to the
- %% leader. The call is forwarded via the local gen_leader instance, if
- %% that one isn't actually the leader. The client will exit if the
- %% leader dies while the request is outstanding.
- %% @end
- %%
- leader_call(Name, Request, Timeout) ->
- case catch gen:call(Name, '$leader_call', Request, Timeout) of
- {ok,{leader,reply,Res}} ->
- Res;
- {ok,{error, leader_died}} ->
- exit({leader_died, {?MODULE, leader_call, [Name, Request]}});
- {'EXIT',Reason} ->
- exit({Reason, {?MODULE, leader_call, [Name, Request, Timeout]}})
- end.
- %% @equiv gen_server:cast/2
- cast(Name, Request) ->
- catch do_cast('$gen_cast', Name, Request),
- ok.
- %% @spec leader_cast(Name::name(), Msg::term()) -> ok
- %% @doc Similar to <code>gen_server:cast/2</code> but will be forwarded to
- %% the leader via the local gen_leader instance.
- leader_cast(Name, Request) ->
- catch do_cast('$leader_cast', Name, Request),
- ok.
- do_cast(Tag, Name, Request) when atom(Name) ->
- Name ! {Tag, Request};
- do_cast(Tag, Pid, Request) when pid(Pid) ->
- Pid ! {Tag, Request}.
- %% @spec reply(From::callerRef(), Reply::term()) -> Void
- %% @equiv gen_server:reply/2
- reply({To, Tag}, Reply) ->
- catch To ! {Tag, Reply}.
- %%% ---------------------------------------------------
- %%% Initiate the new process.
- %%% Register the name using the Rfunc function
- %%% Calls the Mod:init/Args function.
- %%% Finally an acknowledge is sent to Parent and the main
- %%% loop is entered.
- %%% ---------------------------------------------------
- %%% @hidden
- init_it(Starter, self, Name, Mod, {CandidateNodes, Workers, Arg}, Options) ->
- if CandidateNodes == [] ->
- erlang:error(no_candidates);
- true ->
- init_it(Starter, self(), Name, Mod,
- {CandidateNodes, Workers, Arg}, Options)
- end;
- init_it(Starter,Parent,Name,Mod,{local_only, _}=Arg,Options) ->
- Debug = debug_options(Name, Options),
- reg_behaviour(),
- case catch Mod:init(Arg) of
- {stop, Reason} ->
- proc_lib:init_ack(Starter, {error, Reason}),
- exit(Reason);
- ignore ->
- proc_lib:init_ack(Starter, ignore),
- exit(normal);
- {'EXIT', Reason} ->
- proc_lib:init_ack(Starter, {error, Reason}),
- exit(Reason);
- {ok, State} ->
- proc_lib:init_ack(Starter, {ok, self()}),
- Server = #server{parent = Parent,
- mod = Mod,
- state = State,
- debug = Debug},
- loop(Server, local_only, #election{name = Name, mode = local});
- Other ->
- Error = {bad_return_value, Other},
- proc_lib:init_ack(Starter, {error, Error}),
- exit(Error)
- end;
- init_it(Starter,Parent,Name,Mod,{CandidateNodes,Workers,Arg},Options) ->
- Debug = debug_options(Name, Options),
- reg_behaviour(),
- AmCandidate = member(node(), CandidateNodes),
- Election = init_election(CandidateNodes, Workers, #election{name = Name}),
- case {catch Mod:init(Arg), AmCandidate} of
- {{stop, Reason},_} ->
- proc_lib:init_ack(Starter, {error, Reason}),
- exit(Reason);
- {ignore,_} ->
- proc_lib:init_ack(Starter, ignore),
- exit(normal);
- {{'EXIT', Reason},_} ->
- proc_lib:init_ack(Starter, {error, Reason}),
- exit(Reason);
- {{ok, State}, true} ->
- %%% NewE = broadcast(capture,Workers++(CandidateNodes -- [node()]),
- %%% Election),
- proc_lib:init_ack(Starter, {ok, self()}),
- begin_election(#server{parent = Parent,
- mod = Mod,
- state = State,
- debug = Debug}, candidate, Election);
- {{ok, State}, false} ->
- %%% NewE = broadcast(add_worker, CandidateNodes, Election),
- proc_lib:init_ack(Starter, {ok, self()}),
- begin_election(#server{parent = Parent,
- mod = Mod,
- state = State,
- debug = Debug}, waiting_worker, Election);
- Else ->
- Error = {bad_return_value, Else},
- proc_lib:init_ack(Starter, {error, Error}),
- exit(Error)
- end.
- reg_behaviour() ->
- catch gproc:reg({p,l,behaviour}, ?MODULE).
- init_election(CandidateNodes, Workers, E) ->
- %%% dbg:tracer(),
- %%% dbg:tpl(?MODULE,lexcompare,[]),
- %%% dbg:p(self(),[m,c]),
- AmCandidate = member(node(), CandidateNodes),
- case AmCandidate of
- true ->
- E#election{mode = global,
- candidate_nodes = CandidateNodes,
- worker_nodes = Workers,
- iteration = {[],
- position(
- node(),CandidateNodes)}};
- false ->
- E#election{mode = global,
- candidate_nodes = CandidateNodes,
- worker_nodes = Workers}
- end.
- begin_election(#server{mod = Mod, state = State} = Server, candidate,
- #election{candidate_nodes = Cands,
- worker_nodes = Workers} = E) ->
- case Cands of
- [N] when N == node() ->
- {ok, Synch, NewState} = Mod:elected(State, E),
- NewE = broadcast({elect,Synch}, E),
- loop(Server#server{state = NewState}, elected, NewE);
- _ ->
- NewE = broadcast(capture,Workers++(Cands -- [node()]), E),
- safe_loop(Server, candidate, NewE)
- end;
- begin_election(Server, waiting_worker, #election{candidate_nodes = Cands}=E) ->
- NewE = broadcast(add_worker, Cands, E),
- safe_loop(Server, waiting_worker, NewE).
-
- %%% ---------------------------------------------------
- %%% The MAIN loop.
- %%% ---------------------------------------------------
- safe_loop(#server{mod = Mod, state = State} = Server, Role,
- #election{name = Name} = E) ->
- receive
- {system, From, Req} ->
- #server{parent = Parent, debug = Debug} = Server,
- sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
- [safe, Server, Role, E]);
- {'EXIT', _Parent, Reason} = Msg ->
- terminate(Reason, Msg, Server, Role, E);
- {leader,capture,Iteration,_Node,Candidate} ->
- case Role of
- candidate ->
- NewE =
- nodeup(node(Candidate),E),
- case lexcompare(NewE#election.iteration,Iteration) of
- less ->
- Candidate !
- {leader,accept,
- NewE#election.iteration,self()},
- safe_loop(Server, captured,
- NewE#election{leader = Candidate});
- greater ->
- %% I'll get either an accept or DOWN
- %% from Candidate later
- safe_loop(Server, Role, NewE);
- equal ->
- safe_loop(Server, Role, NewE)
- end;
- captured ->
- NewE = nodeup(node(Candidate), E),
- safe_loop(Server, Role, NewE);
- waiting_worker ->
- NewE =
- nodeup(node(Candidate),E),
- safe_loop(Server, Role, NewE)
- end;
- {leader,add_worker,Worker} ->
- NewE = nodeup(node(Worker), E),
- safe_loop(Server, Role, NewE);
- {leader,accept,Iteration,Candidate} ->
- case Role of
- candidate ->
- NewE =
- nodeup(node(Candidate),E),
- {Captured,_} = Iteration,
- NewIteration = % inherit all procs that have been
- % accepted by Candidate
- foldl(fun(C,Iter) ->
- add_captured(Iter,C)
- end,NewE#election.iteration,
- [node(Candidate)|Captured]),
- check_majority(NewE#election{
- iteration = NewIteration}, Server);
- captured ->
- %% forward this to the leader
- E#election.leader ! {leader,accept,Iteration,Candidate},
- NewE = nodeup(node(Candidate), E),
- safe_loop(Server, Role, NewE)
- end;
- {leader,elect,Synch,Candidate} ->
- NewE =
- case Role of
- waiting_worker ->
- nodeup(node(Candidate),
- E#election{
- leader = Candidate,
- leadernode = node(Candidate)});
- _ ->
- nodeup(node(Candidate),
- E#election{
- leader = Candidate,
- leadernode = node(Candidate),
- iteration = {[],
- position(
- node(),
- E#election.candidate_nodes)}
- })
- end,
- {ok,NewState} = Mod:surrendered(State,Synch,NewE),
- NewRole = case Role of
- waiting_worker ->
- worker;
- _ ->
- surrendered
- end,
- loop(Server#server{state = NewState}, NewRole, NewE);
- {leader, local_only, Node, Candidate} ->
- case lists:keysearch(node(Candidate), 2, E#election.monitored) of
- {value, {Ref, N}} ->
- NewE = down(Ref, {E#election.name,N},local_only,E),
- io:format("local_only received from ~p~n"
- "E0 = ~p~n"
- "E1 = ~p~n", [Node, E, NewE]),
- safe_after_down(Server, Role, NewE);
- false ->
- safe_loop(Server, Role, E)
- end;
- {'DOWN',Ref,process,{Name,_}=Who,Why} ->
- NewE =
- down(Ref,Who,Why,E),
- safe_after_down(Server, Role, NewE)
- end.
- safe_after_down(Server, Role, E) ->
- case {Role,E#election.leader} of
- {candidate,_} ->
- check_majority(E, Server);
- {captured,none} ->
- check_majority(broadcast(capture,E), Server);
- {waiting_worker,_} ->
- safe_loop(Server, Role, E)
- end.
- loop(#server{parent = Parent,
- mod = Mod,
- state = State,
- debug = Debug} = Server, Role,
- #election{mode = Mode, name = Name} = E) ->
- Msg = receive
- Input ->
- Input
- end,
- case Msg of
- {system, From, Req} ->
- sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
- [normal, Server, Role, E]);
- {'EXIT', Parent, Reason} ->
- terminate(Reason, Msg, Server, Role, E);
- {leader, local_only, _, _Candidate} ->
- loop(Server, Role, E);
- LeaderMsg when element(1,LeaderMsg) == leader, Mode == local ->
- Candidate = element(size(LeaderMsg), LeaderMsg),
- Candidate ! {leader, local_only, node(), self()},
- loop(Server, Role, E);
- {leader,capture,_Iteration,_Node,Candidate} ->
- NewE = nodeup(node(Candidate),E),
- case Role of
- R when R == surrendered; R == worker ->
- loop(Server, Role, NewE);
- elected ->
- {ok,Synch,NewState} = Mod:elected(State,NewE),
- Candidate ! {leader, elect, Synch, self()},
- loop(Server#server{state = NewState}, Role, NewE)
- end;
- {leader,accept,_Iteration,Candidate} ->
- NewE = nodeup(node(Candidate),E),
- case Role of
- surrendered ->
- loop(Server, Role, NewE);
- elected ->
- {ok,Synch,NewState} = Mod:elected(State,NewE),
- Candidate ! {leader, elect, Synch, self()},
- loop(Server#server{state = NewState}, Role, NewE)
- end;
- {leader,elect,Synch,Candidate} ->
- NewE =
- case Role of
- worker ->
- nodeup(node(Candidate),
- E#election{
- leader = Candidate,
- leadernode = node(Candidate)});
- surrendered ->
- nodeup(node(Candidate),
- E#election{
- leader = Candidate,
- leadernode = node(Candidate),
- iteration = {[],
- position(
- node(),
- E#election.candidate_nodes)}
- })
- end,
- {ok, NewState} = Mod:surrendered(State, Synch, NewE),
- loop(Server#server{state = NewState}, Role, NewE);
- {'DOWN',Ref,process,{Name,Node} = Who,Why} ->
- #election{alive = PreviouslyAlive} = E,
- NewE =
- down(Ref,Who,Why,E),
- case NewE#election.leader of
- none ->
- foreach(fun({_,From}) ->
- reply(From,{error,leader_died})
- end, E#election.buffered),
- NewE1 = NewE#election{buffered = []},
- case Role of
- surrendered ->
- check_majority(
- broadcast(capture,NewE1), Server);
- worker ->
- safe_loop(Server, waiting_worker, NewE1)
- end;
- L when L == self() ->
- case member(Node, PreviouslyAlive) of
- true ->
- case Mod:handle_DOWN(Node, State, E) of
- {ok, NewState} ->
- loop(Server#server{state = NewState},
- Role, NewE);
- {ok, Broadcast, NewState} ->
- NewE1 = broadcast(
- {from_leader,Broadcast}, NewE),
- loop(Server#server{state = NewState},
- Role, NewE1)
- end;
- false ->
- loop(Server, Role, NewE)
- end;
- _ ->
- loop(Server, Role, NewE)
- end;
- _Msg when Debug == [] ->
- handle_msg(Msg, Server, Role, E);
- _Msg ->
- Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
- E#election.name, {in, Msg}),
- handle_msg(Msg, Server#server{debug = Debug1}, Role, E)
- end.
- %%-----------------------------------------------------------------
- %% Callback functions for system messages handling.
- %%-----------------------------------------------------------------
- %% @hidden
- system_continue(_Parent, Debug, [safe, Server, Role, E]) ->
- safe_loop(Server#server{debug = Debug}, Role, E);
- system_continue(_Parent, Debug, [normal, Server, Role, E]) ->
- loop(Server#server{debug = Debug}, Role, E).
- %% @hidden
- system_terminate(Reason, _Parent, Debug, [_Mode, Server, Role, E]) ->
- terminate(Reason, [], Server#server{debug = Debug}, Role, E).
- %% @hidden
- system_code_change([Mode, Server, Role, E], _Module, OldVsn, Extra) ->
- #server{mod = Mod, state = State} = Server,
- case catch Mod:code_change(OldVsn, State, E, Extra) of
- {ok, NewState} ->
- NewServer = Server#server{state = NewState},
- {ok, [Mode, NewServer, Role, E]};
- {ok, NewState, NewE} ->
- NewServer = Server#server{state = NewState},
- {ok, [Mode, NewServer, Role, NewE]};
- Else -> Else
- end.
- %%-----------------------------------------------------------------
- %% Format debug messages. Print them as the call-back module sees
- %% them, not as the real erlang messages. Use trace for that.
- %%-----------------------------------------------------------------
- %% @hidden
- print_event(Dev, {in, Msg}, Name) ->
- case Msg of
- {'$gen_call', {From, _Tag}, Call} ->
- io:format(Dev, "*DBG* ~p got local call ~p from ~w~n",
- [Name, Call, From]);
- {'$leader_call', {From, _Tag}, Call} ->
- io:format(Dev, "*DBG* ~p got global call ~p from ~w~n",
- [Name, Call, From]);
- {'$gen_cast', Cast} ->
- io:format(Dev, "*DBG* ~p got local cast ~p~n",
- [Name, Cast]);
- {'$leader_cast', Cast} ->
- io:format(Dev, "*DBG* ~p got global cast ~p~n",
- [Name, Cast]);
- _ ->
- io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg])
- end;
- print_event(Dev, {out, Msg, To, State}, Name) ->
- io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n",
- [Name, Msg, To, State]);
- print_event(Dev, {noreply, State}, Name) ->
- io:format(Dev, "*DBG* ~p new state ~w~n", [Name, State]);
- print_event(Dev, Event, Name) ->
- io:format(Dev, "*DBG* ~p dbg ~p~n", [Name, Event]).
- handle_msg({'$leader_call', From, Request} = Msg,
- #server{mod = Mod, state = State} = Server, elected = Role, E) ->
- case catch Mod:handle_leader_call(Request, From, State, E) of
- {reply, Reply, NState} ->
- NewServer = reply(From, {leader,reply,Reply},
- Server#server{state = NState}, Role, E),
- loop(NewServer, Role, E);
- {reply, Reply, Broadcast, NState} ->
- NewE = broadcast({from_leader,Broadcast}, E),
- NewServer = reply(From, {leader,reply,Reply},
- Server#server{state = NState}, Role,
- NewE),
- loop(NewServer, Role, NewE);
- {noreply, NState} = Reply ->
- NewServer = handle_debug(Server#server{state = NState},
- Role, E, Reply),
- loop(NewServer, Role, E);
- {stop, Reason, Reply, NState} ->
- {'EXIT', R} =
- (catch terminate(Reason, Msg,
- Server#server{state = NState},
- Role, E)),
- reply(From, Reply),
- exit(R);
- Other ->
- handle_common_reply(Other, Msg, Server, Role, E)
- end;
- handle_msg({'$leader_call', From, Request} = Msg,
- #server{mod = Mod, state = State} = Server, Role,
- #election{mode = local} = E) ->
- Reply = (catch Mod:handle_leader_call(Request,From,State,E)),
- handle_call_reply(Reply, Msg, Server, Role, E);
- %%% handle_common_reply(Reply, Msg, Server, Role, E);
- handle_msg({'$leader_cast', Cast} = Msg,
- #server{mod = Mod, state = State} = Server, Role,
- #election{mode = local} = E) ->
- Reply = (catch Mod:handle_leader_cast(Cast,State,E)),
- handle_common_reply(Reply, Msg, Server, Role, E);
- handle_msg({'$leader_cast', Cast} = Msg,
- #server{mod = Mod, state = State} = Server, elected = Role, E) ->
- Reply = (catch Mod:handle_leader_cast(Cast, State, E)),
- handle_common_reply(Reply, Msg, Server, Role, E);
- handle_msg({from_leader, Cmd} = Msg,
- #server{mod = Mod, state = State} = Server, Role, E) ->
- handle_common_reply(catch Mod:from_leader(Cmd, State, E),
- Msg, Server, Role, E);
- handle_msg({'$leader_call', From, Request}, Server, Role,
- #election{buffered = Buffered, leader = Leader} = E) ->
- Ref = make_ref(),
- Leader ! {'$leader_call', {self(),Ref}, Request},
- NewBuffered = [{Ref,From}|Buffered],
- loop(Server, Role, E#election{buffered = NewBuffered});
- handle_msg({Ref, {leader,reply,Reply}}, Server, Role,
- #election{buffered = Buffered} = E) ->
- {value, {_,From}} = keysearch(Ref,1,Buffered),
- NewServer = reply(From, {leader,reply,Reply}, Server, Role,
- E#election{buffered = keydelete(Ref,1,Buffered)}),
- loop(NewServer, Role, E);
- handle_msg({'$gen_call', From, Request} = Msg,
- #server{mod = Mod, state = State} = Server, Role, E) ->
- Reply = (catch Mod:handle_call(Request, From, State)),
- handle_call_reply(Reply, Msg, Server, Role, E);
- handle_msg({'$gen_cast',Msg} = Cast,
- #server{mod = Mod, state = State} = Server, Role, E) ->
- handle_common_reply(catch Mod:handle_cast(Msg, State),
- Cast, Server, Role, E);
- handle_msg(Msg,
- #server{mod = Mod, state = State} = Server, Role, E) ->
- handle_common_reply(catch Mod:handle_info(Msg, State),
- Msg, Server, Role, E).
- handle_call_reply(CB_reply, {_, From, _Request} = Msg, Server, Role, E) ->
- case CB_reply of
- {reply, Reply, NState} ->
- NewServer = reply(From, Reply,
- Server#server{state = NState}, Role, E),
- loop(NewServer, Role, E);
- {noreply, NState} = Reply ->
- NewServer = handle_debug(Server#server{state = NState},
- Role, E, Reply),
- loop(NewServer, Role, E);
- {activate, Cands, Workers, Reply, NState}
- when E#election.mode == local ->
- NewRole = case member(node(), Cands) of
- true -> candidate;
- false -> waiting_worker
- end,
- reply(From, Reply),
- NServer = Server#server{state = NState},
- NewE = init_election(Cands, Workers, E),
- io:format("activating: NewE = ~p~n", [NewE]),
- begin_election(NServer, NewRole, NewE);
- {stop, Reason, Reply, NState} ->
- {'EXIT', R} =
- (catch terminate(Reason, Msg, Server#server{state = NState},
- Role, E)),
- reply(From, Reply),
- exit(R);
- Other ->
- handle_common_reply(Other, Msg, Server, Role, E)
- end.
- handle_common_reply(Reply, Msg, Server, Role, E) ->
- case Reply of
- {ok, NState} ->
- NewServer = handle_debug(Server#server{state = NState},
- Role, E, Reply),
- loop(NewServer, Role, E);
- {ok, Broadcast, NState} ->
- NewE = broadcast({from_leader,Broadcast}, E),
- NewServer = handle_debug(Server#server{state = NState},
- Role, E, Reply),
- loop(NewServer, Role, NewE);
- {stop, Reason, NState} ->
- terminate(Reason, Msg, Server#server{state = NState}, Role, E);
- {'EXIT', Reason} ->
- terminate(Reason, Msg, Server, Role, E);
- _ ->
- terminate({bad_return_value, Reply}, Msg, Server, Role, E)
- end.
- reply({To, Tag}, Reply, #server{state = State} = Server, Role, E) ->
- reply({To, Tag}, Reply),
- handle_debug(Server, Role, E, {out, Reply, To, State}).
- handle_debug(#server{debug = []} = Server, _Role, _E, _Event) ->
- Server;
- handle_debug(#server{debug = Debug} = Server, _Role, E, Event) ->
- Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
- E#election.name, Event),
- Server#server{debug = Debug1}.
- %%% ---------------------------------------------------
- %%% Terminate the server.
- %%% ---------------------------------------------------
- terminate(Reason, Msg, #server{mod = Mod,
- state = State,
- debug = Debug}, _Role,
- #election{name = Name}) ->
- case catch Mod:terminate(Reason, State) of
- {'EXIT', R} ->
- error_info(R, Name, Msg, State, Debug),
- exit(R);
- _ ->
- case Reason of
- normal ->
- exit(normal);
- shutdown ->
- exit(shutdown);
- _ ->
- error_info(Reason, Name, Msg, State, Debug),
- exit(Reason)
- end
- end.
- %% Maybe we shouldn't do this? We have the crash report...
- error_info(Reason, Name, Msg, State, Debug) ->
- format("** Generic leader ~p terminating \n"
- "** Last message in was ~p~n"
- "** When Server state == ~p~n"
- "** Reason for termination == ~n** ~p~n",
- [Name, Msg, State, Reason]),
- sys:print_log(Debug),
- ok.
- %%% ---------------------------------------------------
- %%% Misc. functions.
- %%% ---------------------------------------------------
- opt(Op, [{Op, Value}|_]) ->
- {ok, Value};
- opt(Op, [_|Options]) ->
- opt(Op, Options);
- opt(_, []) ->
- false.
- debug_options(Name, Opts) ->
- case opt(debug, Opts) of
- {ok, Options} -> dbg_options(Name, Options);
- _ -> dbg_options(Name, [])
- end.
- dbg_options(Name, []) ->
- Opts =
- case init:get_argument(generic_debug) of
- error ->
- [];
- _ ->
- [log, statistics]
- end,
- dbg_opts(Name, Opts);
- dbg_options(Name, Opts) ->
- dbg_opts(Name, Opts).
- dbg_opts(Name, Opts) ->
- case catch sys:debug_options(Opts) of
- {'EXIT',_} ->
- format("~p: ignoring erroneous debug options - ~p~n",
- [Name, Opts]),
- [];
- Dbg ->
- Dbg
- end.
- %%-----------------------------------------------------------------
- %% Status information
- %%-----------------------------------------------------------------
- %% @hidden
- format_status(Opt, StatusData) ->
- [PDict, SysState, Parent, Debug, [_Mode, Server, _Role, E]] = StatusData,
- Header = lists:concat(["Status for generic server ", E#election.name]),
- Log = sys:get_debug(log, Debug, []),
- #server{mod = Mod, state = State} = Server,
- Specific =
- case erlang:function_exported(Mod, format_status, 2) of
- true ->
- case catch apply(Mod, format_status, [Opt, [PDict, State]]) of
- {'EXIT', _} -> [{data, [{"State", State}]}];
- Else -> Else
- end;
- _ ->
- [{data, [{"State", State}]}]
- end,
- [{header, Header},
- {data, [{"Status", SysState},
- {"Parent", Parent},
- {"Logged events", Log}]} |
- Specific].
- broadcast(Msg, #election{monitored = Monitored} = E) ->
- %% When broadcasting the first time, we broadcast to all candidate nodes,
- %% using broadcast/3. This function is used for subsequent broadcasts,
- %% and we make sure only to broadcast to already known nodes.
- %% It's the responsibility of new nodes to make themselves known through
- %% a wider broadcast.
- ToNodes = [N || {_,N} <- Monitored],
- broadcast(Msg, ToNodes, E).
- broadcast(capture, ToNodes, #election{monitored = Monitored} = E) ->
- ToMonitor = [N || N <- ToNodes,
- not(keymember(N,2,Monitored))],
- NewE =
- foldl(fun(Node,Ex) ->
- Ref = erlang:monitor(
- process,{Ex#election.name,Node}),
- Ex#election{monitored = [{Ref,Node}|
- Ex#election.monitored]}
- end,E,ToMonitor),
- foreach(
- fun(Node) ->
- {NewE#election.name,Node} !
- {leader,capture,NewE#election.iteration,node(),self()}
- end,ToNodes),
- NewE;
- broadcast({elect,Synch},ToNodes,E) ->
- foreach(
- fun(Node) ->
- {E#election.name,Node} ! {leader,elect,Synch,self()}
- end,ToNodes),
- E;
- broadcast({from_leader, Msg}, ToNodes, E) ->
- foreach(
- fun(Node) ->
- {E#election.name,Node} ! {from_leader, Msg}
- end,ToNodes),
- E;
- broadcast(add_worker, ToNodes, E) ->
- foreach(
- fun(Node) ->
- {E#election.name,Node} ! {leader, add_worker, self()}
- end,ToNodes),
- E.
- check_majority(E, Server) ->
- {Captured,_} = E#election.iteration,
- AcceptMeAsLeader = length(Captured) + 1, % including myself
- NrCandidates = length(E#election.candidate_nodes),
- NrDown = E#election.down,
- if AcceptMeAsLeader > NrCandidates/2 ->
- NewE = E#election{leader = self(), leadernode = node()},
- {ok,Synch,NewState} =
- (Server#server.mod):elected(Server#server.state, NewE),
- NewE1 = broadcast({elect,Synch}, NewE),
- loop(Server#server{state = NewState}, elected, NewE1);
- AcceptMeAsLeader+length(NrDown) == NrCandidates ->
- NewE = E#election{leader = self(), leadernode = node()},
- {ok,Synch,NewState} =
- (Server#server.mod):elected(Server#server.state, NewE),
- NewE1 = broadcast({elect,Synch}, NewE),
- loop(Server#server{state = NewState}, elected, NewE1);
- true ->
- safe_loop(Server, candidate, E)
- end.
- down(Ref,_Who,Why,E) ->
- case lists:keysearch(Ref,1,E#election.monitored) of
- {value, {_,Node}} ->
- NewMonitored = if Why == local_only -> E#election.monitored;
- true ->
- E#election.monitored -- [{Ref,Node}]
- end,
- {Captured,Pos} = E#election.iteration,
- case Node == E#election.leadernode of
- true ->
- E#election{leader = none,
- leadernode = none,
- iteration = {Captured -- [Node],
- Pos}, % TAKE CARE !
- down = [Node|E#election.down],
- alive = E#election.alive -- [Node],
- monitored = NewMonitored};
- false ->
- Down = case member(Node,E#election.candidate_nodes) of
- true ->
- [Node|E#election.down];
- false ->
- E#election.down
- end,
- E#election{iteration = {Captured -- [Node],
- Pos}, % TAKE CARE !
- down = Down,
- alive = E#election.alive -- [Node],
- monitored = NewMonitored}
- end
- end.
- %% position of element counted from end of the list
- %%
- position(X,[Head|Tail]) ->
- case X==Head of
- true ->
- length(Tail);
- false ->
- position(X,Tail)
- end.
- %% This is a multi-level comment
- %% This is the second line of the comment
- lexcompare({C1,P1},{C2,P2}) ->
- lexcompare([{length(C1),length(C2)},{P1,P2}]).
- lexcompare([]) ->
- equal;
- lexcompare([{X,Y}|Rest]) ->
- if X<Y -> less;
- X==Y -> lexcompare(Rest);
- X>Y -> greater
- end.
- add_captured({Captured,Pos}, CandidateNode) ->
- {[CandidateNode|[ Node || Node <- Captured,
- Node =/= CandidateNode ]], Pos}.
- nodeup(Node, #election{monitored = Monitored,
- alive = Alive,
- down = Down} = E) ->
- %% make sure process is monitored from now on
- case [ N || {_,N}<-Monitored, N==Node] of
- [] ->
- Ref = erlang:monitor(process,{E#election.name,Node}),
- E#election{down = Down -- [Node],
- alive = [Node | Alive],
- monitored = [{Ref,Node}|Monitored]};
- _ -> % already monitored, thus not in down
- E#election{alive = [Node | [N || N <- Alive,
- N =/= Node]]}
- end.
|