syn_gen_scope.erl 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. %% ==========================================================================================================
  2. %% Syn - A global Process Registry and Process Group manager.
  3. %%
  4. %% The MIT License (MIT)
  5. %%
  6. %% Copyright (c) 2015-2021 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
  7. %%
  8. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  9. %% of this software and associated documentation files (the "Software"), to deal
  10. %% in the Software without restriction, including without limitation the rights
  11. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  12. %% copies of the Software, and to permit persons to whom the Software is
  13. %% furnished to do so, subject to the following conditions:
  14. %%
  15. %% The above copyright notice and this permission notice shall be included in
  16. %% all copies or substantial portions of the Software.
  17. %%
  18. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  19. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THxE WARRANTIES OF MERCHANTABILITY,
  20. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  21. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  22. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  23. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  24. %% THE SOFTWARE.
  25. %% ==========================================================================================================
  26. -module(syn_gen_scope).
  27. -behaviour(gen_server).
  28. %% API
  29. -export([
  30. start_link/2,
  31. subcluster_nodes/2,
  32. call/3, call/4
  33. ]).
  34. -export([
  35. broadcast/2,
  36. broadcast/3,
  37. send_to_node/3
  38. ]).
  39. %% gen_server callbacks
  40. -export([
  41. init/1,
  42. handle_call/3,
  43. handle_cast/2,
  44. handle_info/2,
  45. handle_continue/2,
  46. terminate/2,
  47. code_change/3
  48. ]).
  49. %% internal
  50. -export([multicast_loop/0]).
  51. %% includes
  52. -include("syn.hrl").
  53. %% callbacks
  54. -callback init(#state{}) ->
  55. {ok, HandlerState :: term()}.
  56. -callback handle_call(Request :: term(), From :: {pid(), Tag :: term()},
  57. #state{}) ->
  58. {reply, Reply :: term(), #state{}} |
  59. {reply, Reply :: term(), #state{}, timeout() | hibernate | {continue, term()}} |
  60. {noreply, #state{}} |
  61. {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
  62. {stop, Reason :: term(), Reply :: term(), #state{}} |
  63. {stop, Reason :: term(), #state{}}.
  64. -callback handle_info(Info :: timeout | term(), #state{}) ->
  65. {noreply, #state{}} |
  66. {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
  67. {stop, Reason :: term(), #state{}}.
  68. -callback save_remote_data(RemoteData :: term(), #state{}) -> any().
  69. -callback get_local_data(#state{}) -> {ok, Data :: term()} | undefined.
  70. -callback purge_local_data_for_node(Node :: node(), #state{}) -> any().
  71. %% ===================================================================
  72. %% API
  73. %% ===================================================================
  74. -spec start_link(Handler :: module(), Scope :: atom()) ->
  75. {ok, Pid :: pid()} | {error, {already_started, Pid :: pid()}} | {error, Reason :: term()}.
  76. start_link(Handler, Scope) when is_atom(Scope) ->
  77. %% build name
  78. HandlerBin = list_to_binary(atom_to_list(Handler)),
  79. ScopeBin = list_to_binary(atom_to_list(Scope)),
  80. ProcessName = list_to_atom(binary_to_list(<<HandlerBin/binary, "_", ScopeBin/binary>>)),
  81. %% save to lookup table
  82. syn_backbone:save_process_name({Handler, Scope}, ProcessName),
  83. %% create process
  84. gen_server:start_link({local, ProcessName}, ?MODULE, [Handler, Scope, ProcessName], []).
  85. -spec subcluster_nodes(Handler :: module(), Scope :: atom()) -> [node()].
  86. subcluster_nodes(Handler, Scope) ->
  87. case get_process_name_for_scope(Handler, Scope) of
  88. undefined -> error({invalid_scope, Scope});
  89. ProcessName -> gen_server:call(ProcessName, {'3.0', subcluster_nodes})
  90. end.
  91. -spec call(Handler :: module(), Scope :: atom(), Message :: term()) -> Response :: term().
  92. call(Handler, Scope, Message) ->
  93. call(Handler, node(), Scope, Message).
  94. -spec call(Handler :: module(), Node :: atom(), Scope :: atom(), Message :: term()) -> Response :: term().
  95. call(Handler, Node, Scope, Message) ->
  96. case get_process_name_for_scope(Handler, Scope) of
  97. undefined -> error({invalid_scope, Scope});
  98. ProcessName ->
  99. try gen_server:call({ProcessName, Node}, Message)
  100. catch exit:{noproc, {gen_server, call, _}} when node() =/= Node ->
  101. error({invalid_remote_scope, Scope, Node})
  102. end
  103. end.
  104. %% ===================================================================
  105. %% In-Process API
  106. %% ===================================================================
  107. -spec broadcast(Message :: term(), #state{}) -> any().
  108. broadcast(Message, State) ->
  109. broadcast(Message, [], State).
  110. -spec broadcast(Message :: term(), ExcludedNodes :: [node()], #state{}) -> any().
  111. broadcast(Message, ExcludedNodes, #state{multicast_pid = MulticastPid} = State) ->
  112. MulticastPid ! {broadcast, Message, ExcludedNodes, State}.
  113. -spec send_to_node(RemoteNode :: node(), Message :: term(), #state{}) -> any().
  114. send_to_node(RemoteNode, Message, #state{process_name = ProcessName}) ->
  115. {ProcessName, RemoteNode} ! Message.
  116. %% ===================================================================
  117. %% Callbacks
  118. %% ===================================================================
  119. %% ----------------------------------------------------------------------------------------------------------
  120. %% Init
  121. %% ----------------------------------------------------------------------------------------------------------
  122. -spec init([term()]) ->
  123. {ok, #state{}} |
  124. {ok, #state{}, timeout() | hibernate | {continue, term()}} |
  125. {stop, Reason :: term()} | ignore.
  126. init([Handler, Scope, ProcessName]) ->
  127. %% monitor nodes
  128. ok = net_kernel:monitor_nodes(true),
  129. %% start multicast process
  130. MulticastPid = spawn_link(?MODULE, multicast_loop, []),
  131. %% table names
  132. HandlerBin = list_to_binary(atom_to_list(Handler)),
  133. TableByName = syn_backbone:get_table_name(list_to_atom(binary_to_list(<<HandlerBin/binary, "_by_name">>)), Scope),
  134. TableByPid = syn_backbone:get_table_name(list_to_atom(binary_to_list(<<HandlerBin/binary, "_by_pid">>)), Scope),
  135. %% build state
  136. State = #state{
  137. handler = Handler,
  138. scope = Scope,
  139. process_name = ProcessName,
  140. multicast_pid = MulticastPid,
  141. table_by_name = TableByName,
  142. table_by_pid = TableByPid
  143. },
  144. %% call init
  145. {ok, HandlerState} = Handler:init(State),
  146. State1 = State#state{handler_state = HandlerState},
  147. {ok, State1, {continue, after_init}}.
  148. %% ----------------------------------------------------------------------------------------------------------
  149. %% Call messages
  150. %% ----------------------------------------------------------------------------------------------------------
  151. -spec handle_call(Request :: term(), From :: {pid(), Tag :: term()}, #state{}) ->
  152. {reply, Reply :: term(), #state{}} |
  153. {reply, Reply :: term(), #state{}, timeout() | hibernate | {continue, term()}} |
  154. {noreply, #state{}} |
  155. {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
  156. {stop, Reason :: term(), Reply :: term(), #state{}} |
  157. {stop, Reason :: term(), #state{}}.
  158. handle_call({'3.0', subcluster_nodes}, _From, #state{
  159. nodes_map = NodesMap
  160. } = State) ->
  161. Nodes = maps:keys(NodesMap),
  162. {reply, Nodes, State};
  163. handle_call(Request, From, #state{handler = Handler} = State) ->
  164. Handler:handle_call(Request, From, State).
  165. %% ----------------------------------------------------------------------------------------------------------
  166. %% Cast messages
  167. %% ----------------------------------------------------------------------------------------------------------
  168. -spec handle_cast(Request :: term(), #state{}) ->
  169. {noreply, #state{}} |
  170. {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
  171. {stop, Reason :: term(), #state{}}.
  172. handle_cast(Msg, #state{handler = Handler} = State) ->
  173. Handler:handle_cast(Msg, State).
  174. %% ----------------------------------------------------------------------------------------------------------
  175. %% Info messages
  176. %% ----------------------------------------------------------------------------------------------------------
  177. -spec handle_info(Info :: timeout | term(), #state{}) ->
  178. {noreply, #state{}} |
  179. {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
  180. {stop, Reason :: term(), #state{}}.
  181. handle_info({'3.0', discover, RemoteScopePid}, #state{
  182. handler = Handler,
  183. scope = Scope,
  184. nodes_map = NodesMap
  185. } = State) ->
  186. RemoteScopeNode = node(RemoteScopePid),
  187. error_logger:info_msg("SYN[~s<~s>] Received DISCOVER request from node '~s'",
  188. [Handler, Scope, RemoteScopeNode]
  189. ),
  190. %% send local data to remote
  191. {ok, LocalData} = Handler:get_local_data(State),
  192. send_to_node(RemoteScopeNode, {'3.0', ack_sync, self(), LocalData}, State),
  193. %% is this a new node?
  194. case maps:is_key(RemoteScopeNode, NodesMap) of
  195. true ->
  196. %% already known, ignore
  197. {noreply, State};
  198. false ->
  199. %% monitor
  200. _MRef = monitor(process, RemoteScopePid),
  201. {noreply, State#state{nodes_map = NodesMap#{RemoteScopeNode => RemoteScopePid}}}
  202. end;
  203. handle_info({'3.0', ack_sync, RemoteScopePid, Data}, #state{
  204. handler = Handler,
  205. nodes_map = NodesMap,
  206. scope = Scope
  207. } = State) ->
  208. RemoteScopeNode = node(RemoteScopePid),
  209. error_logger:info_msg("SYN[~s<~s>] Received ACK SYNC (~w entries) from node '~s'",
  210. [Handler, Scope, length(Data), RemoteScopeNode]
  211. ),
  212. %% save remote data
  213. Handler:save_remote_data(Data, State),
  214. %% is this a new node?
  215. case maps:is_key(RemoteScopeNode, NodesMap) of
  216. true ->
  217. %% already known
  218. {noreply, State};
  219. false ->
  220. %% monitor
  221. _MRef = monitor(process, RemoteScopePid),
  222. %% send local to remote
  223. {ok, LocalData} = Handler:get_local_data(State),
  224. send_to_node(RemoteScopeNode, {'3.0', ack_sync, self(), LocalData}, State),
  225. %% return
  226. {noreply, State#state{nodes_map = NodesMap#{RemoteScopeNode => RemoteScopePid}}}
  227. end;
  228. handle_info({'DOWN', MRef, process, Pid, Reason}, #state{
  229. handler = Handler,
  230. scope = Scope,
  231. nodes_map = NodesMap
  232. } = State) when node(Pid) =/= node() ->
  233. %% scope process down
  234. RemoteNode = node(Pid),
  235. case maps:take(RemoteNode, NodesMap) of
  236. {Pid, NodesMap1} ->
  237. error_logger:info_msg("SYN[~s<~s>] Scope Process is DOWN on node '~s': ~p",
  238. [Handler, Scope, RemoteNode, Reason]
  239. ),
  240. Handler:purge_local_data_for_node(RemoteNode, State),
  241. {noreply, State#state{nodes_map = NodesMap1}};
  242. error ->
  243. %% relay to handler
  244. Handler:handle_info({'DOWN', MRef, process, Pid, Reason}, State)
  245. end;
  246. handle_info({nodedown, _Node}, State) ->
  247. %% ignore & wait for monitor DOWN message
  248. {noreply, State};
  249. handle_info({nodeup, RemoteNode}, #state{
  250. handler = Handler,
  251. scope = Scope
  252. } = State) ->
  253. error_logger:info_msg("SYN[~s<~s>] Node '~s' has joined the cluster, sending discover message",
  254. [Handler, Scope, RemoteNode]
  255. ),
  256. send_to_node(RemoteNode, {'3.0', discover, self()}, State),
  257. {noreply, State};
  258. handle_info(Info, #state{handler = Handler} = State) ->
  259. Handler:handle_info(Info, State).
  260. %% ----------------------------------------------------------------------------------------------------------
  261. %% Continue messages
  262. %% ----------------------------------------------------------------------------------------------------------
  263. -spec handle_continue(Info :: term(), #state{}) ->
  264. {noreply, #state{}} |
  265. {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
  266. {stop, Reason :: term(), #state{}}.
  267. handle_continue(after_init, #state{
  268. handler = Handler,
  269. scope = Scope,
  270. process_name = ProcessName
  271. } = State) ->
  272. error_logger:info_msg("SYN[~s<~s>] Discovering the cluster", [Handler, Scope]),
  273. %% broadcasting is done in the scope process to avoid issues with ordering guarantees
  274. lists:foreach(fun(RemoteNode) ->
  275. {ProcessName, RemoteNode} ! {'3.0', discover, self()}
  276. end, nodes()),
  277. {noreply, State}.
  278. %% ----------------------------------------------------------------------------------------------------------
  279. %% Terminate
  280. %% ----------------------------------------------------------------------------------------------------------
  281. -spec terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), #state{}) -> any().
  282. terminate(Reason, #state{handler = Handler, scope = Scope}) ->
  283. error_logger:info_msg("SYN[~s<~s>] ~s terminating with reason: ~p", [Handler, Scope, Handler, Reason]).
  284. %% ----------------------------------------------------------------------------------------------------------
  285. %% Convert process state when code is changed.
  286. %% ----------------------------------------------------------------------------------------------------------
  287. -spec code_change(OldVsn :: (term() | {down, term()}), #state{}, Extra :: term()) ->
  288. {ok, NewState :: term()} | {error, Reason :: term()}.
  289. code_change(_OldVsn, State, _Extra) ->
  290. {ok, State}.
  291. %% ===================================================================
  292. %% Internal
  293. %% ===================================================================
  294. -spec get_process_name_for_scope(Handler :: module(), Scope :: atom()) -> ProcessName :: atom() | undefined.
  295. get_process_name_for_scope(Handler, Scope) ->
  296. syn_backbone:get_process_name({Handler, Scope}).
  297. -spec multicast_loop() -> terminated.
  298. multicast_loop() ->
  299. receive
  300. {broadcast, Message, ExcludedNodes, #state{process_name = ProcessName, nodes_map = NodesMap}} ->
  301. lists:foreach(fun(RemoteNode) ->
  302. {ProcessName, RemoteNode} ! Message
  303. end, maps:keys(NodesMap) -- ExcludedNodes),
  304. multicast_loop();
  305. terminate ->
  306. terminated
  307. end.