Skip to content

Commit

Permalink
Merge branch 'release/v0.7.4' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
general-CbIC committed Jul 23, 2023
2 parents 7a9e709 + 1b5015b commit d2165ed
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 88 deletions.
4 changes: 2 additions & 2 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
erlang 25.3.2.1
elixir 1.15.0-otp-25
erlang 26.0.2
elixir 1.15.1-otp-26
13 changes: 12 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.7.4] - 2023-07-23

### Fixed

- Fixed a bug where a restarted worker was not automatically dispatched to pending callers ([Issue](https://github.com/general-CbIC/poolex/issues/53) / [PR](https://github.com/general-CbIC/poolex/pull/54)).

### Changed

- Upgraded [ex_doc](https://hex.pm/packages/ex_doc) from `0.29.4` to `0.30.3`

## [0.7.3] - 2023-06-21

### Fixed
Expand Down Expand Up @@ -183,7 +193,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Supported main interface `Poolex.run/3` with `:timeout` option.

[unreleased]: https://github.com/general-CbIC/poolex/compare/v0.7.3...HEAD
[unreleased]: https://github.com/general-CbIC/poolex/compare/v0.7.4...HEAD
[0.7.4]: https://github.com/general-CbIC/poolex/compare/v0.7.3...v0.7.4
[0.7.3]: https://github.com/general-CbIC/poolex/compare/v0.7.2...v0.7.3
[0.7.2]: https://github.com/general-CbIC/poolex/compare/v0.7.1...v0.7.2
[0.7.1]: https://github.com/general-CbIC/poolex/compare/v0.7.0...v0.7.1
Expand Down
13 changes: 7 additions & 6 deletions docs/guides/migration-from-poolboy.cheatmd
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

If you are using `:poolboy` and want to use `Poolex` instead, then you need to follow three simple steps.

## I. Install the `Poolex` dependency
## Installation steps

### I. Install the Poolex dependency

#### mix.exs

Expand All @@ -27,7 +29,7 @@ Well, you can also clean up installed dependencies locally and remove them from
mix deps.clean --unlock --unused
```

## II. Update child specs
### II. Update child specs

#### Your Application or Supervisor file

Expand All @@ -40,19 +42,18 @@ def init(_args) do
- size: 100,
- max_overflow: 50
- )
+ Poolex.child_spec(
+ {Poolex,
+ pool_id: :some_pool,
+ worker_module: MyApp.SomeWorker,
+ workers_count: 100,
+ max_overflow: 50
+ )
+ max_overflow: 50}
]

Supervisor.init(children, strategy: :one_for_one)
end
```

## III. Update call site
### III. Update call site

Use `run!/3` to leave the same behavior.
If you want a safe interface with error handling, then use `run/3`.
Expand Down
178 changes: 106 additions & 72 deletions lib/poolex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ defmodule Poolex do
```elixir
children = [
Poolex.child_spec(
{Poolex,
pool_id: :worker_pool,
worker_module: SomeWorker,
workers_count: 5
)
workers_count: 5}
]
Supervisor.start_link(children, strategy: :one_for_one)
Expand Down Expand Up @@ -321,7 +320,8 @@ defmodule Poolex do
defp start_worker(%State{} = state) do
DynamicSupervisor.start_child(state.supervisor, %{
id: make_ref(),
start: {state.worker_module, state.worker_start_fun, state.worker_args}
start: {state.worker_module, state.worker_start_fun, state.worker_args},
restart: :temporary
})
end

Expand All @@ -336,16 +336,11 @@ defmodule Poolex do
if state.overflow < state.max_overflow do
{:ok, new_worker} = start_worker(state)

Monitoring.add(state.monitor_id, new_worker, :temporary_worker)
Monitoring.add(state.monitor_id, new_worker, :worker)

new_state = %State{
state
| busy_workers_state:
BusyWorkers.add(state.busy_workers_impl, state.busy_workers_state, new_worker),
overflow: state.overflow + 1
}
state = add_worker_to_busy_workers(state, new_worker)

{:reply, {:ok, new_worker}, new_state}
{:reply, {:ok, new_worker}, %State{state | overflow: state.overflow + 1}}
else
Monitoring.add(state.monitor_id, from_pid, :caller)

Expand All @@ -358,14 +353,9 @@ defmodule Poolex do
{idle_worker_pid, new_idle_workers_state} =
IdleWorkers.pop(state.idle_workers_impl, state.idle_workers_state)

new_busy_workers_state =
BusyWorkers.add(state.busy_workers_impl, state.busy_workers_state, idle_worker_pid)
state = add_worker_to_busy_workers(state, idle_worker_pid)

new_state = %State{
state
| idle_workers_state: new_idle_workers_state,
busy_workers_state: new_busy_workers_state
}
new_state = %State{state | idle_workers_state: new_idle_workers_state}

{:reply, {:ok, idle_worker_pid}, new_state}
end
Expand Down Expand Up @@ -407,23 +397,31 @@ defmodule Poolex do
end
end

@impl GenServer
def handle_info(
{:DOWN, monitoring_reference, _process, dead_process_pid, _reason},
%State{} = state
) do
case Monitoring.remove(state.monitor_id, monitoring_reference) do
:worker ->
{:noreply, handle_down_worker(state, dead_process_pid)}

:caller ->
{:noreply, handle_down_caller(state, dead_process_pid)}
end
end

@spec release_busy_worker(State.t(), worker()) :: State.t()
defp release_busy_worker(%State{} = state, worker) do
if BusyWorkers.member?(state.busy_workers_impl, state.busy_workers_state, worker) do
busy_workers_state =
BusyWorkers.remove(state.busy_workers_impl, state.busy_workers_state, worker)
state = remove_worker_from_busy_workers(state, worker)

if state.overflow > 0 do
stop_worker(state.supervisor, worker)

%State{state | busy_workers_state: busy_workers_state}
state
else
%State{
state
| busy_workers_state: busy_workers_state,
idle_workers_state:
IdleWorkers.add(state.idle_workers_impl, state.idle_workers_state, worker)
}
add_worker_to_idle_workers(state, worker)
end
else
state
Expand All @@ -440,61 +438,97 @@ defmodule Poolex do
%{state | waiting_callers_state: new_waiting_callers_state}
end

@impl GenServer
def handle_info(
{:DOWN, monitoring_reference, _process, dead_process_pid, _reason},
%State{} = state
) do
case Monitoring.remove(state.monitor_id, monitoring_reference) do
:temporary_worker ->
{:noreply,
%State{
state
| overflow: state.overflow - 1,
idle_workers_state:
IdleWorkers.remove(
state.idle_workers_impl,
state.idle_workers_state,
dead_process_pid
)
}}
@spec add_worker_to_busy_workers(State.t(), worker()) :: State.t()
defp add_worker_to_busy_workers(%State{} = state, worker) do
%State{
state
| busy_workers_state:
BusyWorkers.add(
state.busy_workers_impl,
state.busy_workers_state,
worker
)
}
end

:worker ->
{:ok, new_worker} = start_worker(state)
@spec add_worker_to_idle_workers(State.t(), worker()) :: State.t()
defp add_worker_to_idle_workers(%State{} = state, worker) do
%State{
state
| idle_workers_state:
IdleWorkers.add(
state.idle_workers_impl,
state.busy_workers_state,
worker
)
}
end

Monitoring.add(state.monitor_id, new_worker, :worker)
@spec remove_worker_from_busy_workers(State.t(), worker()) :: State.t()
defp remove_worker_from_busy_workers(%State{} = state, worker) do
%State{
state
| busy_workers_state:
BusyWorkers.remove(
state.busy_workers_impl,
state.busy_workers_state,
worker
)
}
end

temp_idle_workers_state =
IdleWorkers.remove(state.idle_workers_impl, state.idle_workers_state, dead_process_pid)
@spec remove_worker_from_idle_workers(State.t(), worker()) :: State.t()
defp remove_worker_from_idle_workers(%State{} = state, worker) do
%State{
state
| idle_workers_state:
IdleWorkers.remove(
state.idle_workers_impl,
state.idle_workers_state,
worker
)
}
end

new_idle_workers_state =
IdleWorkers.add(state.idle_workers_impl, temp_idle_workers_state, new_worker)
@spec handle_down_worker(State.t(), pid()) :: State.t()
defp handle_down_worker(%State{} = state, dead_process_pid) do
state =
state
|> remove_worker_from_idle_workers(dead_process_pid)
|> remove_worker_from_busy_workers(dead_process_pid)

state = %State{
state
| idle_workers_state: new_idle_workers_state,
busy_workers_state:
BusyWorkers.remove(
state.busy_workers_impl,
state.busy_workers_state,
dead_process_pid
)
}
if WaitingCallers.empty?(state.waiting_callers_impl, state.waiting_callers_state) do
if state.overflow > 0 do
%State{state | overflow: state.overflow - 1}
else
{:ok, new_worker} = start_worker(state)

{:noreply, state}
Monitoring.add(state.monitor_id, new_worker, :worker)

:caller ->
new_waiting_callers_state =
WaitingCallers.remove_by_pid(
state.waiting_callers_impl,
state.waiting_callers_state,
dead_process_pid
)
add_worker_to_idle_workers(state, new_worker)
end
else
{:ok, new_worker} = start_worker(state)
Monitoring.add(state.monitor_id, new_worker, :worker)

{:noreply, %{state | waiting_callers_state: new_waiting_callers_state}}
state
|> add_worker_to_busy_workers(new_worker)
|> provide_worker_to_waiting_caller(new_worker)
end
end

@spec handle_down_caller(State.t(), pid()) :: State.t()
defp handle_down_caller(%State{} = state, dead_process_pid) do
new_waiting_callers_state =
WaitingCallers.remove_by_pid(
state.waiting_callers_impl,
state.waiting_callers_state,
dead_process_pid
)

%State{state | waiting_callers_state: new_waiting_callers_state}
end

@impl GenServer
def terminate(reason, %State{} = state) do
DynamicSupervisor.stop(state.supervisor, reason)
Expand Down
2 changes: 1 addition & 1 deletion lib/poolex/monitoring.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Poolex.Monitoring do
@moduledoc false
@type monitor_id() :: atom() | reference()
@type kind_of_process() :: :worker | :caller | :temporary_worker
@type kind_of_process() :: :worker | :caller

@spec init(Poolex.pool_id()) :: {:ok, monitor_id()}
@doc false
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ defmodule Poolex.MixProject do
package: package(),
source_url: "https://github.com/general-CbIC/poolex",
start_permanent: Mix.env() == :prod,
version: "0.7.3"
version: "0.7.4"
]
end

Expand Down
10 changes: 5 additions & 5 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
"decimal": {:hex, :decimal, "2.0.0", "a78296e617b0f5dd4c6caf57c714431347912ffb1d0842e998e9792b5642d697", [:mix], [], "hexpm", "34666e9c55dea81013e77d9d87370fe6cb6291d1ef32f46a1600230b1d44f577"},
"dialyxir": {:hex, :dialyxir, "1.3.0", "fd1672f0922b7648ff9ce7b1b26fcf0ef56dda964a459892ad15f6b4410b5284", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "00b2a4bcd6aa8db9dcb0b38c1225b7277dca9bc370b6438715667071a304696f"},
"doctor": {:hex, :doctor, "0.21.0", "20ef89355c67778e206225fe74913e96141c4d001cb04efdeba1a2a9704f1ab5", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}], "hexpm", "a227831daa79784eb24cdeedfa403c46a4cb7d0eab0e31232ec654314447e4e0"},
"earmark_parser": {:hex, :earmark_parser, "1.4.31", "a93921cdc6b9b869f519213d5bc79d9e218ba768d7270d46fdcf1c01bacff9e2", [:mix], [], "hexpm", "317d367ee0335ef037a87e46c91a2269fef6306413f731e8ec11fc45a7efd059"},
"earmark_parser": {:hex, :earmark_parser, "1.4.33", "3c3fd9673bb5dcc9edc28dd90f50c87ce506d1f71b70e3de69aa8154bc695d44", [:mix], [], "hexpm", "2d526833729b59b9fdb85785078697c72ac5e5066350663e5be6a1182da61b8f"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_check": {:hex, :ex_check, "0.15.0", "074b94c02de11c37bba1ca82ae5cc4926e6ccee862e57a485b6ba60fca2d8dc1", [:mix], [], "hexpm", "33848031a0c7e4209c3b4369ce154019788b5219956220c35ca5474299fb6a0e"},
"ex_doc": {:hex, :ex_doc, "0.29.4", "6257ecbb20c7396b1fe5accd55b7b0d23f44b6aa18017b415cb4c2b91d997729", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "2c6699a737ae46cb61e4ed012af931b57b699643b24dabe2400a8168414bc4f5"},
"ex_doc": {:hex, :ex_doc, "0.30.3", "bfca4d340e3b95f2eb26e72e4890da83e2b3a5c5b0e52607333bf5017284b063", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "fbc8702046c1d25edf79de376297e608ac78cdc3a29f075484773ad1718918b6"},
"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
"jason": {:hex, :jason, "1.4.0", "e855647bc964a44e2f67df589ccf49105ae039d4179db7f6271dfd3843dc27e6", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "79a3791085b2a0f743ca04cec0f7be26443738779d09302e01318f97bdb82121"},
"makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"},
"makeup_diff": {:hex, :makeup_diff, "0.1.0", "5be352b6aa6f07fa6a236e3efd7ba689a03f28fb5d35b7a0fa0a1e4a64f6d8bb", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "186bad5bb433a8afeb16b01423950e440072284a4103034ca899180343b9b4ac"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.0", "f8c570a0d33f8039513fbccaf7108c5d750f47d8defd44088371191b76492b0b", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "28b2cbdc13960a46ae9a8858c4bebdec3c9a6d7b4b9e7f4ed1502f8159f338e7"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"},
"nimble_parsec": {:hex, :nimble_parsec, "1.2.3", "244836e6e3f1200c7f30cb56733fd808744eca61fd182f731eac4af635cc6d0b", [:mix], [], "hexpm", "c8d789e39b9131acf7b99291e93dae60ab48ef14a7ee9d58c6964f59efb570b0"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.2", "ad87296a092a46e03b7e9b0be7631ddcf64c790fa68a9ef5323b6cbb36affc72", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f3f5a1ca93ce6e092d92b6d9c049bcda58a3b617a8d888f8e7231c85630e8108"},
"nimble_parsec": {:hex, :nimble_parsec, "1.3.1", "2c54013ecf170e249e9291ed0a62e5832f70a476c61da16f6aac6dca0189f2af", [:mix], [], "hexpm", "2682e3c0b2eb58d90c6375fc0cc30bc7be06f365bf72608804fb9cffa5e1b167"},
}
25 changes: 25 additions & 0 deletions test/poolex_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,31 @@ defmodule PoolexTest do
assert agent_pid != new_agent_pid
end

test "restart busy workers when pending callers" do
pool_name = start_pool(worker_module: SomeWorker, workers_count: 1)

# test_process = self()
launch_long_tasks(pool_name, 2)

debug_info = Poolex.get_debug_info(pool_name)
assert debug_info.busy_workers_count == 1
assert length(debug_info.waiting_callers) == 1

[busy_worker_pid] = debug_info.busy_workers_pids
Process.exit(busy_worker_pid, :kill)

# To be sure that DOWN message will be handed
:timer.sleep(1)

debug_info = Poolex.get_debug_info(pool_name)
assert debug_info.busy_workers_count == 1
assert Enum.empty?(debug_info.waiting_callers)

[new_worker_pid] = debug_info.busy_workers_pids

assert busy_worker_pid != new_worker_pid
end

test "works on callers" do
pool_name = start_pool(worker_module: SomeWorker, workers_count: 1)

Expand Down

0 comments on commit d2165ed

Please sign in to comment.