Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions dapr/aio/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1408,6 +1408,7 @@ async def start_workflow(
send_raw_bytes: bool = False,
) -> StartWorkflowResponse:
"""Starts a workflow.
Deprecated: use dapr-ext-workflow instead

Args:
workflow_component (str): the name of the workflow component
Expand Down Expand Up @@ -1469,6 +1470,7 @@ async def start_workflow(

async def get_workflow(self, instance_id: str, workflow_component: str) -> GetWorkflowResponse:
"""Gets information on a workflow.
Deprecated: use dapr-ext-workflow instead

Args:
instance_id (str): the ID of the workflow instance,
Expand Down Expand Up @@ -1510,6 +1512,7 @@ async def get_workflow(self, instance_id: str, workflow_component: str) -> GetWo

async def terminate_workflow(self, instance_id: str, workflow_component: str) -> DaprResponse:
"""Terminates a workflow.
Deprecated: use dapr-ext-workflow instead

Args:
instance_id (str): the ID of the workflow instance, e.g.
Expand Down Expand Up @@ -1547,6 +1550,7 @@ async def raise_workflow_event(
send_raw_bytes: bool = False,
) -> DaprResponse:
"""Raises an event on a workflow.
Deprecated: use dapr-ext-workflow instead

Args:
instance_id (str): the ID of the workflow instance,
Expand Down Expand Up @@ -1610,6 +1614,7 @@ async def raise_workflow_event(

async def pause_workflow(self, instance_id: str, workflow_component: str) -> DaprResponse:
"""Pause a workflow.
Deprecated: use dapr-ext-workflow instead

Args:
instance_id (str): the ID of the workflow instance,
Expand Down Expand Up @@ -1642,6 +1647,7 @@ async def pause_workflow(self, instance_id: str, workflow_component: str) -> Dap

async def resume_workflow(self, instance_id: str, workflow_component: str) -> DaprResponse:
"""Resumes a workflow.
Deprecated: use dapr-ext-workflow instead

Args:
instance_id (str): the ID of the workflow instance,
Expand Down Expand Up @@ -1673,6 +1679,7 @@ async def resume_workflow(self, instance_id: str, workflow_component: str) -> Da

async def purge_workflow(self, instance_id: str, workflow_component: str) -> DaprResponse:
"""Purges a workflow.
Deprecated: use dapr-ext-workflow instead

Args:
instance_id (str): the ID of the workflow instance,
Expand Down
7 changes: 7 additions & 0 deletions dapr/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1409,6 +1409,7 @@ def start_workflow(
send_raw_bytes: bool = False,
) -> StartWorkflowResponse:
"""Starts a workflow.
Deprecated: use dapr-ext-workflow instead

Args:
workflow_component (str): the name of the workflow component
Expand Down Expand Up @@ -1466,6 +1467,7 @@ def start_workflow(

def get_workflow(self, instance_id: str, workflow_component: str) -> GetWorkflowResponse:
"""Gets information on a workflow.
Deprecated: use dapr-ext-workflow instead

Args:
instance_id (str): the ID of the workflow instance,
Expand Down Expand Up @@ -1507,6 +1509,7 @@ def get_workflow(self, instance_id: str, workflow_component: str) -> GetWorkflow

def terminate_workflow(self, instance_id: str, workflow_component: str) -> DaprResponse:
"""Terminates a workflow.
Deprecated: use dapr-ext-workflow instead

Args:
instance_id (str): the ID of the workflow instance, e.g.
Expand Down Expand Up @@ -1545,6 +1548,7 @@ def raise_workflow_event(
send_raw_bytes: bool = False,
) -> DaprResponse:
"""Raises an event on a workflow.
Deprecated: use dapr-ext-workflow instead

Args:
instance_id (str): the ID of the workflow instance,
Expand Down Expand Up @@ -1609,6 +1613,7 @@ def raise_workflow_event(

def pause_workflow(self, instance_id: str, workflow_component: str) -> DaprResponse:
"""Pause a workflow.
Deprecated: use dapr-ext-workflow instead

Args:
instance_id (str): the ID of the workflow instance,
Expand Down Expand Up @@ -1641,6 +1646,7 @@ def pause_workflow(self, instance_id: str, workflow_component: str) -> DaprRespo

def resume_workflow(self, instance_id: str, workflow_component: str) -> DaprResponse:
"""Resumes a workflow.
Deprecated: use dapr-ext-workflow instead

Args:
instance_id (str): the ID of the workflow instance,
Expand Down Expand Up @@ -1672,6 +1678,7 @@ def resume_workflow(self, instance_id: str, workflow_component: str) -> DaprResp

def purge_workflow(self, instance_id: str, workflow_component: str) -> DaprResponse:
"""Purges a workflow.
Deprecated: use dapr-ext-workflow instead

Args:
instance_id (str): the ID of the workflow instance,
Expand Down
6 changes: 3 additions & 3 deletions examples/demo_workflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

This document describes how to register a workflow and activities inside it and start running it.
It demonstrates the following APIs:
- **start_workflow**: Start an instance of a workflow
- **get_workflow**: Get information on a single workflow
- **schedule_new_workflow**: Start an instance of a workflow
- **get_workflow_state**: Get information on a single workflow
- **terminate_workflow**: Terminate or stop a particular instance of a workflow
- **raise_event**: Raise an event on a workflow
- **raise_workflow_event**: Raise an event on a workflow
- **pause_workflow**: Pauses or suspends a workflow instance that can later be resumed
- **resume_workflow**: Resumes a paused workflow instance
- **purge_workflow**: Removes all metadata related to a specific workflow instance from the state store
Expand Down
75 changes: 34 additions & 41 deletions examples/demo_workflow/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from datetime import timedelta
from time import sleep
from dapr.ext.workflow import (
DaprWorkflowClient,
WorkflowRuntime,
DaprWorkflowContext,
WorkflowActivityContext,
Expand Down Expand Up @@ -106,7 +107,7 @@ def act_for_child_wf(ctx: WorkflowActivityContext, inp):


def main():
with DaprClient() as d:
with DaprWorkflowClient() as wfc:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is changing what this example (which also runs as an integration test) does!

The point was that you can use the regular Dapr SDK to start a workflow as well - you do not have to use the workflow SDK for that. We will want to retain this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is equivalent to dapr/dotnet-sdk#1344 and issues like this have been created in all other SDKs, the workflow functions in the DaprClient are now deprecated and we shouldn't advice users to use them. Thats why changing the examples.

Copy link
Contributor

@elena-kolevska elena-kolevska Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this example is also an integration test, as Bernd said, I'd prefer adding a deprecation notice and keeping it until we completely remove support for workflows in the DaprClient.

We have other examples/tests for thedapr-ext-workflow client in the https://github.com/dapr/python-sdk/tree/main/examples/workflow directory.
So I'd suggest checking this file for overlaps with whatever is already in there and moving it to that dir.

If we're doing this deprecation work in all clients, we should also go through the docs and remove all references to DaprClient in the context of workflows and only use dapr-ext-workflow. As a matter of fact, someone just opened an issue for that a few days ago: dapr/docs#4410
FYI @hhunter-ms

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the point of changing this is that we no longer want to encourage users to use the DaprClient to interact with Workflows.

examples usually are the first point of contact for users, so intentionally leaving a deprecated example just because it acts as a test is a mistake in my opinion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but since we already have examples of the correct SDK to use here https://github.com/dapr/python-sdk/tree/main/examples/workflow

I'll revert the changes made to this file

Copy link
Contributor

@hhunter-ms hhunter-ms Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@famarting @elena-kolevska will the Python workflow quickstart also be updated to dapr-ext-workflow instead of DaprClient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow, I didnt realize of that, well, all quickstarts in that repo using the equivalent to the DaprClient will also need to be updated

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update, the quickstarts were updated here.

workflow_runtime = WorkflowRuntime()
workflow_runtime.register_workflow(hello_world_wf)
workflow_runtime.register_workflow(child_retryable_wf)
Expand All @@ -119,14 +120,11 @@ def main():
sleep(2)

print('==========Start Counter Increase as per Input:==========')
start_resp = d.start_workflow(
instance_id = wfc.schedule_new_workflow(
instance_id=instance_id,
workflow_component=workflow_component,
workflow_name=workflow_name,
input=input_data,
workflow_options=workflow_options,
)
print(f'start_resp {start_resp.instance_id}')
workflow=hello_world_wf,
input=input_data)
print(f'start_resp {instance_id}')

# Sleep for a while to let the workflow run
sleep(12)
Expand All @@ -135,75 +133,70 @@ def main():
assert child_orchestrator_string == '1aa2bb3cc'

# Pause Test
d.pause_workflow(instance_id=instance_id, workflow_component=workflow_component)
get_response = d.get_workflow(
instance_id=instance_id, workflow_component=workflow_component
)
wfc.pause_workflow(instance_id=instance_id)
get_response = wfc.get_workflow_state(instance_id=instance_id, fetch_payloads=True)
print(f'Get response from {workflow_name} after pause call: {get_response.runtime_status}')

# Resume Test
d.resume_workflow(instance_id=instance_id, workflow_component=workflow_component)
get_response = d.get_workflow(
instance_id=instance_id, workflow_component=workflow_component
)
wfc.resume_workflow(instance_id=instance_id)
get_response = wfc.get_workflow_state(instance_id=instance_id, fetch_payloads=True)
print(f'Get response from {workflow_name} after resume call: {get_response.runtime_status}')

sleep(1)
# Raise event
d.raise_workflow_event(
instance_id=child_instance_id,
workflow_component=workflow_component,
wfc.raise_workflow_event(
instance_id=instance_id,
event_name=event_name,
event_data=event_data,
data=event_data,
)

sleep(5)
# Purge Test
d.purge_workflow(instance_id=instance_id, workflow_component=workflow_component)
# // TODO IMPLEMENT PURGE
# d.purge_workflow(instance_id=instance_id, workflow_component=workflow_component)
try:
d.get_workflow(instance_id=instance_id, workflow_component=workflow_component)
except DaprInternalError as err:
wfc.get_workflow_state(instance_id=instance_id, fetch_payloads=True)
except Exception as err:
# TODO temporary print
print(f'got error {err}')
if non_existent_id_error in err._message:
print('Instance Successfully Purged')

# Kick off another workflow for termination purposes
# This will also test using the same instance ID on a new workflow after
# the old instance was purged
start_resp = d.start_workflow(
instance_id = wfc.schedule_new_workflow(
instance_id=instance_id,
workflow_component=workflow_component,
workflow_name=workflow_name,
input=input_data,
workflow_options=workflow_options,
workflow=hello_world_wf,
input=input_data
)
print(f'start_resp {start_resp.instance_id}')
print(f'start_resp {instance_id}')

sleep(5)
# Terminate Test
d.terminate_workflow(instance_id=instance_id, workflow_component=workflow_component)
wfc.terminate_workflow(instance_id=instance_id)
sleep(1)
get_response = d.get_workflow(
instance_id=instance_id, workflow_component=workflow_component
)
get_response = wfc.get_workflow_state(instance_id=instance_id, fetch_payloads=True)
print(
f'Get response from {workflow_name} '
f'after terminate call: {get_response.runtime_status}'
)
child_get_response = d.get_workflow(
instance_id=child_instance_id, workflow_component=workflow_component
child_get_response = wfc.get_workflow_state(
instance_id=child_instance_id, fetch_payloads=True
)
print(
f'Get response from {child_workflow_name} '
f'after terminate call: {child_get_response.runtime_status}'
)

# Purge Test
d.purge_workflow(instance_id=instance_id, workflow_component=workflow_component)
try:
d.get_workflow(instance_id=instance_id, workflow_component=workflow_component)
except DaprInternalError as err:
if non_existent_id_error in err._message:
print('Instance Successfully Purged')
# TODO IMPLEMENT PURGE
# d.purge_workflow(instance_id=instance_id, workflow_component=workflow_component)
# try:
# d.get_workflow(instance_id=instance_id, workflow_component=workflow_component)
# except DaprInternalError as err:
# if non_existent_id_error in err._message:
# print('Instance Successfully Purged')

workflow_runtime.shutdown()

Expand Down
2 changes: 1 addition & 1 deletion examples/demo_workflow/demo_workflow/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
dapr-ext-workflow-dev>=0.0.1rc1.dev
dapr-ext-workflow-dev>=0.4.1rc1.dev
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not modify this in the future. We want to run examples against older versions for backwards compatibility as well. It's fine here given the SDK is not 1.0 yet.

2 changes: 2 additions & 0 deletions ext/dapr-ext-workflow/tests/test_workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,5 @@ def test_client_functions(self):

actual_resume_result = wfClient.resume_workflow(instance_id=mockInstanceId)
assert actual_resume_result == mock_resume_result

# TODO add purge support
2 changes: 1 addition & 1 deletion tests/clients/test_dapr_grpc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,7 @@ def test_unlock_input_validation(self):
# Tests for workflow
#

def test_workflow(self):
def test_workflow_deprecated(self):
dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}')
# Sane parameters
workflow_name = 'test_workflow'
Expand Down