%% ``The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved via the world wide web at http://www.erlang.org/.
%% 
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%% 
%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
%% AB. All Rights Reserved.''
%%
%% @author Ulf Wiger <ulf.wiger@ericsson.com>
%% @author Thomas Arts <thomas.arts@ituniv.se>
%% 
%% @doc Leader election behaviour.
%% <p>This application implements a leader election behaviour modeled after
%% gen_server. This behaviour intends to make it reasonably
%% straightforward to implement a fully distributed server with
%% master-slave semantics.</p>
%% <p>The gen_leader behaviour supports nearly everything that gen_server
%% does (some functions, such as multicall() and the internal timeout,
%% have been removed), and adds a few callbacks and API functions to 
%% support leader election etc.</p>
%% <p>Also included is an example program, a global dictionary, based
%% on the modules gen_leader and dict. The callback implementing the
%% global dictionary is called 'test_cb', for no particularly logical
%% reason.</p>
%% @end
%%
%% @type election() = tuple(). Opaque state of the gen_leader behaviour.
%% @type node() = atom(). A node name.
%% @type name() = atom(). A locally registered name.
%% @type serverRef() = Name | {name(),node()} | {global,Name} | pid(). 
%%   See gen_server.
%% @type callerRef() = {pid(), reference()}. See gen_server.
%%
-module(gen_leader).


-export([start/4, start/6,
	 start_link/4, start_link/6,
	 leader_call/2, leader_call/3, leader_cast/2,
	 call/2, call/3, cast/2,
	 reply/2]).

%% Query functions
-export([alive/1,
	 down/1,
	 candidates/1,
	 workers/1]).

-export([
	 system_continue/3,
	 system_terminate/4,
	 system_code_change/4,
	 format_status/2
	]).

-export([behaviour_info/1]).

%% Internal exports
-export([init_it/6, print_event/3
	 %%, safe_send/2
	]).

-import(error_logger , [format/2]).
-import(lists, [foldl/3,
		foreach/2,
		member/2,
		keydelete/3,
		keysearch/3,
		keymember/3]).


-record(election,{leader = none,
		  mode = global,
		  name,
		  leadernode = none,
		  candidate_nodes = [],	
		  worker_nodes = [],
		  alive = [],
		  iteration,
		  down = [],
		  monitored = [],
		  buffered = []
		 }).

-record(server, {parent,
		 mod,
		 state,
		 debug}).

%%% ---------------------------------------------------
%%% Interface functions.
%%% ---------------------------------------------------

%% @hidden
behaviour_info(callbacks) ->
    [{init,1},
     {elected,2},
     {surrendered,3},
     {handle_leader_call,4},
     {handle_leader_cast,3},
     {handle_local_only, 4},
     {from_leader,3},
     {handle_call,3},
     {handle_cast,2},
     {handle_DOWN,3},
     {handle_info,2},
     {terminate,2},
     {code_change,4}];
behaviour_info(_Other) ->
    undefined.

start(Name, Mod, Arg, Options) when is_atom(Name) ->
    gen:start(?MODULE, nolink, {local,Name},
	      Mod, {local_only, Arg}, Options).

%% @spec start(Name::node(), CandidateNodes::[node()],
%%             Workers::[node()], Mod::atom(), Arg, Options::list()) ->
%%    {ok,pid()}
%%
%% @doc Starts a gen_leader process without linking to the parent.
%%
start(Name, [_|_] = CandidateNodes, Workers, Mod, Arg, Options)
  when is_atom(Name) ->
    gen:start(?MODULE, nolink, {local,Name},
	      Mod, {CandidateNodes, Workers, Arg}, Options).

%% @spec start_link(Name::atom(), CandidateNodes::[atom()],
%%             Workers::[atom()], Mod::atom(), Arg, Options::list()) ->
%%  {ok, pid()}
%%
%% @doc Starts a gen_leader process.
%% <table>
%%  <tr><td>Name</td><td>The locally registered name of the process</td></tr>
%%  <tr><td>CandidateNodes</td><td>The names of nodes capable of assuming
%%     a leadership role</td></tr>
%%  <tr><td>Workers</td>
%%     <td>The names of nodes that will be part of the "cluster",
%%         but cannot ever assume a leadership role.</td></tr>
%%  <tr><td>Mod</td><td>The name of the callback module</td></tr>
%%  <tr><td>Arg</td><td>Argument passed on to <code>Mod:init/1</code></td></tr>
%%  <tr><td>Options</td><td>Same as gen_server's Options</td></tr>
%% </table>
%%
%% <p>The list of candidates needs to be known from the start. Workers 
%% can be added at runtime.</p>
%% @end
start_link(Name, [_|_] = CandidateNodes, Workers, 
	   Mod, Arg, Options) when is_atom(Name) ->
    gen:start(?MODULE, link, {local,Name}, Mod,
	      {CandidateNodes, Workers, Arg}, Options).

start_link(Name, Mod, Arg, Options) when is_atom(Name) ->
    gen:start(?MODULE, link, {local,Name}, Mod,
	      {local_only, Arg}, Options).

%% Query functions to be used from the callback module

%% @spec alive(E::election()) -> [node()]
%%
%% @doc Returns a list of live nodes (candidates and workers).
%%
alive(#election{alive = Alive}) ->
    Alive.

%% @spec down(E::election()) -> [node()]
%%
%% @doc Returns a list of candidates currently not running.
%%
down(#election{down = Down}) ->
    Down.

%% @spec candidates(E::election()) -> [node()]
%%
%% @doc Returns a list of known candidates.
%%
candidates(#election{candidate_nodes = Cands}) ->
    Cands.

%% @spec workers(E::election()) -> [node()]
%%
%% @doc Returns a list of known workers.
%%
workers(#election{worker_nodes = Workers}) ->
    Workers.

%% @spec call(Name::serverRef(), Request) -> term()
%%
%% @doc Equivalent to <code>gen_server:call/2</code>, but with a slightly
%% different exit reason if something goes wrong. This function calls 
%% the <code>gen_leader</code> process exactly as if it were a gen_server
%% (which, for practical purposes, it is.)
%% @end
call(Name, Request) ->
    case catch gen:call(Name, '$gen_call', Request) of
	{ok,Res} ->
	    Res;
	{'EXIT',Reason} ->
	    exit({Reason, {?MODULE, local_call, [Name, Request]}})
    end.

%% @spec call(Name::serverRef(), Request, Timeout::integer()) ->
%%     Reply
%%
%%     Reply = term()
%%
%% @doc Equivalent to <code>gen_server:call/3</code>, but with a slightly
%% different exit reason if something goes wrong. This function calls 
%% the <code>gen_leader</code> process exactly as if it were a gen_server
%% (which, for practical purposes, it is.)
%% @end
call(Name, Request, Timeout) ->
    case catch gen:call(Name, '$gen_call', Request, Timeout) of
	{ok,Res} ->
	    Res;
	{'EXIT',Reason} ->
	    exit({Reason, {?MODULE, local_call, [Name, Request, Timeout]}})
    end.

%% @spec leader_call(Name::name(), Request::term())
%%    -> Reply
%%
%%    Reply = term()
%%
%% @doc Makes a call (similar to <code>gen_server:call/2</code>) to the 
%% leader. The call is forwarded via the local gen_leader instance, if 
%% that one isn't actually the leader. The client will exit if the 
%% leader dies while the request is outstanding.
%% <p>This function uses <code>gen:call/3</code>, and is subject to the
%% same default timeout as e.g. <code>gen_server:call/2</code>.</p>
%% @end
%%
leader_call(Name, Request) ->
    case catch gen:call(Name, '$leader_call', Request) of
	{ok,{leader,reply,Res}} ->
	    Res;
	{ok,{error, leader_died}} ->
	    exit({leader_died, {?MODULE, leader_call, [Name, Request]}});
	{'EXIT',Reason} ->
	    exit({Reason, {?MODULE, leader_call, [Name, Request]}})
    end.

%% @spec leader_call(Name::name(), Request::term(), Timeout::integer())
%%    -> Reply
%%
%%    Reply = term()
%%
%% @doc Makes a call (similar to <code>gen_server:call/3</code>) to the 
%% leader. The call is forwarded via the local gen_leader instance, if 
%% that one isn't actually the leader. The client will exit if the 
%% leader dies while the request is outstanding.
%% @end
%%
leader_call(Name, Request, Timeout) ->
    case catch gen:call(Name, '$leader_call', Request, Timeout) of
	{ok,{leader,reply,Res}} ->
	    Res;
	{ok,{error, leader_died}} ->
	    exit({leader_died, {?MODULE, leader_call, [Name, Request]}});
	{'EXIT',Reason} ->
	    exit({Reason, {?MODULE, leader_call, [Name, Request, Timeout]}})
    end.



%% @equiv gen_server:cast/2
cast(Name, Request) ->
    catch do_cast('$gen_cast', Name, Request),
    ok.

%% @spec leader_cast(Name::name(), Msg::term()) -> ok
%% @doc Similar to <code>gen_server:cast/2</code> but will be forwarded to
%% the leader via the local gen_leader instance.
leader_cast(Name, Request) ->
    catch do_cast('$leader_cast', Name, Request),
    ok.


do_cast(Tag, Name, Request) when atom(Name) ->
    Name ! {Tag, Request};
do_cast(Tag, Pid, Request) when pid(Pid) ->
    Pid ! {Tag, Request}.


%% @spec reply(From::callerRef(), Reply::term()) -> Void
%% @equiv gen_server:reply/2
reply({To, Tag}, Reply) ->
    catch To ! {Tag, Reply}.


%%% ---------------------------------------------------
%%% Initiate the new process.
%%% Register the name using the Rfunc function
%%% Calls the Mod:init/Args function.
%%% Finally an acknowledge is sent to Parent and the main
%%% loop is entered.
%%% ---------------------------------------------------
%%% @hidden
init_it(Starter, self, Name, Mod, {CandidateNodes, Workers, Arg}, Options) ->
    if CandidateNodes == [] ->
	    erlang:error(no_candidates);
       true ->
	    init_it(Starter, self(), Name, Mod, 
		    {CandidateNodes, Workers, Arg}, Options)
    end;
init_it(Starter,Parent,Name,Mod,{local_only, _}=Arg,Options) ->
    Debug = debug_options(Name, Options),
    reg_behaviour(),
    case catch Mod:init(Arg) of
	{stop, Reason} ->
	    proc_lib:init_ack(Starter, {error, Reason}),
	    exit(Reason);
	ignore ->
	    proc_lib:init_ack(Starter, ignore),
	    exit(normal);
	{'EXIT', Reason} ->
	    proc_lib:init_ack(Starter, {error, Reason}),
	    exit(Reason);
	{ok, State} ->
	    proc_lib:init_ack(Starter, {ok, self()}),
	    Server = #server{parent = Parent,
			     mod = Mod,
			     state = State,
			     debug = Debug},
	    loop(Server, local_only, #election{name = Name, mode = local});
	Other ->
	    Error = {bad_return_value, Other},
	    proc_lib:init_ack(Starter, {error, Error}),
	    exit(Error)
    end;
init_it(Starter,Parent,Name,Mod,{CandidateNodes,Workers,Arg},Options) ->
    Debug = debug_options(Name, Options),
    reg_behaviour(),
    AmCandidate = member(node(), CandidateNodes),
    Election = init_election(CandidateNodes, Workers, #election{name = Name}),
    case {catch Mod:init(Arg), AmCandidate} of
	{{stop, Reason},_} ->
	    proc_lib:init_ack(Starter, {error, Reason}),
	    exit(Reason);
	{ignore,_} ->
	    proc_lib:init_ack(Starter, ignore),
	    exit(normal);
	{{'EXIT', Reason},_} ->
	    proc_lib:init_ack(Starter, {error, Reason}),
	    exit(Reason);
	{{ok, State}, true} ->
%%%	    NewE = broadcast(capture,Workers++(CandidateNodes -- [node()]),
%%%			     Election),
	    proc_lib:init_ack(Starter, {ok, self()}), 	  
	    begin_election(#server{parent = Parent,
				   mod = Mod,
				   state = State,
				   debug = Debug}, candidate, Election);
	{{ok, State}, false} ->
%%%	    NewE = broadcast(add_worker, CandidateNodes, Election),
	    proc_lib:init_ack(Starter, {ok, self()}),
	    begin_election(#server{parent = Parent,
				   mod = Mod,
				   state = State,
				   debug = Debug}, waiting_worker, Election);
	Else ->
	    Error = {bad_return_value, Else},
	    proc_lib:init_ack(Starter, {error, Error}),
	    exit(Error)
    end.

reg_behaviour() ->
    catch gproc:reg({p,l,behaviour}, ?MODULE).

init_election(CandidateNodes, Workers, E) ->
%%%    dbg:tracer(),
%%%    dbg:tpl(?MODULE,lexcompare,[]),
%%%    dbg:p(self(),[m,c]),
    AmCandidate = member(node(), CandidateNodes),
    case AmCandidate of
	true ->
	    E#election{mode = global,
		       candidate_nodes = CandidateNodes,
		       worker_nodes = Workers,
		       iteration = {[], 
				    position(
				      node(),CandidateNodes)}};
	false ->
	    E#election{mode = global,
		       candidate_nodes = CandidateNodes,
		       worker_nodes = Workers}
    end.

begin_election(#server{mod = Mod, state = State} = Server, candidate,
	       #election{candidate_nodes = Cands,
			 worker_nodes = Workers} = E) ->
    case Cands of
	[N] when N == node() ->
	    {ok, Synch, NewState} = Mod:elected(State, E),
	    NewE = broadcast({elect,Synch}, E),
	    loop(Server#server{state = NewState}, elected, NewE);
	_ ->
	    NewE = broadcast(capture,Workers++(Cands -- [node()]), E),
	    safe_loop(Server, candidate, NewE)
    end;
begin_election(Server, waiting_worker, #election{candidate_nodes = Cands}=E) ->
    NewE = broadcast(add_worker, Cands, E),
    safe_loop(Server, waiting_worker, NewE).
    

%%% ---------------------------------------------------
%%% The MAIN loop.
%%% ---------------------------------------------------


safe_loop(#server{mod = Mod, state = State} = Server, Role,
	  #election{name = Name} = E) ->
    receive
	{system, From, Req} ->
	    #server{parent = Parent, debug = Debug} = Server,
	    sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
				  [safe, Server, Role, E]);
	{'EXIT', _Parent, Reason} = Msg ->
	    terminate(Reason, Msg, Server, Role, E);
	{leader,capture,Iteration,_Node,Candidate} ->
	    case Role of
		candidate ->
		    NewE =
			nodeup(node(Candidate),E),
		    case lexcompare(NewE#election.iteration,Iteration) of
			less ->
			    Candidate ! 
				{leader,accept,
				 NewE#election.iteration,self()},
			    safe_loop(Server, captured, 
				      NewE#election{leader = Candidate});
			greater ->
			    %% I'll get either an accept or DOWN
			    %% from Candidate later
			    safe_loop(Server, Role, NewE);
			equal ->
			    safe_loop(Server, Role, NewE)
		    end;
		captured ->
		    NewE = nodeup(node(Candidate), E),
		    safe_loop(Server, Role, NewE);
		waiting_worker ->
		    NewE = 
			nodeup(node(Candidate),E),
		    safe_loop(Server, Role, NewE)
	    end;
	{leader,add_worker,Worker} ->
	    NewE = nodeup(node(Worker), E),
	    safe_loop(Server, Role, NewE);
	{leader,accept,Iteration,Candidate} ->
	    case Role of
		candidate ->
		    NewE =
			nodeup(node(Candidate),E),
		    {Captured,_} = Iteration,
		    NewIteration =   % inherit all procs that have been
				     % accepted by Candidate
			foldl(fun(C,Iter) ->
				      add_captured(Iter,C)
			      end,NewE#election.iteration,
			      [node(Candidate)|Captured]),
		    check_majority(NewE#election{
				     iteration = NewIteration}, Server);
		captured ->
		    %% forward this to the leader
		    E#election.leader ! {leader,accept,Iteration,Candidate},
		    NewE = nodeup(node(Candidate), E),
		    safe_loop(Server, Role, NewE)
	    end;
	{leader,elect,Synch,Candidate} ->
	    NewE = 
		case Role of
		    waiting_worker ->
			nodeup(node(Candidate),
			       E#election{
				 leader = Candidate,
				 leadernode = node(Candidate)});
		    _ ->
			nodeup(node(Candidate),
			       E#election{
				 leader = Candidate,
				 leadernode = node(Candidate),
				 iteration = {[],
					      position(
						node(),
						E#election.candidate_nodes)}
				})
		end,
	    {ok,NewState} = Mod:surrendered(State,Synch,NewE),
	    NewRole = case Role of
			  waiting_worker ->
			      worker;
			  _ ->
			      surrendered
		      end,
	    loop(Server#server{state = NewState}, NewRole, NewE);
	{leader, local_only, Node, Candidate} ->
	    case lists:keysearch(node(Candidate), 2, E#election.monitored) of
		{value, {Ref, N}} ->
		    NewE = down(Ref, {E#election.name,N},local_only,E),
		    io:format("local_only received from ~p~n"
			      "E0 = ~p~n"
			      "E1 = ~p~n", [Node, E, NewE]),
		    safe_after_down(Server, Role, NewE);
		false ->
		    safe_loop(Server, Role, E)
	    end;
	{'DOWN',Ref,process,{Name,_}=Who,Why} ->
	    NewE = 
		down(Ref,Who,Why,E),
	    safe_after_down(Server, Role, NewE)
    end.

safe_after_down(Server, Role, E) ->
    case {Role,E#election.leader} of
	{candidate,_} ->
	    check_majority(E, Server);
	{captured,none} ->
	    check_majority(broadcast(capture,E), Server);
	{waiting_worker,_} ->
	    safe_loop(Server, Role, E)
    end.


loop(#server{parent = Parent,
	     mod = Mod,
	     state = State,
	     debug = Debug} = Server, Role,
     #election{mode = Mode, name = Name} = E) ->
    Msg = receive

	      Input ->
		    Input
	  end,
    case Msg of
	{system, From, Req} ->
	    sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
				  [normal, Server, Role, E]);
	{'EXIT', Parent, Reason} ->
	    terminate(Reason, Msg, Server, Role, E);
	{leader, local_only, _, _Candidate} ->
	    loop(Server, Role, E);
	LeaderMsg when element(1,LeaderMsg) == leader, Mode == local ->
	    Candidate = element(size(LeaderMsg), LeaderMsg),
	    Candidate ! {leader, local_only, node(), self()},
	    loop(Server, Role, E);
	{leader,capture,_Iteration,_Node,Candidate} ->
	    NewE = nodeup(node(Candidate),E),
	    case Role of
		R when R == surrendered; R == worker ->
		    loop(Server, Role, NewE);
		elected ->
		    {ok,Synch,NewState} = Mod:elected(State,NewE),
		    Candidate ! {leader, elect, Synch, self()},
		    loop(Server#server{state = NewState}, Role, NewE)
	    end;
	{leader,accept,_Iteration,Candidate} ->
	    NewE = nodeup(node(Candidate),E),
	    case Role of
		surrendered ->
		    loop(Server, Role, NewE);
		elected ->
		    {ok,Synch,NewState} = Mod:elected(State,NewE),
		    Candidate ! {leader, elect, Synch, self()},
		    loop(Server#server{state = NewState}, Role, NewE)
	    end;
	{leader,elect,Synch,Candidate} ->
	    NewE = 
		case Role of
		    worker ->
			nodeup(node(Candidate),
			       E#election{
				 leader = Candidate,
				 leadernode = node(Candidate)});
		    surrendered ->
			nodeup(node(Candidate),
			       E#election{
				 leader = Candidate,
				 leadernode = node(Candidate),
				 iteration = {[],
					      position(
						node(),
						E#election.candidate_nodes)}
				})
		end,
	    {ok, NewState} = Mod:surrendered(State, Synch, NewE),
	    loop(Server#server{state = NewState}, Role, NewE);
	{'DOWN',Ref,process,{Name,Node} = Who,Why} ->
	    #election{alive = PreviouslyAlive} = E,
	    NewE = 
		down(Ref,Who,Why,E),
	    case NewE#election.leader of
		none ->
		    foreach(fun({_,From}) ->
				    reply(From,{error,leader_died})
			    end, E#election.buffered),
		    NewE1 = NewE#election{buffered = []},
		    case Role of 
			surrendered ->
			    check_majority(
			      broadcast(capture,NewE1), Server);
			worker ->
			    safe_loop(Server, waiting_worker, NewE1)
		    end;
		L when L == self() ->
		    case member(Node, PreviouslyAlive) of
			true ->
			    case Mod:handle_DOWN(Node, State, E) of
				{ok, NewState} ->
				    loop(Server#server{state = NewState},
					 Role, NewE);
				{ok, Broadcast, NewState} ->
				    NewE1 = broadcast(
					      {from_leader,Broadcast}, NewE),
				    loop(Server#server{state = NewState},
					 Role, NewE1)
			    end;
			false ->
			    loop(Server, Role, NewE)
		    end;
		_ ->
		    loop(Server, Role, NewE)
	    end;
	_Msg when Debug == [] ->
	    handle_msg(Msg, Server, Role, E);
	_Msg ->
	    Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, 
				      E#election.name, {in, Msg}),
	    handle_msg(Msg, Server#server{debug = Debug1}, Role, E)
    end.

%%-----------------------------------------------------------------
%% Callback functions for system messages handling.
%%-----------------------------------------------------------------

%% @hidden
system_continue(_Parent, Debug, [safe, Server, Role, E]) ->
    safe_loop(Server#server{debug = Debug}, Role, E);
system_continue(_Parent, Debug, [normal, Server, Role, E]) ->
    loop(Server#server{debug = Debug}, Role, E).

%% @hidden
system_terminate(Reason, _Parent, Debug, [_Mode, Server, Role, E]) ->
    terminate(Reason, [], Server#server{debug = Debug}, Role, E).

%% @hidden
system_code_change([Mode, Server, Role, E], _Module, OldVsn, Extra) ->
    #server{mod = Mod, state = State} = Server,
    case catch Mod:code_change(OldVsn, State, E, Extra) of
	{ok, NewState} ->
	    NewServer = Server#server{state = NewState},
	    {ok, [Mode, NewServer, Role, E]};
	{ok, NewState, NewE} ->
	    NewServer = Server#server{state = NewState},
	    {ok, [Mode, NewServer, Role, NewE]};
	Else -> Else
    end.

%%-----------------------------------------------------------------
%% Format debug messages.  Print them as the call-back module sees
%% them, not as the real erlang messages.  Use trace for that.
%%-----------------------------------------------------------------
%% @hidden
print_event(Dev, {in, Msg}, Name) ->
    case Msg of
	{'$gen_call', {From, _Tag}, Call} ->
	    io:format(Dev, "*DBG* ~p got local call ~p from ~w~n",
		      [Name, Call, From]);
	{'$leader_call', {From, _Tag}, Call} ->
	    io:format(Dev, "*DBG* ~p got global call ~p from ~w~n",
		      [Name, Call, From]);
	{'$gen_cast', Cast} ->
	    io:format(Dev, "*DBG* ~p got local cast ~p~n",
		      [Name, Cast]);
	{'$leader_cast', Cast} ->
	    io:format(Dev, "*DBG* ~p got global cast ~p~n",
		      [Name, Cast]);
	_ ->
	    io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg])
    end;
print_event(Dev, {out, Msg, To, State}, Name) ->
    io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n", 
	      [Name, Msg, To, State]);
print_event(Dev, {noreply, State}, Name) ->
    io:format(Dev, "*DBG* ~p new state ~w~n", [Name, State]);
print_event(Dev, Event, Name) ->
    io:format(Dev, "*DBG* ~p dbg  ~p~n", [Name, Event]).


handle_msg({'$leader_call', From, Request} = Msg, 
	   #server{mod = Mod, state = State} = Server, elected = Role, E) ->
    case catch Mod:handle_leader_call(Request, From, State, E) of
	{reply, Reply, NState} ->
	    NewServer = reply(From, {leader,reply,Reply},
			      Server#server{state = NState}, Role, E),
	    loop(NewServer, Role, E);
	{reply, Reply, Broadcast, NState} ->
	    NewE = broadcast({from_leader,Broadcast}, E),
	    NewServer = reply(From, {leader,reply,Reply},
			      Server#server{state = NState}, Role,
			      NewE),
	    loop(NewServer, Role, NewE);
	{noreply, NState} = Reply ->
	    NewServer = handle_debug(Server#server{state = NState},
				     Role, E, Reply),
	    loop(NewServer, Role, E);
	{stop, Reason, Reply, NState} ->
	    {'EXIT', R} = 
		(catch terminate(Reason, Msg, 
				 Server#server{state = NState},
				 Role, E)),
	    reply(From, Reply),
	    exit(R);
	Other ->
	    handle_common_reply(Other, Msg, Server, Role, E)
    end;
handle_msg({'$leader_call', From, Request} = Msg,
	   #server{mod = Mod, state = State} = Server, Role,
	   #election{mode = local} = E) ->
    Reply = (catch Mod:handle_leader_call(Request,From,State,E)),
    handle_call_reply(Reply, Msg, Server, Role, E);
%%%    handle_common_reply(Reply, Msg, Server, Role, E);
handle_msg({'$leader_cast', Cast} = Msg,
	   #server{mod = Mod, state = State} = Server, Role,
	   #election{mode = local} = E) ->
    Reply = (catch Mod:handle_leader_cast(Cast,State,E)),
    handle_common_reply(Reply, Msg, Server, Role, E);
handle_msg({'$leader_cast', Cast} = Msg, 
	   #server{mod = Mod, state = State} = Server, elected = Role, E) ->
    Reply = (catch Mod:handle_leader_cast(Cast, State, E)),
    handle_common_reply(Reply, Msg, Server, Role, E);
handle_msg({from_leader, Cmd} = Msg, 
	   #server{mod = Mod, state = State} = Server, Role, E) ->
    handle_common_reply(catch Mod:from_leader(Cmd, State, E), 
			Msg, Server, Role, E);
handle_msg({'$leader_call', From, Request}, Server, Role,
	   #election{buffered = Buffered, leader = Leader} = E) ->
    Ref = make_ref(),
    Leader ! {'$leader_call', {self(),Ref}, Request},
    NewBuffered = [{Ref,From}|Buffered],
    loop(Server, Role, E#election{buffered = NewBuffered});
handle_msg({Ref, {leader,reply,Reply}}, Server, Role,
	   #election{buffered = Buffered} = E) ->
    {value, {_,From}} = keysearch(Ref,1,Buffered),
    NewServer = reply(From, {leader,reply,Reply}, Server, Role,
		      E#election{buffered = keydelete(Ref,1,Buffered)}),
    loop(NewServer, Role, E);
handle_msg({'$gen_call', From, Request} = Msg, 
	   #server{mod = Mod, state = State} = Server, Role, E) ->
    Reply = (catch Mod:handle_call(Request, From, State)),
    handle_call_reply(Reply, Msg, Server, Role, E);
handle_msg({'$gen_cast',Msg} = Cast,
	   #server{mod = Mod, state = State} = Server, Role, E) ->
    handle_common_reply(catch Mod:handle_cast(Msg, State), 
			Cast, Server, Role, E);
handle_msg(Msg,
	   #server{mod = Mod, state = State} = Server, Role, E) ->
    handle_common_reply(catch Mod:handle_info(Msg, State),
			Msg, Server, Role, E).


handle_call_reply(CB_reply, {_, From, _Request} = Msg, Server, Role, E) ->
    case CB_reply of
    	{reply, Reply, NState} ->
	    NewServer = reply(From, Reply, 
			      Server#server{state = NState}, Role, E),
	    loop(NewServer, Role, E);
	{noreply, NState} = Reply ->
	    NewServer = handle_debug(Server#server{state = NState},
				     Role, E, Reply),
	    loop(NewServer, Role, E);
	{activate, Cands, Workers, Reply, NState}
	when E#election.mode == local ->
	    NewRole = case member(node(), Cands) of
			  true -> candidate;
			  false -> waiting_worker
		      end,
	    reply(From, Reply),
	    NServer = Server#server{state = NState},
	    NewE = init_election(Cands, Workers, E),
	    io:format("activating: NewE = ~p~n", [NewE]),
	    begin_election(NServer, NewRole, NewE);
	{stop, Reason, Reply, NState} ->
	    {'EXIT', R} = 
		(catch terminate(Reason, Msg, Server#server{state = NState},
				 Role, E)),
	    reply(From, Reply),
	    exit(R);
	Other ->
	    handle_common_reply(Other, Msg, Server, Role, E)
    end.


handle_common_reply(Reply, Msg, Server, Role, E) ->
    case Reply of
	{ok, NState} ->
	    NewServer = handle_debug(Server#server{state = NState},
				     Role, E, Reply),
	    loop(NewServer, Role, E);
	{ok, Broadcast, NState} ->
	    NewE = broadcast({from_leader,Broadcast}, E),
	    NewServer = handle_debug(Server#server{state = NState},
				     Role, E, Reply),
	    loop(NewServer, Role, NewE);
	{stop, Reason, NState} ->
	    terminate(Reason, Msg, Server#server{state = NState}, Role, E);
	{'EXIT', Reason} ->
	    terminate(Reason, Msg, Server, Role, E);
	_ ->
	    terminate({bad_return_value, Reply}, Msg, Server, Role, E)
    end.


reply({To, Tag}, Reply, #server{state = State} = Server, Role, E) ->
    reply({To, Tag}, Reply),
    handle_debug(Server, Role, E, {out, Reply, To, State}).


handle_debug(#server{debug = []} = Server, _Role, _E, _Event) ->
    Server;
handle_debug(#server{debug = Debug} = Server, _Role, E, Event) ->
    Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, 
			      E#election.name, Event),
    Server#server{debug = Debug1}.

%%% ---------------------------------------------------
%%% Terminate the server.
%%% ---------------------------------------------------

terminate(Reason, Msg, #server{mod = Mod, 
			       state = State,
			       debug = Debug}, _Role,
	  #election{name = Name}) ->
    case catch Mod:terminate(Reason, State) of
	{'EXIT', R} ->
	    error_info(R, Name, Msg, State, Debug),
	    exit(R);
	_ ->
	    case Reason of
		normal ->
		    exit(normal);
		shutdown ->
		    exit(shutdown);
		_ ->
		    error_info(Reason, Name, Msg, State, Debug),
		    exit(Reason)
	    end
    end.

%% Maybe we shouldn't do this?  We have the crash report...
error_info(Reason, Name, Msg, State, Debug) ->
    format("** Generic leader ~p terminating \n"
           "** Last message in was ~p~n"
           "** When Server state == ~p~n"
           "** Reason for termination == ~n** ~p~n",
	   [Name, Msg, State, Reason]),
    sys:print_log(Debug),
    ok.

%%% ---------------------------------------------------
%%% Misc. functions.
%%% ---------------------------------------------------

opt(Op, [{Op, Value}|_]) ->
    {ok, Value};
opt(Op, [_|Options]) ->
    opt(Op, Options);
opt(_, []) ->
    false.

debug_options(Name, Opts) ->
    case opt(debug, Opts) of
	{ok, Options} -> dbg_options(Name, Options);
	_ -> dbg_options(Name, [])
    end.

dbg_options(Name, []) ->
    Opts = 
	case init:get_argument(generic_debug) of
	    error ->
		[];
	    _ ->
		[log, statistics]
	end,
    dbg_opts(Name, Opts);
dbg_options(Name, Opts) ->
    dbg_opts(Name, Opts).

dbg_opts(Name, Opts) ->
    case catch sys:debug_options(Opts) of
	{'EXIT',_} ->
	    format("~p: ignoring erroneous debug options - ~p~n",
		   [Name, Opts]),
	    [];
	Dbg ->
	    Dbg
    end.

%%-----------------------------------------------------------------
%% Status information
%%-----------------------------------------------------------------
%% @hidden
format_status(Opt, StatusData) ->
    [PDict, SysState, Parent, Debug, [_Mode, Server, _Role, E]] = StatusData,
    Header = lists:concat(["Status for generic server ", E#election.name]),
    Log = sys:get_debug(log, Debug, []),
    #server{mod = Mod, state = State} = Server,
    Specific = 
	case erlang:function_exported(Mod, format_status, 2) of
	    true ->
		case catch apply(Mod, format_status, [Opt, [PDict, State]]) of
		    {'EXIT', _} -> [{data, [{"State", State}]}];
		    Else -> Else
		end;
	    _ ->
		[{data, [{"State", State}]}]
	end,
    [{header, Header},
     {data, [{"Status", SysState},
	     {"Parent", Parent},
	     {"Logged events", Log}]} |
     Specific].




broadcast(Msg, #election{monitored = Monitored} = E) ->
    %% When broadcasting the first time, we broadcast to all candidate nodes,
    %% using broadcast/3. This function is used for subsequent broadcasts,
    %% and we make sure only to broadcast to already known nodes.
    %% It's the responsibility of new nodes to make themselves known through
    %% a wider broadcast.
    ToNodes = [N || {_,N} <- Monitored],
    broadcast(Msg, ToNodes, E).

broadcast(capture, ToNodes, #election{monitored = Monitored} = E) ->
    ToMonitor = [N || N <- ToNodes,
                      not(keymember(N,2,Monitored))],
    NewE = 
        foldl(fun(Node,Ex) ->
                      Ref = erlang:monitor(
			      process,{Ex#election.name,Node}),
                      Ex#election{monitored = [{Ref,Node}|
					      Ex#election.monitored]}
              end,E,ToMonitor),
    foreach(
      fun(Node) ->
	      {NewE#election.name,Node} !
		  {leader,capture,NewE#election.iteration,node(),self()}
      end,ToNodes),
    NewE;
broadcast({elect,Synch},ToNodes,E) ->
    foreach(
      fun(Node) ->
	      {E#election.name,Node} ! {leader,elect,Synch,self()}
      end,ToNodes),
    E;
broadcast({from_leader, Msg}, ToNodes, E) ->
    foreach(
      fun(Node) ->
	      {E#election.name,Node} ! {from_leader, Msg}
      end,ToNodes),
    E;
broadcast(add_worker, ToNodes, E) ->
    foreach(
      fun(Node) ->
	      {E#election.name,Node} ! {leader, add_worker, self()}
      end,ToNodes),
    E.



check_majority(E, Server) ->
    {Captured,_} = E#election.iteration,
    AcceptMeAsLeader = length(Captured) + 1,   % including myself
    NrCandidates = length(E#election.candidate_nodes),
    NrDown = E#election.down,
    if AcceptMeAsLeader > NrCandidates/2 ->
	    NewE = E#election{leader = self(), leadernode = node()},
	    {ok,Synch,NewState} =
		(Server#server.mod):elected(Server#server.state, NewE),
	    NewE1 = broadcast({elect,Synch}, NewE),
	    loop(Server#server{state = NewState}, elected, NewE1);
       AcceptMeAsLeader+length(NrDown) == NrCandidates -> 
	    NewE = E#election{leader = self(), leadernode = node()},
	    {ok,Synch,NewState} =
		(Server#server.mod):elected(Server#server.state, NewE),
	    NewE1 = broadcast({elect,Synch}, NewE),
	    loop(Server#server{state = NewState}, elected, NewE1);
       true ->
	    safe_loop(Server, candidate, E)
    end.


down(Ref,_Who,Why,E) ->
    case lists:keysearch(Ref,1,E#election.monitored) of
	{value, {_,Node}} ->
	    NewMonitored = if Why == local_only -> E#election.monitored;
			      true ->
				   E#election.monitored -- [{Ref,Node}]
			   end,
	    {Captured,Pos} = E#election.iteration,
	    case Node == E#election.leadernode of
		true ->
		    E#election{leader = none,
			       leadernode = none,
			       iteration = {Captured -- [Node],
					    Pos},  % TAKE CARE !
			       down = [Node|E#election.down],
			       alive = E#election.alive -- [Node],
			       monitored = NewMonitored};
		false ->
		    Down = case member(Node,E#election.candidate_nodes) of
			       true ->
				   [Node|E#election.down];
			       false ->
				   E#election.down
			   end,
		    E#election{iteration = {Captured -- [Node],
					    Pos},  % TAKE CARE !
			       down = Down,
			       alive = E#election.alive -- [Node],
			       monitored = NewMonitored}
	    end
    end.



%% position of element counted from end of the list
%%
position(X,[Head|Tail]) ->
    case X==Head of
        true ->
            length(Tail);
        false ->
            position(X,Tail)
    end.

%% This is a multi-level comment
%% This is the second line of the comment
lexcompare({C1,P1},{C2,P2}) ->
    lexcompare([{length(C1),length(C2)},{P1,P2}]).

lexcompare([]) ->
    equal;
lexcompare([{X,Y}|Rest]) ->
    if X<Y  -> less;
       X==Y -> lexcompare(Rest);
       X>Y  -> greater
    end.

add_captured({Captured,Pos}, CandidateNode) ->
    {[CandidateNode|[ Node || Node <- Captured,
			      Node =/= CandidateNode ]], Pos}.

nodeup(Node, #election{monitored = Monitored,
		       alive = Alive,
		       down = Down} = E) ->
    %% make sure process is monitored from now on
    case [ N || {_,N}<-Monitored, N==Node] of
        [] ->
            Ref = erlang:monitor(process,{E#election.name,Node}),
            E#election{down = Down -- [Node],
		       alive = [Node | Alive],
		       monitored = [{Ref,Node}|Monitored]};
        _ ->    % already monitored, thus not in down
            E#election{alive = [Node | [N || N <- Alive,
					     N =/= Node]]}
    end.