syn_netsplits.erl 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. %% ==========================================================================================================
  2. %% Syn - A global process registry.
  3. %%
  4. %% Copyright (C) 2015, Roberto Ostinelli <roberto@ostinelli.net>.
  5. %% All rights reserved.
  6. %%
  7. %% The MIT License (MIT)
  8. %%
  9. %% Copyright (c) 2015 Roberto Ostinelli
  10. %%
  11. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  12. %% of this software and associated documentation files (the "Software"), to deal
  13. %% in the Software without restriction, including without limitation the rights
  14. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  15. %% copies of the Software, and to permit persons to whom the Software is
  16. %% furnished to do so, subject to the following conditions:
  17. %%
  18. %% The above copyright notice and this permission notice shall be included in
  19. %% all copies or substantial portions of the Software.
  20. %%
  21. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  22. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  23. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  24. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  25. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  26. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  27. %% THE SOFTWARE.
  28. %% ==========================================================================================================
  29. -module(syn_netsplits).
  30. -behaviour(gen_server).
  31. %% API
  32. -export([start_link/0]).
  33. %% gen_server callbacks
  34. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
  35. %% internal
  36. -export([get_processes_info_of_node/1]).
  37. -export([write_processes_info_to_node/2]).
  38. %% records
  39. -record(state, {
  40. netsplit_conflicting_process_callback_module = undefined :: atom(),
  41. netsplit_conflicting_process_callback_function = undefined :: atom()
  42. }).
  43. %% include
  44. -include("syn.hrl").
  45. %% ===================================================================
  46. %% API
  47. %% ===================================================================
  48. -spec start_link() -> {ok, pid()} | {error, any()}.
  49. start_link() ->
  50. Options = [],
  51. gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
  52. %% ===================================================================
  53. %% Callbacks
  54. %% ===================================================================
  55. %% ----------------------------------------------------------------------------------------------------------
  56. %% Init
  57. %% ----------------------------------------------------------------------------------------------------------
  58. -spec init([]) ->
  59. {ok, #state{}} |
  60. {ok, #state{}, Timeout :: non_neg_integer()} |
  61. ignore |
  62. {stop, Reason :: any()}.
  63. init([]) ->
  64. %% monitor mnesia events
  65. mnesia:subscribe(system),
  66. %% get options
  67. {ok, [NetsplitConflictingProcessCallbackModule, NetsplitConflictingProcessCallbackFunction]} = syn_utils:get_env_value(
  68. netsplit_conflicting_process_callback,
  69. [undefined, undefined]
  70. ),
  71. %% build state
  72. {ok, #state{
  73. netsplit_conflicting_process_callback_module = NetsplitConflictingProcessCallbackModule,
  74. netsplit_conflicting_process_callback_function = NetsplitConflictingProcessCallbackFunction
  75. }}.
  76. %% ----------------------------------------------------------------------------------------------------------
  77. %% Call messages
  78. %% ----------------------------------------------------------------------------------------------------------
  79. -spec handle_call(Request :: any(), From :: any(), #state{}) ->
  80. {reply, Reply :: any(), #state{}} |
  81. {reply, Reply :: any(), #state{}, Timeout :: non_neg_integer()} |
  82. {noreply, #state{}} |
  83. {noreply, #state{}, Timeout :: non_neg_integer()} |
  84. {stop, Reason :: any(), Reply :: any(), #state{}} |
  85. {stop, Reason :: any(), #state{}}.
  86. handle_call(Request, From, State) ->
  87. error_logger:warning_msg("Received from ~p an unknown call message: ~p~n", [Request, From]),
  88. {reply, undefined, State}.
  89. %% ----------------------------------------------------------------------------------------------------------
  90. %% Cast messages
  91. %% ----------------------------------------------------------------------------------------------------------
  92. -spec handle_cast(Msg :: any(), #state{}) ->
  93. {noreply, #state{}} |
  94. {noreply, #state{}, Timeout :: non_neg_integer()} |
  95. {stop, Reason :: any(), #state{}}.
  96. handle_cast(Msg, State) ->
  97. error_logger:warning_msg("Received an unknown cast message: ~p~n", [Msg]),
  98. {noreply, State}.
  99. %% ----------------------------------------------------------------------------------------------------------
  100. %% All non Call / Cast messages
  101. %% ----------------------------------------------------------------------------------------------------------
  102. -spec handle_info(Info :: any(), #state{}) ->
  103. {noreply, #state{}} |
  104. {noreply, #state{}, Timeout :: non_neg_integer()} |
  105. {stop, Reason :: any(), #state{}}.
  106. handle_info({mnesia_system_event, {inconsistent_database, Context, Node}}, #state{
  107. netsplit_conflicting_process_callback_module = NetsplitConflictingProcessCallbackModule,
  108. netsplit_conflicting_process_callback_function = NetsplitConflictingProcessCallbackFunction
  109. } = State) ->
  110. error_logger:warning_msg("MNESIA signalled an inconsistent database on node: ~p with context: ~p, initiating automerge~n", [Node, Context]),
  111. automerge(Node, NetsplitConflictingProcessCallbackModule, NetsplitConflictingProcessCallbackFunction),
  112. {noreply, State};
  113. handle_info({mnesia_system_event, {mnesia_down, Node}}, State) when Node =/= node() ->
  114. error_logger:warning_msg("Received a MNESIA down event, removing all pids of node ~p~n", [Node]),
  115. delete_pids_of_disconnected_node(Node),
  116. {noreply, State};
  117. handle_info({mnesia_system_event, _MnesiaEvent}, State) ->
  118. %% ignore mnesia event
  119. {noreply, State};
  120. handle_info(Info, State) ->
  121. error_logger:warning_msg("Received an unknown info message: ~p~n", [Info]),
  122. {noreply, State}.
  123. %% ----------------------------------------------------------------------------------------------------------
  124. %% Terminate
  125. %% ----------------------------------------------------------------------------------------------------------
  126. -spec terminate(Reason :: any(), #state{}) -> terminated.
  127. terminate(Reason, _State) ->
  128. error_logger:info_msg("Terminating syn netsplits with reason: ~p~n", [Reason]),
  129. terminated.
  130. %% ----------------------------------------------------------------------------------------------------------
  131. %% Convert process state when code is changed.
  132. %% ----------------------------------------------------------------------------------------------------------
  133. -spec code_change(OldVsn :: any(), #state{}, Extra :: any()) -> {ok, #state{}}.
  134. code_change(_OldVsn, State, _Extra) ->
  135. {ok, State}.
  136. %% ===================================================================
  137. %% Internal
  138. %% ===================================================================
  139. -spec delete_pids_of_disconnected_node(Node :: atom()) -> pid().
  140. delete_pids_of_disconnected_node(Node) ->
  141. %% don't lock gen server
  142. spawn(fun() ->
  143. %% build match specs
  144. MatchHead = #syn_processes_table{key = '$1', node = '$2', _ = '_'},
  145. Guard = {'=:=', '$2', Node},
  146. IdFormat = '$1',
  147. %% delete
  148. DelF = fun(Id) -> mnesia:dirty_delete({syn_processes_table, Id}) end,
  149. NodePids = mnesia:dirty_select(syn_processes_table, [{MatchHead, [Guard], [IdFormat]}]),
  150. lists:foreach(DelF, NodePids)
  151. end).
  152. -spec automerge(RemoteNode :: atom(), CallbackModule :: atom(), CallbackFunction :: atom()) -> ok.
  153. automerge(RemoteNode, CallbackModule, CallbackFunction) ->
  154. global:trans({{?MODULE, automerge}, self()},
  155. fun() ->
  156. error_logger:warning_msg("AUTOMERGE starting for remote node ~s (global lock is set)~n", [RemoteNode]),
  157. check_stitch(RemoteNode, CallbackModule, CallbackFunction),
  158. error_logger:warning_msg("AUTOMERGE done (global lock will be unset)~n")
  159. end).
  160. -spec check_stitch(RemoteNode :: atom(), CallbackModule :: atom(), CallbackFunction :: atom()) -> ok.
  161. check_stitch(RemoteNode, CallbackModule, CallbackFunction) ->
  162. case catch lists:member(RemoteNode, mnesia:system_info(running_db_nodes)) of
  163. true ->
  164. ok;
  165. false ->
  166. stitch(RemoteNode, CallbackModule, CallbackFunction),
  167. ok;
  168. Error ->
  169. error_logger:error_msg("Could not check if node is stiched: ~p~n", [Error])
  170. end.
  171. -spec stitch(RemoteNode :: atom(), CallbackModule :: atom(), CallbackFunction :: atom()) ->
  172. {'ok', any()} | {'error', any()}.
  173. stitch(RemoteNode, CallbackModule, CallbackFunction) ->
  174. mnesia_controller:connect_nodes(
  175. [RemoteNode],
  176. fun(MergeF) ->
  177. catch case MergeF([syn_processes_table]) of
  178. {merged, _, _} = Res ->
  179. stitch_tab(RemoteNode, CallbackModule, CallbackFunction),
  180. Res;
  181. Other ->
  182. Other
  183. end
  184. end).
  185. -spec stitch_tab(RemoteNode :: atom(), CallbackModule :: atom(), CallbackFunction :: atom()) -> ok.
  186. stitch_tab(RemoteNode, CallbackModule, CallbackFunction) ->
  187. %% get remote processes info
  188. RemoteProcessesInfo = rpc:call(RemoteNode, ?MODULE, get_processes_info_of_node, [RemoteNode]),
  189. %% get local processes info
  190. LocalProcessesInfo = get_processes_info_of_node(node()),
  191. %% purge doubles (if any)
  192. {LocalProcessesInfo1, RemoteProcessesInfo1} = purge_double_processes_from_local_node(
  193. LocalProcessesInfo,
  194. RemoteProcessesInfo,
  195. CallbackModule,
  196. CallbackFunction
  197. ),
  198. %% write
  199. write_remote_processes_to_local(RemoteNode, RemoteProcessesInfo1),
  200. write_local_processes_to_remote(RemoteNode, LocalProcessesInfo1).
  201. -spec purge_double_processes_from_local_node(
  202. LocalProcessesInfo :: list(),
  203. RemoteProcessesInfo :: list(),
  204. ConflictingMode :: kill | send_message,
  205. Message :: any()
  206. ) ->
  207. {LocalProcessesInfo :: list(), RemoteProcessesInfo :: list()}.
  208. purge_double_processes_from_local_node(LocalProcessesInfo, RemoteProcessesInfo, CallbackModule, CallbackFunction) ->
  209. %% create ETS table
  210. Tab = ets:new(syn_automerge_doubles_table, [set]),
  211. %% insert local processes info
  212. ets:insert(Tab, LocalProcessesInfo),
  213. %% find doubles
  214. F = fun({Key, _RemoteProcessPid}) ->
  215. case ets:lookup(Tab, Key) of
  216. [] -> ok;
  217. [{Key, LocalProcessPid}] ->
  218. error_logger:warning_msg("Found a double process for ~s, killing it on local node~n", [Key]),
  219. %% remove it from local mnesia table
  220. mnesia:dirty_delete(syn_processes_table, Key),
  221. %% remove it from ETS
  222. ets:delete(Tab, Key),
  223. %% kill or send message
  224. case CallbackModule of
  225. undefined -> exit(LocalProcessPid, kill);
  226. _ -> spawn(fun() -> CallbackModule:CallbackFunction(Key, LocalProcessPid) end)
  227. end
  228. end
  229. end,
  230. lists:foreach(F, RemoteProcessesInfo),
  231. %% compute local processes without doubles
  232. LocalProcessesInfo1 = ets:tab2list(Tab),
  233. %% delete ETS table
  234. ets:delete(Tab),
  235. %% return
  236. {LocalProcessesInfo1, RemoteProcessesInfo}.
  237. -spec write_remote_processes_to_local(RemoteNode :: atom(), RemoteProcessesInfo :: list()) -> ok.
  238. write_remote_processes_to_local(RemoteNode, RemoteProcessesInfo) ->
  239. write_processes_info_to_node(RemoteNode, RemoteProcessesInfo).
  240. -spec write_local_processes_to_remote(RemoteNode :: atom(), LocalProcessesInfo :: list()) -> ok.
  241. write_local_processes_to_remote(RemoteNode, LocalProcessesInfo) ->
  242. ok = rpc:call(RemoteNode, ?MODULE, write_processes_info_to_node, [node(), LocalProcessesInfo]).
  243. -spec get_processes_info_of_node(Node :: atom()) -> list().
  244. get_processes_info_of_node(Node) ->
  245. %% build match specs
  246. MatchHead = #syn_processes_table{key = '$1', pid = '$2', node = '$3'},
  247. Guard = {'=:=', '$3', Node},
  248. ProcessInfoFormat = {{'$1', '$2'}},
  249. %% select
  250. mnesia:dirty_select(syn_processes_table, [{MatchHead, [Guard], [ProcessInfoFormat]}]).
  251. -spec write_processes_info_to_node(Node :: atom(), ProcessesInfo :: list()) -> ok.
  252. write_processes_info_to_node(Node, ProcessesInfo) ->
  253. FWrite = fun({Key, ProcessPid}) ->
  254. mnesia:dirty_write(#syn_processes_table{
  255. key = Key,
  256. pid = ProcessPid,
  257. node = Node
  258. })
  259. end,
  260. lists:foreach(FWrite, ProcessesInfo).