syn_consistency.erl 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374
  1. %% ==========================================================================================================
  2. %% Syn - A global Process Registry and Process Group manager.
  3. %%
  4. %% The MIT License (MIT)
  5. %%
  6. %% Copyright (c) 2015 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
  7. %%
  8. %% Portions of code from Ulf Wiger's unsplit server module:
  9. %% <https://github.com/uwiger/unsplit/blob/master/src/unsplit_server.erl>
  10. %%
  11. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  12. %% of this software and associated documentation files (the "Software"), to deal
  13. %% in the Software without restriction, including without limitation the rights
  14. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  15. %% copies of the Software, and to permit persons to whom the Software is
  16. %% furnished to do so, subject to the following conditions:
  17. %%
  18. %% The above copyright notice and this permission notice shall be included in
  19. %% all copies or substantial portions of the Software.
  20. %%
  21. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  22. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  23. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  24. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  25. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  26. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  27. %% THE SOFTWARE.
  28. %% ==========================================================================================================
  29. -module(syn_consistency).
  30. -behaviour(gen_server).
  31. %% API
  32. -export([start_link/0]).
  33. %% gen_server callbacks
  34. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
  35. %% internal
  36. -export([get_registry_processes_info_of_node/1]).
  37. -export([write_registry_processes_info_to_node/2]).
  38. -export([get_groups_processes_info_of_node/1]).
  39. -export([write_groups_processes_info_to_node/2]).
  40. %% records
  41. -record(state, {
  42. registry_conflicting_process_callback_module = undefined :: atom(),
  43. registry_conflicting_process_callback_function = undefined :: atom()
  44. }).
  45. %% include
  46. -include("syn.hrl").
  47. %% ===================================================================
  48. %% API
  49. %% ===================================================================
  50. -spec start_link() -> {ok, pid()} | {error, any()}.
  51. start_link() ->
  52. Options = [],
  53. gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
  54. %% ===================================================================
  55. %% Callbacks
  56. %% ===================================================================
  57. %% ----------------------------------------------------------------------------------------------------------
  58. %% Init
  59. %% ----------------------------------------------------------------------------------------------------------
  60. -spec init([]) ->
  61. {ok, #state{}} |
  62. {ok, #state{}, Timeout :: non_neg_integer()} |
  63. ignore |
  64. {stop, Reason :: any()}.
  65. init([]) ->
  66. %% monitor mnesia events
  67. mnesia:subscribe(system),
  68. %% get options
  69. {ok, [RegistryConflictingProcessCallbackModule, RegistryConflictingProcessCallbackFunction]} = syn_utils:get_env_value(
  70. registry_conflicting_process_callback,
  71. [undefined, undefined]
  72. ),
  73. %% build state
  74. {ok, #state{
  75. registry_conflicting_process_callback_module = RegistryConflictingProcessCallbackModule,
  76. registry_conflicting_process_callback_function = RegistryConflictingProcessCallbackFunction
  77. }}.
  78. %% ----------------------------------------------------------------------------------------------------------
  79. %% Call messages
  80. %% ----------------------------------------------------------------------------------------------------------
  81. -spec handle_call(Request :: any(), From :: any(), #state{}) ->
  82. {reply, Reply :: any(), #state{}} |
  83. {reply, Reply :: any(), #state{}, Timeout :: non_neg_integer()} |
  84. {noreply, #state{}} |
  85. {noreply, #state{}, Timeout :: non_neg_integer()} |
  86. {stop, Reason :: any(), Reply :: any(), #state{}} |
  87. {stop, Reason :: any(), #state{}}.
  88. handle_call(Request, From, State) ->
  89. error_logger:warning_msg("Received from ~p an unknown call message: ~p", [Request, From]),
  90. {reply, undefined, State}.
  91. %% ----------------------------------------------------------------------------------------------------------
  92. %% Cast messages
  93. %% ----------------------------------------------------------------------------------------------------------
  94. -spec handle_cast(Msg :: any(), #state{}) ->
  95. {noreply, #state{}} |
  96. {noreply, #state{}, Timeout :: non_neg_integer()} |
  97. {stop, Reason :: any(), #state{}}.
  98. handle_cast(Msg, State) ->
  99. error_logger:warning_msg("Received an unknown cast message: ~p", [Msg]),
  100. {noreply, State}.
  101. %% ----------------------------------------------------------------------------------------------------------
  102. %% All non Call / Cast messages
  103. %% ----------------------------------------------------------------------------------------------------------
  104. -spec handle_info(Info :: any(), #state{}) ->
  105. {noreply, #state{}} |
  106. {noreply, #state{}, Timeout :: non_neg_integer()} |
  107. {stop, Reason :: any(), #state{}}.
  108. handle_info({mnesia_system_event, {inconsistent_database, Context, RemoteNode}}, State) ->
  109. error_logger:error_msg("MNESIA signalled an inconsistent database on node ~p for remote node ~p with context: ~p, initiating automerge", [node(), RemoteNode, Context]),
  110. automerge(RemoteNode),
  111. {noreply, State};
  112. handle_info({mnesia_system_event, {mnesia_down, RemoteNode}}, State) when RemoteNode =/= node() ->
  113. error_logger:error_msg("Received a MNESIA down event, removing on node ~p all pids of node ~p", [node(), RemoteNode]),
  114. delete_registry_pids_of_disconnected_node(RemoteNode),
  115. delete_groups_pids_of_disconnected_node(RemoteNode),
  116. {noreply, State};
  117. handle_info({mnesia_system_event, _MnesiaEvent}, State) ->
  118. %% ignore mnesia event
  119. {noreply, State};
  120. handle_info({handle_purged_registry_double_processes, DoubleRegistryProcessesInfo}, #state{
  121. registry_conflicting_process_callback_module = RegistryConflictingProcessCallbackModule,
  122. registry_conflicting_process_callback_function = RegistryConflictingProcessCallbackFunction
  123. } = State) ->
  124. error_logger:warning_msg("About to purge double processes after netsplit"),
  125. handle_purged_registry_double_processes(RegistryConflictingProcessCallbackModule, RegistryConflictingProcessCallbackFunction, DoubleRegistryProcessesInfo),
  126. {noreply, State};
  127. handle_info(Info, State) ->
  128. error_logger:warning_msg("Received an unknown info message: ~p", [Info]),
  129. {noreply, State}.
  130. %% ----------------------------------------------------------------------------------------------------------
  131. %% Terminate
  132. %% ----------------------------------------------------------------------------------------------------------
  133. -spec terminate(Reason :: any(), #state{}) -> terminated.
  134. terminate(Reason, _State) ->
  135. error_logger:info_msg("Terminating syn consistency with reason: ~p", [Reason]),
  136. terminated.
  137. %% ----------------------------------------------------------------------------------------------------------
  138. %% Convert process state when code is changed.
  139. %% ----------------------------------------------------------------------------------------------------------
  140. -spec code_change(OldVsn :: any(), #state{}, Extra :: any()) -> {ok, #state{}}.
  141. code_change(_OldVsn, State, _Extra) ->
  142. {ok, State}.
  143. %% ===================================================================
  144. %% Internal
  145. %% ===================================================================
  146. -spec delete_registry_pids_of_disconnected_node(RemoteNode :: atom()) -> ok.
  147. delete_registry_pids_of_disconnected_node(RemoteNode) ->
  148. %% build match specs
  149. MatchHead = #syn_registry_table{key = '$1', node = '$2', _ = '_'},
  150. Guard = {'=:=', '$2', RemoteNode},
  151. IdFormat = '$1',
  152. %% delete
  153. DelF = fun(Id) -> mnesia:dirty_delete({syn_registry_table, Id}) end,
  154. NodePids = mnesia:dirty_select(syn_registry_table, [{MatchHead, [Guard], [IdFormat]}]),
  155. lists:foreach(DelF, NodePids).
  156. -spec delete_groups_pids_of_disconnected_node(RemoteNode :: atom()) -> ok.
  157. delete_groups_pids_of_disconnected_node(RemoteNode) ->
  158. %% build match specs
  159. Pattern = #syn_groups_table{node = RemoteNode, _ = '_'},
  160. ObjectsToDelete = mnesia:dirty_match_object(syn_groups_table, Pattern),
  161. %% delete
  162. DelF = fun(Record) -> mnesia:dirty_delete_object(syn_groups_table, Record) end,
  163. lists:foreach(DelF, ObjectsToDelete).
  164. -spec automerge(RemoteNode :: atom()) -> ok.
  165. automerge(RemoteNode) ->
  166. %% suspend processes able to modify mnesia tables
  167. sys:suspend(syn_registry),
  168. sys:suspend(syn_groups),
  169. %% resolve conflicts
  170. global:trans({{?MODULE, automerge}, self()},
  171. fun() ->
  172. error_logger:warning_msg("AUTOMERGE starting on node ~p for remote node ~p (global lock is set)", [node(), RemoteNode]),
  173. check_stitch(RemoteNode),
  174. error_logger:warning_msg("AUTOMERGE done (global lock about to be unset)")
  175. end
  176. ),
  177. %% resume processes able to modify mnesia tables
  178. sys:resume(syn_registry),
  179. sys:resume(syn_groups).
  180. -spec check_stitch(RemoteNode :: atom()) -> ok.
  181. check_stitch(RemoteNode) ->
  182. case catch lists:member(RemoteNode, mnesia:system_info(running_db_nodes)) of
  183. true ->
  184. error_logger:warning_msg("Remote node ~p is already stitched.", [RemoteNode]),
  185. ok;
  186. false ->
  187. catch stitch(RemoteNode),
  188. ok;
  189. Error ->
  190. error_logger:error_msg("Could not check if node is stiched: ~p", [Error]),
  191. ok
  192. end.
  193. -spec stitch(RemoteNode :: atom()) -> {ok, any()} | {error, any()}.
  194. stitch(RemoteNode) ->
  195. mnesia_controller:connect_nodes(
  196. [RemoteNode],
  197. fun(MergeF) ->
  198. catch case MergeF([syn_registry_table, syn_groups_table]) of
  199. {merged, _, _} = Res ->
  200. stitch_registry_tab(RemoteNode),
  201. stitch_group_tab(RemoteNode),
  202. Res;
  203. Other ->
  204. Other
  205. end
  206. end).
  207. -spec stitch_registry_tab(RemoteNode :: atom()) -> ok.
  208. stitch_registry_tab(RemoteNode) ->
  209. %% get remote processes info
  210. RemoteRegistryProcessesInfo = rpc:call(RemoteNode, ?MODULE, get_registry_processes_info_of_node, [RemoteNode]),
  211. %% get local processes info
  212. LocalRegistryProcessesInfo = get_registry_processes_info_of_node(node()),
  213. %% purge doubles (if any)
  214. {LocalRegistryProcessesInfo1, RemoteRegistryProcessesInfo1} = purge_registry_double_processes_from_local_mnesia(
  215. LocalRegistryProcessesInfo,
  216. RemoteRegistryProcessesInfo
  217. ),
  218. %% write
  219. write_remote_registry_processes_to_local(RemoteNode, RemoteRegistryProcessesInfo1),
  220. write_local_registry_processes_to_remote(RemoteNode, LocalRegistryProcessesInfo1).
  221. -spec purge_registry_double_processes_from_local_mnesia(
  222. LocalRegistryProcessesInfo :: list(),
  223. RemoteRegistryProcessesInfo :: list()
  224. ) ->
  225. {LocalRegistryProcessesInfo :: list(), RemoteRegistryProcessesInfo :: list()}.
  226. purge_registry_double_processes_from_local_mnesia(LocalRegistryProcessesInfo, RemoteRegistryProcessesInfo) ->
  227. %% create ETS table
  228. Tab = ets:new(syn_automerge_doubles_table, [set]),
  229. %% insert local processes info
  230. ets:insert(Tab, LocalRegistryProcessesInfo),
  231. %% find doubles
  232. F = fun({Key, _RemoteProcessPid, _RemoteProcessMeta}, Acc) ->
  233. case ets:lookup(Tab, Key) of
  234. [] -> Acc;
  235. [{Key, LocalProcessPid, LocalProcessMeta}] ->
  236. %% found a double process, remove it from local mnesia table
  237. mnesia:dirty_delete(syn_registry_table, Key),
  238. %% remove it from ETS
  239. ets:delete(Tab, Key),
  240. %% add it to acc
  241. [{Key, LocalProcessPid, LocalProcessMeta} | Acc]
  242. end
  243. end,
  244. DoubleRegistryProcessesInfo = lists:foldl(F, [], RemoteRegistryProcessesInfo),
  245. %% send to syn_consistency gen_server to handle double processes once merging is done
  246. ?MODULE ! {handle_purged_registry_double_processes, DoubleRegistryProcessesInfo},
  247. %% compute local processes without doubles
  248. LocalRegistryProcessesInfo1 = ets:tab2list(Tab),
  249. %% delete ETS table
  250. ets:delete(Tab),
  251. %% return
  252. {LocalRegistryProcessesInfo1, RemoteRegistryProcessesInfo}.
  253. -spec write_remote_registry_processes_to_local(RemoteNode :: atom(), RemoteRegistryProcessesInfo :: list()) -> ok.
  254. write_remote_registry_processes_to_local(RemoteNode, RemoteRegistryProcessesInfo) ->
  255. write_registry_processes_info_to_node(RemoteNode, RemoteRegistryProcessesInfo).
  256. -spec write_local_registry_processes_to_remote(RemoteNode :: atom(), LocalRegistryProcessesInfo :: list()) -> ok.
  257. write_local_registry_processes_to_remote(RemoteNode, LocalRegistryProcessesInfo) ->
  258. ok = rpc:call(RemoteNode, ?MODULE, write_registry_processes_info_to_node, [node(), LocalRegistryProcessesInfo]).
  259. -spec get_registry_processes_info_of_node(Node :: atom()) -> list().
  260. get_registry_processes_info_of_node(Node) ->
  261. %% build match specs
  262. MatchHead = #syn_registry_table{key = '$1', pid = '$2', node = '$3', meta = '$4'},
  263. Guard = {'=:=', '$3', Node},
  264. ProcessInfoFormat = {{'$1', '$2', '$4'}},
  265. %% select
  266. mnesia:dirty_select(syn_registry_table, [{MatchHead, [Guard], [ProcessInfoFormat]}]).
  267. -spec write_registry_processes_info_to_node(Node :: atom(), RegistryProcessesInfo :: list()) -> ok.
  268. write_registry_processes_info_to_node(Node, RegistryProcessesInfo) ->
  269. FWrite = fun({Key, ProcessPid, ProcessMeta}) ->
  270. mnesia:dirty_write(#syn_registry_table{
  271. key = Key,
  272. pid = ProcessPid,
  273. node = Node,
  274. meta = ProcessMeta
  275. })
  276. end,
  277. lists:foreach(FWrite, RegistryProcessesInfo).
  278. -spec stitch_group_tab(RemoteNode :: atom()) -> ok.
  279. stitch_group_tab(RemoteNode) ->
  280. %% get remote processes info
  281. RemoteGroupsRegistryProcessesInfo = rpc:call(RemoteNode, ?MODULE, get_groups_processes_info_of_node, [RemoteNode]),
  282. %% get local processes info
  283. LocalGroupsRegistryProcessesInfo = get_groups_processes_info_of_node(node()),
  284. %% write
  285. write_remote_groups_processes_info_to_local(RemoteNode, RemoteGroupsRegistryProcessesInfo),
  286. write_local_groups_processes_info_to_remote(RemoteNode, LocalGroupsRegistryProcessesInfo).
  287. -spec get_groups_processes_info_of_node(Node :: atom()) -> list().
  288. get_groups_processes_info_of_node(Node) ->
  289. %% build match specs
  290. MatchHead = #syn_groups_table{name = '$1', pid = '$2', node = '$3'},
  291. Guard = {'=:=', '$3', Node},
  292. GroupInfoFormat = {{'$1', '$2'}},
  293. %% select
  294. mnesia:dirty_select(syn_groups_table, [{MatchHead, [Guard], [GroupInfoFormat]}]).
  295. -spec write_remote_groups_processes_info_to_local(RemoteNode :: atom(), RemoteGroupsRegistryProcessesInfo :: list()) -> ok.
  296. write_remote_groups_processes_info_to_local(RemoteNode, RemoteGroupsRegistryProcessesInfo) ->
  297. write_groups_processes_info_to_node(RemoteNode, RemoteGroupsRegistryProcessesInfo).
  298. -spec write_local_groups_processes_info_to_remote(RemoteNode :: atom(), LocalGroupsRegistryProcessesInfo :: list()) -> ok.
  299. write_local_groups_processes_info_to_remote(RemoteNode, LocalGroupsRegistryProcessesInfo) ->
  300. ok = rpc:call(RemoteNode, ?MODULE, write_groups_processes_info_to_node, [node(), LocalGroupsRegistryProcessesInfo]).
  301. -spec write_groups_processes_info_to_node(Node :: atom(), GroupsRegistryProcessesInfo :: list()) -> ok.
  302. write_groups_processes_info_to_node(Node, GroupsRegistryProcessesInfo) ->
  303. FWrite = fun({Name, Pid}) ->
  304. mnesia:dirty_write(#syn_groups_table{
  305. name = Name,
  306. pid = Pid,
  307. node = Node
  308. })
  309. end,
  310. lists:foreach(FWrite, GroupsRegistryProcessesInfo).
  311. -spec handle_purged_registry_double_processes(
  312. RegistryConflictingProcessCallbackModule :: atom(),
  313. RegistryConflictingProcessCallbackFunction :: atom(),
  314. DoubleRegistryProcessesInfo :: list()
  315. ) -> ok.
  316. handle_purged_registry_double_processes(undefined, _, DoubleRegistryProcessesInfo) ->
  317. F = fun({Key, LocalProcessPid, _LocalProcessMeta}) ->
  318. error_logger:warning_msg("Found a double process for ~s, killing it on local node ~p", [Key, node()]),
  319. exit(LocalProcessPid, kill)
  320. end,
  321. lists:foreach(F, DoubleRegistryProcessesInfo);
  322. handle_purged_registry_double_processes(RegistryConflictingProcessCallbackModule, RegistryConflictingProcessCallbackFunction, DoubleRegistryProcessesInfo) ->
  323. F = fun({Key, LocalProcessPid, LocalProcessMeta}) ->
  324. spawn(
  325. fun() ->
  326. error_logger:warning_msg("Found a double process for ~s, about to trigger callback on local node ~p", [Key, node()]),
  327. RegistryConflictingProcessCallbackModule:RegistryConflictingProcessCallbackFunction(Key, LocalProcessPid, LocalProcessMeta)
  328. end
  329. )
  330. end,
  331. lists:foreach(F, DoubleRegistryProcessesInfo).