syn_backbone.erl 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. %% ==========================================================================================================
  2. %% Syn - A global Process Registry and Process Group manager.
  3. %%
  4. %% The MIT License (MIT)
  5. %%
  6. %% Copyright (c) 2015-2021 Roberto Ostinelli <roberto@ostinelli.net> and Neato Robotics, Inc.
  7. %%
  8. %% Permission is hereby granted, free of charge, to any person obtaining a copy
  9. %% of this software and associated documentation files (the "Software"), to deal
  10. %% in the Software without restriction, including without limitation the rights
  11. %% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  12. %% copies of the Software, and to permit persons to whom the Software is
  13. %% furnished to do so, subject to the following conditions:
  14. %%
  15. %% The above copyright notice and this permission notice shall be included in
  16. %% all copies or substantial portions of the Software.
  17. %%
  18. %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  19. %% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  20. %% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  21. %% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  22. %% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  23. %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  24. %% THE SOFTWARE.
  25. %% ==========================================================================================================
  26. %% @private
  27. -module(syn_backbone).
  28. -behaviour(gen_server).
  29. %% API
  30. -export([start_link/0]).
  31. -export([create_tables_for_scope/1]).
  32. -export([get_table_name/2]).
  33. -export([save_process_name/2, get_process_name/1]).
  34. %% gen_server callbacks
  35. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
  36. - if (?OTP_RELEASE >= 23).
  37. -define(ETS_OPTIMIZATIONS, [{decentralized_counters, true}]).
  38. -else.
  39. -define(ETS_OPTIMIZATIONS, []).
  40. -endif.
  41. %% includes
  42. -include("syn.hrl").
  43. %% ===================================================================
  44. %% API
  45. %% ===================================================================
  46. -spec start_link() -> {ok, pid()} | {error, term()}.
  47. start_link() ->
  48. Options = [],
  49. gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
  50. -spec create_tables_for_scope(Scope :: atom()) -> ok.
  51. create_tables_for_scope(Scope) ->
  52. gen_server:call(?MODULE, {create_tables_for_scope, Scope}).
  53. -spec save_process_name(Key :: term(), ProcessName :: atom()) -> true.
  54. save_process_name(Key, ProcessName) ->
  55. true = ets:insert(syn_process_names, {Key, ProcessName}).
  56. -spec get_process_name(Key :: term()) -> ProcessName :: atom().
  57. get_process_name(Key) ->
  58. case ets:lookup(syn_process_names, Key) of
  59. [{_, ProcessName}] -> ProcessName;
  60. [] -> undefined
  61. end.
  62. -spec get_table_name(TableId :: atom(), Scope :: atom()) -> TableName :: atom() | undefined.
  63. get_table_name(TableId, Scope) ->
  64. case ets:lookup(syn_table_names, {TableId, Scope}) of
  65. [{_, TableName}] -> TableName;
  66. [] -> undefined
  67. end.
  68. %% ===================================================================
  69. %% Callbacks
  70. %% ===================================================================
  71. %% ----------------------------------------------------------------------------------------------------------
  72. %% Init
  73. %% ----------------------------------------------------------------------------------------------------------
  74. -spec init([]) ->
  75. {ok, State :: map()} |
  76. {ok, State :: map(), Timeout :: non_neg_integer()} |
  77. ignore |
  78. {stop, Reason :: term()}.
  79. init([]) ->
  80. %% create table names table
  81. ets:new(syn_table_names, [set, public, named_table, {read_concurrency, true}] ++ ?ETS_OPTIMIZATIONS),
  82. ets:new(syn_process_names, [set, public, named_table, {read_concurrency, true}] ++ ?ETS_OPTIMIZATIONS),
  83. %% init
  84. {ok, #{}}.
  85. %% ----------------------------------------------------------------------------------------------------------
  86. %% Call messages
  87. %% ----------------------------------------------------------------------------------------------------------
  88. -spec handle_call(Request :: term(), From :: term(), State :: map()) ->
  89. {reply, Reply :: term(), State :: map()} |
  90. {reply, Reply :: term(), State :: map(), Timeout :: non_neg_integer()} |
  91. {noreply, State :: map()} |
  92. {noreply, State :: map(), Timeout :: non_neg_integer()} |
  93. {stop, Reason :: term(), Reply :: term(), State :: map()} |
  94. {stop, Reason :: term(), State :: map()}.
  95. handle_call({create_tables_for_scope, Scope}, _From, State) ->
  96. error_logger:info_msg("SYN[~s] Creating tables for scope '~s'", [?MODULE, Scope]),
  97. ensure_table_existence(set, syn_registry_by_name, Scope),
  98. ensure_table_existence(bag, syn_registry_by_pid, Scope),
  99. ensure_table_existence(ordered_set, syn_pg_by_name, Scope),
  100. ensure_table_existence(ordered_set, syn_pg_by_pid, Scope),
  101. {reply, ok, State};
  102. handle_call(Request, From, State) ->
  103. error_logger:warning_msg("SYN[~s] Received from ~p an unknown call message: ~p", [?MODULE, From, Request]),
  104. {reply, undefined, State}.
  105. %% ----------------------------------------------------------------------------------------------------------
  106. %% Cast messages
  107. %% ----------------------------------------------------------------------------------------------------------
  108. -spec handle_cast(Msg :: term(), State :: map()) ->
  109. {noreply, State :: map()} |
  110. {noreply, State :: map(), Timeout :: non_neg_integer()} |
  111. {stop, Reason :: term(), State :: map()}.
  112. handle_cast(Msg, State) ->
  113. error_logger:warning_msg("SYN[~s] Received an unknown cast message: ~p", [?MODULE, Msg]),
  114. {noreply, State}.
  115. %% ----------------------------------------------------------------------------------------------------------
  116. %% All non Call / Cast messages
  117. %% ----------------------------------------------------------------------------------------------------------
  118. -spec handle_info(Info :: term(), State :: map()) ->
  119. {noreply, State :: map()} |
  120. {noreply, State :: map(), Timeout :: non_neg_integer()} |
  121. {stop, Reason :: term(), State :: map()}.
  122. handle_info(Info, State) ->
  123. error_logger:warning_msg("SYN[~s] Received an unknown info message: ~p", [?MODULE, Info]),
  124. {noreply, State}.
  125. %% ----------------------------------------------------------------------------------------------------------
  126. %% Terminate
  127. %% ----------------------------------------------------------------------------------------------------------
  128. -spec terminate(Reason :: term(), State :: map()) -> terminated.
  129. terminate(Reason, _State) ->
  130. error_logger:info_msg("SYN[~s] Terminating with reason: ~p", [?MODULE, Reason]),
  131. %% return
  132. terminated.
  133. %% ----------------------------------------------------------------------------------------------------------
  134. %% Convert process state when code is changed.
  135. %% ----------------------------------------------------------------------------------------------------------
  136. -spec code_change(OldVsn :: term(), State :: map(), Extra :: term()) -> {ok, State :: map()}.
  137. code_change(_OldVsn, State, _Extra) ->
  138. {ok, State}.
  139. %% ===================================================================
  140. %% Internal
  141. %% ===================================================================
  142. -spec ensure_table_existence(Type :: ets:type(), TableId :: atom(), Scope :: atom()) -> any().
  143. ensure_table_existence(Type, TableId, Scope) ->
  144. %% build name
  145. TableIdBin = list_to_binary(atom_to_list(TableId)),
  146. ScopeBin = list_to_binary(atom_to_list(Scope)),
  147. TableName = list_to_atom(binary_to_list(<<TableIdBin/binary, "_", ScopeBin/binary>>)),
  148. %% save to loopkup table
  149. true = ets:insert(syn_table_names, {{TableId, Scope}, TableName}),
  150. %% check or create
  151. case ets:whereis(TableName) of
  152. undefined ->
  153. %% regarding decentralized_counters: <https://blog.erlang.org/scalable-ets-counters/>
  154. ets:new(TableName, [Type, public, named_table, {read_concurrency, true}] ++ ?ETS_OPTIMIZATIONS);
  155. _ ->
  156. ok
  157. end.