Skip to content

Commit

Permalink
Merge pull request #686 from benoitc/hotfix/gh683
Browse files Browse the repository at this point in the history
make checkout synchronous again
  • Loading branch information
benoitc authored May 20, 2021
2 parents 6e79b2b + 592a007 commit f876781
Showing 1 changed file with 16 additions and 32 deletions.
48 changes: 16 additions & 32 deletions src/hackney_pool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -61,31 +61,10 @@ start() ->
%% @doc fetch a socket from the pool
checkout(Host, Port, Transport, Client) ->
Requester = self(),
Ref = make_ref(),
Fun =
fun() ->
Result =
try
do_checkout(Requester, Host, Port, Transport, Client)
catch _:_ ->
{error, checkout_failure}
end,
case is_process_alive(Requester) of
true ->
Requester ! {checkout, Ref, Result};
false ->
case Result of
{ok, {_Name, ConnRef, Connection, Owner, Transport}, Socket} ->
gen_server:call(Owner, {checkin, ConnRef, Connection, Socket, Transport}, infinity);
_Error ->
ok
end
end
end,
_ = spawn(Fun),
receive
{checkout, Ref, Result} ->
Result
try
do_checkout(Requester, Host, Port, Transport, Client)
catch _:_ ->
{error, checkout_failure}
end.

do_checkout(Requester, Host, _Port, Transport, #client{options=Opts,
Expand All @@ -104,8 +83,6 @@ do_checkout(Requester, Host, _Port, Transport, #client{options=Opts,
?report_debug("reuse a connection", [{pool, PoolName}]),
_ = metrics:update_meter(Metrics, [hackney_pool, PoolName, take_rate], 1),
_ = metrics:increment_counter(Metrics, [hackney_pool, Host, reuse_connection]),


{ok, {PoolName, RequestRef, Connection, Owner, Transport}, Socket};
{error, no_socket, Owner} ->
?report_trace("no socket in the pool", [{pool, PoolName}]),
Expand Down Expand Up @@ -324,7 +301,7 @@ handle_call({checkout, Dest, Requester, RequestRef}, From, State) ->
end
end;
handle_call({checkin, Ref, Dest, Socket, Transport}, From, State) ->
gen_server:reply(From, ok),
reply_if_alive(From, ok),
Clients2 = case dict:find(Ref, State#state.clients) of
{ok, Dest} ->
dict:erase(Ref, State#state.clients);
Expand Down Expand Up @@ -412,7 +389,7 @@ dequeue(Dest, Ref, State) ->
_ = metrics:update_histogram(
State#state.metrics, [hackney_pool, State#state.name, queue_count], dict:size(Pending2)
),
gen_server:reply(From, {error, no_socket, self()}),
reply_if_alive(From, {error, no_socket, self()}),
State2 = State#state{queues = Queues2, clients = Clients2, pending=Pending2},
monitor_client(Dest, Ref2, State2)
end.
Expand Down Expand Up @@ -545,18 +522,17 @@ deliver_socket(Socket, Connection, State) ->
),
case hackney_connection:controlling_process(Connection, Socket, PidWaiter) of
ok ->
gen_server:reply(FromWaiter, {ok, Socket, self()}),
reply_if_alive(FromWaiter, {ok, Socket, self()}),
monitor_client(Connection, Ref, State#state{queues = Queues2, pending=Pending2});
_Error ->
% Something wrong, close the socket
_ = (catch hackney_connection:close(Connection, Socket)),
%% and let the waiter connect to a new one
gen_server:reply(FromWaiter, {error, no_socket, self()}),
reply_if_alive(FromWaiter, {error, no_socket, self()}),
State#state{queues = Queues2, pending = Pending2}
end
end.


add_pending(Ref, From, Connection, Requester, Pending) ->
dict:store(Ref, {From, Connection, Requester}, Pending).

Expand Down Expand Up @@ -635,3 +611,11 @@ handle_stats(State) ->
{in_use_count, dict:size(Clients)},
{free_count, dict:size(Sockets)},
{queue_count, dict:size(Pending)}].

reply_if_alive({Pid, _Tag}=To, Msg) ->
case is_process_alive(Pid) of
true ->
gen_server:reply(To, Msg);
false ->
ok
end.

0 comments on commit f876781

Please sign in to comment.