From 6886dbc32c7a06eb779933889bbc979c7ae92ff9 Mon Sep 17 00:00:00 2001 From: Aidan McMahon-Smith Date: Tue, 17 Oct 2023 15:18:13 +0200 Subject: [PATCH] Add limit to unassociate [RHELDST-20725] We want to perform garbage collection in batches to reduce the amount of resources it consumes and avoid out of memory exceptions. Unit association criteria has a limit field which we could use for batching requests. --- pubtools/pulplib/_impl/client/client.py | 5 ++- pubtools/pulplib/_impl/fake/client.py | 6 ++-- .../pulplib/_impl/model/repository/base.py | 12 ++++++- tests/fake/test_fake_remove_content.py | 31 ++++++++++++++++++ tests/repository/test_remove_content.py | 32 +++++++++++++++++++ 5 files changed, 82 insertions(+), 4 deletions(-) diff --git a/pubtools/pulplib/_impl/client/client.py b/pubtools/pulplib/_impl/client/client.py index 33affacd..673189d5 100644 --- a/pubtools/pulplib/_impl/client/client.py +++ b/pubtools/pulplib/_impl/client/client.py @@ -775,7 +775,7 @@ def _do_associate(self, src_repo_id, dest_repo_id, criteria=None, raw_options=No self._do_request, method="POST", url=url, json=body ) - def _do_unassociate(self, repo_id, criteria=None): + def _do_unassociate(self, repo_id, criteria=None, limit=None): url = os.path.join( self._url, "pulp/api/v2/repositories/%s/actions/unassociate/" % repo_id ) @@ -800,6 +800,9 @@ def _do_unassociate(self, repo_id, criteria=None): else: body["criteria"]["filters"] = {"unit": pulp_search.filters} + if limit: + body["criteria"]["limit"] = limit + LOG.debug("Submitting %s unassociate: %s", url, body) return self._task_executor.submit( diff --git a/pubtools/pulplib/_impl/fake/client.py b/pubtools/pulplib/_impl/fake/client.py index 05489472..7990e5ff 100644 --- a/pubtools/pulplib/_impl/fake/client.py +++ b/pubtools/pulplib/_impl/fake/client.py @@ -404,7 +404,7 @@ def do_next_upload(checksum, size): return out - def _do_unassociate(self, repo_id, criteria=None): + def _do_unassociate(self, repo_id, criteria=None, limit=None): repo_f = self.get_repository(repo_id) if repo_f.exception(): return repo_f @@ -431,7 +431,9 @@ def _do_unassociate(self, repo_id, criteria=None): for unit_with_key in units_with_key: unit = unit_with_key["unit"] - if match_object(criteria, unit): + if match_object(criteria, unit) and ( + not limit or len(removed_units) < limit + ): removed_units.add(unit) else: kept_keys.add(unit_with_key["key"]) diff --git a/pubtools/pulplib/_impl/model/repository/base.py b/pubtools/pulplib/_impl/model/repository/base.py index 508a0ace..7c2a38b4 100644 --- a/pubtools/pulplib/_impl/model/repository/base.py +++ b/pubtools/pulplib/_impl/model/repository/base.py @@ -652,6 +652,10 @@ def remove_content(self, criteria=None, **kwargs): be removed. If criteria is omitted, all the content will be removed. + limit (None, int) + Limit the maximum number of units that will be disassociated by + pulp. + Returns: Future[list[:class:`~pubtools.pulplib.Task`]] A future which is resolved when content has been removed. @@ -696,7 +700,13 @@ def remove_content(self, criteria=None, **kwargs): Matcher.in_(type_ids), # Criteria.with_field_in is deprecated ) - return f_proxy(self._client._do_unassociate(self.id, criteria=criteria)) + return f_proxy( + self._client._do_unassociate( + self.id, + criteria=criteria, + limit=kwargs.get("limit"), + ) + ) @classmethod def from_data(cls, data): diff --git a/tests/fake/test_fake_remove_content.py b/tests/fake/test_fake_remove_content.py index d183ffe9..e14185d6 100644 --- a/tests/fake/test_fake_remove_content.py +++ b/tests/fake/test_fake_remove_content.py @@ -30,6 +30,37 @@ def test_can_remove_empty(): assert not task.units +def test_limited_remove_content(): + """repo.remove() can remove a limited number of units.""" + controller = FakeController() + client = controller.client + + rpm_units = [ + RpmUnit(name="gliba", version="1.0", release="1", arch="x86_64"), + RpmUnit(name="glibb", version="1.0", release="1", arch="x86_64"), + RpmUnit(name="glibc", version="1.0", release="1", arch="x86_64"), + RpmUnit(name="glibd", version="1.0", release="1", arch="x86_64"), + ] + + repo = YumRepository(id="repo1") + controller.insert_repository(repo) + controller.insert_units(repo, rpm_units) + + remove_rpms = client.get_repository("repo1").remove_content( + type_ids=["rpm"], limit=3 + ) + + assert len(remove_rpms) == 1 + task = remove_rpms[0] + + # It should have completed successfully + assert task.completed + assert task.succeeded + + # It should have removed (only) RPM units + assert len(task.units) == 3 + + def test_can_remove_content(): """repo.remove() succeeds and removes expected units inserted via controller.""" controller = FakeController() diff --git a/tests/repository/test_remove_content.py b/tests/repository/test_remove_content.py index 0dcb1edc..94157aab 100644 --- a/tests/repository/test_remove_content.py +++ b/tests/repository/test_remove_content.py @@ -251,6 +251,38 @@ def test_remove_with_criteria(fast_poller, requests_mocker, client): } +def test_remove_with_limit(fast_poller, requests_mocker, client): + """Remove succeeds when given a critria/filter for removal""" + repo = Repository(id="some-repo") + repo.__dict__["_client"] = client + + requests_mocker.post( + "https://pulp.example.com/pulp/api/v2/repositories/some-repo/actions/unassociate/", + [ + {"json": {"spawned_tasks": [{"task_id": "task1"}]}}, + ], + ) + + requests_mocker.post( + "https://pulp.example.com/pulp/api/v2/tasks/search/", + [ + {"json": [{"task_id": "task1", "state": "finished"}]}, + ], + ) + + assert repo.remove_content(type_ids=["type1", "type2"], limit=1).result() == [ + Task(id="task1", completed=True, succeeded=True) + ] + + # It should have included the limit in the post request + assert requests_mocker.request_history[0].json() == { + "criteria": { + "type_ids": ["type1", "type2"], + "limit": 1, + } + } + + def test_remove_fail_without_type_id(fast_poller, client): """Remove fails when a critria is provided without unit type""" repo = Repository(id="some-repo")