diff --git a/docs/concepts/caching.rst b/docs/concepts/caching.rst index 4030ed57e..e8a27f80c 100644 --- a/docs/concepts/caching.rst +++ b/docs/concepts/caching.rst @@ -2,18 +2,9 @@ Caching ======== -In Hamilton, the term *caching* broadly refers to "reusing results from previous executions to skip redundant computation". When caching is enabled, node results are stored in a **result store** and where they can be retrieved by subsequent runs. +Caching enables storing execution results to be reused in later executions, effectively skipping redundant computations. This speeds up execution and saves resources (computation, API credits, GPU time, etc.), and has applications both for development and production. -By default, results are stored based on the **code** that defines the node and its input **data** by default. Consequently, running a node with the "same code + same input data" multiple times should lead to a single execution. - -.. important:: - - Caching being a new core feature, please reach out via GitHub or Slack for requests or issues. - -Basics -------- - -To get started, add ``.with_cache()`` to the ``Builder()``. This will create the subdirectory ``hamilton_cache/`` in the current directory. Calling ``Driver.execute()`` will execute the dataflow as usual, but it will also store **metadata** and **results** under ``hamilton_cache/``. When calling ``.execute()`` a second time, the ``Driver`` will use the **metadata** to determine if **results** can be loaded, effectively skipping execution! +To get started, simply add ``.with_cache()`` to your ``Builder()`` to enable caching. .. code-block:: python @@ -28,6 +19,54 @@ To get started, add ``.with_cache()`` to the ``Builder()``. This will create the ) dr.execute([...]) + dr.execute([...]) + + +The first execution will store **metadata** and **results** under the subdirectory ``hamilton_cache/``, next to the current directory. The next execution will skip computation by leveraging the cache! + + +.. important:: + + Caching being a new core feature, please reach out via GitHub or Slack for requests or issues. + + +Setting the cache path +------------------------ + +By default, the **metadata** and **results** are stored under a new subdirectory ``hamilton_cache/``. The location can be set via ``.with_cache(path=...)``. + + +By project +~~~~~~~~~~~~~~ +Centralizing your cache by project is useful when you have nodes that are reused across multiple dataflows (e.g., training and inference ML pipelines, feature engineering). + + +.. code-block:: python + + from hamilton import driver + import training + import inference + + cache_path = "/path/to/project/hamilton_cache" + + train_dr = driver.Builder().with_modules(training).with_cache(path=cache_path).build() + # ... + predict_dr = driver.Builder().with_modules(inference).with_cache(path=cache_path).build() + + +Globally +~~~~~~~~~~ + +The primary benefit of using a global cache is easier storage management. Since the metadata and the results for *all* your Hamilton dataflows are in one place, it can be easier to cleanup disk space. + +.. code-block:: python + + from hamilton import driver + import my_dataflow + + # set the cache under the user's global directory + cache_path = "~/.hamilton_cache" + dr = driver.Builder().with_module(my_dataflow).with_cache(path=cache_path).build() .. _cache-result-format: @@ -388,14 +427,6 @@ A useful pattern is using the ``Driver.cache`` state or `structured logs `_ since these API are currently unstable. - - Code version -------------- @@ -473,13 +504,9 @@ Additional types can be supported by registering a hashing function via the modu Roadmap ----------- -Caching is a significant Hamilton feature and there are plans to expand it. Here are some ideas and areas for development. Feel free comment on them or make other suggestions via Slack or GitHub! +Caching is a significant Hamilton feature and there are plans to expand it. Here are some ideas and areas for development. Feel free comment on them or make other suggestions via `Slack `_ or GitHub! - **async support**: Support caching with ``AsyncDriver``. This requires a significant amount of code, but the core logic shouldn't change much. - - **cache eviction**: Allow to set up a max storage (in size or number of items) or time-based policy to delete data from the metadata and result stores. This would help with managing the cache size. - -- **more store backends**: The initial release includes backend supported by the Python standard library (SQLite metadata and file-based results). Could support more backends via `fsspec -`_ (AWS, Azure, GCP, Databricks, etc.) - +- **more store backends**: The initial release includes backend supported by the Python standard library (SQLite metadata and file-based results). Could support more backends via `fsspec `_ (AWS, Azure, GCP, Databricks, etc.) - **support more types**: Include specialized hashing functions for complex objects from popular libraries. This can be done through Hamilton extensions. diff --git a/docs/index.md b/docs/index.md index 181175116..fbe8ae77a 100644 --- a/docs/index.md +++ b/docs/index.md @@ -34,7 +34,6 @@ reference/graph-adapters/index reference/lifecycle-hooks/index reference/result-builders/index reference/io/index -reference/stores/index reference/dataflows/index reference/disabling-telemetry.md ``` diff --git a/docs/reference/caching/caching-logic.rst b/docs/reference/caching/caching-logic.rst index 7d5978370..912ff1a8b 100644 --- a/docs/reference/caching/caching-logic.rst +++ b/docs/reference/caching/caching-logic.rst @@ -5,7 +5,7 @@ Caching logic Caching Behavior ---------------- -.. autoclass:: hamilton.lifecycle.caching.CachingBehavior +.. autoclass:: hamilton.caching.adapter.CachingBehavior :exclude-members: __init__, from_string :members: @@ -15,9 +15,7 @@ Caching Behavior ``@cache`` decorator ---------------------- -.. autofunction:: hamilton.function_modifiers.metadata.cache - -.. autoclass:: hamilton.function_modifiers.metadata.Cache +.. autoclass:: hamilton.function_modifiers.metadata.cache :special-members: __init__ :private-members: :members: @@ -31,13 +29,13 @@ Caching Behavior Logging ----------- -.. autoclass:: hamilton.lifecycle.caching.CachingEvent +.. autoclass:: hamilton.caching.adapter.CachingEvent :special-members: __init__ :members: :inherited-members: -.. autoclass:: hamilton.lifecycle.caching.CachingEventType +.. autoclass:: hamilton.caching.adapter.CachingEventType :special-members: __init__ :members: :inherited-members: @@ -46,7 +44,7 @@ Logging Adapter -------- -.. autoclass:: hamilton.lifecycle.caching.SmartCacheAdapter +.. autoclass:: hamilton.caching.adapter.SmartCacheAdapter :special-members: __init__ :members: :inherited-members: diff --git a/docs/reference/caching/data-versioning.rst b/docs/reference/caching/data-versioning.rst index 86438df23..567c5eb6b 100644 --- a/docs/reference/caching/data-versioning.rst +++ b/docs/reference/caching/data-versioning.rst @@ -2,5 +2,5 @@ Data versioning ================= -.. automodule:: hamilton.io.fingerprinting +.. automodule:: hamilton.caching.fingerprinting :members: diff --git a/docs/reference/caching/index.rst b/docs/reference/caching/index.rst index 777a83374..dd0803d02 100644 --- a/docs/reference/caching/index.rst +++ b/docs/reference/caching/index.rst @@ -10,3 +10,4 @@ Reference caching-logic data-versioning + stores diff --git a/docs/reference/caching/stores.rst b/docs/reference/caching/stores.rst new file mode 100644 index 000000000..7e1a83d22 --- /dev/null +++ b/docs/reference/caching/stores.rst @@ -0,0 +1,21 @@ +========= +Stores +========= + +stores.base +----------- + +.. automodule:: hamilton.caching.stores.base + :members: + +stores.file +----------- + +.. automodule:: hamilton.caching.stores.file + :members: + +stores.sqlite +------------- + +.. automodule:: hamilton.caching.stores.sqlite + :members: diff --git a/docs/reference/stores/base.rst b/docs/reference/stores/base.rst deleted file mode 100644 index 40375fcaa..000000000 --- a/docs/reference/stores/base.rst +++ /dev/null @@ -1,6 +0,0 @@ -========= -Base -========= - -.. automodule:: hamilton.stores.base - :members: diff --git a/docs/reference/stores/file.rst b/docs/reference/stores/file.rst deleted file mode 100644 index b3c5e4062..000000000 --- a/docs/reference/stores/file.rst +++ /dev/null @@ -1,6 +0,0 @@ -========= -File -========= - -.. automodule:: hamilton.stores.file - :members: diff --git a/docs/reference/stores/index.rst b/docs/reference/stores/index.rst deleted file mode 100644 index b58100019..000000000 --- a/docs/reference/stores/index.rst +++ /dev/null @@ -1,13 +0,0 @@ -============== -Stores -============== - -Reference ---------- - -.. toctree:: - :maxdepth: 2 - - base - file - sqlite diff --git a/docs/reference/stores/sqlite.rst b/docs/reference/stores/sqlite.rst deleted file mode 100644 index 1b56b65bb..000000000 --- a/docs/reference/stores/sqlite.rst +++ /dev/null @@ -1,6 +0,0 @@ -========= -SQLite -========= - -.. automodule:: hamilton.stores.sqlite - :members: diff --git a/hamilton/caching/adapter.py b/hamilton/caching/adapter.py index 6fb0858ed..a3b6c6f27 100644 --- a/hamilton/caching/adapter.py +++ b/hamilton/caching/adapter.py @@ -905,7 +905,7 @@ def _process_input(self, run_id: str, node_name: str, value: Any) -> None: To enable caching, input values must be versioned. Since inputs have no associated code, set a constant "code version" ``f"{node_name}__input"`` that uniquely identifies this input. """ - data_version = fingerprinting.hash_value(value) + data_version = self.version_data(value) self.code_versions[run_id][node_name] = f"{node_name}__input" self.data_versions[run_id][node_name] = data_version self._log_event( @@ -924,7 +924,7 @@ def _process_override(self, run_id: str, node_name: str, value: Any) -> None: code and data versions for overrides are not stored because their value is user provided and isn't necessarily tied to the code. """ - data_version = fingerprinting.hash_value(value) + data_version = self.version_data(value) self.data_versions[run_id][node_name] = data_version self._log_event( run_id=run_id, @@ -993,13 +993,13 @@ def pre_node_execute( Collecting ``data_version`` for upstream dependencies requires handling special cases when task-based execution is used: - - If the current node is ``COLLECT`` , the dependency annotated with ``Collect[]`` needs to - be versioned item by item instead of versioning the full container. This is because the - collect order is inconsistent. - - If the current node is ``INSIDE`` and the dependency is ``EXPAND``, this means the - ``kwargs`` dictionary contains a single item. We need to version this individual item because - it will not be available from "inside" the branch for some executors (multiprocessing, multithreading) - because they lose access to the data_versions of ``OUTSIDE`` nodes stored in ``self.data_versions``. + - If the current node is ``COLLECT`` , the dependency annotated with ``Collect[]`` needs to + be versioned item by item instead of versioning the full container. This is because the + collect order is inconsistent. + - If the current node is ``INSIDE`` and the dependency is ``EXPAND``, this means the + ``kwargs`` dictionary contains a single item. We need to version this individual item because + it will not be available from "inside" the branch for some executors (multiprocessing, multithreading) + because they lose access to the data_versions of ``OUTSIDE`` nodes stored in ``self.data_versions``. """ node_name = node_.name @@ -1036,20 +1036,20 @@ def pre_node_execute( if dep_name == collected_name: # the collected value should be hashed based on the items, not the container - items_data_versions = [fingerprinting.hash_value(item) for item in dep_value] + items_data_versions = [self.version_data(item) for item in dep_value] dep_data_version = fingerprinting.hash_sequence(sorted(items_data_versions)) elif dep_role == NodeRoleInTaskExecution.EXPAND: # if the dependency is `EXPAND`, the kwarg received is a single item yielded by the iterator # rather than the full iterable. We must version it directly, similar to a top-level input - dep_data_version = fingerprinting.hash_value(dep_value) + dep_data_version = self.version_data(dep_value) else: tasks_data_versions = self._get_memory_data_version( run_id=run_id, node_name=dep_name, task_id=None ) if tasks_data_versions is SENTINEL: - tasks_data_versions = fingerprinting.hash_value(dep_value) + dep_data_version = self.version_data(dep_value) elif isinstance(tasks_data_versions, dict): dep_data_version = tasks_data_versions.get(task_id) else: @@ -1102,7 +1102,7 @@ def do_node_execute( CachingBehavior.IGNORE, ): cache_key = self.get_cache_key(run_id=run_id, node_name=node_name, task_id=task_id) - data_version = fingerprinting.hash_value(result) + data_version = self.version_data(result) self._set_memory_metadata( run_id=run_id, node_name=node_name, task_id=task_id, data_version=data_version ) @@ -1170,7 +1170,7 @@ def do_node_execute( node_kwargs=node_kwargs, task_id=task_id, ) - data_version = fingerprinting.hash_value(result) + data_version = self.version_data(result) self._set_memory_metadata( run_id=run_id, node_name=node_name, task_id=task_id, data_version=data_version ) diff --git a/hamilton/caching/fingerprinting.py b/hamilton/caching/fingerprinting.py index 0517e71b2..448bbb53b 100644 --- a/hamilton/caching/fingerprinting.py +++ b/hamilton/caching/fingerprinting.py @@ -3,6 +3,7 @@ import hashlib import logging from collections.abc import Mapping, Sequence, Set +from types import NoneType from typing import Dict from hamilton.experimental import h_databackends @@ -10,7 +11,9 @@ logger = logging.getLogger(__name__) -MAX_DEPTH = 2 +MAX_DEPTH = 4 +UNHASHABLE = "" +NONE_HASH = "" def set_max_depth(depth: int) -> None: @@ -43,14 +46,21 @@ def hash_value(obj, *args, depth=0, **kwargs) -> str: depth += 1 return hash_value(obj.__dict__, depth=depth) - logger.warning( - f"Currently versioning object of type `{type(obj)}` and hiting recursion depth {depth}. " - f"To avoid data version collisions, register a data versioning function for type `{type(obj)}` " - "or increase the module constant `hamilton.io.fingeprinting.MAX_DEPTH`. " - "See the Hamilton documentation Concepts page about caching for details." - ) - hash_object = hashlib.md5("".encode()) - return _compact_hash(hash_object.digest()) + if depth >= MAX_DEPTH: + logger.warning( + f"Currently versioning object of type `{type(obj)}` and hiting recursion depth {depth}. " + f"To avoid data version collisions, register a data versioning function for type `{type(obj)}` " + "or increase the module constant `hamilton.io.fingeprinting.MAX_DEPTH`. " + "See the Hamilton documentation Concepts page about caching for details." + ) + + return UNHASHABLE + + +@hash_value.register(NoneType) +def hash_none(obj, *args, **kwargs) -> str: + """Hash for None is """ + return NONE_HASH @hash_value.register(str) diff --git a/tests/caching/test_fingerprinting.py b/tests/caching/test_fingerprinting.py index a8620ae94..2a58cb913 100644 --- a/tests/caching/test_fingerprinting.py +++ b/tests/caching/test_fingerprinting.py @@ -35,6 +35,11 @@ def test_hash_sequence(obj, expected_hash): assert fingerprint == expected_hash +def test_hash_none(): + fingerprint = fingerprinting.hash_value(None) + assert fingerprint == "" + + def test_hash_equals_for_different_sequence_types(): list_obj = [0, True, "hello-world"] tuple_obj = (0, True, "hello-world") @@ -47,7 +52,7 @@ def test_hash_equals_for_different_sequence_types(): def test_hash_ordered_mapping(): obj = {0: True, "key": "value", 17.0: None} - expected_hash = "a6kiZ3pD0g9vOp1XD_CViVJ9fHYM3ct_oItyJQ==" + expected_hash = "1zH9TfTu0-nlWXXXYo0vigFFSQajWXov2w4AZQ==" fingerprint = fingerprinting.hash_mapping(obj, ignore_order=False) assert fingerprint == expected_hash @@ -62,7 +67,7 @@ def test_hash_mapping_where_order_matters(): def test_hash_unordered_mapping(): obj = {0: True, "key": "value", 17.0: None} - expected_hash = "4BCjST4ftDLuBsuNTMgIOOkCy5pV79fCERP9hw==" + expected_hash = "uw0dfSAEgE9nOK3bHgmJ4TR3-VFRqOAoogdRmw==" fingerprint = fingerprinting.hash_mapping(obj, ignore_order=True) assert fingerprint == expected_hash @@ -77,6 +82,6 @@ def test_hash_mapping_where_order_doesnt_matter(): def test_hash_set(): obj = {0, True, "key", "value", 17.0, None} - expected_hash = "4sA1r4wny7AvoG1wzEN6nHdQjpE2V-AodJ9dEQ==" + expected_hash = "dKyAE-ob4_GD-Mb5Lu2R-VJAxGctY4L8JDwc2g==" fingerprint = fingerprinting.hash_set(obj) assert fingerprint == expected_hash diff --git a/tests/caching/test_metadata_store.py b/tests/caching/test_metadata_store.py index 7a6668fa2..80afb6394 100644 --- a/tests/caching/test_metadata_store.py +++ b/tests/caching/test_metadata_store.py @@ -28,10 +28,10 @@ def test_initialize_empty(metadata_store): def test_not_empty_after_set(metadata_store): code_version = "FOO-1" data_version = "foo-a" - context_key = create_cache_key(code_version=code_version, dep_data_versions={}) + cache_key = create_cache_key(code_version=code_version, dep_data_versions={}) metadata_store.set( - context_key=context_key, + cache_key=cache_key, node_name="foo", code_version=code_version, data_version=data_version, @@ -45,9 +45,9 @@ def test_not_empty_after_set(metadata_store): def test_set_doesnt_produce_duplicates(metadata_store): code_version = "FOO-1" data_version = "foo-a" - context_key = create_cache_key(code_version=code_version, dep_data_versions={}) + cache_key = create_cache_key(code_version=code_version, dep_data_versions={}) metadata_store.set( - context_key=context_key, + cache_key=cache_key, node_name="foo", code_version=code_version, data_version=data_version, @@ -56,7 +56,7 @@ def test_set_doesnt_produce_duplicates(metadata_store): assert metadata_store.size == 1 metadata_store.set( - context_key=context_key, + cache_key=cache_key, node_name="foo", code_version=code_version, data_version=data_version, @@ -67,8 +67,8 @@ def test_set_doesnt_produce_duplicates(metadata_store): @pytest.mark.parametrize("metadata_store", [SQLiteMetadataStore], indirect=True) def test_get_miss_returns_none(metadata_store): - context_key = create_cache_key(code_version="FOO-1", dep_data_versions={"bar": "bar-a"}) - data_version = metadata_store.get(context_key=context_key) + cache_key = create_cache_key(code_version="FOO-1", dep_data_versions={"bar": "bar-a"}) + data_version = metadata_store.get(cache_key=cache_key) assert data_version is None @@ -76,14 +76,14 @@ def test_get_miss_returns_none(metadata_store): def test_set_get_without_dependencies(metadata_store): code_version = "FOO-1" data_version = "foo-a" - context_key = create_cache_key(code_version=code_version, dep_data_versions={}) + cache_key = create_cache_key(code_version=code_version, dep_data_versions={}) metadata_store.set( - context_key=context_key, + cache_key=cache_key, node_name="foo", code_version=code_version, data_version=data_version, run_id="...", ) - retrieved_data_version = metadata_store.get(context_key=context_key) + retrieved_data_version = metadata_store.get(cache_key=cache_key) assert retrieved_data_version == data_version