Skip to content

Commit

Permalink
Turn off result persistence by default (#14300)
Browse files Browse the repository at this point in the history
Co-authored-by: Chris White <[email protected]>
  • Loading branch information
desertaxle and cicdw committed Jun 24, 2024
1 parent 275f05b commit b75631a
Show file tree
Hide file tree
Showing 14 changed files with 162 additions and 120 deletions.
2 changes: 1 addition & 1 deletion docs/3.0rc/api-ref/server/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -21746,7 +21746,7 @@
"PREFECT_RESULTS_PERSIST_BY_DEFAULT": {
"type": "boolean",
"title": "Prefect Results Persist By Default",
"default": true
"default": false
},
"PREFECT_TASKS_REFRESH_CACHE": {
"type": "boolean",
Expand Down
2 changes: 1 addition & 1 deletion src/prefect/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,7 @@ def default_cloud_ui_url(settings, value):

PREFECT_RESULTS_PERSIST_BY_DEFAULT = Setting(
bool,
default=True,
default=False,
)
"""
The default setting for persisting results when not otherwise specified. If enabled,
Expand Down
15 changes: 13 additions & 2 deletions src/prefect/task_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from concurrent.futures import ThreadPoolExecutor
from contextlib import AsyncExitStack
from contextvars import copy_context
from typing import List, Optional
from typing import Optional
from uuid import UUID

import anyio
Expand All @@ -20,6 +20,7 @@

from prefect import Task
from prefect._internal.concurrency.api import create_call, from_sync
from prefect.cache_policies import DEFAULT, NONE
from prefect.client.orchestration import get_client
from prefect.client.schemas.objects import TaskRun
from prefect.client.subscriptions import Subscription
Expand All @@ -32,6 +33,7 @@
)
from prefect.states import Pending
from prefect.task_engine import run_task_async, run_task_sync
from prefect.utilities.annotations import NotSet
from prefect.utilities.asyncutils import asyncnullcontext, sync_compatible
from prefect.utilities.engine import emit_task_run_state_change_event, propose_state
from prefect.utilities.processutils import _register_signal
Expand Down Expand Up @@ -76,7 +78,16 @@ def __init__(
*tasks: Task,
limit: Optional[int] = 10,
):
self.tasks: List[Task] = list(tasks)
self.tasks = []
for t in tasks:
if isinstance(t, Task):
if t.cache_policy in [None, NONE, NotSet]:
self.tasks.append(
t.with_options(persist_result=True, cache_policy=DEFAULT)
)
else:
self.tasks.append(t.with_options(persist_result=True))

self.task_keys = set(t.task_key for t in tasks if isinstance(t, Task))

self._started_at: Optional[pendulum.DateTime] = None
Expand Down
6 changes: 4 additions & 2 deletions src/prefect/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ def with_options(
cache_key_fn: Optional[
Callable[["TaskRunContext", Dict[str, Any]], Optional[str]]
] = None,
task_run_name: Optional[Union[Callable[[], str], str]] = None,
task_run_name: Optional[Union[Callable[[], str], str, Type[NotSet]]] = NotSet,
cache_expiration: Optional[datetime.timedelta] = None,
retries: Union[int, Type[NotSet]] = NotSet,
retry_delay_seconds: Union[
Expand Down Expand Up @@ -594,7 +594,9 @@ def with_options(
else self.cache_policy,
cache_key_fn=cache_key_fn or self.cache_key_fn,
cache_expiration=cache_expiration or self.cache_expiration,
task_run_name=task_run_name,
task_run_name=task_run_name
if task_run_name is not NotSet
else self.task_run_name,
retries=retries if retries is not NotSet else self.retries,
retry_delay_seconds=(
retry_delay_seconds
Expand Down
2 changes: 1 addition & 1 deletion tests/results/test_flow_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ async def foo():
async def test_root_flow_default_remote_storage_saves_correct_result(tmp_path):
await LocalFileSystem(basepath=tmp_path).save("my-result-storage")

@task(result_storage_key="my-result.pkl")
@task(result_storage_key="my-result.pkl", persist_result=True)
async def bar():
return {"foo": "bar"}

Expand Down
71 changes: 34 additions & 37 deletions tests/results/test_result_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def foo():
return get_run_context().result_factory

result_factory = foo()
assert result_factory.persist_result is True
assert result_factory.persist_result is False
assert result_factory.cache_result_in_memory is True
assert result_factory.serializer == DEFAULT_SERIALIZER()
assert_blocks_equal(result_factory.storage_block, DEFAULT_STORAGE())
Expand Down Expand Up @@ -118,9 +118,8 @@ def foo():
assert result_factory.storage_block_id is not None


@pytest.mark.parametrize("options", [{"cache_result_in_memory": False}])
def test_root_flow_persists_results_when_flow_uses_feature(options):
@flow(**options)
def test_root_flow_persists_results_when_flow_uses_feature():
@flow(cache_result_in_memory=False, persist_result=True)
def foo():
return get_run_context().result_factory

Expand All @@ -131,11 +130,10 @@ def foo():
assert isinstance(result_factory.storage_block_id, uuid.UUID)


@pytest.mark.parametrize("options", [{"cache_result_in_memory": False}])
def test_root_flow_can_opt_out_of_persistence_when_flow_uses_feature(options):
def test_root_flow_can_opt_out_of_persistence_when_flow_uses_feature():
result_factory = None

@flow(**options, persist_result=False)
@flow(cache_result_in_memory=False, persist_result=False)
def foo():
nonlocal result_factory
result_factory = get_run_context().result_factory
Expand Down Expand Up @@ -228,7 +226,7 @@ async def test_root_flow_custom_storage_by_instance_unsaved(prefect_client, tmp_
storage = LocalFileSystem(basepath=tmp_path / "test")

@flow(
result_storage=storage, cache_result_in_memory=False
result_storage=storage, cache_result_in_memory=False, persist_result=True
) # use a feature that requires persistence
def foo():
return get_run_context().result_factory
Expand Down Expand Up @@ -446,12 +444,14 @@ def bar():
async def test_child_flow_custom_storage_by_instance_unsaved(prefect_client, tmp_path):
storage = LocalFileSystem(basepath=tmp_path / "test")

@flow(cache_result_in_memory=False) # use a feature that requires persistence
@flow(
cache_result_in_memory=False, persist_result=True
) # use a feature that requires persistence
def foo():
print(f"In parent, persist={get_run_context().result_factory.persist_result}")
return get_run_context().result_factory, bar()

@flow(result_storage=storage, cache_result_in_memory=False)
@flow(result_storage=storage, cache_result_in_memory=False, persist_result=True)
def bar():
print(f"In child, persist={get_run_context().result_factory.persist_result}")
return get_run_context().result_factory
Expand Down Expand Up @@ -490,7 +490,7 @@ def bar():
return get_run_context().result_factory

_, task_factory = foo()
assert task_factory.persist_result
assert task_factory.persist_result is False
assert task_factory.serializer == DEFAULT_SERIALIZER()
assert_blocks_equal(task_factory.storage_block, DEFAULT_STORAGE())
assert task_factory.storage_block_id is not None
Expand All @@ -508,22 +508,23 @@ def bar():


def test_task_default_persist_result_can_be_overriden_by_setting():
@flow
def foo():
return get_run_context().result_factory, bar()
with temporary_settings({PREFECT_RESULTS_PERSIST_BY_DEFAULT: True}):

@task
def bar():
return get_run_context().result_factory
@flow
def foo():
return get_run_context().result_factory, bar()

@task
def bar():
return get_run_context().result_factory

with temporary_settings({PREFECT_RESULTS_PERSIST_BY_DEFAULT: True}):
_, task_factory = foo()

assert task_factory.persist_result is True


def test_task_custom_persist_setting():
@flow
def test_nested_flow_custom_persist_setting():
@flow(persist_result=True)
def foo():
return get_run_context().result_factory, bar()

Expand Down Expand Up @@ -555,7 +556,6 @@ def bar():

flow_factory = foo()
assert flow_factory.cache_result_in_memory is True
assert task_factory.persist_result # Persistence on unless explicitly turned off
assert task_factory.cache_result_in_memory is toggle
assert task_factory.serializer == DEFAULT_SERIALIZER()
assert_blocks_equal(task_factory.storage_block, DEFAULT_STORAGE())
Expand Down Expand Up @@ -620,11 +620,11 @@ async def test_task_inherits_custom_storage(tmp_path):
storage = LocalFileSystem(basepath=tmp_path / "test")
storage_id = await storage.save("test")

@flow(result_storage="local-file-system/test")
@flow(result_storage="local-file-system/test", persist_result=True)
def foo():
return get_run_context().result_factory, bar()

@task
@task(persist_result=True)
def bar():
return get_run_context().result_factory

Expand Down Expand Up @@ -652,15 +652,15 @@ def bar():
assert task_factory.storage_block_id is not None


async def test_task_custom_storage(tmp_path):
async def test_nested_flow_custom_storage(tmp_path):
storage = LocalFileSystem(basepath=tmp_path / "test")
storage_id = await storage.save("test")

@flow()
@flow(persist_result=True)
def foo():
return get_run_context().result_factory, bar()

@flow(result_storage="local-file-system/test")
@flow(result_storage="local-file-system/test", persist_result=True)
def bar():
return get_run_context().result_factory

Expand All @@ -675,11 +675,11 @@ def bar():
async def test_task_custom_storage_by_instance_unsaved(prefect_client, tmp_path):
storage = LocalFileSystem(basepath=tmp_path / "test")

@flow(cache_result_in_memory=False)
@flow(cache_result_in_memory=False, persist_result=True)
def foo():
return get_run_context().result_factory, bar()

@flow(result_storage=storage, cache_result_in_memory=False)
@flow(result_storage=storage, cache_result_in_memory=False, persist_result=True)
def bar():
return get_run_context().result_factory

Expand Down Expand Up @@ -742,13 +742,10 @@ async def _verify_default_storage_creation_without_persistence(
assert result_factory.storage_block_id is not None


@pytest.mark.parametrize(
"options", [{"persist_result": True}, {"cache_result_in_memory": False}]
)
async def test_default_storage_creation_for_flow_with_persistence_features(
prefect_client, options
prefect_client,
):
@flow(**options)
@flow(persist_result=True)
def foo():
return get_run_context().result_factory

Expand All @@ -770,11 +767,11 @@ def foo():
async def test_default_storage_creation_for_task_with_persistence_features(
prefect_client,
):
@task
@task(persist_result=True)
def my_task_1():
return get_run_context().result_factory

@flow(retries=2)
@flow(retries=2, persist_result=True)
def my_flow_1():
return my_task_1()

Expand All @@ -783,11 +780,11 @@ def my_flow_1():
prefect_client, result_factory
)

@task(cache_key_fn=lambda *_: "always")
@task(cache_key_fn=lambda *_: "always", persist_result=True)
def my_task_2():
return get_run_context().result_factory

@flow
@flow(persist_result=True)
def my_flow_2():
return my_task_2()

Expand Down
14 changes: 7 additions & 7 deletions tests/results/test_result_fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,39 +22,39 @@ async def test_async_result_warnings_are_not_raised_by_engine():

task_run_count = flow_run_count = subflow_run_count = 0

@task(retries=3)
@task(persist_result=True, retries=3)
async def my_task():
nonlocal task_run_count
task_run_count += 1
if task_run_count < 3:
raise ValueError()
return 1

@task(cache_key_fn=lambda *_: "test")
@task(persist_result=True, cache_key_fn=lambda *_: "test")
def foo():
return 1

@task(cache_key_fn=lambda *_: "test")
@task(persist_result=True, cache_key_fn=lambda *_: "test")
def bar():
return 2

@flow
@flow(persist_result=True)
def subflow():
return 1

@flow
@flow(persist_result=True)
async def async_subflow():
return 1

@flow(retries=3)
@flow(retries=3, persist_result=True)
async def retry_subflow():
nonlocal subflow_run_count
subflow_run_count += 1
if subflow_run_count < 3:
raise ValueError()
return 1

@flow(retries=3)
@flow(retries=3, persist_result=True)
async def my_flow():
a = await my_task()

Expand Down
6 changes: 3 additions & 3 deletions tests/results/test_task_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async def test_task_persisted_result_due_to_flow_feature(prefect_client, options
def foo():
return bar(return_state=True)

@task
@task(persist_result=True)
def bar():
return 1

Expand All @@ -41,7 +41,7 @@ async def test_task_persisted_result_due_to_task_feature(prefect_client, options
def foo():
return bar(return_state=True)

@task(**options)
@task(**options, persist_result=True)
def bar():
return 1

Expand Down Expand Up @@ -316,7 +316,7 @@ async def test_task_result_with_null_return(prefect_client):
def foo():
return bar(return_state=True)

@task
@task(persist_result=True)
def bar():
return None

Expand Down
8 changes: 4 additions & 4 deletions tests/test_flow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,13 +447,13 @@ async def test_flow_retry_with_error_in_flow_and_successful_task(self):
task_run_count = 0
flow_run_count = 0

@task
@task(persist_result=True)
async def my_task():
nonlocal task_run_count
task_run_count += 1
return "hello"

@flow(retries=1)
@flow(retries=1, persist_result=True)
async def foo():
nonlocal flow_run_count
flow_run_count += 1
Expand Down Expand Up @@ -607,13 +607,13 @@ async def test_flow_retry_with_error_in_flow_and_one_successful_child_flow(self)
child_run_count = 0
flow_run_count = 0

@flow
@flow(persist_result=True)
async def child_flow():
nonlocal child_run_count
child_run_count += 1
return "hello"

@flow(retries=1)
@flow(retries=1, persist_result=True)
async def parent_flow():
nonlocal flow_run_count
flow_run_count += 1
Expand Down
Loading

0 comments on commit b75631a

Please sign in to comment.