syn_gen_scope.erl 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. %% ==========================================================================================================
  2. %% Syn - A global Process Registry and Process Group manager.
  3. %%
  4. %% The MIT License (MIT)
  5. %%
  6. %% Copyright (c) 2015-2021 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
  7. %%
  8. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  9. %% of this software and associated documentation files (the "Software"), to deal
  10. %% in the Software without restriction, including without limitation the rights
  11. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  12. %% copies of the Software, and to permit persons to whom the Software is
  13. %% furnished to do so, subject to the following conditions:
  14. %%
  15. %% The above copyright notice and this permission notice shall be included in
  16. %% all copies or substantial portions of the Software.
  17. %%
  18. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  19. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THxE WARRANTIES OF MERCHANTABILITY,
  20. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  21. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  22. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  23. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  24. %% THE SOFTWARE.
  25. %% ==========================================================================================================
  26. -module(syn_gen_scope).
  27. -behaviour(gen_server).
  28. %% API
  29. -export([
  30. start_link/2,
  31. get_subcluster_nodes/2,
  32. call/3, call/4
  33. ]).
  34. -export([
  35. broadcast/2, broadcast/3,
  36. broadcast_all_cluster/2,
  37. send_to_node/3
  38. ]).
  39. %% gen_server callbacks
  40. -export([
  41. init/1,
  42. handle_call/3,
  43. handle_cast/2,
  44. handle_info/2,
  45. handle_continue/2,
  46. terminate/2,
  47. code_change/3
  48. ]).
  49. %% internal
  50. -export([multicast_loop/0]).
  51. %% includes
  52. -include("syn.hrl").
  53. %% callbacks
  54. -callback init(#state{}) ->
  55. {ok, HandlerState :: term()}.
  56. -callback handle_call(Request :: term(), From :: {pid(), Tag :: term()},
  57. #state{}) ->
  58. {reply, Reply :: term(), #state{}} |
  59. {reply, Reply :: term(), #state{}, timeout() | hibernate | {continue, term()}} |
  60. {noreply, #state{}} |
  61. {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
  62. {stop, Reason :: term(), Reply :: term(), #state{}} |
  63. {stop, Reason :: term(), #state{}}.
  64. -callback handle_info(Info :: timeout | term(), #state{}) ->
  65. {noreply, #state{}} |
  66. {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
  67. {stop, Reason :: term(), #state{}}.
  68. -callback save_remote_data(RemoteData :: term(), #state{}) -> any().
  69. -callback get_local_data(#state{}) -> {ok, Data :: term()} | undefined.
  70. -callback purge_local_data_for_node(Node :: node(), #state{}) -> any().
  71. %% ===================================================================
  72. %% API
  73. %% ===================================================================
  74. -spec start_link(Handler :: module(), Scope :: atom()) ->
  75. {ok, Pid :: pid()} | {error, {already_started, Pid :: pid()}} | {error, Reason :: term()}.
  76. start_link(Handler, Scope) when is_atom(Scope) ->
  77. %% build name
  78. HandlerBin = atom_to_binary(Handler),
  79. ScopeBin = atom_to_binary(Scope),
  80. ProcessName = binary_to_atom(<<HandlerBin/binary, "_", ScopeBin/binary>>),
  81. %% save to lookup table
  82. syn_backbone:save_process_name({Handler, Scope}, ProcessName),
  83. %% create process
  84. gen_server:start_link({local, ProcessName}, ?MODULE, [Handler, Scope, ProcessName], []).
  85. -spec get_subcluster_nodes(Handler :: module(), Scope :: atom()) -> [node()].
  86. get_subcluster_nodes(Handler, Scope) ->
  87. case get_process_name_for_scope(Handler, Scope) of
  88. undefined -> error({invalid_scope, Scope});
  89. ProcessName -> gen_server:call(ProcessName, get_subcluster_nodes)
  90. end.
  91. -spec call(Handler :: module(), Scope :: atom(), Message :: term()) -> Response :: term().
  92. call(Handler, Scope, Message) ->
  93. call(Handler, node(), Scope, Message).
  94. -spec call(Handler :: module(), Node :: atom(), Scope :: atom(), Message :: term()) -> Response :: term().
  95. call(Handler, Node, Scope, Message) ->
  96. case get_process_name_for_scope(Handler, Scope) of
  97. undefined -> error({invalid_scope, Scope});
  98. ProcessName ->
  99. try gen_server:call({ProcessName, Node}, Message)
  100. catch exit:{noproc, {gen_server, call, _}} -> error({invalid_scope, Scope})
  101. end
  102. end.
  103. %% ===================================================================
  104. %% In-Process API
  105. %% ===================================================================
  106. -spec broadcast(Message :: term(), #state{}) -> any().
  107. broadcast(Message, State) ->
  108. broadcast(Message, [], State).
  109. -spec broadcast(Message :: term(), ExcludedNodes :: [node()], #state{}) -> any().
  110. broadcast(Message, ExcludedNodes, #state{multicast_pid = MulticastPid} = State) ->
  111. MulticastPid ! {broadcast, Message, ExcludedNodes, State}.
  112. -spec broadcast_all_cluster(Message :: term(), #state{}) -> any().
  113. broadcast_all_cluster(Message, #state{multicast_pid = MulticastPid} = State) ->
  114. MulticastPid ! {broadcast_all_cluster, Message, State}.
  115. -spec send_to_node(RemoteNode :: node(), Message :: term(), #state{}) -> any().
  116. send_to_node(RemoteNode, Message, #state{process_name = ProcessName}) ->
  117. erlang:send({ProcessName, RemoteNode}, Message, [noconnect]).
  118. %% ===================================================================
  119. %% Callbacks
  120. %% ===================================================================
  121. %% ----------------------------------------------------------------------------------------------------------
  122. %% Init
  123. %% ----------------------------------------------------------------------------------------------------------
  124. -spec init([term()]) ->
  125. {ok, #state{}} |
  126. {ok, #state{}, timeout() | hibernate | {continue, term()}} |
  127. {stop, Reason :: term()} | ignore.
  128. init([Handler, Scope, ProcessName]) ->
  129. %% monitor nodes
  130. ok = net_kernel:monitor_nodes(true),
  131. %% start multicast process
  132. MulticastPid = spawn_link(?MODULE, multicast_loop, []),
  133. %% table names
  134. HandlerBin = atom_to_binary(Handler),
  135. TableByName = syn_backbone:get_table_name(binary_to_atom(<<HandlerBin/binary, "_by_name">>), Scope),
  136. TableByPid = syn_backbone:get_table_name(binary_to_atom(<<HandlerBin/binary, "_by_pid">>), Scope),
  137. %% build state
  138. State = #state{
  139. handler = Handler,
  140. scope = Scope,
  141. process_name = ProcessName,
  142. multicast_pid = MulticastPid,
  143. table_by_name = TableByName,
  144. table_by_pid = TableByPid
  145. },
  146. %% call init
  147. {ok, HandlerState} = Handler:init(State),
  148. State1 = State#state{handler_state = HandlerState},
  149. {ok, State1, {continue, after_init}}.
  150. %% ----------------------------------------------------------------------------------------------------------
  151. %% Call messages
  152. %% ----------------------------------------------------------------------------------------------------------
  153. -spec handle_call(Request :: term(), From :: {pid(), Tag :: term()}, #state{}) ->
  154. {reply, Reply :: term(), #state{}} |
  155. {reply, Reply :: term(), #state{}, timeout() | hibernate | {continue, term()}} |
  156. {noreply, #state{}} |
  157. {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
  158. {stop, Reason :: term(), Reply :: term(), #state{}} |
  159. {stop, Reason :: term(), #state{}}.
  160. handle_call(get_subcluster_nodes, _From, #state{
  161. nodes_map = NodesMap
  162. } = State) ->
  163. Nodes = maps:keys(NodesMap),
  164. {reply, Nodes, State};
  165. handle_call(Request, From, #state{handler = Handler} = State) ->
  166. Handler:handle_call(Request, From, State).
  167. %% ----------------------------------------------------------------------------------------------------------
  168. %% Cast messages
  169. %% ----------------------------------------------------------------------------------------------------------
  170. -spec handle_cast(Request :: term(), #state{}) ->
  171. {noreply, #state{}} |
  172. {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
  173. {stop, Reason :: term(), #state{}}.
  174. handle_cast(Msg, #state{handler = Handler} = State) ->
  175. Handler:handle_cast(Msg, State).
  176. %% ----------------------------------------------------------------------------------------------------------
  177. %% Info messages
  178. %% ----------------------------------------------------------------------------------------------------------
  179. -spec handle_info(Info :: timeout | term(), #state{}) ->
  180. {noreply, #state{}} |
  181. {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
  182. {stop, Reason :: term(), #state{}}.
  183. handle_info({'3.0', discover, RemoteScopePid}, #state{
  184. handler = Handler,
  185. scope = Scope,
  186. nodes_map = NodesMap
  187. } = State) ->
  188. RemoteScopeNode = node(RemoteScopePid),
  189. error_logger:info_msg("SYN[~s|~s] Received DISCOVER request from node '~s'",
  190. [Handler, Scope, RemoteScopeNode]
  191. ),
  192. %% send local data to remote
  193. {ok, LocalData} = Handler:get_local_data(State),
  194. send_to_node(RemoteScopeNode, {'3.0', ack_sync, self(), LocalData}, State),
  195. %% is this a new node?
  196. case maps:is_key(RemoteScopeNode, NodesMap) of
  197. true ->
  198. %% already known, ignore
  199. {noreply, State};
  200. false ->
  201. %% monitor
  202. _MRef = monitor(process, RemoteScopePid),
  203. {noreply, State#state{nodes_map = NodesMap#{RemoteScopeNode => RemoteScopePid}}}
  204. end;
  205. handle_info({'3.0', ack_sync, RemoteScopePid, Data}, #state{
  206. handler = Handler,
  207. nodes_map = NodesMap,
  208. scope = Scope
  209. } = State) ->
  210. RemoteScopeNode = node(RemoteScopePid),
  211. error_logger:info_msg("SYN[~s|~s] Received ACK SYNC (~w entries) from node '~s'",
  212. [Handler, Scope, length(Data), RemoteScopeNode]
  213. ),
  214. %% save remote data
  215. Handler:save_remote_data(Data, State),
  216. %% is this a new node?
  217. case maps:is_key(RemoteScopeNode, NodesMap) of
  218. true ->
  219. %% already known
  220. {noreply, State};
  221. false ->
  222. %% monitor
  223. _MRef = monitor(process, RemoteScopePid),
  224. %% send local to remote
  225. {ok, LocalData} = Handler:get_local_data(State),
  226. send_to_node(RemoteScopeNode, {'3.0', ack_sync, self(), LocalData}, State),
  227. %% return
  228. {noreply, State#state{nodes_map = NodesMap#{RemoteScopeNode => RemoteScopePid}}}
  229. end;
  230. handle_info({'DOWN', MRef, process, Pid, Reason}, #state{
  231. handler = Handler,
  232. scope = Scope,
  233. nodes_map = NodesMap
  234. } = State) when node(Pid) =/= node() ->
  235. %% scope process down
  236. RemoteNode = node(Pid),
  237. case maps:take(RemoteNode, NodesMap) of
  238. {Pid, NodesMap1} ->
  239. error_logger:info_msg("SYN[~s|~s] Scope Process is DOWN on node '~s': ~p",
  240. [Handler, Scope, RemoteNode, Reason]
  241. ),
  242. Handler:purge_local_data_for_node(RemoteNode, State),
  243. {noreply, State#state{nodes_map = NodesMap1}};
  244. error ->
  245. %% relay to handler
  246. Handler:handle_info({'DOWN', MRef, process, Pid, Reason}, State)
  247. end;
  248. handle_info({nodedown, _Node}, State) ->
  249. %% ignore & wait for monitor DOWN message
  250. {noreply, State};
  251. handle_info({nodeup, RemoteNode}, #state{
  252. handler = Handler,
  253. scope = Scope
  254. } = State) ->
  255. error_logger:info_msg("SYN[~s|~s] Node '~s' has joined the cluster, sending discover message",
  256. [Handler, Scope, RemoteNode]
  257. ),
  258. send_to_node(RemoteNode, {'3.0', discover, self()}, State),
  259. {noreply, State};
  260. handle_info(Info, #state{handler = Handler} = State) ->
  261. Handler:handle_info(Info, State).
  262. %% ----------------------------------------------------------------------------------------------------------
  263. %% Continue messages
  264. %% ----------------------------------------------------------------------------------------------------------
  265. -spec handle_continue(Info :: term(), #state{}) ->
  266. {noreply, #state{}} |
  267. {noreply, #state{}, timeout() | hibernate | {continue, term()}} |
  268. {stop, Reason :: term(), #state{}}.
  269. handle_continue(after_init, #state{
  270. handler = Handler,
  271. scope = Scope
  272. } = State) ->
  273. error_logger:info_msg("SYN[~s|~s] Discovering the cluster", [Handler, Scope]),
  274. broadcast_all_cluster({'3.0', discover, self()}, State),
  275. {noreply, State}.
  276. %% ----------------------------------------------------------------------------------------------------------
  277. %% Terminate
  278. %% ----------------------------------------------------------------------------------------------------------
  279. -spec terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), #state{}) -> any().
  280. terminate(Reason, #state{handler = Handler, scope = Scope}) ->
  281. error_logger:info_msg("SYN[~s|~s] ~s terminating with reason: ~p", [Handler, Scope, Handler, Reason]).
  282. %% ----------------------------------------------------------------------------------------------------------
  283. %% Convert process state when code is changed.
  284. %% ----------------------------------------------------------------------------------------------------------
  285. -spec code_change(OldVsn :: (term() | {down, term()}), #state{}, Extra :: term()) ->
  286. {ok, NewState :: term()} | {error, Reason :: term()}.
  287. code_change(_OldVsn, State, _Extra) ->
  288. {ok, State}.
  289. %% ===================================================================
  290. %% Internal
  291. %% ===================================================================
  292. -spec get_process_name_for_scope(Handler :: module(), Scope :: atom()) -> ProcessName :: atom() | undefined.
  293. get_process_name_for_scope(Handler, Scope) ->
  294. syn_backbone:get_process_name({Handler, Scope}).
  295. -spec multicast_loop() -> terminated.
  296. multicast_loop() ->
  297. receive
  298. {broadcast, Message, ExcludedNodes, #state{process_name = ProcessName, nodes_map = NodesMap}} ->
  299. lists:foreach(fun(RemoteNode) ->
  300. erlang:send({ProcessName, RemoteNode}, Message, [noconnect])
  301. end, maps:keys(NodesMap) -- ExcludedNodes),
  302. multicast_loop();
  303. {broadcast_all_cluster, Message, #state{process_name = ProcessName}} ->
  304. lists:foreach(fun(RemoteNode) ->
  305. erlang:send({ProcessName, RemoteNode}, Message, [noconnect])
  306. end, nodes()),
  307. multicast_loop();
  308. terminate ->
  309. terminated
  310. end.