epgsql_pool_worker.erl 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. -module(epgsql_pool_worker).
  2. -behaviour(gen_server).
  3. -export([start_link/1, stop/1]).
  4. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
  5. -include("epgsql_pool.hrl").
  6. -include("otp_types.hrl").
  7. -record(state, {pool_name :: atom(),
  8. connection :: #epgsql_connection{} | undefined,
  9. keep_alive_query_ref :: reference() | undefined, % ref to async keep-alive query to DB
  10. send_keep_alive_timer :: reference(), % timer to send keep-alive query to DB
  11. no_reply_keep_alive_timer :: reference() % timer to wait for reply from DB
  12. }).
  13. %% Module API
  14. -spec start_link(epgsql_pool:pool_name()) -> gs_start_link_reply().
  15. start_link(PoolName0) ->
  16. PoolName = epgsql_pool_utils:pool_name_to_atom(PoolName0),
  17. gen_server:start_link(?MODULE, PoolName, []).
  18. -spec stop(pid()) -> ok.
  19. stop(Pid) ->
  20. gen_server:call(Pid, stop).
  21. %%% gen_server API
  22. -spec init(gs_args()) -> gs_init_reply().
  23. init(PoolName) ->
  24. process_flag(trap_exit, true),
  25. herd_rand:init_crypto(),
  26. self() ! open_connection,
  27. {ok, #state{pool_name = PoolName,
  28. send_keep_alive_timer = make_ref(), % no need to check for undefined in cancel_timer
  29. no_reply_keep_alive_timer = make_ref()}}.
  30. -spec handle_call(gs_request(), gs_from(), gs_reply()) -> gs_call_reply().
  31. handle_call(stop, _From, #state{connection = Connection,
  32. send_keep_alive_timer = Send_KA_Timer,
  33. no_reply_keep_alive_timer = NR_KA_Timer} = State) ->
  34. Connection2 = epgsql_pool_utils:close_connection(Connection),
  35. erlang:cancel_timer(Send_KA_Timer),
  36. erlang:cancel_timer(NR_KA_Timer),
  37. {stop, normal, ok, State#state{connection = Connection2}};
  38. handle_call(_, _From, #state{connection = undefined} = State) ->
  39. {reply, {error, no_connection}, State};
  40. handle_call(_, _From, #state{connection = #epgsql_connection{sock = undefined}} = State) ->
  41. {reply, {error, no_connection}, State};
  42. handle_call(get_sock, _From,
  43. #state{connection = #epgsql_connection{sock = Sock}} = State) ->
  44. {reply, Sock, State};
  45. handle_call({equery, Stmt, Params}, _From,
  46. #state{connection = #epgsql_connection{sock = Sock}} = State) ->
  47. Reply = case process_info(Sock, status) of
  48. undefined -> {error, no_connection};
  49. {status, _} -> epgsql:equery(Sock, Stmt, Params)
  50. end,
  51. {reply, Reply, State};
  52. handle_call({squery, Stmt}, _From,
  53. #state{connection = #epgsql_connection{sock = Sock}} = State) ->
  54. Reply = case process_info(Sock, status) of
  55. undefined -> {error, no_connection};
  56. {status, _} -> epgsql:squery(Sock, Stmt)
  57. end,
  58. {reply, Reply, State};
  59. handle_call(cancel, _From, #state{connection = #epgsql_connection{sock = Sock}} = State) ->
  60. epgsql:cancel(Sock),
  61. {reply, ok, State};
  62. handle_call(Message, _From, State) ->
  63. error_logger:error_msg("unknown call ~p in ~p ~n", [Message, ?MODULE]),
  64. {reply, ok, State}.
  65. -spec handle_cast(gs_request(), gs_state()) -> gs_cast_reply().
  66. handle_cast(Message, State) ->
  67. error_logger:error_msg("unknown cast ~p in ~p ~n", [Message, ?MODULE]),
  68. {noreply, State}.
  69. -spec handle_info(gs_request(), gs_state()) -> gs_info_reply().
  70. handle_info(open_connection, #state{pool_name = PoolName, connection = Connection,
  71. send_keep_alive_timer = Send_KA_Timer} = State) ->
  72. case epgsql_pool_utils:open_connection(PoolName, Connection) of
  73. {ok, Connection2} ->
  74. {ok, KeepAliveTimeout} = application:get_env(epgsql_pool, keep_alive_timeout),
  75. erlang:cancel_timer(Send_KA_Timer),
  76. Send_KA_Timer2 = erlang:send_after(KeepAliveTimeout, self(), keep_alive),
  77. {noreply, State#state{connection = Connection2, send_keep_alive_timer = Send_KA_Timer2}};
  78. {error, Reason, Connection3} ->
  79. error_logger:error_msg("Pool:~p, Worker:~p could not to connect to DB:~p", [PoolName, self(), Reason]),
  80. Connection4 = epgsql_pool_utils:reconnect(Connection3),
  81. {noreply, State#state{connection = Connection4}}
  82. end;
  83. handle_info(keep_alive, #state{connection = #epgsql_connection{sock = undefined}} = State) ->
  84. do_nothing,
  85. {noreply, State};
  86. handle_info(keep_alive, #state{connection = #epgsql_connection{sock = Sock},
  87. no_reply_keep_alive_timer = NR_KA_Timer} = State) ->
  88. %% send async keep-alive query to DB
  89. KA_Ref = epgsqli:squery(Sock, "SELECT 1"),
  90. {ok, QueryTimeout} = application:get_env(epgsql_pool, query_timeout),
  91. erlang:cancel_timer(NR_KA_Timer),
  92. NR_KA_Timer2 = erlang:send_after(QueryTimeout, self(), no_reply_to_keep_alive),
  93. {noreply, State#state{keep_alive_query_ref = KA_Ref, no_reply_keep_alive_timer = NR_KA_Timer2}};
  94. handle_info({_Pid, Ref, done}, #state{keep_alive_query_ref = Ref,
  95. send_keep_alive_timer = Send_KA_Timer,
  96. no_reply_keep_alive_timer = NR_KA_Timer} = State) ->
  97. %% got reply to asycn keep-alive query from DB
  98. {ok, KeepAliveTimeout} = application:get_env(epgsql_pool, keep_alive_timeout),
  99. erlang:cancel_timer(Send_KA_Timer),
  100. erlang:cancel_timer(NR_KA_Timer),
  101. Send_KA_Timer2 = erlang:send_after(KeepAliveTimeout, self(), keep_alive),
  102. {noreply, State#state{send_keep_alive_timer = Send_KA_Timer2}};
  103. handle_info({_Pid, Ref, _Reply}, #state{keep_alive_query_ref = Ref} = State) ->
  104. do_nothing,
  105. {noreply, State};
  106. handle_info(no_reply_to_keep_alive, #state{connection = Connection,
  107. send_keep_alive_timer = Send_KA_Timer,
  108. no_reply_keep_alive_timer = NR_KA_Timer} = State) ->
  109. %% no reply to asycn keep-alive query from DB
  110. error_logger:error_msg("DB Connection, no_reply_to_keep_alive"),
  111. erlang:cancel_timer(Send_KA_Timer),
  112. erlang:cancel_timer(NR_KA_Timer),
  113. Connection2 = epgsql_pool_utils:close_connection(Connection),
  114. Connection3 = epgsql_pool_utils:reconnect(Connection2),
  115. {noreply, State#state{connection = Connection3}};
  116. handle_info({'EXIT', _Sock, normal},
  117. #state{connection = #epgsql_connection{sock = undefined}} = State) ->
  118. do_nothing,
  119. {noreply, State};
  120. handle_info({'EXIT', Sock, Reason},
  121. #state{connection = #epgsql_connection{sock = Sock} = Connection} = State) ->
  122. error_logger:error_msg("DB Connection ~p~nEXIT with reason:~p", [Connection, Reason]),
  123. Connection2 = epgsql_pool_utils:close_connection(Connection),
  124. Connection3 = epgsql_pool_utils:reconnect(Connection2),
  125. {noreply, State#state{connection = Connection3}};
  126. handle_info({'EXIT', _Sock, econnrefused},
  127. #state{connection = #epgsql_connection{sock = undefined}} = State) ->
  128. %% reconnect is already running, do nothing
  129. {noreply, State};
  130. handle_info(Message, State) ->
  131. error_logger:error_msg("unknown info ~p in ~p ~n", [Message, ?MODULE]),
  132. {noreply, State}.
  133. -spec terminate(terminate_reason(), gs_state()) -> ok.
  134. terminate(_Reason, _State) ->
  135. ok.
  136. -spec code_change(term(), term(), term()) -> gs_code_change_reply().
  137. code_change(_OldVsn, State, _Extra) ->
  138. {ok, State}.