syn_consistency.erl 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. %% ==========================================================================================================
  2. %% Syn - A global Process Registry and Process Group manager.
  3. %%
  4. %% The MIT License (MIT)
  5. %%
  6. %% Copyright (c) 2015-2019 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
  7. %%
  8. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  9. %% of this software and associated documentation files (the "Software"), to deal
  10. %% in the Software without restriction, including without limitation the rights
  11. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  12. %% copies of the Software, and to permit persons to whom the Software is
  13. %% furnished to do so, subject to the following conditions:
  14. %%
  15. %% The above copyright notice and this permission notice shall be included in
  16. %% all copies or substantial portions of the Software.
  17. %%
  18. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  19. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  20. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  21. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  22. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  23. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  24. %% THE SOFTWARE.
  25. %% ==========================================================================================================
  26. -module(syn_consistency).
  27. -behaviour(gen_server).
  28. %% API
  29. -export([start_link/0]).
  30. -export([resume_local_syn_registry/0]).
  31. %% gen_server callbacks
  32. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
  33. %% records
  34. -record(state, {}).
  35. %% includes
  36. -include("syn_records.hrl").
  37. %% ===================================================================
  38. %% API
  39. %% ===================================================================
  40. -spec start_link() -> {ok, pid()} | {error, any()}.
  41. start_link() ->
  42. Options = [],
  43. gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
  44. -spec resume_local_syn_registry() -> ok.
  45. resume_local_syn_registry() ->
  46. %% resume processes able to modify mnesia tables
  47. sys:resume(syn_registry).
  48. %% ===================================================================
  49. %% Callbacks
  50. %% ===================================================================
  51. %% ----------------------------------------------------------------------------------------------------------
  52. %% Init
  53. %% ----------------------------------------------------------------------------------------------------------
  54. -spec init([]) ->
  55. {ok, #state{}} |
  56. {ok, #state{}, Timeout :: non_neg_integer()} |
  57. ignore |
  58. {stop, Reason :: any()}.
  59. init([]) ->
  60. %% monitor nodes
  61. ok = net_kernel:monitor_nodes(true),
  62. %% wait for table
  63. case mnesia:wait_for_tables([syn_registry_table], 10000) of
  64. ok ->
  65. {ok, #state{}};
  66. Reason ->
  67. {stop, {error_waiting_for_process_registry_table, Reason}}
  68. end.
  69. %% ----------------------------------------------------------------------------------------------------------
  70. %% Call messages
  71. %% ----------------------------------------------------------------------------------------------------------
  72. -spec handle_call(Request :: any(), From :: any(), #state{}) ->
  73. {reply, Reply :: any(), #state{}} |
  74. {reply, Reply :: any(), #state{}, Timeout :: non_neg_integer()} |
  75. {noreply, #state{}} |
  76. {noreply, #state{}, Timeout :: non_neg_integer()} |
  77. {stop, Reason :: any(), Reply :: any(), #state{}} |
  78. {stop, Reason :: any(), #state{}}.
  79. handle_call(Request, From, State) ->
  80. error_logger:warning_msg("Received from ~p an unknown call message: ~p~n", [Request, From]),
  81. {reply, undefined, State}.
  82. %% ----------------------------------------------------------------------------------------------------------
  83. %% Cast messages
  84. %% ----------------------------------------------------------------------------------------------------------
  85. -spec handle_cast(Msg :: any(), #state{}) ->
  86. {noreply, #state{}} |
  87. {noreply, #state{}, Timeout :: non_neg_integer()} |
  88. {stop, Reason :: any(), #state{}}.
  89. handle_cast(Msg, State) ->
  90. error_logger:warning_msg("Received an unknown cast message: ~p~n", [Msg]),
  91. {noreply, State}.
  92. %% ----------------------------------------------------------------------------------------------------------
  93. %% All non Call / Cast messages
  94. %% ----------------------------------------------------------------------------------------------------------
  95. -spec handle_info(Info :: any(), #state{}) ->
  96. {noreply, #state{}} |
  97. {noreply, #state{}, Timeout :: non_neg_integer()} |
  98. {stop, Reason :: any(), #state{}}.
  99. handle_info({nodeup, RemoteNode}, State) ->
  100. error_logger:info_msg("Node ~p has joined the cluster of local node ~p~n", [RemoteNode, node()]),
  101. global:trans({{?MODULE, auto_merge_node_up}, self()},
  102. fun() ->
  103. error_logger:info_msg("Merge: ----> Initiating on ~p for remote node ~p~n", [node(), RemoteNode]),
  104. %% request remote node process info & suspend remote registry
  105. RegistryTuples = rpc:call(RemoteNode, syn_registry, get_local_registry_tuples_and_suspend, [node()]),
  106. sync_registry_tuples(RemoteNode, RegistryTuples),
  107. error_logger:error_msg("Merge: <---- Done on ~p for remote node ~p~n", [node(), RemoteNode])
  108. end
  109. ),
  110. %% resume remote processes able to modify tables
  111. ok = rpc:call(RemoteNode, sys, resume, [syn_registry]),
  112. %% resume
  113. {noreply, State};
  114. handle_info({nodedown, RemoteNode}, State) ->
  115. error_logger:warning_msg("Node ~p has left the cluster of local node ~p~n", [RemoteNode, node()]),
  116. purge_registry_entries_for_node(RemoteNode),
  117. {noreply, State};
  118. handle_info(Info, State) ->
  119. error_logger:warning_msg("Received an unknown info message: ~p~n", [Info]),
  120. {noreply, State}.
  121. sync_registry_tuples(RemoteNode, RegistryTuples) ->
  122. %% ensure that registry doesn't have any joining node's entries
  123. purge_registry_entries_for_node(RemoteNode),
  124. %% loop
  125. F = fun({Name, RemotePid, _RemoteNode, RemoteMeta}) ->
  126. %% check if same name is registered
  127. case syn_registry:find_process_entry_by_name(Name) of
  128. undefined ->
  129. %% no conflict
  130. ok;
  131. Entry ->
  132. error_logger:warning_msg(
  133. "Conflicting name process found for: ~p, processes are ~p, ~p, killing local~n",
  134. [Name, Entry#syn_registry_table.pid, RemotePid]
  135. ),
  136. %% kill the local one
  137. exit(Entry#syn_registry_table.pid, kill)
  138. end,
  139. %% enqueue registration (to be done on syn_registry for monitor)
  140. syn_registry:sync_register(Name, RemotePid, RemoteMeta)
  141. end,
  142. %% add to table
  143. lists:foreach(F, RegistryTuples).
  144. %% ----------------------------------------------------------------------------------------------------------
  145. %% Terminate
  146. %% ----------------------------------------------------------------------------------------------------------
  147. -spec terminate(Reason :: any(), #state{}) -> terminated.
  148. terminate(Reason, _State) ->
  149. error_logger:info_msg("Terminating with reason: ~p~n", [Reason]),
  150. terminated.
  151. %% ----------------------------------------------------------------------------------------------------------
  152. %% Convert process state when code is changed.
  153. %% ----------------------------------------------------------------------------------------------------------
  154. -spec code_change(OldVsn :: any(), #state{}, Extra :: any()) -> {ok, #state{}}.
  155. code_change(_OldVsn, State, _Extra) ->
  156. {ok, State}.
  157. %% ===================================================================
  158. %% Internal
  159. %% ===================================================================
  160. -spec purge_registry_entries_for_node(Node :: atom()) -> ok.
  161. purge_registry_entries_for_node(Node) ->
  162. %% build match specs
  163. MatchHead = #syn_registry_table{name = '$1', node = '$2', _ = '_'},
  164. Guard = {'=:=', '$2', Node},
  165. IdFormat = '$1',
  166. %% delete
  167. NodePids = mnesia:dirty_select(syn_registry_table, [{MatchHead, [Guard], [IdFormat]}]),
  168. DelF = fun(Id) -> mnesia:dirty_delete({syn_registry_table, Id}) end,
  169. lists:foreach(DelF, NodePids).