Browse Source

v3 - work with DB description

221V 3 years ago
parent
commit
e1e4ea20ef

+ 1 - 1
apps/myapp/ebin/myapp.app

@@ -4,7 +4,7 @@
   {vsn, "0.0.1"},
   {vsn, "0.0.1"},
   {registered, [myapp_sup]},
   {registered, [myapp_sup]},
   {applications, [kernel, stdlib]},
   {applications, [kernel, stdlib]},
-  {modules, [myapp, myapp_sup, myapp_events, myapp_events2, test]},
+  {modules, [myapp, myapp_sup, myapp_events, myapp_events2, myapp_events3, test]},
   {mod, { myapp, []}},
   {mod, { myapp, []}},
   {env, []}
   {env, []}
  ]}.
  ]}.

+ 2 - 1
apps/myapp/src/myapp_events2.erl

@@ -33,7 +33,8 @@ start_link() ->
 
 
 
 
 init([]) ->
 init([]) ->
-  erlang:send_after(100, self(), check_time_process),
+  %erlang:send_after(100, self(), check_time_process),
+  self() ! check_time_process,
   State = [],
   State = [],
   {ok, State}.
   {ok, State}.
 
 

+ 124 - 0
apps/myapp/src/myapp_events3.erl

@@ -0,0 +1,124 @@
+-module(myapp_events3).
+-behaviour(gen_server).
+
+-export([start_link/0]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
+
+-export([add_task/4, delete_task/2]).
+
+-define(N_COUNT, 20_000).
+-define(IS_STRING(S), (is_list(S) andalso S /= [] andalso is_integer(hd(S)))).
+
+
+add_task(Task_Id, Proc_Id, Type, Time)
+when ?IS_STRING(Task_Id), is_integer(Proc_Id), ((Type =:= static) orelse (Type =:= dynamic)), is_integer(Time) ->
+  Time2 = case Type of
+    dynamic ->
+      erlang:system_time(second) + Time; % Timestamp_Now + Time
+    _ ->
+      % static
+      Time
+  end,
+  % додаємо запис в СУБД
+  % postgres("INSERT INTO events_table (task_id, proc_id, time) VALUES ($1, $2, $3)"
+  %   " ON CONFLICT (unique_key) UPDATE SET time = $4", [Task_Id, Proc_Id, Time2, Time2]),
+  
+  ok.
+
+delete_task(Task_Id, Proc_Id)
+when ?IS_STRING(Task_Id), is_integer(Proc_Id) ->
+  % позначаємо запис видаленим в СУБД
+  % postgres("UPDATE events_table SET deleted = 1"
+  %   " WHERE task_id = $1, proc_id = $2 LIMIT 1", [Task_Id, Proc_Id]),
+  
+  % видаляємо зі State (якщо є)
+  gen_server:cast(?MODULE, {delete_task, {Task_Id, Proc_Id}}),
+  ok.
+
+
+start_link() ->
+  gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+init([]) ->
+  erlang:send_after(10, self(), load_data_from_db),
+  State = [],
+  {ok, State}.
+
+
+handle_call(_Req, _From, State) ->
+  {reply, not_handled, State}.
+
+
+handle_cast({delete_task, {Task_Id, Proc_Id}=V}, State) ->
+  % видаляємо зі State (якщо є)
+  ets:delete(events_table, V),
+  io:format("deleted: ~p ~p~n", [Task_Id, Proc_Id]),
+  {noreply, State};
+
+
+handle_cast(_Req, State) ->
+  {noreply, State}.
+
+
+handle_info(check_time_process, State) ->
+  Timestamp_Now = erlang:system_time(second),
+  
+  %ets:fun2ms(fun({{A,B},C}) when C =< Timestamp_Now -> {{A,B},C} end).
+  %[{{{'$1','$2'},'$3'}, [{'=<','$3',{const,1642507256}}], [{{{{'$1','$2'}},'$3'}}]}]
+  
+  L = ets:select(events_table, [{{{'$1','$2'},'$3'}, [{'=<','$3',{const, Timestamp_Now}}], [{{{{'$1','$2'}},'$3'}}]}] ),
+  
+  lists:foldl(fun({{Task_Id0, Proc_Id0}=V, _Time0}, ok) ->
+     % send_to_rabbitmq({call_timer, Proc_Id0, Task_Id0}),
+     % позначаємо запис видаленим в СУБД
+     % postgres("UPDATE events_table SET deleted = 1"
+     %   " WHERE task_id = $1, proc_id = $2 LIMIT 1", [Task_Id0, Proc_Id0]),
+     
+     ets:delete(events_table, V),
+     io:format("tick: ~p ~p~n", [Task_Id0, Proc_Id0]),
+     ok
+    end, ok, L),
+  
+  erlang:send_after(10, self(), check_time_process),
+  {noreply, State};
+
+
+handle_info(load_data_from_db, State) ->
+  %ets:fun2ms(fun({{A,B},C}) -> true end).
+  %[{{{'$1','$2'},'$3'},[],[true]}]
+  
+  %ets:select_count(events_table, [{{{'$1','$2'},'$3'},[],[true]}]).
+  %proplists:get_value('size', ets:info(events_table)).
+  %ets:foldl(fun({{_,_},_}, A) -> A + 1;(_, A) -> A end, 0, events_table).
+  
+  N0 = proplists:get_value('size', ets:info(events_table)),
+  N = ?N_COUNT - N0,
+  % отримуємо записи з СУБД
+  % L = postgres("SELECT task_id, proc_id, time FROM events_table"
+  %       " WHERE deleted IS NULL ORDER BY time DESC LIMIT $1", [N]),
+  
+  % додаємо отримані записи для обробки в ets
+  %lists:foldl(fun({{_Task_Id0, _Proc_Id0}, _Time0}=V, ok) ->
+  %   ets:insert(events_table, V),
+  %   ok
+  %  end, ok, L),
+  
+  
+  %erlang:send_after(10, self(), check_time_process),
+  self() ! check_time_process,
+  erlang:send_after(1000, self(), load_data_from_db),
+  {noreply, State};
+
+
+handle_info(_Request, State) ->
+  {noreply, State}.
+
+
+terminate(_Reason, _State) ->
+  ok.
+
+code_change(_OldVsn, State, _Extra) -> 
+  {ok, State}.
+
+

+ 1 - 1
apps/myapp/src/myapp_sup.erl

@@ -8,7 +8,7 @@ start_link() ->
   supervisor:start_link({local, ?MODULE}, ?MODULE, []).
   supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 
 init(_Args) ->
 init(_Args) ->
-  ets:new(events_table, [set, named_table, { keypos, 1 }, public]),
+  ets:new(events_table, [set, named_table, { keypos, 1 }, public, {write_concurrency, true}]),
   
   
   Procs = [
   Procs = [
     #{id => worker_1,
     #{id => worker_1,