|
@@ -1,12 +1,14 @@
|
|
-module(myapp_events2).
|
|
-module(myapp_events2).
|
|
-behaviour(gen_server).
|
|
-behaviour(gen_server).
|
|
|
|
|
|
|
|
+%-include_lib("stdlib/include/qlc.hrl").
|
|
|
|
+
|
|
-export([start_link/0]).
|
|
-export([start_link/0]).
|
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
|
|
|
|
|
-export([add_task/4, delete_task/2]).
|
|
-export([add_task/4, delete_task/2]).
|
|
-export([add_task_b/4, delete_task_b/2]).
|
|
-export([add_task_b/4, delete_task_b/2]).
|
|
--export([check_ets_count/0]).
|
|
|
|
|
|
+-export([check_ets_count/0, check_ets_count2/0]).
|
|
|
|
|
|
|
|
|
|
-define(IS_STRING(S), (is_list(S) andalso S /= [] andalso is_integer(hd(S)))).
|
|
-define(IS_STRING(S), (is_list(S) andalso S /= [] andalso is_integer(hd(S)))).
|
|
@@ -53,7 +55,16 @@ start_link() ->
|
|
|
|
|
|
|
|
|
|
init([]) ->
|
|
init([]) ->
|
|
- ets:new(events_table, [ordered_set, named_table, { keypos, 1 }, private]),
|
|
|
|
|
|
+ %ets:new(events_time_table, [bag, named_table, { keypos, 2 }, private, {write_concurrency, true}, {read_concurrency, true}]), % Key = Time
|
|
|
|
+ %ets:new(events_table, [ordered_set, named_table, { keypos, 1 }, private, {write_concurrency, true}, {read_concurrency, true}]),
|
|
|
|
+
|
|
|
|
+ %ets:new(events_time_table, [bag, named_table, { keypos, 2 }, private]), % Key = Time
|
|
|
|
+
|
|
|
|
+ %ets:new(events_time_table, [ordered_set, named_table, { keypos, 2 }, private]), % Key = Time
|
|
|
|
+ %ets:new(events_table, [ordered_set, named_table, { keypos, 1 }, private]), % Key = {Task_Id, Proc_Id}
|
|
|
|
+
|
|
|
|
+ ets:new(events_time_table, [ordered_set, named_table, { keypos, 1 }, private]), % Key = {Time, Task_Id, Proc_Id}
|
|
|
|
+ ets:new(events_table, [ordered_set, named_table, { keypos, 1 }, private]), % Key = {Task_Id, Proc_Id}
|
|
|
|
|
|
%erlang:send_after(100, self(), check_time_process),
|
|
%erlang:send_after(100, self(), check_time_process),
|
|
self() ! check_time_process,
|
|
self() ! check_time_process,
|
|
@@ -61,14 +72,70 @@ init([]) ->
|
|
{ok, State}.
|
|
{ok, State}.
|
|
|
|
|
|
|
|
|
|
-handle_call({add_task, {{_Task_Id, _Proc_Id}, _Time}=V}, _From, State) ->
|
|
|
|
|
|
+handle_call({add_task, {{Task_Id, Proc_Id}=V0, Time}=V}, _From, State) ->
|
|
|
|
+ %true = ets:insert(events_time_table, V),
|
|
|
|
+ %true = ets:insert(events_table, V),
|
|
|
|
+
|
|
|
|
+ case ets:lookup(events_table, V0) of
|
|
|
|
+ [{{Task_Id, Proc_Id}, Time0}] ->
|
|
|
|
+ ets:delete_object(events_time_table, {{Time0, Task_Id, Proc_Id}}),
|
|
|
|
+ true = ets:insert(events_time_table, {{Time, Task_Id, Proc_Id}});
|
|
|
|
+ [] ->
|
|
|
|
+ true = ets:insert(events_time_table, {{Time, Task_Id, Proc_Id}});
|
|
|
|
+ Z ->
|
|
|
|
+ io:format("~p~n", [Z]),
|
|
|
|
+ true = ets:insert(events_time_table, {{Time, Task_Id, Proc_Id}})
|
|
|
|
+ end,
|
|
true = ets:insert(events_table, V),
|
|
true = ets:insert(events_table, V),
|
|
|
|
+
|
|
%io:format("add/update: ~p ~p~n", [Task_Id, Proc_Id]),
|
|
%io:format("add/update: ~p ~p~n", [Task_Id, Proc_Id]),
|
|
{reply, ok, State};
|
|
{reply, ok, State};
|
|
|
|
|
|
|
|
|
|
-handle_call({delete_task, {_Task_Id, _Proc_Id}=V}, _From, State) ->
|
|
|
|
- true = ets:delete(events_table, V),
|
|
|
|
|
|
+handle_call({delete_task, {Task_Id, Proc_Id}=V}, _From, State) ->
|
|
|
|
+ %qlc:e(qlc:q([
|
|
|
|
+ % {ets:delete_object(events_time_table, X), ets:delete(events_table, V)}
|
|
|
|
+ % || {{Task_Id0, Proc_Id0}, _Time} = X <- ets:table(events_table),
|
|
|
|
+ % Task_Id0 =:= Task_Id, Proc_Id0 =:= Proc_Id ])), % [{true,true}] | [] % повільно
|
|
|
|
+
|
|
|
|
+ %qlc:e(qlc:q([
|
|
|
|
+ % {ets:delete_object(events_time_table, X), ets:delete(events_table, V)}
|
|
|
|
+ % || X <- ets:lookup(events_table, V) ])), % [{true,true}] | []
|
|
|
|
+
|
|
|
|
+ %%ets:fun2ms(fun({{A,B},C}) when A =:= X andalso B =:= Y -> {{A,B},C} end).
|
|
|
|
+ %%[{{{'$1','$2'},'$3'}, [{'andalso',{'=:=','$1',{const,"2"}}, {'=:=','$2',{const,2}}}], [{{{{'$1','$2'}},'$3'}}]}]
|
|
|
|
+ %case ets:select(events_table, [{{{'$1','$2'},'$3'},
|
|
|
|
+ % [{'andalso',{'=:=','$1',{const, Task_Id}}, {'=:=','$2',{const, Proc_Id}}}],
|
|
|
|
+ % [{{{{'$1','$2'}},'$3'}}]}] ) of
|
|
|
|
+ %
|
|
|
|
+ % [X] ->
|
|
|
|
+ % ets:delete_object(events_time_table, X),
|
|
|
|
+ % ets:delete(events_table, V);
|
|
|
|
+ % _ -> ok
|
|
|
|
+ %end,
|
|
|
|
+
|
|
|
|
+ %case ets:lookup(events_table, V) of
|
|
|
|
+ % [X] ->
|
|
|
|
+ % ets:delete_object(events_time_table, X),
|
|
|
|
+ % ets:delete(events_table, V);
|
|
|
|
+ % _ -> ok
|
|
|
|
+ %end,
|
|
|
|
+
|
|
|
|
+ %case ets:match_object(events_table, {{Task_Id, Proc_Id}, '$3'}) of
|
|
|
|
+ % [X] ->
|
|
|
|
+ % ets:delete_object(events_time_table, X),
|
|
|
|
+ % ets:delete(events_table, V);
|
|
|
|
+ % _ -> ok
|
|
|
|
+ %end,
|
|
|
|
+
|
|
|
|
+ %case ets:match_object(events_table, {{Task_Id, Proc_Id}, '$3'}) of
|
|
|
|
+ case ets:lookup(events_table, V) of
|
|
|
|
+ [{{Task_Id, Proc_Id}, Time}] ->
|
|
|
|
+ ets:delete_object(events_time_table, {{Time, Task_Id, Proc_Id}}),
|
|
|
|
+ ets:delete(events_table, V);
|
|
|
|
+ _ -> ok
|
|
|
|
+ end,
|
|
|
|
+
|
|
%io:format("deleted: ~p ~p~n", [Task_Id, Proc_Id]),
|
|
%io:format("deleted: ~p ~p~n", [Task_Id, Proc_Id]),
|
|
{reply, ok, State};
|
|
{reply, ok, State};
|
|
|
|
|
|
@@ -77,14 +144,14 @@ handle_call(_Req, _From, State) ->
|
|
{reply, not_handled, State}.
|
|
{reply, not_handled, State}.
|
|
|
|
|
|
|
|
|
|
-handle_cast({add_task, {{_Task_Id, _Proc_Id}, _Time}=V}, State) ->
|
|
|
|
- ets:insert(events_table, V),
|
|
|
|
|
|
+handle_cast({add_task, {{_Task_Id, _Proc_Id}, _Time}=_V}, State) ->
|
|
|
|
+ %ets:insert(events_table, V),
|
|
%io:format("add/update: ~p ~p~n", [Task_Id, Proc_Id]),
|
|
%io:format("add/update: ~p ~p~n", [Task_Id, Proc_Id]),
|
|
{noreply, State};
|
|
{noreply, State};
|
|
|
|
|
|
|
|
|
|
-handle_cast({delete_task, {_Task_Id, _Proc_Id}=V}, State) ->
|
|
|
|
- ets:delete(events_table, V),
|
|
|
|
|
|
+handle_cast({delete_task, {_Task_Id, _Proc_Id}=_V}, State) ->
|
|
|
|
+ %ets:delete(events_table, V),
|
|
%io:format("deleted: ~p ~p~n", [Task_Id, Proc_Id]),
|
|
%io:format("deleted: ~p ~p~n", [Task_Id, Proc_Id]),
|
|
{noreply, State};
|
|
{noreply, State};
|
|
|
|
|
|
@@ -96,19 +163,22 @@ handle_cast(_Req, State) ->
|
|
handle_info(check_time_process, State) ->
|
|
handle_info(check_time_process, State) ->
|
|
erlang:send_after(1000, self(), check_time_process),
|
|
erlang:send_after(1000, self(), check_time_process),
|
|
|
|
|
|
- Timestamp_Now = erlang:system_time(second),
|
|
|
|
|
|
+ %Timestamp_Now = erlang:system_time(second),
|
|
|
|
|
|
- %ets:fun2ms(fun({{A,B},C}) when C =< Timestamp_Now -> {{A,B},C} end).
|
|
|
|
- %[{{{'$1','$2'},'$3'}, [{'=<','$3',{const,1642507256}}], [{{{{'$1','$2'}},'$3'}}]}]
|
|
|
|
|
|
+ %%ets:fun2ms(fun({{A,B},C}) when C =< Timestamp_Now -> {{A,B},C} end).
|
|
|
|
+ %%[{{{'$1','$2'},'$3'}, [{'=<','$3',{const,1642507256}}], [{{{{'$1','$2'}},'$3'}}]}]
|
|
|
|
|
|
- L = ets:select(events_table, [{{{'$1','$2'},'$3'}, [{'=<','$3',{const, Timestamp_Now}}], [{{{{'$1','$2'}},'$3'}}]}] ),
|
|
|
|
|
|
+ %L = ets:select(events_table, [{{{'$1','$2'},'$3'}, [{'=<','$3',{const, Timestamp_Now}}], [{{{{'$1','$2'}},'$3'}}]}] ),
|
|
|
|
|
|
- lists:foldl(fun({{Task_Id0, Proc_Id0}=V, _Time0}, ok) ->
|
|
|
|
|
|
+ %lists:foldl(fun({{Task_Id0, Proc_Id0}=V, _Time0}, ok) ->
|
|
% send_to_rabbitmq({call_timer, Proc_Id0, Task_Id0}),
|
|
% send_to_rabbitmq({call_timer, Proc_Id0, Task_Id0}),
|
|
- ets:delete(events_table, V),
|
|
|
|
|
|
+ % ets:delete(events_table, V),
|
|
%io:format("tick: ~p ~p~n", [Task_Id0, Proc_Id0]),
|
|
%io:format("tick: ~p ~p~n", [Task_Id0, Proc_Id0]),
|
|
- ok
|
|
|
|
- end, ok, L),
|
|
|
|
|
|
+ % ok
|
|
|
|
+ % end, ok, L),
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ ok = check_time_h(ets:first(events_time_table), false, erlang:system_time(second) ),
|
|
|
|
|
|
%erlang:send_after(1000, self(), check_time_process),
|
|
%erlang:send_after(1000, self(), check_time_process),
|
|
{noreply, State};
|
|
{noreply, State};
|
|
@@ -125,5 +195,31 @@ code_change(_OldVsn, State, _Extra) ->
|
|
{ok, State}.
|
|
{ok, State}.
|
|
|
|
|
|
|
|
|
|
|
|
+%check_time_h(Key, Prev_Key, Timestamp_Now)
|
|
|
|
+check_time_h('$end_of_table', Prev_Key, _Timestamp_Now) ->
|
|
|
|
+ case Prev_Key of
|
|
|
|
+ false -> ok;
|
|
|
|
+ _ ->
|
|
|
|
+ ets:delete_object(events_time_table, {Prev_Key}),
|
|
|
|
+ ok
|
|
|
|
+ end;
|
|
|
|
+check_time_h({Time, Task_Id, Proc_Id}=Key, Prev_Key, Timestamp_Now) ->
|
|
|
|
+ case Prev_Key of
|
|
|
|
+ false -> ok;
|
|
|
|
+ _ ->
|
|
|
|
+ ets:delete_object(events_time_table, {Prev_Key}),
|
|
|
|
+ ok
|
|
|
|
+ end,
|
|
|
|
+
|
|
|
|
+ if Time =< Timestamp_Now ->
|
|
|
|
+ % send_to_rabbitmq({call_timer, Proc_Id, Task_Id}),
|
|
|
|
+ ets:delete(events_table, {Task_Id, Proc_Id}),
|
|
|
|
+ check_time_h(ets:next(events_time_table, Key), Key, Timestamp_Now);
|
|
|
|
+
|
|
|
|
+ true -> ok
|
|
|
|
+ end.
|
|
|
|
+
|
|
check_ets_count() -> ets:info(events_table, 'size').
|
|
check_ets_count() -> ets:info(events_table, 'size').
|
|
|
|
+check_ets_count2() -> ets:info(events_time_table, 'size').
|
|
|
|
+
|
|
|
|
|