epgsql_pool_worker.erl 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. -module(epgsql_pool_worker).
  2. -behaviour(gen_server).
  3. -export([start_link/1, stop/1]).
  4. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
  5. -include("epgsql_pool.hrl").
  6. -include("otp_types.hrl").
  7. -record(state, {pool_name :: atom(),
  8. connection :: #epgsql_connection{} | undefined,
  9. keep_alive_query_ref :: reference() | undefined, % ref to async keep-alive query to DB
  10. send_keep_alive_timer :: reference(), % timer to send keep-alive query to DB
  11. no_reply_keep_alive_timer :: reference() % timer to wait for reply from DB
  12. }).
  13. %% Module API
  14. -spec start_link(epgsql_pool:pool_name()) -> gs_start_link_reply().
  15. start_link(PoolName0) ->
  16. PoolName = epgsql_pool_utils:pool_name_to_atom(PoolName0),
  17. gen_server:start_link(?MODULE, PoolName, []).
  18. -spec stop(pid()) -> ok.
  19. stop(Pid) ->
  20. gen_server:call(Pid, stop).
  21. %%% gen_server API
  22. -spec init(gs_args()) -> gs_init_reply().
  23. init(PoolName) ->
  24. process_flag(trap_exit, true),
  25. herd_rand:init_crypto(),
  26. self() ! open_connection,
  27. {ok, #state{pool_name = PoolName,
  28. send_keep_alive_timer = make_ref(), % no need to check for undefined in cancel_timer
  29. no_reply_keep_alive_timer = make_ref()}}.
  30. -spec handle_call(gs_request(), gs_from(), gs_reply()) -> gs_call_reply().
  31. handle_call(stop, _From, #state{connection = Connection,
  32. send_keep_alive_timer = Send_KA_Timer,
  33. no_reply_keep_alive_timer = NR_KA_Timer} = State) ->
  34. Connection2 = epgsql_pool_utils:close_connection(Connection),
  35. erlang:cancel_timer(Send_KA_Timer),
  36. erlang:cancel_timer(NR_KA_Timer),
  37. {stop, normal, ok, State#state{connection = Connection2}};
  38. handle_call({set_async_receiver, ReceiverPid}, _From, #state{connection = #epgsql_connection{sock = Sock}} = State) ->
  39. epgsql:set_notice_receiver(Sock, ReceiverPid),
  40. {reply, ok, State};
  41. handle_call(_, _From, #state{connection = undefined} = State) ->
  42. {reply, {error, no_connection}, State};
  43. handle_call(_, _From, #state{connection = #epgsql_connection{sock = undefined}} = State) ->
  44. {reply, {error, no_connection}, State};
  45. handle_call(get_sock, _From,
  46. #state{connection = #epgsql_connection{sock = Sock}} = State) ->
  47. {reply, Sock, State};
  48. handle_call({equery, Stmt, Params}, _From,
  49. #state{connection = #epgsql_connection{sock = Sock}} = State) ->
  50. Reply = case process_info(Sock, status) of
  51. undefined -> {error, no_connection};
  52. {status, _} -> epgsql:equery(Sock, Stmt, Params)
  53. end,
  54. {reply, Reply, State};
  55. handle_call({squery, Stmt}, _From,
  56. #state{connection = #epgsql_connection{sock = Sock}} = State) ->
  57. Reply = case process_info(Sock, status) of
  58. undefined -> {error, no_connection};
  59. {status, _} -> epgsql:squery(Sock, Stmt)
  60. end,
  61. {reply, Reply, State};
  62. handle_call(cancel, _From, #state{connection = #epgsql_connection{sock = Sock}} = State) ->
  63. epgsql:cancel(Sock),
  64. {reply, ok, State};
  65. handle_call(Message, _From, State) ->
  66. error_logger:error_msg("unknown call ~p in ~p ~n", [Message, ?MODULE]),
  67. {reply, ok, State}.
  68. -spec handle_cast(gs_request(), gs_state()) -> gs_cast_reply().
  69. handle_cast(Message, State) ->
  70. error_logger:error_msg("unknown cast ~p in ~p ~n", [Message, ?MODULE]),
  71. {noreply, State}.
  72. -spec handle_info(gs_request(), gs_state()) -> gs_info_reply().
  73. handle_info(open_connection, #state{pool_name = PoolName, connection = Connection,
  74. send_keep_alive_timer = Send_KA_Timer} = State) ->
  75. case epgsql_pool_utils:open_connection(PoolName, Connection) of
  76. {ok, Connection2} ->
  77. {ok, KeepAliveTimeout} = application:get_env(epgsql_pool, keep_alive_timeout),
  78. erlang:cancel_timer(Send_KA_Timer),
  79. Send_KA_Timer2 = erlang:send_after(KeepAliveTimeout, self(), keep_alive),
  80. self() ! on_connect,
  81. {noreply, State#state{connection = Connection2, send_keep_alive_timer = Send_KA_Timer2}};
  82. {error, Reason, Connection3} ->
  83. error_logger:error_msg("Pool:~p, Worker:~p could not to connect to DB:~p", [PoolName, self(), Reason]),
  84. Connection4 = epgsql_pool_utils:reconnect(Connection3),
  85. {noreply, State#state{connection = Connection4}}
  86. end;
  87. handle_info(on_connect, #state{pool_name = PoolName} = State) ->
  88. case application:get_env(epgsql_pool, connect_listener) of
  89. undefined -> ignore;
  90. {ok, undefined} -> ignore;
  91. {ok, Listener} ->
  92. Listener ! {epgsql_connect, PoolName, self()}
  93. end,
  94. {noreply, State};
  95. handle_info(on_disconnect, #state{pool_name = PoolName} = State) ->
  96. case application:get_env(epgsql_pool, disconnect_listener) of
  97. undefined -> ignore;
  98. {ok, undefined} -> ignore;
  99. {ok, Listener} ->
  100. Listener ! {epgsql_disconnect, PoolName}
  101. end,
  102. {noreply, State};
  103. handle_info(keep_alive, #state{connection = #epgsql_connection{sock = undefined}} = State) ->
  104. do_nothing,
  105. {noreply, State};
  106. handle_info(keep_alive, #state{connection = #epgsql_connection{sock = Sock},
  107. no_reply_keep_alive_timer = NR_KA_Timer} = State) ->
  108. %% send async keep-alive query to DB
  109. KA_Ref = epgsqli:squery(Sock, "SELECT 1"),
  110. {ok, QueryTimeout} = application:get_env(epgsql_pool, query_timeout),
  111. erlang:cancel_timer(NR_KA_Timer),
  112. NR_KA_Timer2 = erlang:send_after(QueryTimeout, self(), no_reply_to_keep_alive),
  113. {noreply, State#state{keep_alive_query_ref = KA_Ref, no_reply_keep_alive_timer = NR_KA_Timer2}};
  114. handle_info({_Pid, Ref, done}, #state{keep_alive_query_ref = Ref,
  115. send_keep_alive_timer = Send_KA_Timer,
  116. no_reply_keep_alive_timer = NR_KA_Timer} = State) ->
  117. %% got reply to asycn keep-alive query from DB
  118. {ok, KeepAliveTimeout} = application:get_env(epgsql_pool, keep_alive_timeout),
  119. erlang:cancel_timer(Send_KA_Timer),
  120. erlang:cancel_timer(NR_KA_Timer),
  121. Send_KA_Timer2 = erlang:send_after(KeepAliveTimeout, self(), keep_alive),
  122. {noreply, State#state{send_keep_alive_timer = Send_KA_Timer2}};
  123. handle_info({_Pid, Ref, _Reply}, #state{keep_alive_query_ref = Ref} = State) ->
  124. do_nothing,
  125. {noreply, State};
  126. handle_info(no_reply_to_keep_alive, #state{connection = Connection,
  127. send_keep_alive_timer = Send_KA_Timer,
  128. no_reply_keep_alive_timer = NR_KA_Timer} = State) ->
  129. %% no reply to asycn keep-alive query from DB
  130. error_logger:error_msg("DB Connection, no_reply_to_keep_alive"),
  131. erlang:cancel_timer(Send_KA_Timer),
  132. erlang:cancel_timer(NR_KA_Timer),
  133. Connection2 = epgsql_pool_utils:close_connection(Connection),
  134. Connection3 = epgsql_pool_utils:reconnect(Connection2),
  135. self() ! on_disconnect,
  136. {noreply, State#state{connection = Connection3}};
  137. handle_info({'EXIT', _Sock, normal},
  138. #state{connection = #epgsql_connection{sock = undefined}} = State) ->
  139. do_nothing,
  140. {noreply, State};
  141. handle_info({'EXIT', Sock, Reason},
  142. #state{connection = #epgsql_connection{sock = Sock} = Connection} = State) ->
  143. error_logger:error_msg("DB Connection ~p~nEXIT with reason:~p", [Connection, Reason]),
  144. Connection2 = epgsql_pool_utils:close_connection(Connection),
  145. Connection3 = epgsql_pool_utils:reconnect(Connection2),
  146. self() ! on_disconnect,
  147. {noreply, State#state{connection = Connection3}};
  148. handle_info({'EXIT', _Sock, econnrefused},
  149. #state{connection = #epgsql_connection{sock = undefined}} = State) ->
  150. %% reconnect is already running, do nothing
  151. {noreply, State};
  152. handle_info(Message, State) ->
  153. error_logger:error_msg("unknown info ~p in ~p ~n", [Message, ?MODULE]),
  154. {noreply, State}.
  155. -spec terminate(terminate_reason(), gs_state()) -> ok.
  156. terminate(_Reason, _State) ->
  157. ok.
  158. -spec code_change(term(), term(), term()) -> gs_code_change_reply().
  159. code_change(_OldVsn, State, _Extra) ->
  160. {ok, State}.