syn_consistency.erl 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. %% ==========================================================================================================
  2. %% Syn - A global process registry.
  3. %%
  4. %% Copyright (C) 2015, Roberto Ostinelli <roberto@ostinelli.net>.
  5. %% Portions of code from Ulf Wiger's unsplit server module:
  6. %% <https://github.com/uwiger/unsplit/blob/master/src/unsplit_server.erl>
  7. %%
  8. %% All rights reserved.
  9. %%
  10. %% The MIT License (MIT)
  11. %%
  12. %% Copyright (c) 2015 Roberto Ostinelli
  13. %%
  14. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  15. %% of this software and associated documentation files (the "Software"), to deal
  16. %% in the Software without restriction, including without limitation the rights
  17. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  18. %% copies of the Software, and to permit persons to whom the Software is
  19. %% furnished to do so, subject to the following conditions:
  20. %%
  21. %% The above copyright notice and this permission notice shall be included in
  22. %% all copies or substantial portions of the Software.
  23. %%
  24. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  25. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  26. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  27. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  28. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  29. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  30. %% THE SOFTWARE.
  31. %% ==========================================================================================================
  32. -module(syn_consistency).
  33. -behaviour(gen_server).
  34. %% API
  35. -export([start_link/0]).
  36. %% gen_server callbacks
  37. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
  38. %% internal
  39. -export([get_processes_info_of_node/1]).
  40. -export([write_processes_info_to_node/2]).
  41. %% records
  42. -record(state, {
  43. conflicting_process_callback_module = undefined :: atom(),
  44. conflicting_process_callback_function = undefined :: atom()
  45. }).
  46. %% include
  47. -include("syn.hrl").
  48. %% ===================================================================
  49. %% API
  50. %% ===================================================================
  51. -spec start_link() -> {ok, pid()} | {error, any()}.
  52. start_link() ->
  53. Options = [],
  54. gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
  55. %% ===================================================================
  56. %% Callbacks
  57. %% ===================================================================
  58. %% ----------------------------------------------------------------------------------------------------------
  59. %% Init
  60. %% ----------------------------------------------------------------------------------------------------------
  61. -spec init([]) ->
  62. {ok, #state{}} |
  63. {ok, #state{}, Timeout :: non_neg_integer()} |
  64. ignore |
  65. {stop, Reason :: any()}.
  66. init([]) ->
  67. %% monitor mnesia events
  68. mnesia:subscribe(system),
  69. %% get options
  70. {ok, [ConflictingProcessCallbackModule, ConflictingProcessCallbackFunction]} = syn_utils:get_env_value(
  71. conflicting_process_callback,
  72. [undefined, undefined]
  73. ),
  74. %% build state
  75. {ok, #state{
  76. conflicting_process_callback_module = ConflictingProcessCallbackModule,
  77. conflicting_process_callback_function = ConflictingProcessCallbackFunction
  78. }}.
  79. %% ----------------------------------------------------------------------------------------------------------
  80. %% Call messages
  81. %% ----------------------------------------------------------------------------------------------------------
  82. -spec handle_call(Request :: any(), From :: any(), #state{}) ->
  83. {reply, Reply :: any(), #state{}} |
  84. {reply, Reply :: any(), #state{}, Timeout :: non_neg_integer()} |
  85. {noreply, #state{}} |
  86. {noreply, #state{}, Timeout :: non_neg_integer()} |
  87. {stop, Reason :: any(), Reply :: any(), #state{}} |
  88. {stop, Reason :: any(), #state{}}.
  89. handle_call(Request, From, State) ->
  90. error_logger:warning_msg("Received from ~p an unknown call message: ~p", [Request, From]),
  91. {reply, undefined, State}.
  92. %% ----------------------------------------------------------------------------------------------------------
  93. %% Cast messages
  94. %% ----------------------------------------------------------------------------------------------------------
  95. -spec handle_cast(Msg :: any(), #state{}) ->
  96. {noreply, #state{}} |
  97. {noreply, #state{}, Timeout :: non_neg_integer()} |
  98. {stop, Reason :: any(), #state{}}.
  99. handle_cast(Msg, State) ->
  100. error_logger:warning_msg("Received an unknown cast message: ~p", [Msg]),
  101. {noreply, State}.
  102. %% ----------------------------------------------------------------------------------------------------------
  103. %% All non Call / Cast messages
  104. %% ----------------------------------------------------------------------------------------------------------
  105. -spec handle_info(Info :: any(), #state{}) ->
  106. {noreply, #state{}} |
  107. {noreply, #state{}, Timeout :: non_neg_integer()} |
  108. {stop, Reason :: any(), #state{}}.
  109. handle_info({mnesia_system_event, {inconsistent_database, Context, Node}}, State) ->
  110. error_logger:warning_msg("MNESIA signalled an inconsistent database on node: ~p with context: ~p, initiating automerge", [Node, Context]),
  111. automerge(Node),
  112. {noreply, State};
  113. handle_info({mnesia_system_event, {mnesia_down, Node}}, State) when Node =/= node() ->
  114. error_logger:warning_msg("Received a MNESIA down event, removing all pids of node ~p", [Node]),
  115. delete_pids_of_disconnected_node(Node),
  116. {noreply, State};
  117. handle_info({mnesia_system_event, _MnesiaEvent}, State) ->
  118. %% ignore mnesia event
  119. {noreply, State};
  120. handle_info({purge_double_processes, DoubleProcessesInfo}, #state{
  121. conflicting_process_callback_module = ConflictingProcessCallbackModule,
  122. conflicting_process_callback_function = ConflictingProcessCallbackFunction
  123. } = State) ->
  124. %% is in a test environment, wait to ensure we can get the process exit callback messages
  125. case application:get_env(is_test) of
  126. undefined -> ok;
  127. {ok, true} -> timer:sleep(2000);
  128. _ -> ok
  129. end,
  130. error_logger:warning_msg("About to purge double processes after netsplit"),
  131. purge_double_processes(ConflictingProcessCallbackModule, ConflictingProcessCallbackFunction, DoubleProcessesInfo),
  132. {noreply, State};
  133. handle_info(Info, State) ->
  134. error_logger:warning_msg("Received an unknown info message: ~p", [Info]),
  135. {noreply, State}.
  136. %% ----------------------------------------------------------------------------------------------------------
  137. %% Terminate
  138. %% ----------------------------------------------------------------------------------------------------------
  139. -spec terminate(Reason :: any(), #state{}) -> terminated.
  140. terminate(Reason, _State) ->
  141. error_logger:info_msg("Terminating syn netsplits with reason: ~p", [Reason]),
  142. terminated.
  143. %% ----------------------------------------------------------------------------------------------------------
  144. %% Convert process state when code is changed.
  145. %% ----------------------------------------------------------------------------------------------------------
  146. -spec code_change(OldVsn :: any(), #state{}, Extra :: any()) -> {ok, #state{}}.
  147. code_change(_OldVsn, State, _Extra) ->
  148. {ok, State}.
  149. %% ===================================================================
  150. %% Internal
  151. %% ===================================================================
  152. -spec delete_pids_of_disconnected_node(Node :: atom()) -> pid().
  153. delete_pids_of_disconnected_node(Node) ->
  154. %% don't lock gen server
  155. spawn(fun() ->
  156. %% build match specs
  157. MatchHead = #syn_processes_table{key = '$1', node = '$2', _ = '_'},
  158. Guard = {'=:=', '$2', Node},
  159. IdFormat = '$1',
  160. %% delete
  161. DelF = fun(Id) -> mnesia:dirty_delete({syn_processes_table, Id}) end,
  162. NodePids = mnesia:dirty_select(syn_processes_table, [{MatchHead, [Guard], [IdFormat]}]),
  163. lists:foreach(DelF, NodePids)
  164. end).
  165. -spec automerge(RemoteNode :: atom()) -> ok.
  166. automerge(RemoteNode) ->
  167. global:trans({{?MODULE, automerge}, self()},
  168. fun() ->
  169. error_logger:warning_msg("AUTOMERGE starting for remote node ~s (global lock is set)", [RemoteNode]),
  170. check_stitch(RemoteNode),
  171. error_logger:warning_msg("AUTOMERGE done (global lock about to be unset)")
  172. end).
  173. -spec check_stitch(RemoteNode :: atom()) -> ok.
  174. check_stitch(RemoteNode) ->
  175. case catch lists:member(RemoteNode, mnesia:system_info(running_db_nodes)) of
  176. true ->
  177. ok;
  178. false ->
  179. catch stitch(RemoteNode),
  180. ok;
  181. Error ->
  182. error_logger:error_msg("Could not check if node is stiched: ~p", [Error]),
  183. ok
  184. end.
  185. -spec stitch(RemoteNode :: atom()) -> {ok, any()} | {error, any()}.
  186. stitch(RemoteNode) ->
  187. mnesia_controller:connect_nodes(
  188. [RemoteNode],
  189. fun(MergeF) ->
  190. catch case MergeF([syn_processes_table]) of
  191. {merged, _, _} = Res ->
  192. stitch_tab(RemoteNode),
  193. Res;
  194. Other ->
  195. Other
  196. end
  197. end).
  198. -spec stitch_tab(RemoteNode :: atom()) -> ok.
  199. stitch_tab(RemoteNode) ->
  200. %% get remote processes info
  201. RemoteProcessesInfo = rpc:call(RemoteNode, ?MODULE, get_processes_info_of_node, [RemoteNode]),
  202. %% get local processes info
  203. LocalProcessesInfo = get_processes_info_of_node(node()),
  204. %% purge doubles (if any)
  205. {LocalProcessesInfo1, RemoteProcessesInfo1} = purge_double_processes_from_local_mnesia(
  206. LocalProcessesInfo,
  207. RemoteProcessesInfo
  208. ),
  209. %% write
  210. write_remote_processes_to_local(RemoteNode, RemoteProcessesInfo1),
  211. write_local_processes_to_remote(RemoteNode, LocalProcessesInfo1).
  212. -spec purge_double_processes_from_local_mnesia(
  213. LocalProcessesInfo :: list(),
  214. RemoteProcessesInfo :: list()
  215. ) ->
  216. {LocalProcessesInfo :: list(), RemoteProcessesInfo :: list()}.
  217. purge_double_processes_from_local_mnesia(LocalProcessesInfo, RemoteProcessesInfo) ->
  218. %% create ETS table
  219. Tab = ets:new(syn_automerge_doubles_table, [set]),
  220. %% insert local processes info
  221. ets:insert(Tab, LocalProcessesInfo),
  222. %% find doubles
  223. F = fun({Key, _RemoteProcessPid, _RemoteProcessMeta}, Acc) ->
  224. case ets:lookup(Tab, Key) of
  225. [] -> Acc;
  226. [{Key, LocalProcessPid, LocalProcessMeta}] ->
  227. %% found a double process, remove it from local mnesia table
  228. mnesia:dirty_delete(syn_processes_table, Key),
  229. %% remove it from ETS
  230. ets:delete(Tab, Key),
  231. %% add it to acc
  232. [{Key, LocalProcessPid, LocalProcessMeta} | Acc]
  233. end
  234. end,
  235. DoubleProcessesInfo = lists:foldl(F, [], RemoteProcessesInfo),
  236. %% send to syn_consistency gen_server to handle double processes once merging is done
  237. ?MODULE ! {purge_double_processes, DoubleProcessesInfo},
  238. %% compute local processes without doubles
  239. LocalProcessesInfo1 = ets:tab2list(Tab),
  240. %% delete ETS table
  241. ets:delete(Tab),
  242. %% return
  243. {LocalProcessesInfo1, RemoteProcessesInfo}.
  244. -spec write_remote_processes_to_local(RemoteNode :: atom(), RemoteProcessesInfo :: list()) -> ok.
  245. write_remote_processes_to_local(RemoteNode, RemoteProcessesInfo) ->
  246. write_processes_info_to_node(RemoteNode, RemoteProcessesInfo).
  247. -spec write_local_processes_to_remote(RemoteNode :: atom(), LocalProcessesInfo :: list()) -> ok.
  248. write_local_processes_to_remote(RemoteNode, LocalProcessesInfo) ->
  249. ok = rpc:call(RemoteNode, ?MODULE, write_processes_info_to_node, [node(), LocalProcessesInfo]).
  250. -spec get_processes_info_of_node(Node :: atom()) -> list().
  251. get_processes_info_of_node(Node) ->
  252. %% build match specs
  253. MatchHead = #syn_processes_table{key = '$1', pid = '$2', node = '$3', meta = '$4'},
  254. Guard = {'=:=', '$3', Node},
  255. ProcessInfoFormat = {{'$1', '$2', '$4'}},
  256. %% select
  257. mnesia:dirty_select(syn_processes_table, [{MatchHead, [Guard], [ProcessInfoFormat]}]).
  258. -spec write_processes_info_to_node(Node :: atom(), ProcessesInfo :: list()) -> ok.
  259. write_processes_info_to_node(Node, ProcessesInfo) ->
  260. FWrite = fun({Key, ProcessPid, ProcessMeta}) ->
  261. mnesia:dirty_write(#syn_processes_table{
  262. key = Key,
  263. pid = ProcessPid,
  264. node = Node,
  265. meta = ProcessMeta
  266. })
  267. end,
  268. lists:foreach(FWrite, ProcessesInfo).
  269. -spec purge_double_processes(
  270. ConflictingProcessCallbackModule :: atom(),
  271. ConflictingProcessCallbackFunction :: atom(),
  272. DoubleProcessesInfo :: list()
  273. ) -> ok.
  274. purge_double_processes(undefined, _, DoubleProcessesInfo) ->
  275. F = fun({Key, LocalProcessPid, _LocalProcessMeta}) ->
  276. error_logger:warning_msg("Found a double process for ~s, killing it on local node ~p", [Key, node()]),
  277. exit(LocalProcessPid, kill)
  278. end,
  279. lists:foreach(F, DoubleProcessesInfo);
  280. purge_double_processes(ConflictingProcessCallbackModule, ConflictingProcessCallbackFunction, DoubleProcessesInfo) ->
  281. F = fun({Key, LocalProcessPid, LocalProcessMeta}) ->
  282. spawn(
  283. fun() ->
  284. error_logger:warning_msg("Found a double process for ~s, about to trigger callback on local node ~p", [Key, node()]),
  285. ConflictingProcessCallbackModule:ConflictingProcessCallbackFunction(Key, LocalProcessPid, LocalProcessMeta)
  286. end)
  287. end,
  288. lists:foreach(F, DoubleProcessesInfo).