Browse Source

trying with db

221V 3 years ago
parent
commit
9fc7d4cd33
10 changed files with 204 additions and 41 deletions
  1. 2 1
      .gitignore
  2. 9 0
      Emakefile
  3. 3 1
      Makefile
  4. 43 0
      README.md
  5. 2 2
      apps/myapp/ebin/myapp.app
  6. 16 15
      apps/myapp/src/myapp_events3.erl
  7. 22 0
      apps/myapp/src/myapp_sup.erl
  8. 48 0
      apps/myapp/src/pgs.erl
  9. 48 21
      apps/myapp/src/test.erl
  10. 11 1
      sys.config

+ 2 - 1
.gitignore

@@ -1,2 +1,3 @@
 /apps/myapp/ebin/*.beam
 /apps/myapp/ebin/*.beam
-
+/deps/*
+!/deps/ebin/*.app

+ 9 - 0
Emakefile

@@ -1,2 +1,11 @@
 
 
+{["deps/epgsql/src/*", "deps/epgsql/src/commands/*", "deps/epgsql/src/datatypes/*"],
+  [{i, "deps/epgsql/include"}, {outdir, "deps/ebin"}]}.
+
+{"deps/pooler/src/*", [{i, "deps/pooler/src"}, {outdir, "deps/ebin"}]}.
+
+{"deps/herd/src/*", [{outdir, "deps/ebin"}]}.
+
+{"deps/epgsql_pool/src/*", [{i, "deps/epgsql_pool/include"}, {outdir, "deps/ebin"}]}.
+
 {"apps/myapp/src/*", [debug_info, {outdir, "apps/myapp/ebin"}]}.
 {"apps/myapp/src/*", [debug_info, {outdir, "apps/myapp/ebin"}]}.

+ 3 - 1
Makefile

@@ -3,12 +3,14 @@ default: compile
 
 
 clean:
 clean:
 	rm apps/myapp/ebin/*.beam
 	rm apps/myapp/ebin/*.beam
+	rm deps/ebin/*.beam
 
 
 compile:
 compile:
 	ERL_LIBS="apps:deps" erl -make;
 	ERL_LIBS="apps:deps" erl -make;
 
 
 start:
 start:
-	ERL_LIBS="apps:deps" erl +pc unicode -args_file vm.args -config sys.config -eval 'application:start(myapp)'
+#	ERL_LIBS="apps:deps" erl +pc unicode -args_file vm.args -config sys.config -eval 'application:start(myapp)'
+	ERL_LIBS="apps:deps" erl +pc unicode -args_file vm.args -config sys.config -eval 'application:ensure_all_started(myapp)'
 
 
 
 
 .PHONY: compile clean start
 .PHONY: compile clean start

+ 43 - 0
README.md

@@ -142,3 +142,46 @@ ok
 "the end"
 "the end"
 ```
 ```
 
 
+---
+
+postgresql 14
+```
+sudo -u postgres psql
+CREATE DATABASE test_database;
+CREATE USER test_user WITH password 'qwerty';
+GRANT ALL ON DATABASE test_database TO test_user;
+\q
+```
+```
+sudo vim /etc/postgresql/14/main/postgresql.conf
+
+max_connections = 200
+
+sudo service postgresql restart
+```
+```
+CREATE TABLE "events_table" (
+  "id" bigserial,
+  "task_id" varchar(255) NOT NULL,
+  "proc_id" integer NOT NULL,
+  "time" integer NOT NULL,
+  "deleted" smallint DEFAULT NULL,
+  --active: NULL, deleted: 1
+  PRIMARY KEY ("id")
+);
+CREATE UNIQUE INDEX events_table_ids_uniq ON events_table (task_id, proc_id);
+ALTER TABLE events_table ADD CONSTRAINT events_table_ids_uniq UNIQUE USING INDEX events_table_ids_uniq;
+CREATE INDEX events_table_deleted_idx ON events_table (deleted);
+
+-- truncate events_table;
+-- ALTER SEQUENCE events_table_id_seq RESTART WITH 1;
+```
+
+### deps
+```
+https://github.com/epgsql/epgsql/tree/4.5.0
+https://github.com/seth/pooler/tree/9c28fb479f9329e2a1644565a632bc222780f1b7
+https://github.com/wgnet/herd/tree/1.3.4
+https://github.com/wgnet/epgsql_pool/tree/1.4.4
+```
+

+ 2 - 2
apps/myapp/ebin/myapp.app

@@ -3,8 +3,8 @@
   {description, "test erlang application"},
   {description, "test erlang application"},
   {vsn, "0.0.1"},
   {vsn, "0.0.1"},
   {registered, [myapp_sup]},
   {registered, [myapp_sup]},
-  {applications, [kernel, stdlib]},
+  {applications, [kernel, stdlib, crypto, asn1, public_key, ssl, epgsql, pooler, epgsql_pool]},
-  {modules, [myapp, myapp_sup, myapp_events, myapp_events2, myapp_events3, test]},
+  {modules, [myapp, myapp_sup, myapp_events, myapp_events2, myapp_events3, pgs, test]},
   {mod, { myapp, []}},
   {mod, { myapp, []}},
   {env, []}
   {env, []}
  ]}.
  ]}.

+ 16 - 15
apps/myapp/src/myapp_events3.erl

@@ -20,16 +20,16 @@ when ?IS_STRING(Task_Id), is_integer(Proc_Id), ((Type =:= static) orelse (Type =
       Time
       Time
   end,
   end,
   % додаємо запис в СУБД
   % додаємо запис в СУБД
-  % postgres("INSERT INTO events_table (task_id, proc_id, time) VALUES ($1, $2, $3)"
+  1 = pgs:in_up_del("INSERT INTO events_table (task_id, proc_id, time) VALUES ($1, $2, $3)"
-  %   " ON CONFLICT (unique_key) UPDATE SET time = $4", [Task_Id, Proc_Id, Time2, Time2]),
+    " ON CONFLICT ON CONSTRAINT events_table_ids_uniq DO UPDATE SET time = $4, deleted = NULL",
+    [Task_Id, Proc_Id, Time2, Time2]),
   
   
   ok.
   ok.
 
 
 delete_task(Task_Id, Proc_Id)
 delete_task(Task_Id, Proc_Id)
 when ?IS_STRING(Task_Id), is_integer(Proc_Id) ->
 when ?IS_STRING(Task_Id), is_integer(Proc_Id) ->
   % позначаємо запис видаленим в СУБД
   % позначаємо запис видаленим в СУБД
-  % postgres("UPDATE events_table SET deleted = 1"
+  1 = pgs:in_up_del("UPDATE events_table SET deleted = 1 WHERE task_id = $1 AND proc_id = $2", [Task_Id, Proc_Id]),
-  %   " WHERE task_id = $1, proc_id = $2 LIMIT 1", [Task_Id, Proc_Id]),
   
   
   % видаляємо зі State (якщо є)
   % видаляємо зі State (якщо є)
   gen_server:cast(?MODULE, {delete_task, {Task_Id, Proc_Id}}),
   gen_server:cast(?MODULE, {delete_task, {Task_Id, Proc_Id}}),
@@ -51,10 +51,10 @@ handle_call(_Req, _From, State) ->
   {reply, not_handled, State}.
   {reply, not_handled, State}.
 
 
 
 
-handle_cast({delete_task, {Task_Id, Proc_Id}=V}, State) ->
+handle_cast({delete_task, {_Task_Id, _Proc_Id}=V}, State) ->
   % видаляємо зі State (якщо є)
   % видаляємо зі State (якщо є)
   ets:delete(events_table, V),
   ets:delete(events_table, V),
-  io:format("deleted: ~p ~p~n", [Task_Id, Proc_Id]),
+  %io:format("deleted: ~p ~p~n", [Task_Id, Proc_Id]),
   {noreply, State};
   {noreply, State};
 
 
 
 
@@ -72,12 +72,13 @@ handle_info(check_time_process, State) ->
   
   
   lists:foldl(fun({{Task_Id0, Proc_Id0}=V, _Time0}, ok) ->
   lists:foldl(fun({{Task_Id0, Proc_Id0}=V, _Time0}, ok) ->
      % send_to_rabbitmq({call_timer, Proc_Id0, Task_Id0}),
      % send_to_rabbitmq({call_timer, Proc_Id0, Task_Id0}),
+     
      % позначаємо запис видаленим в СУБД
      % позначаємо запис видаленим в СУБД
-     % postgres("UPDATE events_table SET deleted = 1"
+     1 = pgs:in_up_del("UPDATE events_table SET deleted = 1"
-     %   " WHERE task_id = $1, proc_id = $2 LIMIT 1", [Task_Id0, Proc_Id0]),
+       " WHERE task_id = $1, proc_id = $2 LIMIT 1", [Task_Id0, Proc_Id0]),
      
      
      ets:delete(events_table, V),
      ets:delete(events_table, V),
-     io:format("tick: ~p ~p~n", [Task_Id0, Proc_Id0]),
+     %io:format("tick: ~p ~p~n", [Task_Id0, Proc_Id0]),
      ok
      ok
     end, ok, L),
     end, ok, L),
   
   
@@ -99,14 +100,14 @@ handle_info(load_data_from_db, State) ->
   N0 = proplists:get_value('size', ets:info(events_table)),
   N0 = proplists:get_value('size', ets:info(events_table)),
   N = ?N_COUNT - N0,
   N = ?N_COUNT - N0,
   % отримуємо записи з СУБД
   % отримуємо записи з СУБД
-  % L = postgres("SELECT task_id, proc_id, time FROM events_table"
+  L = pgs:select("SELECT task_id, proc_id, time FROM events_table"
-  %       " WHERE deleted IS NULL ORDER BY time DESC LIMIT $1", [N]),
+    " WHERE deleted IS NULL ORDER BY time DESC LIMIT $1", [N]),
   
   
   % додаємо отримані записи для обробки в ets
   % додаємо отримані записи для обробки в ets
-  %lists:foldl(fun({{_Task_Id0, _Proc_Id0}, _Time0}=V, ok) ->
+  lists:foldl(fun({Task_Id0, Proc_Id0, Time0}, ok) ->
-  %   ets:insert(events_table, V),
+     ets:insert(events_table, {{Task_Id0, Proc_Id0}, Time0}),
-  %   ok
+     ok
-  %  end, ok, L),
+    end, ok, L),
   
   
   
   
   %erlang:send_after(10, self(), check_time_process),
   %erlang:send_after(10, self(), check_time_process),

+ 22 - 0
apps/myapp/src/myapp_sup.erl

@@ -7,9 +7,31 @@
 start_link() ->
 start_link() ->
   supervisor:start_link({local, ?MODULE}, ?MODULE, []).
   supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 
+
+start_pool() ->
+  Params = #{host => application:get_env(myapp, pgs_host, "undefined"),
+    port => application:get_env(myapp, pgs_port, 1),
+    username => application:get_env(myapp, pgs_user, "undefined"),
+    password => application:get_env(myapp, pgs_pass, "undefined"),
+    database => application:get_env(myapp, pgs_db, "undefined")},
+  
+  case epgsql_pool:start(my_pool, 100, 100, Params) of
+    {ok, _} ->
+      io:format("~p~n", ["pg_pool start !!"]),
+      ok;
+    Z ->
+      io:format("Pool start err: ~p~n~p~n", ["err db connect", Z]),
+      err
+  end.
+
+
 init(_Args) ->
 init(_Args) ->
   ets:new(events_table, [set, named_table, { keypos, 1 }, public, {write_concurrency, true}]),
   ets:new(events_table, [set, named_table, { keypos, 1 }, public, {write_concurrency, true}]),
   
   
+  {ok, _} = application:ensure_all_started(epgsql_pool),
+  
+  spawn(fun() -> timer:sleep(1000), start_pool() end),
+  
   Procs = [
   Procs = [
     #{id => worker_1,
     #{id => worker_1,
       start => {myapp_events, start_link, []},
       start => {myapp_events, start_link, []},

+ 48 - 0
apps/myapp/src/pgs.erl

@@ -0,0 +1,48 @@
+-module(pgs).
+-compile([export_all, nowarn_export_all]).
+
+
+transaction(Fun) ->
+  case epgsql_pool:transaction(my_pool, Fun) of
+    ok ->
+      ok;
+    Error ->
+      io:format("transaction error: ~p~n in tr fun: ~p~n", [Error, Fun]),
+      Error
+  end.
+
+
+transaction_q(Worker, Q, A) ->
+  epgsql_pool:query(Worker, Q, A).
+
+
+select(Q,A) ->
+  case epgsql_pool:query(my_pool, Q, A) of
+    {ok,_,R} ->
+      R;
+    {error,E} ->
+      io:format("~p~n", [E]),
+      {error,E}
+  end.
+
+
+in_up_del(Q,A) ->
+  case epgsql_pool:query(my_pool, Q, A) of
+    {ok,R} ->
+      R;
+    {error,E} ->
+      io:format("~p~n", [E]),
+      {error,E}
+  end.
+
+
+returning(Q,A) ->
+  case epgsql_pool:query(my_pool, Q, A) of
+    {ok,1,_,R} ->
+      R;
+    {error,E} ->
+      io:format("~p~n", [E]),
+      {error,E}
+  end.
+
+

+ 48 - 21
apps/myapp/src/test.erl

@@ -1,7 +1,9 @@
 -module(test).
 -module(test).
 
 
--export([test/0, test2/0,
+-export([test/0,
-  testn/1, testn_b/1]).
+  %test2/0,
+  testn/1, testn_b/1,
+  test3/1]).%, test3_b/1]).
 
 
 
 
 test() ->
 test() ->
@@ -25,25 +27,25 @@ test() ->
   ok.
   ok.
 
 
 
 
-test2() ->
+%test2() ->
-  Timestamp_Now = erlang:system_time(second),
+%  Timestamp_Now = erlang:system_time(second),
-  L = lists:seq(1, 15),
+%  L = lists:seq(1, 15),
-  [myapp_events2:add_task(integer_to_list(X), X, dynamic, X + 10) || X <- L, X rem 2 =:= 0],
+%  [myapp_events2:add_task(integer_to_list(X), X, dynamic, X + 10) || X <- L, X rem 2 =:= 0],
-  [myapp_events2:add_task(integer_to_list(X), X, static, Timestamp_Now + X + 5) || X <- L, X rem 2 =:= 1],
+%  [myapp_events2:add_task(integer_to_list(X), X, static, Timestamp_Now + X + 5) || X <- L, X rem 2 =:= 1],
-  
+%  
-  spawn(fun() ->
+%  spawn(fun() ->
-    timer:sleep(3000),
+%    timer:sleep(3000),
-    
+%    
-    myapp_events2:add_task("12", 12, dynamic, 50),
+%    myapp_events2:add_task("12", 12, dynamic, 50),
-    myapp_events2:add_task("13", 13, static, Timestamp_Now + 80),
+%    myapp_events2:add_task("13", 13, static, Timestamp_Now + 80),
-    
+%    
-    myapp_events2:delete_task("9", 9),
+%    myapp_events2:delete_task("9", 9),
-    myapp_events2:delete_task("14", 14),
+%    myapp_events2:delete_task("14", 14),
-    
+%    
-    ok
+%    ok
-  end),
+%  end),
-  
+%  
-  ok.
+%  ok.
 
 
 
 
 testn(0) -> io:format("~p~n",["the end"]);
 testn(0) -> io:format("~p~n",["the end"]);
@@ -100,4 +102,29 @@ testn_h2(_) ->
   ok.
   ok.
 
 
 
 
+test3(0) -> io:format("~p~n",["the end"]);
+test3(T) -> % T -- умовно секунди - кількість циклів по 20_000 нових заявок та 20_000 видалень
+  spawn(fun() ->
+    timer:sleep(1000),
+    test3(T - 1)
+  end),
+  
+  spawn(fun() ->
+    T1 = erlang:system_time(millisecond),
+    test3_h(T rem 2 =:= 0),
+    T2 = erlang:system_time(millisecond),
+    io:format("~p~n",[T2 - T1]),
+    ok
+  end),
+  ok.
+
+test3_h(true) ->
+  [myapp_events3:add_task(integer_to_list(X), X, dynamic, 5) || X <- lists:seq(1, 20_000)],
+  [myapp_events3:delete_task(integer_to_list(X), X) || X <- lists:seq(10_001, 30_000)],
+  ok;
+test3_h(_) ->
+  Timestamp_Now = erlang:system_time(second),
+  [myapp_events3:add_task(integer_to_list(X) ++ ".", X, static, Timestamp_Now + 5) || X <- lists:seq(20_001, 40_000)],
+  [myapp_events3:delete_task(integer_to_list(X) ++ ".", X) || X <- lists:seq(30_001, 50_000)],
+  ok.
 
 

+ 11 - 1
sys.config

@@ -1,3 +1,13 @@
-[
+[{myapp, [
+  {pgs_host, "localhost"},
+  {pgs_port, 5432},
+  {pgs_user, "test_user"},
+  {pgs_pass, "qwerty"},
+  {pgs_db, "test_database"} ]},
+ 
+ {epgsql_pool, [
+  {connection_timeout, 10000},
+  {query_timeout, 3000},
+  {transaction_timeout, 10000} ]}
 ].
 ].