Browse Source

Merge pull request #244 from seriyps/pipelined-and-mixed-api-tests

Pipelined and mixed api tests
Sergey Prokhorov 4 years ago
parent
commit
be1c05d320
5 changed files with 143 additions and 20 deletions
  1. 1 1
      Makefile
  2. 3 1
      README.md
  3. 127 10
      test/epgsql_SUITE.erl
  4. 6 4
      test/epgsql_cast.erl
  5. 6 4
      test/epgsql_incremental.erl

+ 1 - 1
Makefile

@@ -21,7 +21,7 @@ src/epgsql_errcodes.erl:
 	./generate_errcodes_src.sh > src/epgsql_errcodes.erl
 
 common-test:
-	$(REBAR) ct -v -c
+	$(REBAR) ct --readable true -c
 
 eunit:
 	$(REBAR) eunit -c

+ 3 - 1
README.md

@@ -37,8 +37,10 @@ of the protocol feature that allows faster execution.
   - **epgsql** maintains backwards compatibility with the original driver API
   - **epgsqla** delivers complete results as regular erlang messages
   - **epgsqli** delivers results as messages incrementally (row by row)
+  All API interfaces can be used with the same connection: eg, connection opened with `epgsql`
+  can be queried with `epgsql` / `epgsqla` / `epgsqli` in any combinations.
 - internal queue of client requests, so you don't need to wait for the response
-  to send the next request
+  to send the next request (pipelining)
 - single process to hold driver state and receive socket data
 - execution of several parsed statements as a batch
 - binding timestamps in `erlang:now()` format

+ 127 - 10
test/epgsql_SUITE.erl

@@ -31,7 +31,7 @@ all() ->
     [{group, M} || M <- modules()].
 
 groups() ->
-    Groups = [
+    SubGroups = [
         {connect, [parrallel], [
             connect,
             connect_with_application_name,
@@ -73,8 +73,13 @@ groups() ->
             custom_types,
             custom_null
         ]},
+        {pipelining, [parallel], [
+            pipelined_prepared_query,
+            pipelined_parse_batch_execute
+        ]},
         {generic, [parallel], [
-            with_transaction
+            with_transaction,
+            mixed_api
         ]}
     ],
 
@@ -136,12 +141,10 @@ groups() ->
         set_notice_receiver,
         get_cmd_status
     ],
-    Groups ++ [case Module of
-                   epgsql ->
-                       {Module, [], [{group, generic} | Tests]};
-                   _ ->
-                       {Module, [], Tests}
-               end || Module <- modules()].
+    SubGroups ++
+        [{epgsql, [], [{group, generic} | Tests]},
+         {epgsql_cast, [], [{group, pipelining} | Tests]},
+         {epgsql_incremental, [], Tests}].
 
 end_per_suite(_Config) ->
     ok.
@@ -436,11 +439,13 @@ connect_to_closed_port(Config) ->
 prepared_query(Config) ->
     Module = ?config(module, Config),
     epgsql_ct:with_connection(Config, fun(C) ->
-        {ok, _} = Module:parse(C, "inc", "select $1+1", []),
+        {ok, Stmt} = Module:parse(C, "inc", "select $1+1", []),
         {ok, Cols, [{5}]} = Module:prepared_query(C, "inc", [4]),
         {ok, Cols, [{2}]} = Module:prepared_query(C, "inc", [1]),
         {ok, Cols, [{23}]} = Module:prepared_query(C, "inc", [22]),
-        {error, _} = Module:prepared_query(C, "non_existent_query", [4])
+        {ok, Cols, [{34}]} = Module:prepared_query(C, Stmt, [33]),
+        {error, #error{codename = invalid_sql_statement_name}} =
+            Module:prepared_query(C, "non_existent_query", [4])
     end).
 
 select(Config) ->
@@ -1478,10 +1483,122 @@ with_transaction(Config) ->
                    C, fun(_) -> error(my_err) end, []))
       end, []).
 
+%% @doc Mixing all 3 API interfaces with same connection
+mixed_api(Config) ->
+    epgsql = ?config(module, Config),
+    epgsql_ct:with_connection(
+      Config,
+      fun(C) ->
+              {ok, Stmt} = epgsql:parse(
+                             C, "SELECT id, $1::text AS val FROM generate_series(1, 5) AS t(id)"),
+              ABindRef = epgsqla:bind(C, Stmt, "a_portal", [<<"epgsqla">>]),
+              IBindRef = epgsqli:bind(C, Stmt, "i_portal", [<<"epgsqli">>]),
+              AExecute1Ref = epgsqla:execute(C, Stmt, "a_portal", 3),
+              IExecute1Ref = epgsqli:execute(C, Stmt, "i_portal", 3),
+              ?assertEqual({partial, [{4, <<"epgsqla">>}]},
+                           epgsql:execute(C, Stmt, "a_portal", 1)),
+              ?assertEqual({partial, [{4, <<"epgsqli">>}]},
+                           epgsql:execute(C, Stmt, "i_portal", 1)),
+              %% by the time epgsql:execute returns, we should already have all the asynchronous
+              %% responses in our message queue (epgsql:execute uses selective receive),
+              %% but let's try to run some more finalizers.
+              %% Note: we are calling epgsqla on i_portal and epgsqli on a_portal!
+              AExecute2Ref = epgsqla:execute(C, Stmt, "i_portal", 0),
+              IExecute2Ref = epgsqli:execute(C, Stmt, "a_portal", 0),
+              ok = epgsql:close(C, Stmt),
+              ?assertEqual(
+                 [{C, ABindRef, ok},
+                  {C, IBindRef, ok},
+                  {C, AExecute1Ref, {partial, [{1, <<"epgsqla">>},
+                                               {2, <<"epgsqla">>},
+                                               {3, <<"epgsqla">>}
+                                              ]}},
+                  {C, IExecute1Ref, {data, {1, <<"epgsqli">>}}},
+                  {C, IExecute1Ref, {data, {2, <<"epgsqli">>}}},
+                  {C, IExecute1Ref, {data, {3, <<"epgsqli">>}}},
+                  {C, IExecute1Ref, suspended},
+                  {C, AExecute2Ref, {ok, [{5, <<"epgsqli">>}]}},
+                  {C, IExecute2Ref, {data, {5, <<"epgsqla">>}}},
+                  {C, IExecute2Ref, {complete, select}}],
+                 receive_for_conn(C, 10, 1000))
+      end).
+
+pipelined_prepared_query(Config) ->
+    epgsql_cast = ?config(module, Config),
+    epgsql_ct:with_connection(
+      Config,
+      fun(C) ->
+              {ok, #statement{types = Types} = Stmt} =
+                  epgsql_cast:parse(C, "SELECT $1::integer as c1, 'hello' as c2"),
+              Refs = [{epgsqla:prepared_query(C, Stmt, lists:zip(Types, [I])), I}
+                      || I <- lists:seq(1, 10)],
+              Timer = erlang:send_after(5000, self(), timeout),
+              [receive
+                   {C, Ref, {ok, Columns, Rows}} ->
+                       ?assertMatch([#column{name = <<"c1">>, type = int4},
+                                     #column{name = <<"c2">>, type = text}], Columns),
+                       ?assertEqual([{I, <<"hello">>}], Rows);
+                   Other ->
+                       %% We expect responses in the same order as we send requests
+                       error({unexpected_message, Other})
+               end || {Ref, I} <- Refs],
+              erlang:cancel_timer(Timer)
+      end).
+
+pipelined_parse_batch_execute(Config) ->
+    epgsql_cast = ?config(module, Config),
+    epgsql_ct:with_connection(
+      Config,
+      fun(C) ->
+              ParseRefs =
+                  [begin
+                       Name = io_lib:format("stmt_~w", [I]),
+                       {epgsqla:parse(C, Name,
+                                      io_lib:format("SELECT $1 AS in, ~w00 AS out", [I]),
+                                      [int4]),
+                        I}
+                   end || I <- lists:seq(1, 5)],
+              Timer = erlang:send_after(5000, self(), timeout),
+              Batch =
+                  [receive
+                       {C, Ref, {ok, #statement{columns = Cols} = Stmt}} ->
+                           ?assertMatch([#column{name = <<"in">>, type = int4},
+                                         #column{name = <<"out">>}],
+                                        Cols),
+                           {Stmt, [I]};
+                       Other ->
+                           error({unexpected_message, Other})
+                   end || {Ref, I} <- ParseRefs],
+              ?assertMatch([{ok, [{1, 100}]},
+                            {ok, [{2, 200}]},
+                            {ok, [{3, 300}]},
+                            {ok, [{4, 400}]},
+                            {ok, [{5, 500}]}],
+                           epgsql:execute_batch(C, Batch)),
+              CloseRefs = [epgsqla:close(C, Stmt) || {Stmt, _} <- Batch],
+              [receive
+                   {C, Ref, ok} ->
+                       ok;
+                   Other ->
+                       error({unexpected_message, Other})
+               end || Ref <- CloseRefs],
+              erlang:cancel_timer(Timer)
+      end).
 %% =============================================================================
 %% Internal functions
 %% ============================================================================
 
+receive_for_conn(_, 0, _) -> [];
+receive_for_conn(C, N, Timeout) ->
+    receive
+        {C, _, _} = Msg ->
+            [Msg | receive_for_conn(C, N - 1, Timeout)];
+        Other ->
+            error({unexpected_msg, Other})
+    after Timeout ->
+            error({timeout, {remaining_msgs, N}})
+    end.
+
 get_type_col(Type) ->
     "c_" ++ atom_to_list(Type).
 

+ 6 - 4
test/epgsql_cast.erl

@@ -76,12 +76,14 @@ equery(C, Sql, Parameters) ->
             Error
     end.
 
+prepared_query(C, #statement{types = Types} = Stmt, Parameters) ->
+    TypedParameters = lists:zip(Types, Parameters),
+    Ref = epgsqla:prepared_query(C, Stmt, TypedParameters),
+    receive_result(C, Ref);
 prepared_query(C, Name, Parameters) ->
     case describe(C, statement, Name) of
-        {ok, #statement{types = Types} = S} ->
-            Typed_Parameters = lists:zip(Types, Parameters),
-            Ref = epgsqla:prepared_query(C, S, Typed_Parameters),
-            receive_result(C, Ref);
+        {ok, S} ->
+            prepared_query(C, S, Parameters);
         Error ->
             Error
     end.

+ 6 - 4
test/epgsql_incremental.erl

@@ -76,12 +76,14 @@ equery(C, Sql, Parameters) ->
             Error
     end.
 
+prepared_query(C, #statement{types = Types} = Stmt, Parameters) ->
+    TypedParameters = lists:zip(Types, Parameters),
+    Ref = epgsqli:prepared_query(C, Stmt, TypedParameters),
+    receive_result(C, Ref, undefined);
 prepared_query(C, Name, Parameters) ->
     case describe(C, statement, Name) of
-        {ok, #statement{types = Types} = S} ->
-            Typed_Parameters = lists:zip(Types, Parameters),
-            Ref = epgsqli:prepared_query(C, S, Typed_Parameters),
-            receive_result(C, Ref, undefined);
+        {ok, S} ->
+            prepared_query(C, S, Parameters);
         Error ->
             Error
     end.