|
@@ -21,7 +21,8 @@ when ?IS_STRING(Task_Id), is_integer(Proc_Id), ((Type =:= static) orelse (Type =
|
|
|
end,
|
|
|
% додаємо запис в СУБД
|
|
|
% postgres("INSERT INTO events_table (task_id, proc_id, time) VALUES ($1, $2, $3)"
|
|
|
- % " ON CONFLICT (unique_key) UPDATE SET time = $4", [Task_Id, Proc_Id, Time2, Time2]),
|
|
|
+ % " ON CONFLICT ON CONSTRAINT events_table_ids_uniq DO UPDATE SET time = $4, deleted = NULL",
|
|
|
+ % [Task_Id, Proc_Id, Time2, Time2]),
|
|
|
|
|
|
ok.
|
|
|
|
|
@@ -29,7 +30,7 @@ delete_task(Task_Id, Proc_Id)
|
|
|
when ?IS_STRING(Task_Id), is_integer(Proc_Id) ->
|
|
|
% позначаємо запис видаленим в СУБД
|
|
|
% postgres("UPDATE events_table SET deleted = 1"
|
|
|
- % " WHERE task_id = $1, proc_id = $2 LIMIT 1", [Task_Id, Proc_Id]),
|
|
|
+ % " WHERE task_id = $1 AND proc_id = $2", [Task_Id, Proc_Id]),
|
|
|
|
|
|
% видаляємо зі State (якщо є)
|
|
|
gen_server:cast(?MODULE, {delete_task, {Task_Id, Proc_Id}}),
|
|
@@ -41,8 +42,9 @@ start_link() ->
|
|
|
|
|
|
|
|
|
init([]) ->
|
|
|
- %erlang:send_after(10, self(), load_data_from_db),
|
|
|
- self() ! load_data_from_db,
|
|
|
+ ets:new(events_table, [ordered_set, named_table, { keypos, 1 }, private, {write_concurrency, true}]),
|
|
|
+
|
|
|
+ self() ! check_time_process,
|
|
|
State = [],
|
|
|
{ok, State}.
|
|
|
|
|
@@ -63,41 +65,52 @@ handle_cast(_Req, State) ->
|
|
|
|
|
|
|
|
|
handle_info(check_time_process, State) ->
|
|
|
- Timestamp_Now = erlang:system_time(second),
|
|
|
-
|
|
|
- %ets:fun2ms(fun({{A,B},C}) when C =< Timestamp_Now -> {{A,B},C} end).
|
|
|
- %[{{{'$1','$2'},'$3'}, [{'=<','$3',{const,1642507256}}], [{{{{'$1','$2'}},'$3'}}]}]
|
|
|
-
|
|
|
- L = ets:select(events_table, [{{{'$1','$2'},'$3'}, [{'=<','$3',{const, Timestamp_Now}}], [{{{{'$1','$2'}},'$3'}}]}] ),
|
|
|
-
|
|
|
- lists:foldl(fun({{Task_Id0, Proc_Id0}=V, _Time0}, ok) ->
|
|
|
- % send_to_rabbitmq({call_timer, Proc_Id0, Task_Id0}),
|
|
|
- % позначаємо запис видаленим в СУБД
|
|
|
- % postgres("UPDATE events_table SET deleted = 1"
|
|
|
- % " WHERE task_id = $1, proc_id = $2 LIMIT 1", [Task_Id0, Proc_Id0]),
|
|
|
-
|
|
|
- ets:delete(events_table, V),
|
|
|
- io:format("tick: ~p ~p~n", [Task_Id0, Proc_Id0]),
|
|
|
- ok
|
|
|
- end, ok, L),
|
|
|
-
|
|
|
- %erlang:send_after(10, self(), check_time_process),
|
|
|
- %self() ! load_data_from_db,
|
|
|
+ N0 = ets:info(events_table, 'size'),
|
|
|
+ if N0 =:= 0 ; N0 < ?N_COUNT ->
|
|
|
+ % отримаємо записи з СУБД
|
|
|
+ N = ?N_COUNT - N0,
|
|
|
+ ok = get_n_from_db(N),
|
|
|
+ self() ! check_time_process;
|
|
|
+
|
|
|
+ true ->
|
|
|
+ % записи є, працюємо з ними
|
|
|
+ erlang:send_after(500, self(), check_time_process),
|
|
|
+
|
|
|
+ Timestamp_Now = erlang:system_time(second),
|
|
|
+ %ets:fun2ms(fun({{A,B},C}) when C =< Timestamp_Now -> {{A,B},C} end).
|
|
|
+ %[{{{'$1','$2'},'$3'}, [{'=<','$3',{const,1642507256}}], [{{{{'$1','$2'}},'$3'}}]}]
|
|
|
+
|
|
|
+ L = ets:select(events_table, [{{{'$1','$2'},'$3'}, [{'=<','$3',{const, Timestamp_Now}}], [{{{{'$1','$2'}},'$3'}}]}] ),
|
|
|
+
|
|
|
+ lists:foldl(fun({{Task_Id0, Proc_Id0}=V, _Time0}, ok) ->
|
|
|
+ % send_to_rabbitmq({call_timer, Proc_Id0, Task_Id0}),
|
|
|
+
|
|
|
+ % позначаємо запис видаленим в СУБД
|
|
|
+ % postgres("UPDATE events_table SET deleted = 1"
|
|
|
+ % " WHERE task_id = $1 AND proc_id = $2", [Task_Id0, Proc_Id0]),
|
|
|
+
|
|
|
+ ets:delete(events_table, V),
|
|
|
+ io:format("tick: ~p ~p~n", [Task_Id0, Proc_Id0]),
|
|
|
+ ok
|
|
|
+ end, ok, L)
|
|
|
+
|
|
|
+ end,
|
|
|
{noreply, State};
|
|
|
|
|
|
|
|
|
-handle_info(load_data_from_db, State) ->
|
|
|
- erlang:send_after(1000, self(), load_data_from_db),
|
|
|
-
|
|
|
- %ets:fun2ms(fun({{A,B},C}) -> true end).
|
|
|
- %[{{{'$1','$2'},'$3'},[],[true]}]
|
|
|
-
|
|
|
- %ets:select_count(events_table, [{{{'$1','$2'},'$3'},[],[true]}]).
|
|
|
- %proplists:get_value('size', ets:info(events_table)).
|
|
|
- %ets:foldl(fun({{_,_},_}, A) -> A + 1;(_, A) -> A end, 0, events_table).
|
|
|
-
|
|
|
- N0 = proplists:get_value('size', ets:info(events_table)),
|
|
|
- N = ?N_COUNT - N0,
|
|
|
+handle_info(_Request, State) ->
|
|
|
+ {noreply, State}.
|
|
|
+
|
|
|
+
|
|
|
+terminate(_Reason, _State) ->
|
|
|
+ ok.
|
|
|
+
|
|
|
+code_change(_OldVsn, State, _Extra) ->
|
|
|
+ {ok, State}.
|
|
|
+
|
|
|
+
|
|
|
+% отримати N записів з СУБД, додати в ETS
|
|
|
+get_n_from_db(N) ->
|
|
|
% отримуємо записи з СУБД
|
|
|
% L = postgres("SELECT task_id, proc_id, time FROM events_table"
|
|
|
% " WHERE deleted IS NULL ORDER BY time DESC LIMIT $1", [N]),
|
|
@@ -108,21 +121,6 @@ handle_info(load_data_from_db, State) ->
|
|
|
% ok
|
|
|
% end, ok, L),
|
|
|
|
|
|
-
|
|
|
- %erlang:send_after(10, self(), check_time_process),
|
|
|
- self() ! check_time_process,
|
|
|
- %erlang:send_after(1000, self(), load_data_from_db),
|
|
|
- {noreply, State};
|
|
|
-
|
|
|
-
|
|
|
-handle_info(_Request, State) ->
|
|
|
- {noreply, State}.
|
|
|
-
|
|
|
-
|
|
|
-terminate(_Reason, _State) ->
|
|
|
ok.
|
|
|
|
|
|
-code_change(_OldVsn, State, _Extra) ->
|
|
|
- {ok, State}.
|
|
|
-
|
|
|
|