Skip to content

Commit

Permalink
[PP-2169] Add support for PALACE_CELERY_RESULT_BACKEND_TRANSPORT_OPTI…
Browse files Browse the repository at this point in the history
…ONS_GLOBAL_KEYPREFIX environment variable.
  • Loading branch information
dbernstein committed Feb 25, 2025
1 parent 7b097c0 commit c89a6fa
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 6 deletions.
28 changes: 22 additions & 6 deletions src/palace/manager/service/celery/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class CeleryConfiguration(ServiceConfiguration):
# Broker options for both Redis and SQS
broker_transport_options_visibility_timeout: int = 3600 # 1 hour
result_expires: int = 3600 # 1 hour (default is 1 day)
result_backend_transport_options_global_keyprefix: str = "palace-"
task_acks_late: bool = True
task_reject_on_worker_lost: bool = True
task_remote_tracebacks: bool = True
Expand Down Expand Up @@ -68,12 +69,27 @@ def model_dump(
results = super().model_dump(**kwargs)
if merge_options:
result_keys = results.copy().keys()
broker_transport_options = {}
broker_transport_options: dict[str, Any] = {}
result_backend_transport_options: dict[str, Any] = {}
for key in result_keys:
if key.startswith("broker_transport_options_"):
value = results.pop(key)
broker_transport_options[
key.replace("broker_transport_options_", "")
] = value
self.extract_keys_by_prefix(
broker_transport_options, results, "broker_transport_options_", key
)
self.extract_keys_by_prefix(
result_backend_transport_options,
results,
"result_backend_transport_options_",
key,
)
results["broker_transport_options"] = broker_transport_options
results["result_backend_transport_options"] = (
result_backend_transport_options
)
return results

def extract_keys_by_prefix(
self, new_dict: dict[str, Any], results: dict[str, Any], prefix: str, key: str
) -> None:
if key.startswith(prefix):
value = results.pop(key)
new_dict[key.replace(prefix, "")] = value
14 changes: 14 additions & 0 deletions tests/manager/service/celery/test_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,19 @@ def test_dict_no_merge(
"PALACE_CELERY_BROKER_TRANSPORT_OPTIONS_QUEUE_ORDER_STRATEGY", "y"
)

monkeypatch.setenv(
"PALACE_CELERY_RESULT_BACKEND_TRANSPORT_OPTIONS_GLOBAL_KEYPREFIX", "z"
)

config = celery_configuration()
result = config.model_dump(merge_options=False)
assert "broker_url" in result
assert "result_backend" in result
assert result.get("broker_transport_options_global_keyprefix") == "x"
assert result.get("broker_transport_options_queue_order_strategy") == "y"
assert result.get("result_backend_transport_options_global_keyprefix") == "z"
assert "broker_transport_options" not in result
assert "result_backend_transport_options" not in result

def test_dict_merge(
self, celery_configuration: CeleryConfFixture, monkeypatch: pytest.MonkeyPatch
Expand All @@ -46,6 +52,10 @@ def test_dict_merge(
"PALACE_CELERY_BROKER_TRANSPORT_OPTIONS_QUEUE_ORDER_STRATEGY", "y"
)

monkeypatch.setenv(
"PALACE_CELERY_RESULT_BACKEND_TRANSPORT_OPTIONS_GLOBAL_KEYPREFIX", "z"
)

config = celery_configuration()
result = config.model_dump()
assert "broker_url" in result
Expand All @@ -56,3 +66,7 @@ def test_dict_merge(
assert options.get("queue_order_strategy") == "y"
assert "broker_transport_options_global_keyprefix" not in result
assert "broker_transport_options_queue_order_strategy" not in result

options = result["result_backend_transport_options"]
assert options.get("global_keyprefix") == "z"
assert "result_backend_transport_options_global_keyprefix" not in result

0 comments on commit c89a6fa

Please sign in to comment.