epgsql_pool_worker.erl 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. -module(epgsql_pool_worker).
  2. -behaviour(gen_server).
  3. -export([start_link/1, stop/1]).
  4. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
  5. -include("epgsql_pool.hrl").
  6. -include("otp_types.hrl").
  7. -record(state, {pool_name :: atom(),
  8. connection :: #epgsql_connection{},
  9. keep_alive_query_ref :: reference(), % ref to async keep-alive query to DB
  10. send_keep_alive_timer :: reference(), % timer to send keep-alive query to DB
  11. no_reply_keep_alive_timer :: reference() % timer to wait for reply from DB
  12. }).
  13. %% Module API
  14. -spec start_link(epgsql_pool:pool_name()) -> gs_start_link_reply().
  15. start_link(PoolName0) ->
  16. PoolName = epgsql_pool_utils:pool_name_to_atom(PoolName0),
  17. gen_server:start_link(?MODULE, PoolName, []).
  18. -spec stop(pid()) -> ok.
  19. stop(Pid) ->
  20. gen_server:call(Pid, stop).
  21. %%% gen_server API
  22. -spec init(gs_args()) -> gs_init_reply().
  23. init(PoolName) ->
  24. process_flag(trap_exit, true),
  25. <<A:32, B:32, C:32>> = crypto:rand_bytes(12),
  26. random:seed({A,B,C}),
  27. self() ! open_connection,
  28. {ok, #state{pool_name = PoolName,
  29. send_keep_alive_timer = make_ref(), % no need to check for undefined in cancel_timer
  30. no_reply_keep_alive_timer = make_ref()}}.
  31. -spec handle_call(gs_request(), gs_from(), gs_reply()) -> gs_call_reply().
  32. handle_call(stop, _From, #state{connection = Connection,
  33. send_keep_alive_timer = Send_KA_Timer,
  34. no_reply_keep_alive_timer = NR_KA_Timer} = State) ->
  35. Connection2 = epgsql_pool_utils:close_connection(Connection),
  36. erlang:cancel_timer(Send_KA_Timer),
  37. erlang:cancel_timer(NR_KA_Timer),
  38. {stop, normal, ok, State#state{connection = Connection2}};
  39. handle_call(_, _From, #state{connection = undefined} = State) ->
  40. {reply, {error, no_connection}, State};
  41. handle_call(_, _From, #state{connection = #epgsql_connection{sock = undefined}} = State) ->
  42. {reply, {error, reconnecting}, State};
  43. handle_call({equery, Stmt, Params}, _From,
  44. #state{connection = #epgsql_connection{sock = Sock}} = State) ->
  45. Reply = case process_info(Sock, status) of
  46. undefined -> {error, reconnecting};
  47. {status, _} -> epgsql:equery(Sock, Stmt, Params)
  48. end,
  49. {reply, Reply, State};
  50. handle_call({squery, Stmt}, _From,
  51. #state{connection = #epgsql_connection{sock = Sock}} = State) ->
  52. Reply = case process_info(Sock, status) of
  53. undefined -> {error, reconnecting};
  54. {status, _} -> epgsql:squery(Sock, Stmt)
  55. end,
  56. {reply, Reply, State};
  57. handle_call(cancel, _From, #state{connection = #epgsql_connection{sock = Sock}} = State) ->
  58. epgsql:cancel(Sock),
  59. {reply, ok, State};
  60. handle_call(Message, _From, State) ->
  61. error_logger:error_msg("unknown call ~p in ~p ~n", [Message, ?MODULE]),
  62. {reply, ok, State}.
  63. -spec handle_cast(gs_request(), gs_state()) -> gs_cast_reply().
  64. handle_cast(Message, State) ->
  65. error_logger:error_msg("unknown cast ~p in ~p ~n", [Message, ?MODULE]),
  66. {noreply, State}.
  67. -spec handle_info(gs_request(), gs_state()) -> gs_info_reply().
  68. handle_info(open_connection, #state{pool_name = PoolName, connection = Connection,
  69. send_keep_alive_timer = Send_KA_Timer} = State) ->
  70. case epgsql_pool_utils:open_connection(PoolName, Connection) of
  71. {ok, Connection2} ->
  72. KeepAliveTimeout = epgsql_pool_settings:get(keep_alive_timeout),
  73. erlang:cancel_timer(Send_KA_Timer),
  74. Send_KA_Timer2 = erlang:send_after(KeepAliveTimeout, self(), keep_alive),
  75. {noreply, State#state{connection = Connection2, send_keep_alive_timer = Send_KA_Timer2}};
  76. {error, Reason, Connection3} ->
  77. error_logger:error_msg("Pool:~p, Worker:~p could not to connect to DB:~p", [PoolName, self(), Reason]),
  78. Connection4 = epgsql_pool_utils:reconnect(Connection3),
  79. {noreply, State#state{connection = Connection4}}
  80. end;
  81. handle_info(keep_alive, #state{connection = #epgsql_connection{sock = undefined}} = State) ->
  82. do_nothing,
  83. {noreply, State};
  84. handle_info(keep_alive, #state{connection = #epgsql_connection{sock = Sock},
  85. no_reply_keep_alive_timer = NR_KA_Timer} = State) ->
  86. %% send async keep-alive query to DB
  87. KA_Ref = epgsqli:squery(Sock, <<"SELECT 1">>),
  88. KeepAliveTimeout = epgsql_pool_settings:get(keep_alive_timeout),
  89. erlang:cancel_timer(NR_KA_Timer),
  90. NR_KA_Timer2 = erlang:send_after(KeepAliveTimeout * 2, self(), no_reply_to_keep_alive),
  91. {noreply, State#state{keep_alive_query_ref = KA_Ref, no_reply_keep_alive_timer = NR_KA_Timer2}};
  92. handle_info({_Pid, Ref, done}, #state{keep_alive_query_ref = Ref,
  93. send_keep_alive_timer = Send_KA_Timer,
  94. no_reply_keep_alive_timer = NR_KA_Timer} = State) ->
  95. %% got reply to asycn keep-alive query from DB
  96. KeepAliveTimeout = epgsql_pool_settings:get(keep_alive_timeout),
  97. erlang:cancel_timer(Send_KA_Timer),
  98. erlang:cancel_timer(NR_KA_Timer),
  99. Send_KA_Timer2 = erlang:send_after(KeepAliveTimeout, self(), keep_alive),
  100. {noreply, State#state{send_keep_alive_timer = Send_KA_Timer2}};
  101. handle_info({_Pid, Ref, _Reply}, #state{keep_alive_query_ref = Ref} = State) ->
  102. do_nothing,
  103. {noreply, State};
  104. handle_info(no_reply_to_keep_alive, #state{connection = Connection,
  105. send_keep_alive_timer = Send_KA_Timer,
  106. no_reply_keep_alive_timer = NR_KA_Timer} = State) ->
  107. %% no reply to asycn keep-alive query from DB
  108. error_logger:error_msg("DB Connection, no_reply_to_keep_alive"),
  109. erlang:cancel_timer(Send_KA_Timer),
  110. erlang:cancel_timer(NR_KA_Timer),
  111. Connection2 = epgsql_pool_utils:close_connection(Connection),
  112. Connection3 = epgsql_pool_utils:reconnect(Connection2),
  113. {noreply, State#state{connection = Connection3}};
  114. handle_info({'EXIT', _Sock, normal},
  115. #state{connection = #epgsql_connection{sock = undefined}} = State) ->
  116. do_nothing,
  117. {noreply, State};
  118. handle_info({'EXIT', Sock, Reason},
  119. #state{connection = #epgsql_connection{sock = Sock} = Connection} = State) ->
  120. error_logger:error_msg("DB Connection ~p EXIT with reason: ~p", [Sock, Reason]),
  121. Connection2 = epgsql_pool_utils:close_connection(Connection),
  122. Connection3 = epgsql_pool_utils:reconnect(Connection2),
  123. {noreply, State#state{connection = Connection3}};
  124. handle_info(Message, State) ->
  125. error_logger:error_msg("unknown info ~p in ~p ~n", [Message, ?MODULE]),
  126. {noreply, State}.
  127. -spec terminate(terminate_reason(), gs_state()) -> ok.
  128. terminate(_Reason, _State) ->
  129. ok.
  130. -spec code_change(term(), term(), term()) -> gs_code_change_reply().
  131. code_change(_OldVsn, State, _Extra) ->
  132. {ok, State}.