Skip to content

Commit

Permalink
fixed tests and documentation build
Browse files Browse the repository at this point in the history
  • Loading branch information
zilto authored and zilto committed Sep 25, 2024
1 parent dbe5a3c commit 998ca73
Show file tree
Hide file tree
Showing 14 changed files with 131 additions and 101 deletions.
77 changes: 52 additions & 25 deletions docs/concepts/caching.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,9 @@
Caching
========

In Hamilton, the term *caching* broadly refers to "reusing results from previous executions to skip redundant computation". When caching is enabled, node results are stored in a **result store** and where they can be retrieved by subsequent runs.
Caching enables storing execution results to be reused in later executions, effectively skipping redundant computations. This speeds up execution and saves resources (computation, API credits, GPU time, etc.), and has applications both for development and production.

By default, results are stored based on the **code** that defines the node and its input **data** by default. Consequently, running a node with the "same code + same input data" multiple times should lead to a single execution.

.. important::

Caching being a new core feature, please reach out via GitHub or Slack for requests or issues.

Basics
-------

To get started, add ``.with_cache()`` to the ``Builder()``. This will create the subdirectory ``hamilton_cache/`` in the current directory. Calling ``Driver.execute()`` will execute the dataflow as usual, but it will also store **metadata** and **results** under ``hamilton_cache/``. When calling ``.execute()`` a second time, the ``Driver`` will use the **metadata** to determine if **results** can be loaded, effectively skipping execution!
To get started, simply add ``.with_cache()`` to your ``Builder()`` to enable caching.

.. code-block:: python
Expand All @@ -28,6 +19,54 @@ To get started, add ``.with_cache()`` to the ``Builder()``. This will create the
)
dr.execute([...])
dr.execute([...])
The first execution will store **metadata** and **results** under the subdirectory ``hamilton_cache/``, next to the current directory. The next execution will skip computation by leveraging the cache!


.. important::

Caching being a new core feature, please reach out via GitHub or Slack for requests or issues.


Setting the cache path
------------------------

By default, the **metadata** and **results** are stored under a new subdirectory ``hamilton_cache/``. The location can be set via ``.with_cache(path=...)``.


By project
~~~~~~~~~~~~~~
Centralizing your cache by project is useful when you have nodes that are reused across multiple dataflows (e.g., training and inference ML pipelines, feature engineering).


.. code-block:: python
from hamilton import driver
import training
import inference
cache_path = "/path/to/project/hamilton_cache"
train_dr = driver.Builder().with_modules(training).with_cache(path=cache_path).build()
# ...
predict_dr = driver.Builder().with_modules(inference).with_cache(path=cache_path).build()
Globally
~~~~~~~~~~

The primary benefit of using a global cache is easier storage management. Since the metadata and the results for *all* your Hamilton dataflows are in one place, it can be easier to cleanup disk space.

.. code-block:: python
from hamilton import driver
import my_dataflow
# set the cache under the user's global directory
cache_path = "~/.hamilton_cache"
dr = driver.Builder().with_module(my_dataflow).with_cache(path=cache_path).build()
.. _cache-result-format:
Expand Down Expand Up @@ -388,14 +427,6 @@ A useful pattern is using the ``Driver.cache`` state or `structured logs <cachin
stored_result = dr.cache.result_store(data_version)
Additional ``result_store`` and ``metadata_store`` backends
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The initial release of the caching feature included an ``sqlite``-backed metadata store and a ``shelve``-backed result store because both only depend on the standard library and work great for a single machine.

Support for ``AWS S3`` as a **result store** in on the roadmap. If you're interested in implementing a specific backend, please reach out via `Slack <https://join.slack.com/t/hamilton-opensource/shared_invite/zt-2niepkra8-DGKGf_tTYhXuJWBTXtIs4g>`_ since these API are currently unstable.


Code version
--------------

Expand Down Expand Up @@ -473,13 +504,9 @@ Additional types can be supported by registering a hashing function via the modu
Roadmap
-----------

Caching is a significant Hamilton feature and there are plans to expand it. Here are some ideas and areas for development. Feel free comment on them or make other suggestions via Slack or GitHub!
Caching is a significant Hamilton feature and there are plans to expand it. Here are some ideas and areas for development. Feel free comment on them or make other suggestions via `Slack <https://join.slack.com/t/hamilton-opensource/shared_invite/zt-2niepkra8-DGKGf_tTYhXuJWBTXtIs4g>`_ or GitHub!

- **async support**: Support caching with ``AsyncDriver``. This requires a significant amount of code, but the core logic shouldn't change much.

- **cache eviction**: Allow to set up a max storage (in size or number of items) or time-based policy to delete data from the metadata and result stores. This would help with managing the cache size.

- **more store backends**: The initial release includes backend supported by the Python standard library (SQLite metadata and file-based results). Could support more backends via `fsspec
<https://filesystem-spec.readthedocs.io/en/latest/?badge=latest>`_ (AWS, Azure, GCP, Databricks, etc.)

- **more store backends**: The initial release includes backend supported by the Python standard library (SQLite metadata and file-based results). Could support more backends via `fsspec <https://filesystem-spec.readthedocs.io/en/latest/?badge=latest>`_ (AWS, Azure, GCP, Databricks, etc.)
- **support more types**: Include specialized hashing functions for complex objects from popular libraries. This can be done through Hamilton extensions.
1 change: 0 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ reference/graph-adapters/index
reference/lifecycle-hooks/index
reference/result-builders/index
reference/io/index
reference/stores/index
reference/dataflows/index
reference/disabling-telemetry.md
```
Expand Down
12 changes: 5 additions & 7 deletions docs/reference/caching/caching-logic.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Caching logic
Caching Behavior
----------------

.. autoclass:: hamilton.lifecycle.caching.CachingBehavior
.. autoclass:: hamilton.caching.adapter.CachingBehavior
:exclude-members: __init__, from_string
:members:

Expand All @@ -15,9 +15,7 @@ Caching Behavior
``@cache`` decorator
----------------------

.. autofunction:: hamilton.function_modifiers.metadata.cache

.. autoclass:: hamilton.function_modifiers.metadata.Cache
.. autoclass:: hamilton.function_modifiers.metadata.cache
:special-members: __init__
:private-members:
:members:
Expand All @@ -31,13 +29,13 @@ Caching Behavior
Logging
-----------

.. autoclass:: hamilton.lifecycle.caching.CachingEvent
.. autoclass:: hamilton.caching.adapter.CachingEvent
:special-members: __init__
:members:
:inherited-members:


.. autoclass:: hamilton.lifecycle.caching.CachingEventType
.. autoclass:: hamilton.caching.adapter.CachingEventType
:special-members: __init__
:members:
:inherited-members:
Expand All @@ -46,7 +44,7 @@ Logging
Adapter
--------

.. autoclass:: hamilton.lifecycle.caching.SmartCacheAdapter
.. autoclass:: hamilton.caching.adapter.SmartCacheAdapter
:special-members: __init__
:members:
:inherited-members:
2 changes: 1 addition & 1 deletion docs/reference/caching/data-versioning.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
Data versioning
=================

.. automodule:: hamilton.io.fingerprinting
.. automodule:: hamilton.caching.fingerprinting
:members:
1 change: 1 addition & 0 deletions docs/reference/caching/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ Reference

caching-logic
data-versioning
stores
21 changes: 21 additions & 0 deletions docs/reference/caching/stores.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
=========
Stores
=========

stores.base
-----------

.. automodule:: hamilton.caching.stores.base
:members:

stores.file
-----------

.. automodule:: hamilton.caching.stores.file
:members:

stores.sqlite
-------------

.. automodule:: hamilton.caching.stores.sqlite
:members:
6 changes: 0 additions & 6 deletions docs/reference/stores/base.rst

This file was deleted.

6 changes: 0 additions & 6 deletions docs/reference/stores/file.rst

This file was deleted.

13 changes: 0 additions & 13 deletions docs/reference/stores/index.rst

This file was deleted.

6 changes: 0 additions & 6 deletions docs/reference/stores/sqlite.rst

This file was deleted.

28 changes: 14 additions & 14 deletions hamilton/caching/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,7 @@ def _process_input(self, run_id: str, node_name: str, value: Any) -> None:
To enable caching, input values must be versioned. Since inputs have no associated code,
set a constant "code version" ``f"{node_name}__input"`` that uniquely identifies this input.
"""
data_version = fingerprinting.hash_value(value)
data_version = self.version_data(value)
self.code_versions[run_id][node_name] = f"{node_name}__input"
self.data_versions[run_id][node_name] = data_version
self._log_event(
Expand All @@ -924,7 +924,7 @@ def _process_override(self, run_id: str, node_name: str, value: Any) -> None:
code and data versions for overrides are not stored because their value is user provided
and isn't necessarily tied to the code.
"""
data_version = fingerprinting.hash_value(value)
data_version = self.version_data(value)
self.data_versions[run_id][node_name] = data_version
self._log_event(
run_id=run_id,
Expand Down Expand Up @@ -993,13 +993,13 @@ def pre_node_execute(
Collecting ``data_version`` for upstream dependencies requires handling special cases when
task-based execution is used:
- If the current node is ``COLLECT`` , the dependency annotated with ``Collect[]`` needs to
be versioned item by item instead of versioning the full container. This is because the
collect order is inconsistent.
- If the current node is ``INSIDE`` and the dependency is ``EXPAND``, this means the
``kwargs`` dictionary contains a single item. We need to version this individual item because
it will not be available from "inside" the branch for some executors (multiprocessing, multithreading)
because they lose access to the data_versions of ``OUTSIDE`` nodes stored in ``self.data_versions``.
- If the current node is ``COLLECT`` , the dependency annotated with ``Collect[]`` needs to
be versioned item by item instead of versioning the full container. This is because the
collect order is inconsistent.
- If the current node is ``INSIDE`` and the dependency is ``EXPAND``, this means the
``kwargs`` dictionary contains a single item. We need to version this individual item because
it will not be available from "inside" the branch for some executors (multiprocessing, multithreading)
because they lose access to the data_versions of ``OUTSIDE`` nodes stored in ``self.data_versions``.
"""
node_name = node_.name
Expand Down Expand Up @@ -1036,20 +1036,20 @@ def pre_node_execute(

if dep_name == collected_name:
# the collected value should be hashed based on the items, not the container
items_data_versions = [fingerprinting.hash_value(item) for item in dep_value]
items_data_versions = [self.version_data(item) for item in dep_value]
dep_data_version = fingerprinting.hash_sequence(sorted(items_data_versions))

elif dep_role == NodeRoleInTaskExecution.EXPAND:
# if the dependency is `EXPAND`, the kwarg received is a single item yielded by the iterator
# rather than the full iterable. We must version it directly, similar to a top-level input
dep_data_version = fingerprinting.hash_value(dep_value)
dep_data_version = self.version_data(dep_value)

else:
tasks_data_versions = self._get_memory_data_version(
run_id=run_id, node_name=dep_name, task_id=None
)
if tasks_data_versions is SENTINEL:
tasks_data_versions = fingerprinting.hash_value(dep_value)
dep_data_version = self.version_data(dep_value)
elif isinstance(tasks_data_versions, dict):
dep_data_version = tasks_data_versions.get(task_id)
else:
Expand Down Expand Up @@ -1102,7 +1102,7 @@ def do_node_execute(
CachingBehavior.IGNORE,
):
cache_key = self.get_cache_key(run_id=run_id, node_name=node_name, task_id=task_id)
data_version = fingerprinting.hash_value(result)
data_version = self.version_data(result)
self._set_memory_metadata(
run_id=run_id, node_name=node_name, task_id=task_id, data_version=data_version
)
Expand Down Expand Up @@ -1170,7 +1170,7 @@ def do_node_execute(
node_kwargs=node_kwargs,
task_id=task_id,
)
data_version = fingerprinting.hash_value(result)
data_version = self.version_data(result)
self._set_memory_metadata(
run_id=run_id, node_name=node_name, task_id=task_id, data_version=data_version
)
Expand Down
28 changes: 19 additions & 9 deletions hamilton/caching/fingerprinting.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@
import hashlib
import logging
from collections.abc import Mapping, Sequence, Set
from types import NoneType
from typing import Dict

from hamilton.experimental import h_databackends

logger = logging.getLogger(__name__)


MAX_DEPTH = 2
MAX_DEPTH = 4
UNHASHABLE = "<unhashable>"
NONE_HASH = "<none>"


def set_max_depth(depth: int) -> None:
Expand Down Expand Up @@ -43,14 +46,21 @@ def hash_value(obj, *args, depth=0, **kwargs) -> str:
depth += 1
return hash_value(obj.__dict__, depth=depth)

logger.warning(
f"Currently versioning object of type `{type(obj)}` and hiting recursion depth {depth}. "
f"To avoid data version collisions, register a data versioning function for type `{type(obj)}` "
"or increase the module constant `hamilton.io.fingeprinting.MAX_DEPTH`. "
"See the Hamilton documentation Concepts page about caching for details."
)
hash_object = hashlib.md5("<unhashable>".encode())
return _compact_hash(hash_object.digest())
if depth >= MAX_DEPTH:
logger.warning(
f"Currently versioning object of type `{type(obj)}` and hiting recursion depth {depth}. "
f"To avoid data version collisions, register a data versioning function for type `{type(obj)}` "
"or increase the module constant `hamilton.io.fingeprinting.MAX_DEPTH`. "
"See the Hamilton documentation Concepts page about caching for details."
)

return UNHASHABLE


@hash_value.register(NoneType)
def hash_none(obj, *args, **kwargs) -> str:
"""Hash for None is <none>"""
return NONE_HASH


@hash_value.register(str)
Expand Down
11 changes: 8 additions & 3 deletions tests/caching/test_fingerprinting.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ def test_hash_sequence(obj, expected_hash):
assert fingerprint == expected_hash


def test_hash_none():
fingerprint = fingerprinting.hash_value(None)
assert fingerprint == "<none>"


def test_hash_equals_for_different_sequence_types():
list_obj = [0, True, "hello-world"]
tuple_obj = (0, True, "hello-world")
Expand All @@ -47,7 +52,7 @@ def test_hash_equals_for_different_sequence_types():

def test_hash_ordered_mapping():
obj = {0: True, "key": "value", 17.0: None}
expected_hash = "a6kiZ3pD0g9vOp1XD_CViVJ9fHYM3ct_oItyJQ=="
expected_hash = "1zH9TfTu0-nlWXXXYo0vigFFSQajWXov2w4AZQ=="
fingerprint = fingerprinting.hash_mapping(obj, ignore_order=False)
assert fingerprint == expected_hash

Expand All @@ -62,7 +67,7 @@ def test_hash_mapping_where_order_matters():

def test_hash_unordered_mapping():
obj = {0: True, "key": "value", 17.0: None}
expected_hash = "4BCjST4ftDLuBsuNTMgIOOkCy5pV79fCERP9hw=="
expected_hash = "uw0dfSAEgE9nOK3bHgmJ4TR3-VFRqOAoogdRmw=="
fingerprint = fingerprinting.hash_mapping(obj, ignore_order=True)
assert fingerprint == expected_hash

Expand All @@ -77,6 +82,6 @@ def test_hash_mapping_where_order_doesnt_matter():

def test_hash_set():
obj = {0, True, "key", "value", 17.0, None}
expected_hash = "4sA1r4wny7AvoG1wzEN6nHdQjpE2V-AodJ9dEQ=="
expected_hash = "dKyAE-ob4_GD-Mb5Lu2R-VJAxGctY4L8JDwc2g=="
fingerprint = fingerprinting.hash_set(obj)
assert fingerprint == expected_hash
Loading

0 comments on commit 998ca73

Please sign in to comment.