syn_netsplits.erl 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. %% ==========================================================================================================
  2. %% Syn - A global process registry.
  3. %%
  4. %% Copyright (C) 2015, Roberto Ostinelli <roberto@ostinelli.net>.
  5. %% All rights reserved.
  6. %%
  7. %% The MIT License (MIT)
  8. %%
  9. %% Copyright (c) 2015 Roberto Ostinelli
  10. %%
  11. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  12. %% of this software and associated documentation files (the "Software"), to deal
  13. %% in the Software without restriction, including without limitation the rights
  14. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  15. %% copies of the Software, and to permit persons to whom the Software is
  16. %% furnished to do so, subject to the following conditions:
  17. %%
  18. %% The above copyright notice and this permission notice shall be included in
  19. %% all copies or substantial portions of the Software.
  20. %%
  21. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  22. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  23. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  24. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  25. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  26. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  27. %% THE SOFTWARE.
  28. %% ==========================================================================================================
  29. -module(syn_netsplits).
  30. -behaviour(gen_server).
  31. %% API
  32. -export([start_link/0]).
  33. %% gen_server callbacks
  34. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
  35. %% internal
  36. -export([get_processes_info_of_node/1]).
  37. -export([write_processes_info_to_node/2]).
  38. -export([conflicting_mode/1]).
  39. %% records
  40. -record(state, {
  41. conflicting_mode = kill :: kill | send_message,
  42. message = undefined :: any()
  43. }).
  44. %% include
  45. -include("syn.hrl").
  46. %% ===================================================================
  47. %% API
  48. %% ===================================================================
  49. -spec start_link() -> {ok, pid()} | {error, any()}.
  50. start_link() ->
  51. Options = [],
  52. gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
  53. -spec conflicting_mode(undefined | kill | {send_message, any()}) -> ok.
  54. conflicting_mode(undefined) -> ok;
  55. conflicting_mode(kill) -> gen_server:call(?MODULE, {conflicting_mode, kill});
  56. conflicting_mode({send_message, Message}) -> gen_server:call(?MODULE, {conflicting_mode, {send_message, Message}}).
  57. %% ===================================================================
  58. %% Callbacks
  59. %% ===================================================================
  60. %% ----------------------------------------------------------------------------------------------------------
  61. %% Init
  62. %% ----------------------------------------------------------------------------------------------------------
  63. -spec init([]) ->
  64. {ok, #state{}} |
  65. {ok, #state{}, Timeout :: non_neg_integer()} |
  66. ignore |
  67. {stop, Reason :: any()}.
  68. init([]) ->
  69. %% trap linked processes signal
  70. process_flag(trap_exit, true),
  71. %% monitor mnesia events
  72. mnesia:subscribe(system),
  73. %% init state
  74. {ok, #state{}}.
  75. %% ----------------------------------------------------------------------------------------------------------
  76. %% Call messages
  77. %% ----------------------------------------------------------------------------------------------------------
  78. -spec handle_call(Request :: any(), From :: any(), #state{}) ->
  79. {reply, Reply :: any(), #state{}} |
  80. {reply, Reply :: any(), #state{}, Timeout :: non_neg_integer()} |
  81. {noreply, #state{}} |
  82. {noreply, #state{}, Timeout :: non_neg_integer()} |
  83. {stop, Reason :: any(), Reply :: any(), #state{}} |
  84. {stop, Reason :: any(), #state{}}.
  85. handle_call({conflicting_mode, kill}, _From, State) ->
  86. {reply, ok, State#state{
  87. conflicting_mode = kill,
  88. message = undefined
  89. }};
  90. handle_call({conflicting_mode, {send_message, Message}}, _From, State) ->
  91. {reply, ok, State#state{
  92. conflicting_mode = send_message,
  93. message = Message
  94. }};
  95. handle_call(Request, From, State) ->
  96. error_logger:warning_msg("Received from ~p an unknown call message: ~p~n", [Request, From]),
  97. {reply, undefined, State}.
  98. %% ----------------------------------------------------------------------------------------------------------
  99. %% Cast messages
  100. %% ----------------------------------------------------------------------------------------------------------
  101. -spec handle_cast(Msg :: any(), #state{}) ->
  102. {noreply, #state{}} |
  103. {noreply, #state{}, Timeout :: non_neg_integer()} |
  104. {stop, Reason :: any(), #state{}}.
  105. handle_cast(Msg, State) ->
  106. error_logger:warning_msg("Received an unknown cast message: ~p~n", [Msg]),
  107. {noreply, State}.
  108. %% ----------------------------------------------------------------------------------------------------------
  109. %% All non Call / Cast messages
  110. %% ----------------------------------------------------------------------------------------------------------
  111. -spec handle_info(Info :: any(), #state{}) ->
  112. {noreply, #state{}} |
  113. {noreply, #state{}, Timeout :: non_neg_integer()} |
  114. {stop, Reason :: any(), #state{}}.
  115. handle_info({mnesia_system_event, {inconsistent_database, Context, Node}}, #state{
  116. conflicting_mode = ConflictingMode,
  117. message = Message
  118. } = State) ->
  119. error_logger:warning_msg("MNESIA signalled an inconsistent database on node: ~p with context: ~p, initiating automerge~n", [Node, Context]),
  120. automerge(Node, ConflictingMode, Message),
  121. {noreply, State};
  122. handle_info({mnesia_system_event, {mnesia_down, Node}}, State) when Node =/= node() ->
  123. error_logger:warning_msg("Received a MNESIA down event, removing all pids of node ~p~n", [Node]),
  124. delete_pids_of_disconnected_node(Node),
  125. {noreply, State};
  126. handle_info({mnesia_system_event, _MnesiaEvent}, State) ->
  127. %% ignore mnesia event
  128. {noreply, State};
  129. handle_info(Info, State) ->
  130. error_logger:warning_msg("Received an unknown info message: ~p~n", [Info]),
  131. {noreply, State}.
  132. %% ----------------------------------------------------------------------------------------------------------
  133. %% Terminate
  134. %% ----------------------------------------------------------------------------------------------------------
  135. -spec terminate(Reason :: any(), #state{}) -> terminated.
  136. terminate(Reason, _State) ->
  137. error_logger:info_msg("Terminating syn netsplits with reason: ~p~n", [Reason]),
  138. terminated.
  139. %% ----------------------------------------------------------------------------------------------------------
  140. %% Convert process state when code is changed.
  141. %% ----------------------------------------------------------------------------------------------------------
  142. -spec code_change(OldVsn :: any(), #state{}, Extra :: any()) -> {ok, #state{}}.
  143. code_change(_OldVsn, State, _Extra) ->
  144. {ok, State}.
  145. %% ===================================================================
  146. %% Internal
  147. %% ===================================================================
  148. -spec delete_pids_of_disconnected_node(Node :: atom()) -> pid().
  149. delete_pids_of_disconnected_node(Node) ->
  150. %% don't lock gen server
  151. spawn(fun() ->
  152. %% build match specs
  153. MatchHead = #syn_processes_table{key = '$1', node = '$2', _ = '_'},
  154. Guard = {'=:=', '$2', Node},
  155. IdFormat = '$1',
  156. %% delete
  157. DelF = fun(Id) -> mnesia:dirty_delete({syn_processes_table, Id}) end,
  158. NodePids = mnesia:dirty_select(syn_processes_table, [{MatchHead, [Guard], [IdFormat]}]),
  159. lists:foreach(DelF, NodePids)
  160. end).
  161. -spec automerge(RemoteNode :: atom(), ConflictingMode :: kill | send_message, Message :: any()) -> ok.
  162. automerge(RemoteNode, ConflictingMode, Message) ->
  163. global:trans({{?MODULE, automerge}, self()},
  164. fun() ->
  165. error_logger:warning_msg("AUTOMERGE starting for remote node ~s (global lock is set)~n", [RemoteNode]),
  166. check_stitch(RemoteNode, ConflictingMode, Message),
  167. error_logger:warning_msg("AUTOMERGE done (global lock will be unset)~n")
  168. end).
  169. -spec check_stitch(RemoteNode :: atom(), ConflictingMode :: kill | send_message, Message :: any()) -> ok.
  170. check_stitch(RemoteNode, ConflictingMode, Message) ->
  171. case catch lists:member(RemoteNode, mnesia:system_info(running_db_nodes)) of
  172. true ->
  173. ok;
  174. false ->
  175. stitch(RemoteNode, ConflictingMode, Message),
  176. ok;
  177. Error ->
  178. error_logger:error_msg("Could not check if node is stiched: ~p~n", [Error])
  179. end.
  180. -spec stitch(RemoteNode :: atom(), ConflictingMode :: kill | send_message, Message :: any()) ->
  181. {'ok', any()} | {'error', any()}.
  182. stitch(RemoteNode, ConflictingMode, Message) ->
  183. mnesia_controller:connect_nodes(
  184. [RemoteNode],
  185. fun(MergeF) ->
  186. catch case MergeF([syn_processes_table]) of
  187. {merged, _, _} = Res ->
  188. stitch_tab(RemoteNode, ConflictingMode, Message),
  189. Res;
  190. Other ->
  191. Other
  192. end
  193. end).
  194. -spec stitch_tab(RemoteNode :: atom(), ConflictingMode :: kill | send_message, Message :: any()) -> ok.
  195. stitch_tab(RemoteNode, ConflictingMode, Message) ->
  196. %% get remote processes info
  197. RemoteProcessesInfo = rpc:call(RemoteNode, ?MODULE, get_processes_info_of_node, [RemoteNode]),
  198. %% get local processes info
  199. LocalProcessesInfo = get_processes_info_of_node(node()),
  200. %% purge doubles (if any)
  201. {LocalProcessesInfo1, RemoteProcessesInfo1} = purge_double_processes_from_local_node(
  202. LocalProcessesInfo,
  203. RemoteProcessesInfo,
  204. ConflictingMode,
  205. Message
  206. ),
  207. %% write
  208. write_remote_processes_to_local(RemoteNode, RemoteProcessesInfo1),
  209. write_local_processes_to_remote(RemoteNode, LocalProcessesInfo1).
  210. -spec purge_double_processes_from_local_node(
  211. LocalProcessesInfo :: list(),
  212. RemoteProcessesInfo :: list(),
  213. ConflictingMode :: kill | send_message,
  214. Message :: any()
  215. ) ->
  216. {LocalProcessesInfo :: list(), RemoteProcessesInfo :: list()}.
  217. purge_double_processes_from_local_node(LocalProcessesInfo, RemoteProcessesInfo, ConflictingMode, Message) ->
  218. %% create ETS table
  219. Tab = ets:new(syn_automerge_doubles_table, [set]),
  220. %% insert local processes info
  221. ets:insert(Tab, LocalProcessesInfo),
  222. %% find doubles
  223. F = fun({Key, _RemoteProcessPid}) ->
  224. case ets:lookup(Tab, Key) of
  225. [] -> ok;
  226. [{Key, LocalProcessPid}] ->
  227. error_logger:warning_msg("Found a double process for ~s, killing it on local node~n", [Key]),
  228. %% remove it from local mnesia table
  229. mnesia:dirty_delete(syn_processes_table, Key),
  230. %% remove it from ETS
  231. ets:delete(Tab, Key),
  232. %% kill or send message
  233. case ConflictingMode of
  234. kill -> exit(LocalProcessPid, kill);
  235. send_message -> LocalProcessPid ! Message
  236. end
  237. end
  238. end,
  239. lists:foreach(F, RemoteProcessesInfo),
  240. %% compute local processes without doubles
  241. LocalProcessesInfo1 = ets:tab2list(Tab),
  242. %% delete ETS table
  243. ets:delete(Tab),
  244. %% return
  245. {LocalProcessesInfo1, RemoteProcessesInfo}.
  246. -spec write_remote_processes_to_local(RemoteNode :: atom(), RemoteProcessesInfo :: list()) -> ok.
  247. write_remote_processes_to_local(RemoteNode, RemoteProcessesInfo) ->
  248. write_processes_info_to_node(RemoteNode, RemoteProcessesInfo).
  249. -spec write_local_processes_to_remote(RemoteNode :: atom(), LocalProcessesInfo :: list()) -> ok.
  250. write_local_processes_to_remote(RemoteNode, LocalProcessesInfo) ->
  251. ok = rpc:call(RemoteNode, ?MODULE, write_processes_info_to_node, [node(), LocalProcessesInfo]).
  252. -spec get_processes_info_of_node(Node :: atom()) -> list().
  253. get_processes_info_of_node(Node) ->
  254. %% build match specs
  255. MatchHead = #syn_processes_table{key = '$1', pid = '$2', node = '$3'},
  256. Guard = {'=:=', '$3', Node},
  257. ProcessInfoFormat = {{'$1', '$2'}},
  258. %% select
  259. mnesia:dirty_select(syn_processes_table, [{MatchHead, [Guard], [ProcessInfoFormat]}]).
  260. -spec write_processes_info_to_node(Node :: atom(), ProcessesInfo :: list()) -> ok.
  261. write_processes_info_to_node(Node, ProcessesInfo) ->
  262. FWrite = fun({Key, ProcessPid}) ->
  263. mnesia:dirty_write(#syn_processes_table{
  264. key = Key,
  265. pid = ProcessPid,
  266. node = Node
  267. })
  268. end,
  269. lists:foreach(FWrite, ProcessesInfo).