Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions docs/docs/module_guides/workflow/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,54 @@ result = await handler
handler = w.run(ctx=handler.ctx)
result = await handler
```
## Resources

Resources are external dependencies that you can equip the steps of your workflows with.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd use something like "Resources are external dependencies you can inject into steps of a workflow" to convey the "dependency injection" concept.


A simple example can be:

```python
from llama_index.core.workflows.resource import Resource
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
from llama_index.core.workflows.resource import Resource
from llama_index.core.workflow.resource import Resource

from llama_index.core.memory import Memory


def get_memory(*args, **kwargs):
return Memory.from_defaults("user_id_123", token_limit=60000)


class CustomStartEvent(StartEvent):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use a standard StartEvent to reduce the size of the example?

msg: str


class SecondEvent(Event):
msg: str


class WorkflowWithResource(Workflow):
@step
def first_step(
self,
ev: CustomStartEvent,
memory: Annotated[Memory, Resource(get_memory)],
) -> SecondEvent:
print("Memory before step 1", memory)
memory.put(ChatMessage.from_str(role="user", content=ev.msg))
print("Memory after step 1", memory)
return SecondEvent(msg="This is an input for step 2")

@step
def second_step(
self, ev: SecondEvent, memory: Annotated[Memory, Resource(get_memory)]
) -> StopEvent:
print("Memory before step 2", memory)
memory.put(ChatMessage.from_str(role="user", content=ev.msg))
print("Memory after step 2", memory)
return StopEvent(result="Messages put into memory")
```

`Resource` here acts like an executor for the function: once you run the steps, if they have resources declared with them, the factory function declared within `Resource` gets executed and turned into the component that it is supposed to represent as specified in the type annotation (in the example above, a Memory object).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would simplify, something along these lines:

The type of a resource must be declared with Annotated, and Resource is responsible for providing the actual instance at runtime by invoking the factory function get_memory. The return type of get_memory must be consistent with the one declared in the annotation, a Memory object in the example above.


If you run the workflow, you will also notice that the resource is shared among steps: this behavior can be changed by passing `Resource(..., cache=False)`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be more direct:

"Resources are shared among steps of a workflow, and Resource will invoke the factory function only once. In case this is not the desired behaviour, passing cache=False to Resource will inject different resource objects in different steps, inoking the factory function as many times.


## Checkpointing Workflows

Expand Down
94 changes: 0 additions & 94 deletions docs/docs/understanding/workflows/nested.md

This file was deleted.

226 changes: 226 additions & 0 deletions docs/docs/understanding/workflows/resources.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
# Resources

Resources are a component of workflows that allow us to equip our steps with external dependencies such as memory, LLMs, query engines or chat history.

Resources are a powerful way of binding components to our steps that we otherwise would need to specify by hand every time and, most importantly, resources are **stateful**, meaning that they maintain their state across different steps, unless otherwise specified.

## Using Stateful Resources

In order to use them within our code, we need to import them from the `resource` submodule:

```python
from llama_index.core.workflow.resource import Resource
from llama_index.core.workflow import (
Event,
step,
StartEvent,
StopEvent,
Workflow,
)
```

The `Resource` function works as a wrapper for another function that, when executed, returns an object of a specified type. This is the usage pattern:

```python
from typing import Annotated
from llama_index.core.memory import Memory


def get_memory(*args, **kwargs) -> Memory:
return Memory.from_defaults("user_id_123", token_limit=60000)


resource = Annotated[Memory, Resource(get_memory)]
```

When a step of our workflow will be equipped with this resource, the variable in the step to which the resource is assigned would behave as a memory component:

```python
import random

from typing import Union
from llama_index.core.llms import ChatMessage

RANDOM_MESSAGES = [
"Hello World!",
"Python is awesome!",
"Resources are great!",
]


class CustomStartEvent(StartEvent):
message: str


class SecondEvent(Event):
message: str


class ThirdEvent(Event):
message: str


class WorkflowWithMemory(Workflow):
@step
def first_step(
self,
ev: CustomStartEvent,
memory: Annotated[Memory, Resource(get_memory)],
) -> SecondEvent:
memory.put(
ChatMessage.from_str(
role="user", content="First step: " + ev.message
)
)
return SecondEvent(message=RANDOM_MESSAGES[random.randint(0, 2)])

@step
def second_step(
self, ev: SecondEvent, memory: Annotated[Memory, Resource(get_memory)]
) -> Union[ThirdEvent, StopEvent]:
memory.put(
ChatMessage.from_str(
role="assistant", content="Second step: " + ev.message
)
)
if random.randint(0, 1) == 0:
return ThirdEvent(message=RANDOM_MESSAGES[random.randint(0, 2)])
else:
messages = memory.get_all()
return StopEvent(result=messages)

@step
def third_step(
self, ev: ThirdEvent, memory: Annotated[Memory, Resource(get_memory)]
) -> StopEvent:
memory.put(
ChatMessage.from_str(
role="user", content="Third step: " + ev.message
)
)
messages = memory.get_all()
return StopEvent(result=messages)
```

As you can see, each step has access to memory and writes to it - the memory is shared among them and we can see it by running the workflow:

```python
wf = WorkflowWithMemory(disable_validation=True)


async def main():
messages = await wf.run(
start_event=CustomStartEvent(message="Happy birthday!")
)
for m in messages:
print(m.blocks[0].text)


if __name__ == "__main__":
import asyncio

asyncio.run(main())
```

A potential result for this might be:

```text
First step: Happy birthday!
Second step: Python is awesome!
Third step: Hello World!
```

This shows that each step added its message to a global memory, which is exactly what we were expecting!

It is important to note, though, the resources are preserved across steps of the same workflow instance, but not across different workflows. If we were to run two `WorkflowWithMemory` instances, their memories would be separate and independent:

```python
wf1 = WorkflowWithMemory(disable_validation=True)
wf2 = WorkflowWithMemory(disable_validation=True)


async def main():
messages1 = await wf1.run(
start_event=CustomStartEvent(message="Happy birthday!")
)
messages2 = await wf1.run(
start_event=CustomStartEvent(message="Happy New Year!")
)
for m in messages1:
print(m.blocks[0].text)
print("===================")
for m in messages2:
print(m.blocks[0].text)


if __name__ == "__main__":
import asyncio

asyncio.run(main())
```

This is a possible output:

```text
First step: Happy birthday!
Second step: Resources are great!
===================
First step: Happy New Year!
Second step: Python is awesome!
```

## Using Steteless Resources

Resources can also be stateless, meaning that we can configure them *not* to be preserved across steps in the same run.

In order to do so, we just need to specify `cache=False` when instantiating `Resource` - let's see this in a simple example, using a custom `Counter` class:

```python
from pydantic import BaseModel, Field


class Counter(BaseModel):
counter: int = Field(description="A simple counter", default=0)

def increment(self) -> None:
self.counter += 1


def get_counter() -> Counter:
return Counter()


class SecondEvent(Event):
count: int


class WorkflowWithCounter(Workflow):
@step
def first_step(
self,
ev: StartEvent,
counter: Annotated[Counter, Resource(get_counter, cache=False)],
) -> SecondEvent:
counter.increment()
return SecondEvent(count=counter.counter)

@step
def second_step(
self,
ev: SecondEvent,
counter: Annotated[Counter, Resource(get_counter, cache=False)],
) -> StopEvent:
print("Counter at first step: ", ev.count)
counter.increment()
print("Counter at second step: ", counter.counter)
return StopEvent(result="End of Workflow")
```

If we now run this workflow, we will get out:

```text
Counter at first step: 1
Counter at second step: 1
```

Now that we've mastered resources, let's take a look at [observability and debugging](./observability.md) in workflows.
2 changes: 1 addition & 1 deletion docs/docs/understanding/workflows/subclass.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,4 @@ draw_all_possible_flows(CustomWorkflow, "custom_workflow.html")

![Custom workflow](subclass.png)

Next, let's look at another way to extend a workflow: [nested workflows](nested.md).
Next, let's look at another way to extend a workflow: [resources](resources.md).