Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GracefulErrorAdapter fails with Parallelizable #1009

Closed
JamesArruda opened this issue Jul 3, 2024 · 6 comments
Closed

GracefulErrorAdapter fails with Parallelizable #1009

JamesArruda opened this issue Jul 3, 2024 · 6 comments
Labels
triage label for issues that need to be triaged.

Comments

@JamesArruda
Copy link

GracefulErrorAdapter does not know to return a list of the sentinel value for Parallelizable nodes (ones that are of type EXPAND).

Current behavior

Stack Traces

The error can be reproduced with the code in the steps to replicate.

Traceback (most recent call last):
  File "c:\git\github\hamilton\tests\test_parallel_error.py", line 42, in <module>
    ans = dr.execute(
          ^^^^^^^^^^^
  File "c:\git\github\hamilton\hamilton\driver.py", line 566, in execute
    raise e
  File "c:\git\github\hamilton\hamilton\driver.py", line 556, in execute
    outputs = self.raw_execute(_final_vars, overrides, display_graph, inputs=inputs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\git\github\hamilton\hamilton\driver.py", line 685, in raw_execute
    raise e
  File "c:\git\github\hamilton\hamilton\driver.py", line 674, in raw_execute
    results = self.graph_executor.execute(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\git\github\hamilton\hamilton\driver.py", line 232, in execute
    executors.run_graph_to_completion(execution_state, self.execution_manager)
  File "c:\git\github\hamilton\hamilton\execution\executors.py", line 399, in run_graph_to_completion
    execution_state.update_task_state(task_name, state, result)
  File "c:\git\github\hamilton\hamilton\execution\state.py", line 363, in update_task_state
    self.write_task_results(task_to_update, results)
  File "c:\git\github\hamilton\hamilton\execution\state.py", line 323, in write_task_results
    result_for_expansion = list(results_to_write.pop(result_name_for_expansion))
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: 'NoneType' object is not iterable

Steps to replicate behavior

from hamilton.htypes import Collect, Parallelizable
from hamilton import driver
from hamilton.execution.executors import SynchronousLocalTaskExecutor
from hamilton.lifecycle import GracefulErrorAdapter


def distro(n: int, allow_fail: bool) -> Parallelizable[int]:
    for x in range(n):
        if x > 4 and allow_fail:
            raise Exception("bad")
        yield x * 3


def some_math(distro: int) -> float:
    if distro > 15:
        raise Exception("No no no")
    return distro * 2.0


def distro_gather(some_math: Collect[float]) -> list[float]:
    ans = [x for x in some_math]
    return ans 



if __name__ == "__main__":
    local_executor = SynchronousLocalTaskExecutor()
    import __main__
    dr = (
        driver.Builder()
        .enable_dynamic_execution(allow_experimental_mode=True)
        .with_modules(__main__)
        .with_remote_executor(local_executor)
        .with_adapters(
            GracefulErrorAdapter(
                error_to_catch=Exception,
                sentinel_value=None
            )
        )
        .build()
    )
    ans = dr.execute(
        ["distro_gather"],
        inputs={
            "n": 10,
            "allow_fail": True,
        }
    )
    print(ans)

Toggling allow_fail shows that the GracefulError works nicely within a parallel block, but not for the entry point.

Library & System Information

Windows 10 Pro
Python 3.12.3
Hamilton 1.69.0 (7fd5e16)

Expected behavior

The node should fail safely and press on.

Possible Solution

The cause is straightforward: hamilton expects an iterable back from an EXPAND node (really from an EXPAND_UNORDERED group, I think). The GracefulErrorAdapter doesn't know to do that currently.

The solution I was attempting to pursue was to inject more node data into the run_to_execute_node method of the adapter. However, the base class doesn't allow an override of do_node_execute such that I could pass that information down into the adapter (such as from node_).

My first workaround was to modify the adapter to look for the "expand-" at the start of the task_id value that's passed in, but that catches all nodes within the parallelizeable block, not just the first one. Currently I am tagging the parallelizable nodes and looking for the tag in the do_node_execute to return [self.sentinel_value], which avoid the error.

@JamesArruda JamesArruda added the triage label for issues that need to be triaged. label Jul 3, 2024
@elijahbenizzy
Copy link
Collaborator

Ahh, yes, good point. Thanks! That's a reasonable workaround -- would you mind sharing the code for it? Otherwise we can take a look to see how we might cascade it forwards with these expectations.

That said, it's a little iffy on requirements. What would you expect to happen (I think I know which one I'd like...):

  1. The first three get executed, the rest get dropped
  2. Nothing gets executed, the parallelizable failed
  3. It returns an empty list

(2) is not crazy, but (1) is a good UX IMO. And I think might be easy to implement...

@JamesArruda
Copy link
Author

I modified the adapter to have:

    def run_to_execute_node(
        self, *, node_callable: Any, node_kwargs: Dict[str, Any], node_tags: dict[str, Any], **future_kwargs: Any
    ) -> Any:
        """Executes a node. If the node fails, returns the sentinel value."""
        for key, value in node_kwargs.items():
            if value == self.sentinel_value:  # == versus is
                return self.sentinel_value  # cascade it through
        try:
            return node_callable(**node_kwargs)
        except self.error_to_catch:
            if node_tags.get("for_graceful", "") == "parallel":
                return [self.sentinel_value]
            return self.sentinel_value

And had:

from hamilton.function_modifiers import tag

@tag(for_graceful="parallel")
def distro(n: int, allow_fail: bool) -> Parallelizable[int]:
    for x in range(n):
        if x > 4 and allow_fail:
            raise Exception("bad")
        yield x * 3

I would prefer (1). If the function fails before the yield/return portion, then I'd expect (2). Or for it to act as if it gets one run through the parallelizable block with the sentinel. As long as the collect block got some kind of list of sentinels/actual results.

I'm working on an expansion of the adapter that'll grab traceback and other metadata and pass that forward as a specific object class (as the sentinel) so I know everything will make it to the end no matter what (and I can introspect and write my logs, etc. based on that). Because of that, anything that fails safely forward and passes the sentinel down is preferred.

@elijahbenizzy
Copy link
Collaborator

elijahbenizzy commented Jul 3, 2024

Ok, great, would love it if you contributed that back! Yes, agreed on (1), and your edge-cases.

Easy enough implementation (might have to do a little more surgery, but shouldn't be too hard):

  1. If the node is an Parallelizable type then it outputs an item for every non-failed one (E.G. if index 0,1,2,5 succed and 3/4 fail it would be [res_1, res_2, SENTINEL, SENTINEL, res_5]
  2. Collect should filter out the sentinel values (or cascade them through, TBD...). I think filtering out makes sense but the code has to be resilient to list shortening. Also filtering wouldn't work with your "collector" approach which I really like.
  3. If it fails before the generator is started it should probably have an empty list. Collect will then be able to use the fact that its empty to bypass the rest. Could also work with the single list you have.
  4. If it fails inside a Parallelizable it should be the same as failing during the generator

Note that with (1) it could also fail for every step after any errors (in the generator), assuming that the first error cuts out control flow. IMO that's cleaner -- you don't want to be running code after an exception. So the example from (1) would be [res_1, res_2].

The ideal solution would be to check for the sentinel type inside the framework but I think that's a bit challenging and doesn't change too much.

Re: implementation -- you seem to be building something exciting -- do you want to contribute back with these changes? I've uploaded a PR that provides the deeper framework-level changes you'll need -- figure that will help you get started. If you're going to build the adapter for your own use-case we can probably find a way to generalize it once you're happy together.

#1010

@JamesArruda
Copy link
Author

Thank you very much for the PR upload, can't wait to dig in.

I'll definitely be putting a PR when I have it moderately functional!.

@elijahbenizzy
Copy link
Collaborator

Thank you very much for the PR upload, can't wait to dig in.

I'll definitely be putting a PR when I have it moderately functional!.

You got it! I don't doubt you'll be able to figure this out but feel free to reach out if you want help navigating -- slack is probably the easiest to reach us.

@zilto
Copy link
Collaborator

zilto commented Jul 11, 2024

Closing since PR was merged

@zilto zilto closed this as completed Jul 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
triage label for issues that need to be triaged.
Projects
None yet
Development

No branches or pull requests

3 participants