123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177 |
- -module(epgsql_pool_worker).
- -behaviour(gen_server).
- -export([start_link/1, stop/1]).
- -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
- -include("epgsql_pool.hrl").
- -include("otp_types.hrl").
- -record(state, {pool_name :: atom(),
- connection :: #epgsql_connection{} | undefined,
- keep_alive_query_ref :: reference() | undefined, % ref to async keep-alive query to DB
- send_keep_alive_timer :: reference(), % timer to send keep-alive query to DB
- no_reply_keep_alive_timer :: reference() % timer to wait for reply from DB
- }).
- %% Module API
- -spec start_link(epgsql_pool:pool_name()) -> gs_start_link_reply().
- start_link(PoolName0) ->
- PoolName = epgsql_pool_utils:pool_name_to_atom(PoolName0),
- gen_server:start_link(?MODULE, PoolName, []).
- -spec stop(pid()) -> ok.
- stop(Pid) ->
- gen_server:call(Pid, stop).
- %%% gen_server API
- -spec init(gs_args()) -> gs_init_reply().
- init(PoolName) ->
- process_flag(trap_exit, true),
- herd_rand:init_crypto(),
- self() ! open_connection,
- {ok, #state{pool_name = PoolName,
- send_keep_alive_timer = make_ref(), % no need to check for undefined in cancel_timer
- no_reply_keep_alive_timer = make_ref()}}.
- -spec handle_call(gs_request(), gs_from(), gs_reply()) -> gs_call_reply().
- handle_call(stop, _From, #state{connection = Connection,
- send_keep_alive_timer = Send_KA_Timer,
- no_reply_keep_alive_timer = NR_KA_Timer} = State) ->
- Connection2 = epgsql_pool_utils:close_connection(Connection),
- erlang:cancel_timer(Send_KA_Timer),
- erlang:cancel_timer(NR_KA_Timer),
- {stop, normal, ok, State#state{connection = Connection2}};
- handle_call(_, _From, #state{connection = undefined} = State) ->
- {reply, {error, no_connection}, State};
- handle_call(_, _From, #state{connection = #epgsql_connection{sock = undefined}} = State) ->
- {reply, {error, no_connection}, State};
- handle_call(get_sock, _From,
- #state{connection = #epgsql_connection{sock = Sock}} = State) ->
- {reply, Sock, State};
- handle_call({equery, Stmt, Params}, _From,
- #state{connection = #epgsql_connection{sock = Sock}} = State) ->
- Reply = case process_info(Sock, status) of
- undefined -> {error, no_connection};
- {status, _} -> epgsql:equery(Sock, Stmt, Params)
- end,
- {reply, Reply, State};
- handle_call({squery, Stmt}, _From,
- #state{connection = #epgsql_connection{sock = Sock}} = State) ->
- Reply = case process_info(Sock, status) of
- undefined -> {error, no_connection};
- {status, _} -> epgsql:squery(Sock, Stmt)
- end,
- {reply, Reply, State};
- handle_call(cancel, _From, #state{connection = #epgsql_connection{sock = Sock}} = State) ->
- epgsql:cancel(Sock),
- {reply, ok, State};
- handle_call(Message, _From, State) ->
- error_logger:error_msg("unknown call ~p in ~p ~n", [Message, ?MODULE]),
- {reply, ok, State}.
- -spec handle_cast(gs_request(), gs_state()) -> gs_cast_reply().
- handle_cast(Message, State) ->
- error_logger:error_msg("unknown cast ~p in ~p ~n", [Message, ?MODULE]),
- {noreply, State}.
- -spec handle_info(gs_request(), gs_state()) -> gs_info_reply().
- handle_info(open_connection, #state{pool_name = PoolName, connection = Connection,
- send_keep_alive_timer = Send_KA_Timer} = State) ->
- case epgsql_pool_utils:open_connection(PoolName, Connection) of
- {ok, Connection2} ->
- {ok, KeepAliveTimeout} = application:get_env(epgsql_pool, keep_alive_timeout),
- erlang:cancel_timer(Send_KA_Timer),
- Send_KA_Timer2 = erlang:send_after(KeepAliveTimeout, self(), keep_alive),
- {noreply, State#state{connection = Connection2, send_keep_alive_timer = Send_KA_Timer2}};
- {error, Reason, Connection3} ->
- error_logger:error_msg("Pool:~p, Worker:~p could not to connect to DB:~p", [PoolName, self(), Reason]),
- Connection4 = epgsql_pool_utils:reconnect(Connection3),
- {noreply, State#state{connection = Connection4}}
- end;
- handle_info(keep_alive, #state{connection = #epgsql_connection{sock = undefined}} = State) ->
- do_nothing,
- {noreply, State};
- handle_info(keep_alive, #state{connection = #epgsql_connection{sock = Sock},
- no_reply_keep_alive_timer = NR_KA_Timer} = State) ->
- %% send async keep-alive query to DB
- KA_Ref = epgsqli:squery(Sock, "SELECT 1"),
- {ok, QueryTimeout} = application:get_env(epgsql_pool, query_timeout),
- erlang:cancel_timer(NR_KA_Timer),
- NR_KA_Timer2 = erlang:send_after(QueryTimeout, self(), no_reply_to_keep_alive),
- {noreply, State#state{keep_alive_query_ref = KA_Ref, no_reply_keep_alive_timer = NR_KA_Timer2}};
- handle_info({_Pid, Ref, done}, #state{keep_alive_query_ref = Ref,
- send_keep_alive_timer = Send_KA_Timer,
- no_reply_keep_alive_timer = NR_KA_Timer} = State) ->
- %% got reply to asycn keep-alive query from DB
- {ok, KeepAliveTimeout} = application:get_env(epgsql_pool, keep_alive_timeout),
- erlang:cancel_timer(Send_KA_Timer),
- erlang:cancel_timer(NR_KA_Timer),
- Send_KA_Timer2 = erlang:send_after(KeepAliveTimeout, self(), keep_alive),
- {noreply, State#state{send_keep_alive_timer = Send_KA_Timer2}};
- handle_info({_Pid, Ref, _Reply}, #state{keep_alive_query_ref = Ref} = State) ->
- do_nothing,
- {noreply, State};
- handle_info(no_reply_to_keep_alive, #state{connection = Connection,
- send_keep_alive_timer = Send_KA_Timer,
- no_reply_keep_alive_timer = NR_KA_Timer} = State) ->
- %% no reply to asycn keep-alive query from DB
- error_logger:error_msg("DB Connection, no_reply_to_keep_alive"),
- erlang:cancel_timer(Send_KA_Timer),
- erlang:cancel_timer(NR_KA_Timer),
- Connection2 = epgsql_pool_utils:close_connection(Connection),
- Connection3 = epgsql_pool_utils:reconnect(Connection2),
- {noreply, State#state{connection = Connection3}};
- handle_info({'EXIT', _Sock, normal},
- #state{connection = #epgsql_connection{sock = undefined}} = State) ->
- do_nothing,
- {noreply, State};
- handle_info({'EXIT', Sock, Reason},
- #state{connection = #epgsql_connection{sock = Sock} = Connection} = State) ->
- error_logger:error_msg("DB Connection ~p~nEXIT with reason:~p", [Connection, Reason]),
- Connection2 = epgsql_pool_utils:close_connection(Connection),
- Connection3 = epgsql_pool_utils:reconnect(Connection2),
- {noreply, State#state{connection = Connection3}};
- handle_info({'EXIT', _Sock, econnrefused},
- #state{connection = #epgsql_connection{sock = undefined}} = State) ->
- %% reconnect is already running, do nothing
- {noreply, State};
- handle_info(Message, State) ->
- error_logger:error_msg("unknown info ~p in ~p ~n", [Message, ?MODULE]),
- {noreply, State}.
- -spec terminate(terminate_reason(), gs_state()) -> ok.
- terminate(_Reason, _State) ->
- ok.
- -spec code_change(term(), term(), term()) -> gs_code_change_reply().
- code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
|