Browse Source

Convert pg_sock() to opaque type.

Сергей Прохоров 7 years ago
parent
commit
7cc76e980f
2 changed files with 71 additions and 39 deletions
  1. 46 30
      src/epgsql_command.erl
  2. 25 9
      src/epgsql_sock.erl

+ 46 - 30
src/epgsql_command.erl

@@ -3,6 +3,7 @@
 %%% Copyright (C) 2017 - Sergey Prokhorov.  All rights reserved.
 
 -module(epgsql_command).
+-export([init/2, execute/3, handle_message/5]).
 
 -export_type([command/0]).
 
@@ -12,38 +13,53 @@
 %% Initialize command's state. Called when command is received by epgsql_sock process.
 -callback init(any()) -> state().
 
+-type execute_return() ::
+        {ok, epgsql_sock:pg_sock(), state()}
+      | {stop, Reason :: any(), Response :: any(), epgsql_sock:pg_sock()}.
 %% Execute command. It should send commands to socket.
 %% May be called many times if 'handle_message' will return 'requeue'.
--callback execute(epgsql_sock:pg_sock(), state()) -> Reply when
-      Reply :: {ok, epgsql_sock:pg_sock(), state()}
-             | {stop, Reason :: any(), Response :: any(), epgsql_sock:pg_sock()}.
+-callback execute(epgsql_sock:pg_sock(), state()) -> execute_return().
 
+-type handle_message_return() ::
+        {noaction, epgsql_sock:pg_sock()}
+        %% Do nothing; remember changed state
+      | {noaction, epgsql_sock:pg_sock(), state()}
+        %% Add result to resultset (eg, `{ok, Count}' `{ok, Cols, Rows}', `{error, #error{}}'
+        %% It may be returned many times for eg, `squery' with multiple
+        %% queries separated by ';'
+        %% See epgsql_sock:get_results/1
+      | {add_result, Data :: any(), Notification :: any(), epgsql_sock:pg_sock(), state()}
+        %% Add new row to current resultset;
+        %% See epgsql_sock:get_rows/1
+      | {add_row, tuple(), epgsql_sock:pg_sock(), state()}
+        %% Finish command execution, reply to the client and go to next command
+      | {finish, Result :: any(), Notification :: any(), epgsql_sock:pg_sock()}
+        %% Stop `epgsql_sock' process
+      | {stop, Reason :: any(), Response :: any(), epgsql_sock:pg_sock()}
+        %% Call 'execute' and reschedule command.
+        %% It's forbidden to call epgsql_sock:send from `handle_message'.
+        %% If you need to do so, you should set some flag in state and
+        %% reschedule command.
+        %% See `epgsql_cmd_connect' for reference.
+      | {requeue, epgsql_sock:pg_sock(), state()}
+        %% Protocol synchronization error (eg, unexpected packet)
+        %% Drop command queue and don't accept any command except 'sync'
+      | {sync_required, Why :: any()}
+        %% Unknown packet. Terminate `epgsql_sock' process
+      | unknown.
 %% Handle incoming packet
 -callback handle_message(Type :: byte(), Payload :: binary() | epgsql:query_error(),
-                         epgsql_sock:pg_sock(), state()) -> Reply when
-      Reply :: {noaction, epgsql_sock:pg_sock()}
-               %% Do nothing; remember changed state
-             | {noaction, epgsql_sock:pg_sock(), state()}
-               %% Add result to resultset (eg, `{ok, Count}' `{ok, Cols, Rows}', `{error, #error{}}'
-               %% It may be returned many times for eg, `squery' with multiple
-               %% queries separated by ';'
-               %% See epgsql_sock:get_results/1
-             | {add_result, Data :: any(), Notification :: any(), epgsql_sock:pg_sock(), state()}
-               %% Add new row to current resultset;
-               %% See epgsql_sock:get_rows/1
-             | {add_row, tuple(), epgsql_sock:pg_sock(), state()}
-               %% Finish command execution, reply to the client and go to next command
-             | {finish, Result :: any(), Notification :: any(), epgsql_sock:pg_sock()}
-               %% Stop `epgsql_sock' process
-             | {stop, Reason :: any(), Response :: any(), epgsql_sock:pg_sock()}
-               %% Call 'execute' and reschedule command.
-               %% It's forbidden to call epgsql_sock:send from `handle_message'.
-               %% If you need to do so, you should set some flag in state and
-               %% reschedule command.
-               %% See `epgsql_cmd_connect' for reference.
-             | {requeue, epgsql_sock:pg_sock(), state()}
-               %% Protocol synchronization error (eg, unexpected packet)
-               %% Drop command queue and don't accept any command except 'sync'
-             | {sync_required, Why :: any()}
-               %% Unknown packet. Terminate `epgsql_sock' process
-             | unknown.
+                         epgsql_sock:pg_sock(), state()) -> handle_message_return().
+
+-spec init(command(), any()) -> state().
+init(Command, Args) ->
+    Command:init(Args).
+
+-spec execute(command(), epgsql_sock:pg_sock(), state()) -> execute_return().
+execute(Command, PgSock, CmdState) ->
+    Command:execute(PgSock, CmdState).
+
+-spec handle_message(command(), Type :: byte(), Payload :: binary() | epgsql:query_error(),
+                     epgsql_sock:pg_sock(), state()) -> handle_message_return().
+handle_message(Command, Type, Payload, PgSock, State) ->
+    Command:handle_message(Type, Payload, PgSock, State).

+ 25 - 9
src/epgsql_sock.erl

@@ -118,14 +118,17 @@ cancel(S) ->
 %% send()
 %% send_many()
 
+-spec set_net_socket(gen_tcp | ssl, gen_tcp:socket() | ssl:sslsocket(), pg_sock()) -> pg_sock().
 set_net_socket(Mod, Socket, State) ->
     State1 = State#state{mod = Mod, sock = Socket},
     setopts(State1, [{active, true}]),
     State1.
 
+-spec init_replication_state(pg_sock()) -> pg_sock().
 init_replication_state(State) ->
     State#state{repl = #repl{}}.
 
+-spec set_attr(atom(), any(), pg_sock()) -> pg_sock().
 set_attr(backend, {_Pid, _Key} = Backend, State) ->
     State#state{backend = Backend};
 set_attr(async, Async, State) ->
@@ -140,21 +143,27 @@ set_attr(replication_state, Value, State) ->
     State#state{repl = Value}.
 
 %% XXX: be careful!
+-spec set_packet_handler(atom(), pg_sock()) -> pg_sock().
 set_packet_handler(Handler, State) ->
     State#state{handler = Handler}.
 
+-spec get_codec(pg_sock()) -> epgsql_binary:codec().
 get_codec(#state{codec = Codec}) ->
     Codec.
 
+-spec get_replication_state(pg_sock()) -> #repl{}.
 get_replication_state(#state{repl = Repl}) ->
     Repl.
 
+-spec get_rows(pg_sock()) -> [tuple()].
 get_rows(#state{rows = Rows}) ->
     lists:reverse(Rows).
 
+-spec get_results(pg_sock()) -> [any()].
 get_results(#state{results = Results}) ->
     lists:reverse(Results).
 
+-spec get_parameter_internal(binary(), pg_sock()) -> binary() | undefined.
 get_parameter_internal(Name, #state{parameters = Parameters}) ->
     case lists:keysearch(Name, 1, Parameters) of
         {value, {Name, Value}} -> Value;
@@ -241,18 +250,18 @@ code_change(_OldVsn, State, _Extra) ->
 
 %% -- internal functions --
 
--spec command_new(transport(), epgsql_command:command(), any(), #state{}) ->
+-spec command_new(transport(), epgsql_command:command(), any(), pg_sock()) ->
                          Result when
-      Result :: {noreply, #state{}}
-              | {stop, Reason :: any(), #state{}}.
+      Result :: {noreply, pg_sock()}
+              | {stop, Reason :: any(), pg_sock()}.
 command_new(Transport, Command, Args, State) ->
-    CmdState = Command:init(Args),
+    CmdState = epgsql_command:init(Command, Args),
     command_exec(Transport, Command, CmdState, State).
 
--spec command_exec(transport(), epgsql_command:command(), any(), #state{}) ->
+-spec command_exec(transport(), epgsql_command:command(), any(), pg_sock()) ->
                           Result when
-      Result :: {noreply, #state{}}
-              | {stop, Reason :: any(), #state{}}.
+      Result :: {noreply, pg_sock()}
+              | {stop, Reason :: any(), pg_sock()}.
 command_exec(Transport, Command, _, State = #state{sync_required = true})
   when Command /= epgsql_cmd_sync ->
     {noreply,
@@ -260,7 +269,7 @@ command_exec(Transport, Command, _, State = #state{sync_required = true})
                         current_cmd_transport = Transport},
             {error, sync_required})};
 command_exec(Transport, Command, CmdState, State) ->
-    case Command:execute(State, CmdState) of
+    case epgsql_command:execute(Command, State, CmdState) of
         {ok, State1, CmdState1} ->
             {noreply, command_enqueue(Transport, Command, CmdState1, State1)};
         {stop, StopReason, Response, State1} ->
@@ -268,6 +277,7 @@ command_exec(Transport, Command, CmdState, State) ->
             {stop, StopReason, State1}
     end.
 
+-spec command_enqueue(transport(), epgsql_command:command(), epgsql_command:state(), pg_sock()) -> pg_sock().
 command_enqueue(Transport, Command, CmdState, #state{current_cmd = undefined} = State) ->
     State#state{current_cmd = Command,
                 current_cmd_state = CmdState,
@@ -277,10 +287,13 @@ command_enqueue(Transport, Command, CmdState, #state{queue = Q} = State) ->
     State#state{queue = queue:in({Command, CmdState, Transport}, Q),
                 complete_status = undefined}.
 
+-spec command_handle_message(byte(), binary() | epgsql:query_error(), pg_sock()) ->
+                                    {noreply, pg_sock()}
+                                  | {stop, any(), pg_sock()}.
 command_handle_message(Msg, Payload,
                        #state{current_cmd = Command,
                               current_cmd_state = CmdState} = State) ->
-    case Command:handle_message(Msg, Payload, State, CmdState) of
+    case epgsql_command:handle_message(Command, Msg, Payload, State, CmdState) of
         {add_row, Row, State1, CmdState1} ->
             {noreply, add_row(State1#state{current_cmd_state = CmdState1}, Row)};
         {add_result, Result, Notice, State1, CmdState1} ->
@@ -331,12 +344,15 @@ setopts(#state{mod = Mod, sock = Sock}, Opts) ->
         ssl     -> ssl:setopts(Sock, Opts)
     end.
 
+-spec send(pg_sock(), iodata()) -> ok | {error, any()}.
 send(#state{mod = Mod, sock = Sock}, Data) ->
     do_send(Mod, Sock, epgsql_wire:encode(Data)).
 
+-spec send(pg_sock(), byte(), iodata()) -> ok | {error, any()}.
 send(#state{mod = Mod, sock = Sock}, Type, Data) ->
     do_send(Mod, Sock, epgsql_wire:encode(Type, Data)).
 
+-spec send_multi(pg_sock(), [{byte(), iodata()}]) -> ok | {error, any()}.
 send_multi(#state{mod = Mod, sock = Sock}, List) ->
     do_send(Mod, Sock, lists:map(fun({Type, Data}) ->
         epgsql_wire:encode(Type, Data)