syn_consistency.erl 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. %% ==========================================================================================================
  2. %% Syn - A global process registry.
  3. %%
  4. %% The MIT License (MIT)
  5. %%
  6. %% Copyright (c) 2016 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
  7. %% Portions of code from Ulf Wiger's unsplit server module:
  8. %% <https://github.com/uwiger/unsplit/blob/master/src/unsplit_server.erl>
  9. %%
  10. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  11. %% of this software and associated documentation files (the "Software"), to deal
  12. %% in the Software without restriction, including without limitation the rights
  13. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  14. %% copies of the Software, and to permit persons to whom the Software is
  15. %% furnished to do so, subject to the following conditions:
  16. %%
  17. %% The above copyright notice and this permission notice shall be included in
  18. %% all copies or substantial portions of the Software.
  19. %%
  20. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  21. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  22. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  23. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  24. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  25. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  26. %% THE SOFTWARE.
  27. %% ==========================================================================================================
  28. -module(syn_consistency).
  29. -behaviour(gen_server).
  30. %% API
  31. -export([start_link/0]).
  32. %% gen_server callbacks
  33. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
  34. %% internal
  35. -export([get_processes_info_of_node/1]).
  36. -export([write_processes_info_to_node/2]).
  37. %% records
  38. -record(state, {
  39. conflicting_process_callback_module = undefined :: atom(),
  40. conflicting_process_callback_function = undefined :: atom()
  41. }).
  42. %% include
  43. -include("syn.hrl").
  44. %% ===================================================================
  45. %% API
  46. %% ===================================================================
  47. -spec start_link() -> {ok, pid()} | {error, any()}.
  48. start_link() ->
  49. Options = [],
  50. gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
  51. %% ===================================================================
  52. %% Callbacks
  53. %% ===================================================================
  54. %% ----------------------------------------------------------------------------------------------------------
  55. %% Init
  56. %% ----------------------------------------------------------------------------------------------------------
  57. -spec init([]) ->
  58. {ok, #state{}} |
  59. {ok, #state{}, Timeout :: non_neg_integer()} |
  60. ignore |
  61. {stop, Reason :: any()}.
  62. init([]) ->
  63. %% monitor mnesia events
  64. mnesia:subscribe(system),
  65. %% get options
  66. {ok, [ConflictingProcessCallbackModule, ConflictingProcessCallbackFunction]} = syn_utils:get_env_value(
  67. conflicting_process_callback,
  68. [undefined, undefined]
  69. ),
  70. %% build state
  71. {ok, #state{
  72. conflicting_process_callback_module = ConflictingProcessCallbackModule,
  73. conflicting_process_callback_function = ConflictingProcessCallbackFunction
  74. }}.
  75. %% ----------------------------------------------------------------------------------------------------------
  76. %% Call messages
  77. %% ----------------------------------------------------------------------------------------------------------
  78. -spec handle_call(Request :: any(), From :: any(), #state{}) ->
  79. {reply, Reply :: any(), #state{}} |
  80. {reply, Reply :: any(), #state{}, Timeout :: non_neg_integer()} |
  81. {noreply, #state{}} |
  82. {noreply, #state{}, Timeout :: non_neg_integer()} |
  83. {stop, Reason :: any(), Reply :: any(), #state{}} |
  84. {stop, Reason :: any(), #state{}}.
  85. handle_call(Request, From, State) ->
  86. error_logger:warning_msg("Received from ~p an unknown call message: ~p", [Request, From]),
  87. {reply, undefined, State}.
  88. %% ----------------------------------------------------------------------------------------------------------
  89. %% Cast messages
  90. %% ----------------------------------------------------------------------------------------------------------
  91. -spec handle_cast(Msg :: any(), #state{}) ->
  92. {noreply, #state{}} |
  93. {noreply, #state{}, Timeout :: non_neg_integer()} |
  94. {stop, Reason :: any(), #state{}}.
  95. handle_cast(Msg, State) ->
  96. error_logger:warning_msg("Received an unknown cast message: ~p", [Msg]),
  97. {noreply, State}.
  98. %% ----------------------------------------------------------------------------------------------------------
  99. %% All non Call / Cast messages
  100. %% ----------------------------------------------------------------------------------------------------------
  101. -spec handle_info(Info :: any(), #state{}) ->
  102. {noreply, #state{}} |
  103. {noreply, #state{}, Timeout :: non_neg_integer()} |
  104. {stop, Reason :: any(), #state{}}.
  105. handle_info({mnesia_system_event, {inconsistent_database, Context, RemoteNode}}, State) ->
  106. error_logger:error_msg("MNESIA signalled an inconsistent database on node ~p for remote node ~p with context: ~p, initiating automerge", [node(), RemoteNode, Context]),
  107. automerge(RemoteNode),
  108. {noreply, State};
  109. handle_info({mnesia_system_event, {mnesia_down, RemoteNode}}, State) when RemoteNode =/= node() ->
  110. error_logger:error_msg("Received a MNESIA down event, removing on node ~p all pids of node ~p", [node(), RemoteNode]),
  111. delete_pids_of_disconnected_node(RemoteNode),
  112. {noreply, State};
  113. handle_info({mnesia_system_event, _MnesiaEvent}, State) ->
  114. %% ignore mnesia event
  115. {noreply, State};
  116. handle_info({purge_double_processes, DoubleProcessesInfo}, #state{
  117. conflicting_process_callback_module = ConflictingProcessCallbackModule,
  118. conflicting_process_callback_function = ConflictingProcessCallbackFunction
  119. } = State) ->
  120. error_logger:warning_msg("About to purge double processes after netsplit"),
  121. purge_double_processes(ConflictingProcessCallbackModule, ConflictingProcessCallbackFunction, DoubleProcessesInfo),
  122. {noreply, State};
  123. handle_info(Info, State) ->
  124. error_logger:warning_msg("Received an unknown info message: ~p", [Info]),
  125. {noreply, State}.
  126. %% ----------------------------------------------------------------------------------------------------------
  127. %% Terminate
  128. %% ----------------------------------------------------------------------------------------------------------
  129. -spec terminate(Reason :: any(), #state{}) -> terminated.
  130. terminate(Reason, _State) ->
  131. error_logger:info_msg("Terminating syn netsplits with reason: ~p", [Reason]),
  132. terminated.
  133. %% ----------------------------------------------------------------------------------------------------------
  134. %% Convert process state when code is changed.
  135. %% ----------------------------------------------------------------------------------------------------------
  136. -spec code_change(OldVsn :: any(), #state{}, Extra :: any()) -> {ok, #state{}}.
  137. code_change(_OldVsn, State, _Extra) ->
  138. {ok, State}.
  139. %% ===================================================================
  140. %% Internal
  141. %% ===================================================================
  142. -spec delete_pids_of_disconnected_node(RemoteNode :: atom()) -> ok.
  143. delete_pids_of_disconnected_node(RemoteNode) ->
  144. %% build match specs
  145. MatchHead = #syn_processes_table{key = '$1', node = '$2', _ = '_'},
  146. Guard = {'=:=', '$2', RemoteNode},
  147. IdFormat = '$1',
  148. %% delete
  149. DelF = fun(Id) -> mnesia:dirty_delete({syn_processes_table, Id}) end,
  150. NodePids = mnesia:dirty_select(syn_processes_table, [{MatchHead, [Guard], [IdFormat]}]),
  151. lists:foreach(DelF, NodePids).
  152. -spec automerge(RemoteNode :: atom()) -> ok.
  153. automerge(RemoteNode) ->
  154. global:trans({{?MODULE, automerge}, self()},
  155. fun() ->
  156. error_logger:warning_msg("AUTOMERGE starting on node ~p for remote node ~p (global lock is set)", [node(), RemoteNode]),
  157. check_stitch(RemoteNode),
  158. error_logger:warning_msg("AUTOMERGE done (global lock about to be unset)")
  159. end).
  160. -spec check_stitch(RemoteNode :: atom()) -> ok.
  161. check_stitch(RemoteNode) ->
  162. case catch lists:member(RemoteNode, mnesia:system_info(running_db_nodes)) of
  163. true ->
  164. error_logger:warning_msg("Remote node ~p is already stitched.", [RemoteNode]),
  165. ok;
  166. false ->
  167. catch stitch(RemoteNode),
  168. ok;
  169. Error ->
  170. error_logger:error_msg("Could not check if node is stiched: ~p", [Error]),
  171. ok
  172. end.
  173. -spec stitch(RemoteNode :: atom()) -> {ok, any()} | {error, any()}.
  174. stitch(RemoteNode) ->
  175. mnesia_controller:connect_nodes(
  176. [RemoteNode],
  177. fun(MergeF) ->
  178. catch case MergeF([syn_processes_table]) of
  179. {merged, _, _} = Res ->
  180. stitch_tab(RemoteNode),
  181. Res;
  182. Other ->
  183. Other
  184. end
  185. end).
  186. -spec stitch_tab(RemoteNode :: atom()) -> ok.
  187. stitch_tab(RemoteNode) ->
  188. %% get remote processes info
  189. RemoteProcessesInfo = rpc:call(RemoteNode, ?MODULE, get_processes_info_of_node, [RemoteNode]),
  190. %% get local processes info
  191. LocalProcessesInfo = get_processes_info_of_node(node()),
  192. %% purge doubles (if any)
  193. {LocalProcessesInfo1, RemoteProcessesInfo1} = purge_double_processes_from_local_mnesia(
  194. LocalProcessesInfo,
  195. RemoteProcessesInfo
  196. ),
  197. %% write
  198. write_remote_processes_to_local(RemoteNode, RemoteProcessesInfo1),
  199. write_local_processes_to_remote(RemoteNode, LocalProcessesInfo1).
  200. -spec purge_double_processes_from_local_mnesia(
  201. LocalProcessesInfo :: list(),
  202. RemoteProcessesInfo :: list()
  203. ) ->
  204. {LocalProcessesInfo :: list(), RemoteProcessesInfo :: list()}.
  205. purge_double_processes_from_local_mnesia(LocalProcessesInfo, RemoteProcessesInfo) ->
  206. %% create ETS table
  207. Tab = ets:new(syn_automerge_doubles_table, [set]),
  208. %% insert local processes info
  209. ets:insert(Tab, LocalProcessesInfo),
  210. %% find doubles
  211. F = fun({Key, _RemoteProcessPid, _RemoteProcessMeta}, Acc) ->
  212. case ets:lookup(Tab, Key) of
  213. [] -> Acc;
  214. [{Key, LocalProcessPid, LocalProcessMeta}] ->
  215. %% found a double process, remove it from local mnesia table
  216. mnesia:dirty_delete(syn_processes_table, Key),
  217. %% remove it from ETS
  218. ets:delete(Tab, Key),
  219. %% add it to acc
  220. [{Key, LocalProcessPid, LocalProcessMeta} | Acc]
  221. end
  222. end,
  223. DoubleProcessesInfo = lists:foldl(F, [], RemoteProcessesInfo),
  224. %% send to syn_consistency gen_server to handle double processes once merging is done
  225. ?MODULE ! {purge_double_processes, DoubleProcessesInfo},
  226. %% compute local processes without doubles
  227. LocalProcessesInfo1 = ets:tab2list(Tab),
  228. %% delete ETS table
  229. ets:delete(Tab),
  230. %% return
  231. {LocalProcessesInfo1, RemoteProcessesInfo}.
  232. -spec write_remote_processes_to_local(RemoteNode :: atom(), RemoteProcessesInfo :: list()) -> ok.
  233. write_remote_processes_to_local(RemoteNode, RemoteProcessesInfo) ->
  234. write_processes_info_to_node(RemoteNode, RemoteProcessesInfo).
  235. -spec write_local_processes_to_remote(RemoteNode :: atom(), LocalProcessesInfo :: list()) -> ok.
  236. write_local_processes_to_remote(RemoteNode, LocalProcessesInfo) ->
  237. ok = rpc:call(RemoteNode, ?MODULE, write_processes_info_to_node, [node(), LocalProcessesInfo]).
  238. -spec get_processes_info_of_node(Node :: atom()) -> list().
  239. get_processes_info_of_node(Node) ->
  240. %% build match specs
  241. MatchHead = #syn_processes_table{key = '$1', pid = '$2', node = '$3', meta = '$4'},
  242. Guard = {'=:=', '$3', Node},
  243. ProcessInfoFormat = {{'$1', '$2', '$4'}},
  244. %% select
  245. mnesia:dirty_select(syn_processes_table, [{MatchHead, [Guard], [ProcessInfoFormat]}]).
  246. -spec write_processes_info_to_node(Node :: atom(), ProcessesInfo :: list()) -> ok.
  247. write_processes_info_to_node(Node, ProcessesInfo) ->
  248. FWrite = fun({Key, ProcessPid, ProcessMeta}) ->
  249. mnesia:dirty_write(#syn_processes_table{
  250. key = Key,
  251. pid = ProcessPid,
  252. node = Node,
  253. meta = ProcessMeta
  254. })
  255. end,
  256. lists:foreach(FWrite, ProcessesInfo).
  257. -spec purge_double_processes(
  258. ConflictingProcessCallbackModule :: atom(),
  259. ConflictingProcessCallbackFunction :: atom(),
  260. DoubleProcessesInfo :: list()
  261. ) -> ok.
  262. purge_double_processes(undefined, _, DoubleProcessesInfo) ->
  263. F = fun({Key, LocalProcessPid, _LocalProcessMeta}) ->
  264. error_logger:warning_msg("Found a double process for ~s, killing it on local node ~p", [Key, node()]),
  265. exit(LocalProcessPid, kill)
  266. end,
  267. lists:foreach(F, DoubleProcessesInfo);
  268. purge_double_processes(ConflictingProcessCallbackModule, ConflictingProcessCallbackFunction, DoubleProcessesInfo) ->
  269. F = fun({Key, LocalProcessPid, LocalProcessMeta}) ->
  270. spawn(
  271. fun() ->
  272. error_logger:warning_msg("Found a double process for ~s, about to trigger callback on local node ~p", [Key, node()]),
  273. ConflictingProcessCallbackModule:ConflictingProcessCallbackFunction(Key, LocalProcessPid, LocalProcessMeta)
  274. end)
  275. end,
  276. lists:foreach(F, DoubleProcessesInfo).