gen_leader.erl 33 KB


  1. %% ``The contents of this file are subject to the Erlang Public License,
  2. %% Version 1.1, (the "License"); you may not use this file except in
  3. %% compliance with the License. You should have received a copy of the
  4. %% Erlang Public License along with this software. If not, it can be
  5. %% retrieved via the world wide web at http://www.erlang.org/.
  6. %%
  7. %% Software distributed under the License is distributed on an "AS IS"
  8. %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  9. %% the License for the specific language governing rights and limitations
  10. %% under the License.
  11. %%
  12. %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
  13. %% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
  14. %% AB. All Rights Reserved.''
  15. %%
  16. %% @author Ulf Wiger <ulf.wiger@ericsson.com>
  17. %% @author Thomas Arts <thomas.arts@ituniv.se>
  18. %%
  19. %% @doc Leader election behaviour.
  20. %% <p>This application implements a leader election behaviour modeled after
  21. %% gen_server. This behaviour intends to make it reasonably
  22. %% straightforward to implement a fully distributed server with
  23. %% master-slave semantics.</p>
  24. %% <p>The gen_leader behaviour supports nearly everything that gen_server
  25. %% does (some functions, such as multicall() and the internal timeout,
  26. %% have been removed), and adds a few callbacks and API functions to
  27. %% support leader election etc.</p>
  28. %% <p>Also included is an example program, a global dictionary, based
  29. %% on the modules gen_leader and dict. The callback implementing the
  30. %% global dictionary is called 'test_cb', for no particularly logical
  31. %% reason.</p>
  32. %% @end
  33. %%
  34. %% @type election() = tuple(). Opaque state of the gen_leader behaviour.
  35. %% @type node() = atom(). A node name.
  36. %% @type name() = atom(). A locally registered name.
  37. %% @type serverRef() = Name | {name(),node()} | {global,Name} | pid().
  38. %% See gen_server.
  39. %% @type callerRef() = {pid(), reference()}. See gen_server.
  40. %%
  41. -module(gen_leader).
  42. -export([start/4, start/6,
  43. start_link/4, start_link/6,
  44. leader_call/2, leader_call/3, leader_cast/2,
  45. call/2, call/3, cast/2,
  46. reply/2]).
  47. %% Query functions
  48. -export([alive/1,
  49. down/1,
  50. candidates/1,
  51. workers/1]).
  52. -export([
  53. system_continue/3,
  54. system_terminate/4,
  55. system_code_change/4,
  56. format_status/2
  57. ]).
  58. -export([behaviour_info/1]).
  59. %% Internal exports
  60. -export([init_it/6, print_event/3
  61. %%, safe_send/2
  62. ]).
  63. -import(error_logger , [format/2]).
  64. -import(lists, [foldl/3,
  65. foreach/2,
  66. member/2,
  67. keydelete/3,
  68. keysearch/3,
  69. keymember/3]).
  70. -record(election,{leader = none,
  71. mode = global,
  72. name,
  73. leadernode = none,
  74. candidate_nodes = [],
  75. worker_nodes = [],
  76. alive = [],
  77. iteration,
  78. down = [],
  79. monitored = [],
  80. buffered = []
  81. }).
  82. -record(server, {parent,
  83. mod,
  84. state,
  85. debug}).
  86. %%% ---------------------------------------------------
  87. %%% Interface functions.
  88. %%% ---------------------------------------------------
  89. %% @hidden
  90. behaviour_info(callbacks) ->
  91. [{init,1},
  92. {elected,2},
  93. {surrendered,3},
  94. {handle_leader_call,4},
  95. {handle_leader_cast,3},
  96. {handle_local_only, 4},
  97. {from_leader,3},
  98. {handle_call,3},
  99. {handle_cast,2},
  100. {handle_DOWN,3},
  101. {handle_info,2},
  102. {terminate,2},
  103. {code_change,4}];
  104. behaviour_info(_Other) ->
  105. undefined.
  106. start(Name, Mod, Arg, Options) when is_atom(Name) ->
  107. gen:start(?MODULE, nolink, {local,Name},
  108. Mod, {local_only, Arg}, Options).
  109. %% @spec start(Name::node(), CandidateNodes::[node()],
  110. %% Workers::[node()], Mod::atom(), Arg, Options::list()) ->
  111. %% {ok,pid()}
  112. %%
  113. %% @doc Starts a gen_leader process without linking to the parent.
  114. %%
  115. start(Name, [_|_] = CandidateNodes, Workers, Mod, Arg, Options)
  116. when is_atom(Name) ->
  117. gen:start(?MODULE, nolink, {local,Name},
  118. Mod, {CandidateNodes, Workers, Arg}, Options).
  119. %% @spec start_link(Name::atom(), CandidateNodes::[atom()],
  120. %% Workers::[atom()], Mod::atom(), Arg, Options::list()) ->
  121. %% {ok, pid()}
  122. %%
  123. %% @doc Starts a gen_leader process.
  124. %% <table>
  125. %% <tr><td>Name</td><td>The locally registered name of the process</td></tr>
  126. %% <tr><td>CandidateNodes</td><td>The names of nodes capable of assuming
  127. %% a leadership role</td></tr>
  128. %% <tr><td>Workers</td>
  129. %% <td>The names of nodes that will be part of the "cluster",
  130. %% but cannot ever assume a leadership role.</td></tr>
  131. %% <tr><td>Mod</td><td>The name of the callback module</td></tr>
  132. %% <tr><td>Arg</td><td>Argument passed on to <code>Mod:init/1</code></td></tr>
  133. %% <tr><td>Options</td><td>Same as gen_server's Options</td></tr>
  134. %% </table>
  135. %%
  136. %% <p>The list of candidates needs to be known from the start. Workers
  137. %% can be added at runtime.</p>
  138. %% @end
  139. start_link(Name, [_|_] = CandidateNodes, Workers,
  140. Mod, Arg, Options) when is_atom(Name) ->
  141. gen:start(?MODULE, link, {local,Name}, Mod,
  142. {CandidateNodes, Workers, Arg}, Options).
  143. start_link(Name, Mod, Arg, Options) when is_atom(Name) ->
  144. gen:start(?MODULE, link, {local,Name}, Mod,
  145. {local_only, Arg}, Options).
  146. %% Query functions to be used from the callback module
  147. %% @spec alive(E::election()) -> [node()]
  148. %%
  149. %% @doc Returns a list of live nodes (candidates and workers).
  150. %%
  151. alive(#election{alive = Alive}) ->
  152. Alive.
  153. %% @spec down(E::election()) -> [node()]
  154. %%
  155. %% @doc Returns a list of candidates currently not running.
  156. %%
  157. down(#election{down = Down}) ->
  158. Down.
  159. %% @spec candidates(E::election()) -> [node()]
  160. %%
  161. %% @doc Returns a list of known candidates.
  162. %%
  163. candidates(#election{candidate_nodes = Cands}) ->
  164. Cands.
  165. %% @spec workers(E::election()) -> [node()]
  166. %%
  167. %% @doc Returns a list of known workers.
  168. %%
  169. workers(#election{worker_nodes = Workers}) ->
  170. Workers.
  171. %% @spec call(Name::serverRef(), Request) -> term()
  172. %%
  173. %% @doc Equivalent to <code>gen_server:call/2</code>, but with a slightly
  174. %% different exit reason if something goes wrong. This function calls
  175. %% the <code>gen_leader</code> process exactly as if it were a gen_server
  176. %% (which, for practical purposes, it is.)
  177. %% @end
  178. call(Name, Request) ->
  179. case catch gen:call(Name, '$gen_call', Request) of
  180. {ok,Res} ->
  181. Res;
  182. {'EXIT',Reason} ->
  183. exit({Reason, {?MODULE, local_call, [Name, Request]}})
  184. end.
  185. %% @spec call(Name::serverRef(), Request, Timeout::integer()) ->
  186. %% Reply
  187. %%
  188. %% Reply = term()
  189. %%
  190. %% @doc Equivalent to <code>gen_server:call/3</code>, but with a slightly
  191. %% different exit reason if something goes wrong. This function calls
  192. %% the <code>gen_leader</code> process exactly as if it were a gen_server
  193. %% (which, for practical purposes, it is.)
  194. %% @end
  195. call(Name, Request, Timeout) ->
  196. case catch gen:call(Name, '$gen_call', Request, Timeout) of
  197. {ok,Res} ->
  198. Res;
  199. {'EXIT',Reason} ->
  200. exit({Reason, {?MODULE, local_call, [Name, Request, Timeout]}})
  201. end.
  202. %% @spec leader_call(Name::name(), Request::term())
  203. %% -> Reply
  204. %%
  205. %% Reply = term()
  206. %%
  207. %% @doc Makes a call (similar to <code>gen_server:call/2</code>) to the
  208. %% leader. The call is forwarded via the local gen_leader instance, if
  209. %% that one isn't actually the leader. The client will exit if the
  210. %% leader dies while the request is outstanding.
  211. %% <p>This function uses <code>gen:call/3</code>, and is subject to the
  212. %% same default timeout as e.g. <code>gen_server:call/2</code>.</p>
  213. %% @end
  214. %%
  215. leader_call(Name, Request) ->
  216. case catch gen:call(Name, '$leader_call', Request) of
  217. {ok,{leader,reply,Res}} ->
  218. Res;
  219. {ok,{error, leader_died}} ->
  220. exit({leader_died, {?MODULE, leader_call, [Name, Request]}});
  221. {'EXIT',Reason} ->
  222. exit({Reason, {?MODULE, leader_call, [Name, Request]}})
  223. end.
  224. %% @spec leader_call(Name::name(), Request::term(), Timeout::integer())
  225. %% -> Reply
  226. %%
  227. %% Reply = term()
  228. %%
  229. %% @doc Makes a call (similar to <code>gen_server:call/3</code>) to the
  230. %% leader. The call is forwarded via the local gen_leader instance, if
  231. %% that one isn't actually the leader. The client will exit if the
  232. %% leader dies while the request is outstanding.
  233. %% @end
  234. %%
  235. leader_call(Name, Request, Timeout) ->
  236. case catch gen:call(Name, '$leader_call', Request, Timeout) of
  237. {ok,{leader,reply,Res}} ->
  238. Res;
  239. {ok,{error, leader_died}} ->
  240. exit({leader_died, {?MODULE, leader_call, [Name, Request]}});
  241. {'EXIT',Reason} ->
  242. exit({Reason, {?MODULE, leader_call, [Name, Request, Timeout]}})
  243. end.
  244. %% @equiv gen_server:cast/2
  245. cast(Name, Request) ->
  246. catch do_cast('$gen_cast', Name, Request),
  247. ok.
  248. %% @spec leader_cast(Name::name(), Msg::term()) -> ok
  249. %% @doc Similar to <code>gen_server:cast/2</code> but will be forwarded to
  250. %% the leader via the local gen_leader instance.
  251. leader_cast(Name, Request) ->
  252. catch do_cast('$leader_cast', Name, Request),
  253. ok.
  254. do_cast(Tag, Name, Request) when atom(Name) ->
  255. Name ! {Tag, Request};
  256. do_cast(Tag, Pid, Request) when pid(Pid) ->
  257. Pid ! {Tag, Request}.
  258. %% @spec reply(From::callerRef(), Reply::term()) -> Void
  259. %% @equiv gen_server:reply/2
  260. reply({To, Tag}, Reply) ->
  261. catch To ! {Tag, Reply}.
  262. %%% ---------------------------------------------------
  263. %%% Initiate the new process.
  264. %%% Register the name using the Rfunc function
  265. %%% Calls the Mod:init/Args function.
  266. %%% Finally an acknowledge is sent to Parent and the main
  267. %%% loop is entered.
  268. %%% ---------------------------------------------------
  269. %%% @hidden
  270. init_it(Starter, self, Name, Mod, {CandidateNodes, Workers, Arg}, Options) ->
  271. if CandidateNodes == [] ->
  272. erlang:error(no_candidates);
  273. true ->
  274. init_it(Starter, self(), Name, Mod,
  275. {CandidateNodes, Workers, Arg}, Options)
  276. end;
  277. init_it(Starter,Parent,Name,Mod,{local_only, _}=Arg,Options) ->
  278. Debug = debug_options(Name, Options),
  279. reg_behaviour(),
  280. case catch Mod:init(Arg) of
  281. {stop, Reason} ->
  282. proc_lib:init_ack(Starter, {error, Reason}),
  283. exit(Reason);
  284. ignore ->
  285. proc_lib:init_ack(Starter, ignore),
  286. exit(normal);
  287. {'EXIT', Reason} ->
  288. proc_lib:init_ack(Starter, {error, Reason}),
  289. exit(Reason);
  290. {ok, State} ->
  291. proc_lib:init_ack(Starter, {ok, self()}),
  292. Server = #server{parent = Parent,
  293. mod = Mod,
  294. state = State,
  295. debug = Debug},
  296. loop(Server, local_only, #election{name = Name, mode = local});
  297. Other ->
  298. Error = {bad_return_value, Other},
  299. proc_lib:init_ack(Starter, {error, Error}),
  300. exit(Error)
  301. end;
  302. init_it(Starter,Parent,Name,Mod,{CandidateNodes,Workers,Arg},Options) ->
  303. Debug = debug_options(Name, Options),
  304. reg_behaviour(),
  305. AmCandidate = member(node(), CandidateNodes),
  306. Election = init_election(CandidateNodes, Workers, #election{name = Name}),
  307. case {catch Mod:init(Arg), AmCandidate} of
  308. {{stop, Reason},_} ->
  309. proc_lib:init_ack(Starter, {error, Reason}),
  310. exit(Reason);
  311. {ignore,_} ->
  312. proc_lib:init_ack(Starter, ignore),
  313. exit(normal);
  314. {{'EXIT', Reason},_} ->
  315. proc_lib:init_ack(Starter, {error, Reason}),
  316. exit(Reason);
  317. {{ok, State}, true} ->
  318. %%% NewE = broadcast(capture,Workers++(CandidateNodes -- [node()]),
  319. %%% Election),
  320. proc_lib:init_ack(Starter, {ok, self()}),
  321. begin_election(#server{parent = Parent,
  322. mod = Mod,
  323. state = State,
  324. debug = Debug}, candidate, Election);
  325. {{ok, State}, false} ->
  326. %%% NewE = broadcast(add_worker, CandidateNodes, Election),
  327. proc_lib:init_ack(Starter, {ok, self()}),
  328. begin_election(#server{parent = Parent,
  329. mod = Mod,
  330. state = State,
  331. debug = Debug}, waiting_worker, Election);
  332. Else ->
  333. Error = {bad_return_value, Else},
  334. proc_lib:init_ack(Starter, {error, Error}),
  335. exit(Error)
  336. end.
  337. reg_behaviour() ->
  338. catch gproc:reg({p,l,behaviour}, ?MODULE).
  339. init_election(CandidateNodes, Workers, E) ->
  340. %%% dbg:tracer(),
  341. %%% dbg:tpl(?MODULE,lexcompare,[]),
  342. %%% dbg:p(self(),[m,c]),
  343. AmCandidate = member(node(), CandidateNodes),
  344. case AmCandidate of
  345. true ->
  346. E#election{mode = global,
  347. candidate_nodes = CandidateNodes,
  348. worker_nodes = Workers,
  349. iteration = {[],
  350. position(
  351. node(),CandidateNodes)}};
  352. false ->
  353. E#election{mode = global,
  354. candidate_nodes = CandidateNodes,
  355. worker_nodes = Workers}
  356. end.
  357. begin_election(#server{mod = Mod, state = State} = Server, candidate,
  358. #election{candidate_nodes = Cands,
  359. worker_nodes = Workers} = E) ->
  360. case Cands of
  361. [N] when N == node() ->
  362. {ok, Synch, NewState} = Mod:elected(State, E),
  363. NewE = broadcast({elect,Synch}, E),
  364. loop(Server#server{state = NewState}, elected, NewE);
  365. _ ->
  366. NewE = broadcast(capture,Workers++(Cands -- [node()]), E),
  367. safe_loop(Server, candidate, NewE)
  368. end;
  369. begin_election(Server, waiting_worker, #election{candidate_nodes = Cands}=E) ->
  370. NewE = broadcast(add_worker, Cands, E),
  371. safe_loop(Server, waiting_worker, NewE).
  372. %%% ---------------------------------------------------
  373. %%% The MAIN loop.
  374. %%% ---------------------------------------------------
  375. safe_loop(#server{mod = Mod, state = State} = Server, Role,
  376. #election{name = Name} = E) ->
  377. receive
  378. {system, From, Req} ->
  379. #server{parent = Parent, debug = Debug} = Server,
  380. sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
  381. [safe, Server, Role, E]);
  382. {'EXIT', _Parent, Reason} = Msg ->
  383. terminate(Reason, Msg, Server, Role, E);
  384. {leader,capture,Iteration,_Node,Candidate} ->
  385. case Role of
  386. candidate ->
  387. NewE =
  388. nodeup(node(Candidate),E),
  389. case lexcompare(NewE#election.iteration,Iteration) of
  390. less ->
  391. Candidate !
  392. {leader,accept,
  393. NewE#election.iteration,self()},
  394. safe_loop(Server, captured,
  395. NewE#election{leader = Candidate});
  396. greater ->
  397. %% I'll get either an accept or DOWN
  398. %% from Candidate later
  399. safe_loop(Server, Role, NewE);
  400. equal ->
  401. safe_loop(Server, Role, NewE)
  402. end;
  403. captured ->
  404. NewE = nodeup(node(Candidate), E),
  405. safe_loop(Server, Role, NewE);
  406. waiting_worker ->
  407. NewE =
  408. nodeup(node(Candidate),E),
  409. safe_loop(Server, Role, NewE)
  410. end;
  411. {leader,add_worker,Worker} ->
  412. NewE = nodeup(node(Worker), E),
  413. safe_loop(Server, Role, NewE);
  414. {leader,accept,Iteration,Candidate} ->
  415. case Role of
  416. candidate ->
  417. NewE =
  418. nodeup(node(Candidate),E),
  419. {Captured,_} = Iteration,
  420. NewIteration = % inherit all procs that have been
  421. % accepted by Candidate
  422. foldl(fun(C,Iter) ->
  423. add_captured(Iter,C)
  424. end,NewE#election.iteration,
  425. [node(Candidate)|Captured]),
  426. check_majority(NewE#election{
  427. iteration = NewIteration}, Server);
  428. captured ->
  429. %% forward this to the leader
  430. E#election.leader ! {leader,accept,Iteration,Candidate},
  431. NewE = nodeup(node(Candidate), E),
  432. safe_loop(Server, Role, NewE)
  433. end;
  434. {leader,elect,Synch,Candidate} ->
  435. NewE =
  436. case Role of
  437. waiting_worker ->
  438. nodeup(node(Candidate),
  439. E#election{
  440. leader = Candidate,
  441. leadernode = node(Candidate)});
  442. _ ->
  443. nodeup(node(Candidate),
  444. E#election{
  445. leader = Candidate,
  446. leadernode = node(Candidate),
  447. iteration = {[],
  448. position(
  449. node(),
  450. E#election.candidate_nodes)}
  451. })
  452. end,
  453. {ok,NewState} = Mod:surrendered(State,Synch,NewE),
  454. NewRole = case Role of
  455. waiting_worker ->
  456. worker;
  457. _ ->
  458. surrendered
  459. end,
  460. loop(Server#server{state = NewState}, NewRole, NewE);
  461. {leader, local_only, Node, Candidate} ->
  462. case lists:keysearch(node(Candidate), 2, E#election.monitored) of
  463. {value, {Ref, N}} ->
  464. NewE = down(Ref, {E#election.name,N},local_only,E),
  465. io:format("local_only received from ~p~n"
  466. "E0 = ~p~n"
  467. "E1 = ~p~n", [Node, E, NewE]),
  468. safe_after_down(Server, Role, NewE);
  469. false ->
  470. safe_loop(Server, Role, E)
  471. end;
  472. {'DOWN',Ref,process,{Name,_}=Who,Why} ->
  473. NewE =
  474. down(Ref,Who,Why,E),
  475. safe_after_down(Server, Role, NewE)
  476. end.
  477. safe_after_down(Server, Role, E) ->
  478. case {Role,E#election.leader} of
  479. {candidate,_} ->
  480. check_majority(E, Server);
  481. {captured,none} ->
  482. check_majority(broadcast(capture,E), Server);
  483. {waiting_worker,_} ->
  484. safe_loop(Server, Role, E)
  485. end.
  486. loop(#server{parent = Parent,
  487. mod = Mod,
  488. state = State,
  489. debug = Debug} = Server, Role,
  490. #election{mode = Mode, name = Name} = E) ->
  491. Msg = receive
  492. Input ->
  493. Input
  494. end,
  495. case Msg of
  496. {system, From, Req} ->
  497. sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
  498. [normal, Server, Role, E]);
  499. {'EXIT', Parent, Reason} ->
  500. terminate(Reason, Msg, Server, Role, E);
  501. {leader, local_only, _, _Candidate} ->
  502. loop(Server, Role, E);
  503. LeaderMsg when element(1,LeaderMsg) == leader, Mode == local ->
  504. Candidate = element(size(LeaderMsg), LeaderMsg),
  505. Candidate ! {leader, local_only, node(), self()},
  506. loop(Server, Role, E);
  507. {leader,capture,_Iteration,_Node,Candidate} ->
  508. NewE = nodeup(node(Candidate),E),
  509. case Role of
  510. R when R == surrendered; R == worker ->
  511. loop(Server, Role, NewE);
  512. elected ->
  513. {ok,Synch,NewState} = Mod:elected(State,NewE),
  514. Candidate ! {leader, elect, Synch, self()},
  515. loop(Server#server{state = NewState}, Role, NewE)
  516. end;
  517. {leader,accept,_Iteration,Candidate} ->
  518. NewE = nodeup(node(Candidate),E),
  519. case Role of
  520. surrendered ->
  521. loop(Server, Role, NewE);
  522. elected ->
  523. {ok,Synch,NewState} = Mod:elected(State,NewE),
  524. Candidate ! {leader, elect, Synch, self()},
  525. loop(Server#server{state = NewState}, Role, NewE)
  526. end;
  527. {leader,elect,Synch,Candidate} ->
  528. NewE =
  529. case Role of
  530. worker ->
  531. nodeup(node(Candidate),
  532. E#election{
  533. leader = Candidate,
  534. leadernode = node(Candidate)});
  535. surrendered ->
  536. nodeup(node(Candidate),
  537. E#election{
  538. leader = Candidate,
  539. leadernode = node(Candidate),
  540. iteration = {[],
  541. position(
  542. node(),
  543. E#election.candidate_nodes)}
  544. })
  545. end,
  546. {ok, NewState} = Mod:surrendered(State, Synch, NewE),
  547. loop(Server#server{state = NewState}, Role, NewE);
  548. {'DOWN',Ref,process,{Name,Node} = Who,Why} ->
  549. #election{alive = PreviouslyAlive} = E,
  550. NewE =
  551. down(Ref,Who,Why,E),
  552. case NewE#election.leader of
  553. none ->
  554. foreach(fun({_,From}) ->
  555. reply(From,{error,leader_died})
  556. end, E#election.buffered),
  557. NewE1 = NewE#election{buffered = []},
  558. case Role of
  559. surrendered ->
  560. check_majority(
  561. broadcast(capture,NewE1), Server);
  562. worker ->
  563. safe_loop(Server, waiting_worker, NewE1)
  564. end;
  565. L when L == self() ->
  566. case member(Node, PreviouslyAlive) of
  567. true ->
  568. case Mod:handle_DOWN(Node, State, E) of
  569. {ok, NewState} ->
  570. loop(Server#server{state = NewState},
  571. Role, NewE);
  572. {ok, Broadcast, NewState} ->
  573. NewE1 = broadcast(
  574. {from_leader,Broadcast}, NewE),
  575. loop(Server#server{state = NewState},
  576. Role, NewE1)
  577. end;
  578. false ->
  579. loop(Server, Role, NewE)
  580. end;
  581. _ ->
  582. loop(Server, Role, NewE)
  583. end;
  584. _Msg when Debug == [] ->
  585. handle_msg(Msg, Server, Role, E);
  586. _Msg ->
  587. Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
  588. E#election.name, {in, Msg}),
  589. handle_msg(Msg, Server#server{debug = Debug1}, Role, E)
  590. end.
  591. %%-----------------------------------------------------------------
  592. %% Callback functions for system messages handling.
  593. %%-----------------------------------------------------------------
  594. %% @hidden
  595. system_continue(_Parent, Debug, [safe, Server, Role, E]) ->
  596. safe_loop(Server#server{debug = Debug}, Role, E);
  597. system_continue(_Parent, Debug, [normal, Server, Role, E]) ->
  598. loop(Server#server{debug = Debug}, Role, E).
  599. %% @hidden
  600. system_terminate(Reason, _Parent, Debug, [_Mode, Server, Role, E]) ->
  601. terminate(Reason, [], Server#server{debug = Debug}, Role, E).
  602. %% @hidden
  603. system_code_change([Mode, Server, Role, E], _Module, OldVsn, Extra) ->
  604. #server{mod = Mod, state = State} = Server,
  605. case catch Mod:code_change(OldVsn, State, E, Extra) of
  606. {ok, NewState} ->
  607. NewServer = Server#server{state = NewState},
  608. {ok, [Mode, NewServer, Role, E]};
  609. {ok, NewState, NewE} ->
  610. NewServer = Server#server{state = NewState},
  611. {ok, [Mode, NewServer, Role, NewE]};
  612. Else -> Else
  613. end.
  614. %%-----------------------------------------------------------------
  615. %% Format debug messages. Print them as the call-back module sees
  616. %% them, not as the real erlang messages. Use trace for that.
  617. %%-----------------------------------------------------------------
  618. %% @hidden
  619. print_event(Dev, {in, Msg}, Name) ->
  620. case Msg of
  621. {'$gen_call', {From, _Tag}, Call} ->
  622. io:format(Dev, "*DBG* ~p got local call ~p from ~w~n",
  623. [Name, Call, From]);
  624. {'$leader_call', {From, _Tag}, Call} ->
  625. io:format(Dev, "*DBG* ~p got global call ~p from ~w~n",
  626. [Name, Call, From]);
  627. {'$gen_cast', Cast} ->
  628. io:format(Dev, "*DBG* ~p got local cast ~p~n",
  629. [Name, Cast]);
  630. {'$leader_cast', Cast} ->
  631. io:format(Dev, "*DBG* ~p got global cast ~p~n",
  632. [Name, Cast]);
  633. _ ->
  634. io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg])
  635. end;
  636. print_event(Dev, {out, Msg, To, State}, Name) ->
  637. io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n",
  638. [Name, Msg, To, State]);
  639. print_event(Dev, {noreply, State}, Name) ->
  640. io:format(Dev, "*DBG* ~p new state ~w~n", [Name, State]);
  641. print_event(Dev, Event, Name) ->
  642. io:format(Dev, "*DBG* ~p dbg ~p~n", [Name, Event]).
  643. handle_msg({'$leader_call', From, Request} = Msg,
  644. #server{mod = Mod, state = State} = Server, elected = Role, E) ->
  645. case catch Mod:handle_leader_call(Request, From, State, E) of
  646. {reply, Reply, NState} ->
  647. NewServer = reply(From, {leader,reply,Reply},
  648. Server#server{state = NState}, Role, E),
  649. loop(NewServer, Role, E);
  650. {reply, Reply, Broadcast, NState} ->
  651. NewE = broadcast({from_leader,Broadcast}, E),
  652. NewServer = reply(From, {leader,reply,Reply},
  653. Server#server{state = NState}, Role,
  654. NewE),
  655. loop(NewServer, Role, NewE);
  656. {noreply, NState} = Reply ->
  657. NewServer = handle_debug(Server#server{state = NState},
  658. Role, E, Reply),
  659. loop(NewServer, Role, E);
  660. {stop, Reason, Reply, NState} ->
  661. {'EXIT', R} =
  662. (catch terminate(Reason, Msg,
  663. Server#server{state = NState},
  664. Role, E)),
  665. reply(From, Reply),
  666. exit(R);
  667. Other ->
  668. handle_common_reply(Other, Msg, Server, Role, E)
  669. end;
  670. handle_msg({'$leader_call', From, Request} = Msg,
  671. #server{mod = Mod, state = State} = Server, Role,
  672. #election{mode = local} = E) ->
  673. Reply = (catch Mod:handle_leader_call(Request,From,State,E)),
  674. handle_call_reply(Reply, Msg, Server, Role, E);
  675. %%% handle_common_reply(Reply, Msg, Server, Role, E);
  676. handle_msg({'$leader_cast', Cast} = Msg,
  677. #server{mod = Mod, state = State} = Server, Role,
  678. #election{mode = local} = E) ->
  679. Reply = (catch Mod:handle_leader_cast(Cast,State,E)),
  680. handle_common_reply(Reply, Msg, Server, Role, E);
  681. handle_msg({'$leader_cast', Cast} = Msg,
  682. #server{mod = Mod, state = State} = Server, elected = Role, E) ->
  683. Reply = (catch Mod:handle_leader_cast(Cast, State, E)),
  684. handle_common_reply(Reply, Msg, Server, Role, E);
  685. handle_msg({from_leader, Cmd} = Msg,
  686. #server{mod = Mod, state = State} = Server, Role, E) ->
  687. handle_common_reply(catch Mod:from_leader(Cmd, State, E),
  688. Msg, Server, Role, E);
  689. handle_msg({'$leader_call', From, Request}, Server, Role,
  690. #election{buffered = Buffered, leader = Leader} = E) ->
  691. Ref = make_ref(),
  692. Leader ! {'$leader_call', {self(),Ref}, Request},
  693. NewBuffered = [{Ref,From}|Buffered],
  694. loop(Server, Role, E#election{buffered = NewBuffered});
  695. handle_msg({Ref, {leader,reply,Reply}}, Server, Role,
  696. #election{buffered = Buffered} = E) ->
  697. {value, {_,From}} = keysearch(Ref,1,Buffered),
  698. NewServer = reply(From, {leader,reply,Reply}, Server, Role,
  699. E#election{buffered = keydelete(Ref,1,Buffered)}),
  700. loop(NewServer, Role, E);
  701. handle_msg({'$gen_call', From, Request} = Msg,
  702. #server{mod = Mod, state = State} = Server, Role, E) ->
  703. Reply = (catch Mod:handle_call(Request, From, State)),
  704. handle_call_reply(Reply, Msg, Server, Role, E);
  705. handle_msg({'$gen_cast',Msg} = Cast,
  706. #server{mod = Mod, state = State} = Server, Role, E) ->
  707. handle_common_reply(catch Mod:handle_cast(Msg, State),
  708. Cast, Server, Role, E);
  709. handle_msg(Msg,
  710. #server{mod = Mod, state = State} = Server, Role, E) ->
  711. handle_common_reply(catch Mod:handle_info(Msg, State),
  712. Msg, Server, Role, E).
  713. handle_call_reply(CB_reply, {_, From, _Request} = Msg, Server, Role, E) ->
  714. case CB_reply of
  715. {reply, Reply, NState} ->
  716. NewServer = reply(From, Reply,
  717. Server#server{state = NState}, Role, E),
  718. loop(NewServer, Role, E);
  719. {noreply, NState} = Reply ->
  720. NewServer = handle_debug(Server#server{state = NState},
  721. Role, E, Reply),
  722. loop(NewServer, Role, E);
  723. {activate, Cands, Workers, Reply, NState}
  724. when E#election.mode == local ->
  725. NewRole = case member(node(), Cands) of
  726. true -> candidate;
  727. false -> waiting_worker
  728. end,
  729. reply(From, Reply),
  730. NServer = Server#server{state = NState},
  731. NewE = init_election(Cands, Workers, E),
  732. io:format("activating: NewE = ~p~n", [NewE]),
  733. begin_election(NServer, NewRole, NewE);
  734. {stop, Reason, Reply, NState} ->
  735. {'EXIT', R} =
  736. (catch terminate(Reason, Msg, Server#server{state = NState},
  737. Role, E)),
  738. reply(From, Reply),
  739. exit(R);
  740. Other ->
  741. handle_common_reply(Other, Msg, Server, Role, E)
  742. end.
  743. handle_common_reply(Reply, Msg, Server, Role, E) ->
  744. case Reply of
  745. {ok, NState} ->
  746. NewServer = handle_debug(Server#server{state = NState},
  747. Role, E, Reply),
  748. loop(NewServer, Role, E);
  749. {ok, Broadcast, NState} ->
  750. NewE = broadcast({from_leader,Broadcast}, E),
  751. NewServer = handle_debug(Server#server{state = NState},
  752. Role, E, Reply),
  753. loop(NewServer, Role, NewE);
  754. {stop, Reason, NState} ->
  755. terminate(Reason, Msg, Server#server{state = NState}, Role, E);
  756. {'EXIT', Reason} ->
  757. terminate(Reason, Msg, Server, Role, E);
  758. _ ->
  759. terminate({bad_return_value, Reply}, Msg, Server, Role, E)
  760. end.
  761. reply({To, Tag}, Reply, #server{state = State} = Server, Role, E) ->
  762. reply({To, Tag}, Reply),
  763. handle_debug(Server, Role, E, {out, Reply, To, State}).
  764. handle_debug(#server{debug = []} = Server, _Role, _E, _Event) ->
  765. Server;
  766. handle_debug(#server{debug = Debug} = Server, _Role, E, Event) ->
  767. Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
  768. E#election.name, Event),
  769. Server#server{debug = Debug1}.
  770. %%% ---------------------------------------------------
  771. %%% Terminate the server.
  772. %%% ---------------------------------------------------
  773. terminate(Reason, Msg, #server{mod = Mod,
  774. state = State,
  775. debug = Debug}, _Role,
  776. #election{name = Name}) ->
  777. case catch Mod:terminate(Reason, State) of
  778. {'EXIT', R} ->
  779. error_info(R, Name, Msg, State, Debug),
  780. exit(R);
  781. _ ->
  782. case Reason of
  783. normal ->
  784. exit(normal);
  785. shutdown ->
  786. exit(shutdown);
  787. _ ->
  788. error_info(Reason, Name, Msg, State, Debug),
  789. exit(Reason)
  790. end
  791. end.
  792. %% Maybe we shouldn't do this? We have the crash report...
  793. error_info(Reason, Name, Msg, State, Debug) ->
  794. format("** Generic leader ~p terminating \n"
  795. "** Last message in was ~p~n"
  796. "** When Server state == ~p~n"
  797. "** Reason for termination == ~n** ~p~n",
  798. [Name, Msg, State, Reason]),
  799. sys:print_log(Debug),
  800. ok.
  801. %%% ---------------------------------------------------
  802. %%% Misc. functions.
  803. %%% ---------------------------------------------------
  804. opt(Op, [{Op, Value}|_]) ->
  805. {ok, Value};
  806. opt(Op, [_|Options]) ->
  807. opt(Op, Options);
  808. opt(_, []) ->
  809. false.
  810. debug_options(Name, Opts) ->
  811. case opt(debug, Opts) of
  812. {ok, Options} -> dbg_options(Name, Options);
  813. _ -> dbg_options(Name, [])
  814. end.
  815. dbg_options(Name, []) ->
  816. Opts =
  817. case init:get_argument(generic_debug) of
  818. error ->
  819. [];
  820. _ ->
  821. [log, statistics]
  822. end,
  823. dbg_opts(Name, Opts);
  824. dbg_options(Name, Opts) ->
  825. dbg_opts(Name, Opts).
  826. dbg_opts(Name, Opts) ->
  827. case catch sys:debug_options(Opts) of
  828. {'EXIT',_} ->
  829. format("~p: ignoring erroneous debug options - ~p~n",
  830. [Name, Opts]),
  831. [];
  832. Dbg ->
  833. Dbg
  834. end.
  835. %%-----------------------------------------------------------------
  836. %% Status information
  837. %%-----------------------------------------------------------------
  838. %% @hidden
  839. format_status(Opt, StatusData) ->
  840. [PDict, SysState, Parent, Debug, [_Mode, Server, _Role, E]] = StatusData,
  841. Header = lists:concat(["Status for generic server ", E#election.name]),
  842. Log = sys:get_debug(log, Debug, []),
  843. #server{mod = Mod, state = State} = Server,
  844. Specific =
  845. case erlang:function_exported(Mod, format_status, 2) of
  846. true ->
  847. case catch apply(Mod, format_status, [Opt, [PDict, State]]) of
  848. {'EXIT', _} -> [{data, [{"State", State}]}];
  849. Else -> Else
  850. end;
  851. _ ->
  852. [{data, [{"State", State}]}]
  853. end,
  854. [{header, Header},
  855. {data, [{"Status", SysState},
  856. {"Parent", Parent},
  857. {"Logged events", Log}]} |
  858. Specific].
  859. broadcast(Msg, #election{monitored = Monitored} = E) ->
  860. %% When broadcasting the first time, we broadcast to all candidate nodes,
  861. %% using broadcast/3. This function is used for subsequent broadcasts,
  862. %% and we make sure only to broadcast to already known nodes.
  863. %% It's the responsibility of new nodes to make themselves known through
  864. %% a wider broadcast.
  865. ToNodes = [N || {_,N} <- Monitored],
  866. broadcast(Msg, ToNodes, E).
  867. broadcast(capture, ToNodes, #election{monitored = Monitored} = E) ->
  868. ToMonitor = [N || N <- ToNodes,
  869. not(keymember(N,2,Monitored))],
  870. NewE =
  871. foldl(fun(Node,Ex) ->
  872. Ref = erlang:monitor(
  873. process,{Ex#election.name,Node}),
  874. Ex#election{monitored = [{Ref,Node}|
  875. Ex#election.monitored]}
  876. end,E,ToMonitor),
  877. foreach(
  878. fun(Node) ->
  879. {NewE#election.name,Node} !
  880. {leader,capture,NewE#election.iteration,node(),self()}
  881. end,ToNodes),
  882. NewE;
  883. broadcast({elect,Synch},ToNodes,E) ->
  884. foreach(
  885. fun(Node) ->
  886. {E#election.name,Node} ! {leader,elect,Synch,self()}
  887. end,ToNodes),
  888. E;
  889. broadcast({from_leader, Msg}, ToNodes, E) ->
  890. foreach(
  891. fun(Node) ->
  892. {E#election.name,Node} ! {from_leader, Msg}
  893. end,ToNodes),
  894. E;
  895. broadcast(add_worker, ToNodes, E) ->
  896. foreach(
  897. fun(Node) ->
  898. {E#election.name,Node} ! {leader, add_worker, self()}
  899. end,ToNodes),
  900. E.
  901. check_majority(E, Server) ->
  902. {Captured,_} = E#election.iteration,
  903. AcceptMeAsLeader = length(Captured) + 1, % including myself
  904. NrCandidates = length(E#election.candidate_nodes),
  905. NrDown = E#election.down,
  906. if AcceptMeAsLeader > NrCandidates/2 ->
  907. NewE = E#election{leader = self(), leadernode = node()},
  908. {ok,Synch,NewState} =
  909. (Server#server.mod):elected(Server#server.state, NewE),
  910. NewE1 = broadcast({elect,Synch}, NewE),
  911. loop(Server#server{state = NewState}, elected, NewE1);
  912. AcceptMeAsLeader+length(NrDown) == NrCandidates ->
  913. NewE = E#election{leader = self(), leadernode = node()},
  914. {ok,Synch,NewState} =
  915. (Server#server.mod):elected(Server#server.state, NewE),
  916. NewE1 = broadcast({elect,Synch}, NewE),
  917. loop(Server#server{state = NewState}, elected, NewE1);
  918. true ->
  919. safe_loop(Server, candidate, E)
  920. end.
  921. down(Ref,_Who,Why,E) ->
  922. case lists:keysearch(Ref,1,E#election.monitored) of
  923. {value, {_,Node}} ->
  924. NewMonitored = if Why == local_only -> E#election.monitored;
  925. true ->
  926. E#election.monitored -- [{Ref,Node}]
  927. end,
  928. {Captured,Pos} = E#election.iteration,
  929. case Node == E#election.leadernode of
  930. true ->
  931. E#election{leader = none,
  932. leadernode = none,
  933. iteration = {Captured -- [Node],
  934. Pos}, % TAKE CARE !
  935. down = [Node|E#election.down],
  936. alive = E#election.alive -- [Node],
  937. monitored = NewMonitored};
  938. false ->
  939. Down = case member(Node,E#election.candidate_nodes) of
  940. true ->
  941. [Node|E#election.down];
  942. false ->
  943. E#election.down
  944. end,
  945. E#election{iteration = {Captured -- [Node],
  946. Pos}, % TAKE CARE !
  947. down = Down,
  948. alive = E#election.alive -- [Node],
  949. monitored = NewMonitored}
  950. end
  951. end.
  952. %% position of element counted from end of the list
  953. %%
  954. position(X,[Head|Tail]) ->
  955. case X==Head of
  956. true ->
  957. length(Tail);
  958. false ->
  959. position(X,Tail)
  960. end.
  961. %% This is a multi-level comment
  962. %% This is the second line of the comment
  963. lexcompare({C1,P1},{C2,P2}) ->
  964. lexcompare([{length(C1),length(C2)},{P1,P2}]).
  965. lexcompare([]) ->
  966. equal;
  967. lexcompare([{X,Y}|Rest]) ->
  968. if X<Y -> less;
  969. X==Y -> lexcompare(Rest);
  970. X>Y -> greater
  971. end.
  972. add_captured({Captured,Pos}, CandidateNode) ->
  973. {[CandidateNode|[ Node || Node <- Captured,
  974. Node =/= CandidateNode ]], Pos}.
  975. nodeup(Node, #election{monitored = Monitored,
  976. alive = Alive,
  977. down = Down} = E) ->
  978. %% make sure process is monitored from now on
  979. case [ N || {_,N}<-Monitored, N==Node] of
  980. [] ->
  981. Ref = erlang:monitor(process,{E#election.name,Node}),
  982. E#election{down = Down -- [Node],
  983. alive = [Node | Alive],
  984. monitored = [{Ref,Node}|Monitored]};
  985. _ -> % already monitored, thus not in down
  986. E#election{alive = [Node | [N || N <- Alive,
  987. N =/= Node]]}
  988. end.