syn_gen_scope.erl 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. %% ==========================================================================================================
  2. %% Syn - A global Process Registry and Process Group manager.
  3. %%
  4. %% The MIT License (MIT)
  5. %%
  6. %% Copyright (c) 2015-2021 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
  7. %%
  8. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  9. %% of this software and associated documentation files (the "Software"), to deal
  10. %% in the Software without restriction, including without limitation the rights
  11. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  12. %% copies of the Software, and to permit persons to whom the Software is
  13. %% furnished to do so, subject to the following conditions:
  14. %%
  15. %% The above copyright notice and this permission notice shall be included in
  16. %% all copies or substantial portions of the Software.
  17. %%
  18. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  19. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THxE WARRANTIES OF MERCHANTABILITY,
  20. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  21. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  22. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  23. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  24. %% THE SOFTWARE.
  25. %% ==========================================================================================================
  26. -module(syn_gen_scope).
  27. -behaviour(gen_server).
  28. %% API
  29. -export([
  30. start_link/3,
  31. get_subcluster_nodes/2,
  32. call/3, call/4
  33. ]).
  34. -export([
  35. broadcast/2, broadcast/3,
  36. broadcast_all_cluster/2,
  37. send_to_node/3
  38. ]).
  39. %% gen_server callbacks
  40. -export([
  41. init/1,
  42. handle_call/3,
  43. handle_cast/2,
  44. handle_info/2,
  45. handle_continue/2,
  46. terminate/2,
  47. code_change/3
  48. ]).
  49. %% callbacks
  50. -callback init(Args :: term()) ->
  51. {ok, State :: term()}.
  52. -callback handle_call(Request :: term(), From :: {pid(), Tag :: term()},
  53. State :: term()) ->
  54. {reply, Reply :: term(), NewState :: term()} |
  55. {reply, Reply :: term(), NewState :: term(), timeout() | hibernate | {continue, term()}} |
  56. {noreply, NewState :: term()} |
  57. {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} |
  58. {stop, Reason :: term(), Reply :: term(), NewState :: term()} |
  59. {stop, Reason :: term(), NewState :: term()}.
  60. -callback handle_info(Info :: timeout | term(), State :: term()) ->
  61. {noreply, NewState :: term()} |
  62. {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} |
  63. {stop, Reason :: term(), NewState :: term()}.
  64. -callback save_remote_data(RemoteScopePid :: pid(), RemoteData :: any(), State :: term()) -> any().
  65. -callback get_local_data(State :: term()) -> {ok, Data :: any()} | undefined.
  66. -callback purge_local_data_for_node(Node :: node(), State :: term()) -> any().
  67. %% includes
  68. -include("syn.hrl").
  69. %% ===================================================================
  70. %% API
  71. %% ===================================================================
  72. -spec start_link(Handler :: module(), Scope :: atom(), Args :: [any()]) ->
  73. {ok, Pid :: pid()} | {error, {already_started, Pid :: pid()}} | {error, Reason :: any()}.
  74. start_link(Handler, Scope, Args) when is_atom(Scope) ->
  75. ProcessName = get_process_name_for_scope(Handler, Scope),
  76. gen_server:start_link({local, ProcessName}, ?MODULE, [Handler, Scope, ProcessName, Args], []).
  77. -spec get_subcluster_nodes(Handler :: module(), Scope :: atom()) -> [node()].
  78. get_subcluster_nodes(Handler, Scope) ->
  79. ProcessName = get_process_name_for_scope(Handler, Scope),
  80. gen_server:call(ProcessName, get_subcluster_nodes).
  81. -spec call(Handler :: module(), Scope :: atom(), Message :: any()) -> Response :: any().
  82. call(Handler, Scope, Message) ->
  83. call(Handler, node(), Scope, Message).
  84. -spec call(Handler :: module(), Node :: atom(), Scope :: atom(), Message :: any()) -> Response :: any().
  85. call(Handler,Node, Scope, Message) ->
  86. ProcessName = get_process_name_for_scope(Handler, Scope),
  87. gen_server:call({ProcessName, Node}, Message).
  88. %% ===================================================================
  89. %% In-Process API
  90. %% ===================================================================
  91. -spec broadcast(Message :: any(), #state{}) -> any().
  92. broadcast(Message, State) ->
  93. broadcast(Message, [], State).
  94. -spec broadcast(Message :: any(), ExcludedNodes :: [node()], #state{}) -> any().
  95. broadcast(Message, ExcludedNodes, #state{process_name = ProcessName, nodes = Nodes}) ->
  96. lists:foreach(fun(RemoteNode) ->
  97. erlang:send({ProcessName, RemoteNode}, Message, [noconnect])
  98. end, maps:keys(Nodes) -- ExcludedNodes).
  99. -spec broadcast_all_cluster(Message :: any(), #state{}) -> any().
  100. broadcast_all_cluster(Message, #state{process_name = ProcessName}) ->
  101. lists:foreach(fun(RemoteNode) ->
  102. erlang:send({ProcessName, RemoteNode}, Message, [noconnect])
  103. end, nodes()).
  104. -spec send_to_node(RemoteNode :: node(), Message :: any(), #state{}) -> any().
  105. send_to_node(RemoteNode, Message, #state{process_name = ProcessName}) ->
  106. erlang:send({ProcessName, RemoteNode}, Message, [noconnect]).
  107. %% ===================================================================
  108. %% Callbacks
  109. %% ===================================================================
  110. %% ----------------------------------------------------------------------------------------------------------
  111. %% Init
  112. %% ----------------------------------------------------------------------------------------------------------
  113. -spec init([term()]) ->
  114. {ok, #state{}} |
  115. {ok, #state{}, Timeout :: non_neg_integer()} |
  116. ignore |
  117. {stop, Reason :: any()} |
  118. {continue, any()}.
  119. init([Handler, Scope, ProcessName, Args]) ->
  120. %% call init
  121. {ok, HandlerState} = Handler:init(Args),
  122. %% monitor nodes
  123. ok = net_kernel:monitor_nodes(true),
  124. %% build state
  125. State = #state{
  126. handler = Handler,
  127. handler_state = HandlerState,
  128. scope = Scope,
  129. process_name = ProcessName
  130. },
  131. {ok, State, {continue, after_init}}.
  132. %% ----------------------------------------------------------------------------------------------------------
  133. %% Call messages
  134. %% ----------------------------------------------------------------------------------------------------------
  135. -spec handle_call(Request :: term(), From :: {pid(), Tag :: term()},
  136. State :: term()) ->
  137. {reply, Reply :: term(), NewState :: term()} |
  138. {reply, Reply :: term(), NewState :: term(), timeout() | hibernate | {continue, term()}} |
  139. {noreply, NewState :: term()} |
  140. {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} |
  141. {stop, Reason :: term(), Reply :: term(), NewState :: term()} |
  142. {stop, Reason :: term(), NewState :: term()}.
  143. handle_call(get_subcluster_nodes, _From, #state{
  144. nodes = Nodes
  145. } = State) ->
  146. {reply, Nodes, State};
  147. handle_call(Request, From, #state{handler = Handler} = State) ->
  148. Handler:handle_call(Request, From, State).
  149. %% ----------------------------------------------------------------------------------------------------------
  150. %% Cast messages
  151. %% ----------------------------------------------------------------------------------------------------------
  152. -spec handle_cast(Request :: term(), State :: term()) ->
  153. {noreply, NewState :: term()} |
  154. {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} |
  155. {stop, Reason :: term(), NewState :: term()}.
  156. handle_cast(Msg, #state{handler = Handler} = State) ->
  157. Handler:handle_cast(Msg, State).
  158. %% ----------------------------------------------------------------------------------------------------------
  159. %% Info messages
  160. %% ----------------------------------------------------------------------------------------------------------
  161. -spec handle_info(Info :: timeout | term(), State :: term()) ->
  162. {noreply, NewState :: term()} |
  163. {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} |
  164. {stop, Reason :: term(), NewState :: term()}.
  165. handle_info({'3.0', discover, RemoteScopePid}, #state{
  166. handler = Handler,
  167. scope = Scope,
  168. nodes = Nodes
  169. } = State) ->
  170. RemoteScopeNode = node(RemoteScopePid),
  171. error_logger:info_msg("SYN[~s] Received DISCOVER request from node '~s' and scope '~s'",
  172. [node(), RemoteScopeNode, Scope]
  173. ),
  174. %% send local data to remote
  175. {ok, LocalData} = Handler:get_local_data(State),
  176. send_to_node(RemoteScopeNode, {'3.0', ack_sync, self(), LocalData}, State),
  177. %% is this a new node?
  178. case maps:is_key(RemoteScopeNode, Nodes) of
  179. true ->
  180. %% already known, ignore
  181. {noreply, State};
  182. false ->
  183. %% monitor
  184. _MRef = monitor(process, RemoteScopePid),
  185. {noreply, State#state{nodes = Nodes#{RemoteScopeNode => RemoteScopePid}}}
  186. end;
  187. handle_info({'3.0', ack_sync, RemoteScopePid, Data}, #state{
  188. handler = Handler,
  189. nodes = Nodes,
  190. scope = Scope
  191. } = State) ->
  192. RemoteScopeNode = node(RemoteScopePid),
  193. error_logger:info_msg("SYN[~s] Received ACK SYNC from node '~s' and scope '~s'",
  194. [node(), RemoteScopeNode, Scope]
  195. ),
  196. %% save remote data
  197. Handler:save_remote_data(RemoteScopePid, Data, State),
  198. %% is this a new node?
  199. case maps:is_key(RemoteScopeNode, Nodes) of
  200. true ->
  201. %% already known
  202. {noreply, State};
  203. false ->
  204. %% monitor
  205. _MRef = monitor(process, RemoteScopePid),
  206. %% send local to remote
  207. {ok, LocalData} = Handler:get_local_data(State),
  208. send_to_node(RemoteScopeNode, {'3.0', ack_sync, self(), LocalData}, State),
  209. %% return
  210. {noreply, State#state{nodes = Nodes#{RemoteScopeNode => RemoteScopePid}}}
  211. end;
  212. handle_info({'DOWN', MRef, process, Pid, Reason}, #state{
  213. handler = Handler,
  214. scope = Scope,
  215. nodes = Nodes
  216. } = State) when node(Pid) =/= node() ->
  217. %% scope process down
  218. RemoteNode = node(Pid),
  219. case maps:take(RemoteNode, Nodes) of
  220. {Pid, Nodes1} ->
  221. error_logger:info_msg("SYN[~s] Scope Process '~s' is DOWN on node '~s': ~p",
  222. [node(), Scope, RemoteNode, Reason]
  223. ),
  224. Handler:purge_local_data_for_node(RemoteNode, State),
  225. {noreply, State#state{nodes = Nodes1}};
  226. error ->
  227. %% relay to handler
  228. Handler:handle_info({'DOWN', MRef, process, Pid, Reason}, State)
  229. end;
  230. handle_info({nodedown, _Node}, State) ->
  231. %% ignore & wait for monitor DOWN message
  232. {noreply, State};
  233. handle_info({nodeup, RemoteNode}, #state{scope = Scope} = State) ->
  234. error_logger:info_msg("SYN[~s] Node '~s' has joined the cluster, sending discover message for scope '~s'",
  235. [node(), RemoteNode, Scope]
  236. ),
  237. send_to_node(RemoteNode, {'3.0', discover, self()}, State),
  238. {noreply, State};
  239. handle_info(Info, #state{handler = Handler} = State) ->
  240. Handler:handle_info(Info, State).
  241. %% ----------------------------------------------------------------------------------------------------------
  242. %% Continue messages
  243. %% ----------------------------------------------------------------------------------------------------------
  244. handle_continue(after_init, #state{scope = Scope} = State) ->
  245. error_logger:info_msg("SYN[~s] Discovering the cluster with scope '~s'", [node(), Scope]),
  246. broadcast_all_cluster({'3.0', discover, self()}, State),
  247. {noreply, State}.
  248. %% ----------------------------------------------------------------------------------------------------------
  249. %% Terminate
  250. %% ----------------------------------------------------------------------------------------------------------
  251. -spec terminate(Reason :: any(), #state{}) -> terminated.
  252. terminate(Reason, _State) ->
  253. error_logger:info_msg("SYN[~s] Terminating with reason: ~p", [node(), Reason]),
  254. terminated.
  255. %% ----------------------------------------------------------------------------------------------------------
  256. %% Convert process state when code is changed.
  257. %% ----------------------------------------------------------------------------------------------------------
  258. -spec code_change(OldVsn :: any(), #state{}, Extra :: any()) -> {ok, #state{}}.
  259. code_change(_OldVsn, State, _Extra) ->
  260. {ok, State}.
  261. %% ===================================================================
  262. %% Internal
  263. %% ===================================================================
  264. -spec get_process_name_for_scope(Handler :: module(), Scope :: atom()) -> atom().
  265. get_process_name_for_scope(Handler, Scope) ->
  266. ModuleBin = atom_to_binary(Handler),
  267. ScopeBin = atom_to_binary(Scope),
  268. binary_to_atom(<<ModuleBin/binary, "_", ScopeBin/binary>>).