myapp_events2.erl 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. -module(myapp_events2).
  2. -behaviour(gen_server).
  3. %-include_lib("stdlib/include/qlc.hrl").
  4. -export([start_link/0]).
  5. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
  6. -export([add_task/4, delete_task/2]).
  7. -export([add_task_b/4, delete_task_b/2]).
  8. -export([check_ets_count/0, check_ets_count2/0]).
  9. -define(IS_STRING(S), (is_list(S) andalso S /= [] andalso is_integer(hd(S)))).
  10. add_task(Task_Id, Proc_Id, Type, Time)
  11. when ?IS_STRING(Task_Id), is_integer(Proc_Id), ((Type =:= static) orelse (Type =:= dynamic)), is_integer(Time) ->
  12. Time2 = case Type of
  13. dynamic ->
  14. erlang:system_time(second) + Time; % Timestamp_Now + Time
  15. _ ->
  16. % static
  17. Time
  18. end,
  19. gen_server:cast(?MODULE, {add_task, {{Task_Id, Proc_Id}, Time2}}),
  20. ok.
  21. delete_task(Task_Id, Proc_Id)
  22. when ?IS_STRING(Task_Id), is_integer(Proc_Id) ->
  23. gen_server:cast(?MODULE, {delete_task, {Task_Id, Proc_Id}}),
  24. ok.
  25. add_task_b(Task_Id, Proc_Id, Type, Time)
  26. when ?IS_STRING(Task_Id), is_integer(Proc_Id), ((Type =:= static) orelse (Type =:= dynamic)), is_integer(Time) ->
  27. Time2 = case Type of
  28. dynamic ->
  29. erlang:system_time(second) + Time; % Timestamp_Now + Time
  30. _ ->
  31. % static
  32. Time
  33. end,
  34. gen_server:call(?MODULE, {add_task, {{Task_Id, Proc_Id}, Time2}}).
  35. delete_task_b(Task_Id, Proc_Id)
  36. when ?IS_STRING(Task_Id), is_integer(Proc_Id) ->
  37. gen_server:call(?MODULE, {delete_task, {Task_Id, Proc_Id}}).
  38. start_link() ->
  39. gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
  40. init([]) ->
  41. %ets:new(events_time_table, [bag, named_table, { keypos, 2 }, private, {write_concurrency, true}, {read_concurrency, true}]), % Key = Time
  42. %ets:new(events_table, [ordered_set, named_table, { keypos, 1 }, private, {write_concurrency, true}, {read_concurrency, true}]),
  43. %ets:new(events_time_table, [bag, named_table, { keypos, 2 }, private]), % Key = Time
  44. %ets:new(events_time_table, [ordered_set, named_table, { keypos, 2 }, private]), % Key = Time
  45. %ets:new(events_table, [ordered_set, named_table, { keypos, 1 }, private]), % Key = {Task_Id, Proc_Id}
  46. ets:new(events_time_table, [ordered_set, named_table, { keypos, 1 }, private]), % Key = {Time, Task_Id, Proc_Id}
  47. ets:new(events_table, [ordered_set, named_table, { keypos, 1 }, private]), % Key = {Task_Id, Proc_Id}
  48. %erlang:send_after(100, self(), check_time_process),
  49. self() ! check_time_process,
  50. State = [],
  51. {ok, State}.
  52. handle_call({add_task, {{Task_Id, Proc_Id}=V0, Time}=V}, _From, State) ->
  53. %true = ets:insert(events_time_table, V),
  54. %true = ets:insert(events_table, V),
  55. case ets:lookup(events_table, V0) of
  56. [{{Task_Id, Proc_Id}, Time0}] ->
  57. ets:delete_object(events_time_table, {{Time0, Task_Id, Proc_Id}}),
  58. true = ets:insert(events_time_table, {{Time, Task_Id, Proc_Id}});
  59. [] ->
  60. true = ets:insert(events_time_table, {{Time, Task_Id, Proc_Id}});
  61. Z ->
  62. io:format("~p~n", [Z]),
  63. true = ets:insert(events_time_table, {{Time, Task_Id, Proc_Id}})
  64. end,
  65. true = ets:insert(events_table, V),
  66. %io:format("add/update: ~p ~p~n", [Task_Id, Proc_Id]),
  67. {reply, ok, State};
  68. handle_call({delete_task, {Task_Id, Proc_Id}=V}, _From, State) ->
  69. %qlc:e(qlc:q([
  70. % {ets:delete_object(events_time_table, X), ets:delete(events_table, V)}
  71. % || {{Task_Id0, Proc_Id0}, _Time} = X <- ets:table(events_table),
  72. % Task_Id0 =:= Task_Id, Proc_Id0 =:= Proc_Id ])), % [{true,true}] | [] % повільно
  73. %qlc:e(qlc:q([
  74. % {ets:delete_object(events_time_table, X), ets:delete(events_table, V)}
  75. % || X <- ets:lookup(events_table, V) ])), % [{true,true}] | []
  76. %%ets:fun2ms(fun({{A,B},C}) when A =:= X andalso B =:= Y -> {{A,B},C} end).
  77. %%[{{{'$1','$2'},'$3'}, [{'andalso',{'=:=','$1',{const,"2"}}, {'=:=','$2',{const,2}}}], [{{{{'$1','$2'}},'$3'}}]}]
  78. %case ets:select(events_table, [{{{'$1','$2'},'$3'},
  79. % [{'andalso',{'=:=','$1',{const, Task_Id}}, {'=:=','$2',{const, Proc_Id}}}],
  80. % [{{{{'$1','$2'}},'$3'}}]}] ) of
  81. %
  82. % [X] ->
  83. % ets:delete_object(events_time_table, X),
  84. % ets:delete(events_table, V);
  85. % _ -> ok
  86. %end,
  87. %case ets:lookup(events_table, V) of
  88. % [X] ->
  89. % ets:delete_object(events_time_table, X),
  90. % ets:delete(events_table, V);
  91. % _ -> ok
  92. %end,
  93. %case ets:match_object(events_table, {{Task_Id, Proc_Id}, '$3'}) of
  94. % [X] ->
  95. % ets:delete_object(events_time_table, X),
  96. % ets:delete(events_table, V);
  97. % _ -> ok
  98. %end,
  99. %case ets:match_object(events_table, {{Task_Id, Proc_Id}, '$3'}) of
  100. case ets:lookup(events_table, V) of
  101. [{{Task_Id, Proc_Id}, Time}] ->
  102. ets:delete_object(events_time_table, {{Time, Task_Id, Proc_Id}}),
  103. ets:delete(events_table, V);
  104. _ -> ok
  105. end,
  106. %io:format("deleted: ~p ~p~n", [Task_Id, Proc_Id]),
  107. {reply, ok, State};
  108. handle_call(_Req, _From, State) ->
  109. {reply, not_handled, State}.
  110. handle_cast({add_task, {{_Task_Id, _Proc_Id}, _Time}=_V}, State) ->
  111. %ets:insert(events_table, V),
  112. %io:format("add/update: ~p ~p~n", [Task_Id, Proc_Id]),
  113. {noreply, State};
  114. handle_cast({delete_task, {_Task_Id, _Proc_Id}=_V}, State) ->
  115. %ets:delete(events_table, V),
  116. %io:format("deleted: ~p ~p~n", [Task_Id, Proc_Id]),
  117. {noreply, State};
  118. handle_cast(_Req, State) ->
  119. {noreply, State}.
  120. handle_info(check_time_process, State) ->
  121. erlang:send_after(1000, self(), check_time_process),
  122. %Timestamp_Now = erlang:system_time(second),
  123. %%ets:fun2ms(fun({{A,B},C}) when C =< Timestamp_Now -> {{A,B},C} end).
  124. %%[{{{'$1','$2'},'$3'}, [{'=<','$3',{const,1642507256}}], [{{{{'$1','$2'}},'$3'}}]}]
  125. %L = ets:select(events_table, [{{{'$1','$2'},'$3'}, [{'=<','$3',{const, Timestamp_Now}}], [{{{{'$1','$2'}},'$3'}}]}] ),
  126. %lists:foldl(fun({{Task_Id0, Proc_Id0}=V, _Time0}, ok) ->
  127. % send_to_rabbitmq({call_timer, Proc_Id0, Task_Id0}),
  128. % ets:delete(events_table, V),
  129. %io:format("tick: ~p ~p~n", [Task_Id0, Proc_Id0]),
  130. % ok
  131. % end, ok, L),
  132. ok = check_time_h(ets:first(events_time_table), false, erlang:system_time(second) ),
  133. %erlang:send_after(1000, self(), check_time_process),
  134. {noreply, State};
  135. handle_info(_Request, State) ->
  136. {noreply, State}.
  137. terminate(_Reason, _State) ->
  138. ok.
  139. code_change(_OldVsn, State, _Extra) ->
  140. {ok, State}.
  141. %check_time_h(Key, Prev_Key, Timestamp_Now)
  142. check_time_h('$end_of_table', Prev_Key, _Timestamp_Now) ->
  143. case Prev_Key of
  144. false -> ok;
  145. _ ->
  146. ets:delete_object(events_time_table, {Prev_Key}),
  147. ok
  148. end;
  149. check_time_h({Time, Task_Id, Proc_Id}=Key, Prev_Key, Timestamp_Now) ->
  150. case Prev_Key of
  151. false -> ok;
  152. _ ->
  153. ets:delete_object(events_time_table, {Prev_Key}),
  154. ok
  155. end,
  156. if Time =< Timestamp_Now ->
  157. % send_to_rabbitmq({call_timer, Proc_Id, Task_Id}),
  158. ets:delete(events_table, {Task_Id, Proc_Id}),
  159. check_time_h(ets:next(events_time_table, Key), Key, Timestamp_Now);
  160. true -> ok
  161. end.
  162. check_ets_count() -> ets:info(events_table, 'size').
  163. check_ets_count2() -> ets:info(events_time_table, 'size').