Skip to content

Conversation

@filintod
Copy link

@filintod filintod commented Nov 8, 2025

This is a split from asyncio PR #13 . Removing changes not related to asyncio changes

@filintod filintod requested a review from a team as a code owner November 8, 2025 17:41
self._stub = stubs.TaskHubSidecarServiceStub(channel)
self._logger = shared.get_logger("client", log_handler, log_formatter)

def __enter__(self):
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add context manager option for clean closing

# gRPC timeout mapping (pytest unit tests may pass None explicitly)
grpc_timeout = None if (timeout is None or timeout == 0) else timeout

# If timeout is None or 0, skip pre-checks/polling and call server-side wait directly
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

improves resource consumption on server side that might also lag behind client side

pass


class NonRetryableError(Exception):
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a new helper, that is present in Temporal but not us, where we can defined errors that are non-retryable so activities don't attempt to retry when raised

next_delay_f, self._retry_policy.max_retry_interval.total_seconds()
)
return timedelta(seconds=next_delay_f)
return timedelta(seconds=next_delay_f)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this fixes a bug with retry, as the login in line 400 above f datetime.utcnow() < retry_expiration: means that we should retry, but as this was badly indented if for some reason max_retry_interval is not none this was not working.

Copy link
Author

@filintod filintod Nov 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is also kind of mentioned in one of the gotchas in dapr/python-sdk#836, I found this bug beforehand, the other gotchas are gotchas or not-explained behavior

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added some info in README to cover the gotchas, but we might need to add to python-sdk

@@ -0,0 +1,16 @@
apiVersion: dapr.io/v1alpha1
Copy link
Author

@filintod filintod Nov 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needed for e2e tests with dapr that should substitute durabletask-go tests with dapr setup

@filintod
Copy link
Author

@acroca ptal

@filintod filintod changed the title add new deterministic functions, non-retryable errors, and shutdown h… add non-retryable errors, and shutdown helpers Nov 10, 2025
Comment on lines 218 to 220
res: pb.GetInstanceResponse = self._stub.WaitForInstanceCompletion(
req, timeout=grpc_timeout
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this. grpc_timeout is set to None in both 0 and None cases but if I understand correctly, when timeout is None we wait forever, but timeout 0 won't wait at all, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well the current behavior has not change a timeout of 0 still means wait forever. that kind of make sense, why would you call this function to not wait

Comment on lines 230 to 234
if current_state and current_state.runtime_status in [
OrchestrationStatus.COMPLETED,
OrchestrationStatus.FAILED,
OrchestrationStatus.TERMINATED,
]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From https://github.com/dapr/durabletask-go/blob/7f28b2408db77ed48b1b03ecc71624fc456ccca3/api/orchestration.go#L196-L201, CANCELLED is also a condition for a workflow to be considered in a terminal state.
But what's the reason for this check? Why not just call the WaitForInstanceCompletion? You are still sending a call to the runtime to get the current state.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is maybe a premature optimization, but on python for things that are closed quickly use polls without taking server longer running streaming https://grpc.io/docs/guides/performance/#python

Comment on lines 547 to 548
if isinstance(t, str):
if t:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we check it all at once?

Suggested change
if isinstance(t, str):
if t:
if isinstance(t, str) and len(t)>0:

self._channel_options = channel_options
self._stop_timeout = stop_timeout
# Track in-flight activity executions for graceful draining
import threading as _threading
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this import to the top of the file 🙏

current_reader_thread.start()
loop = asyncio.get_running_loop()
while not self._shutdown.is_set():
try:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why this try was removed. If I understand correctly, the exceptions that were captured here will now be captured outside of the while, right? Why is this preferred now?

Copy link
Author

@filintod filintod Nov 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mainly reduce extra logging/indentation self._logger.warning(f"Error in work item stream: {e}"). I could put it back, I think it was just bothering me with the extra duplicated messages that were not helping me

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's not really much before this point and the other try, maybe GetWorkItems

"""
end: Optional[float] = None
if timeout is not None:
import time as _t
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move all the imports to the top please

@filintod filintod requested a review from acroca November 11, 2025 14:34
@filintod
Copy link
Author

@acroca ptal

Signed-off-by: Filinto Duran <[email protected]>
mock_channel.close.side_effect = Exception("close failed")
mock_get_channel.return_value = mock_channel

from durabletask import client
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙏

Comment on lines 603 to 623
except grpc.RpcError as rpc_error: # type: ignore
# Treat common shutdown/termination races as benign to avoid noisy logs
code = rpc_error.code() # type: ignore
details = str(rpc_error)
benign = code in {
grpc.StatusCode.CANCELLED,
grpc.StatusCode.UNAVAILABLE,
grpc.StatusCode.UNKNOWN,
} and (
"unknown instance ID/task ID combo" in details
or "Channel closed" in details
or "Locally cancelled by application" in details
)
if self._shutdown.is_set() or benign:
self._logger.debug(
f"Ignoring activity completion delivery error during shutdown/benign condition: {rpc_error}"
)
else:
self._logger.exception(
f"Failed to deliver activity response for '{req.name}#{req.taskId}' of orchestration ID '{instance_id}' to sidecar: {rpc_error}"
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we combine this logic with the other one that looks very similar?


self._async_worker_manager = _AsyncWorkerManager(self._concurrency_options)
# Readiness flag set once the worker has an active stream to the sidecar
self._ready = Event()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this _ready necessary?

actions = result.actions
complete_action = get_and_validate_single_complete_orchestration_action(actions)
assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_FAILED
assert complete_action.failureDetails.errorMessage.__contains__("Activity task #1 failed: boom")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be good to test the activity have been called exactly once, to make sure is not retrying

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, this test only check the event processing pipeline and making sure that when you send a nonretryable error in the event loop to be process it fails the workflow. There is a test after that checks that when a retryable error is raised a new timer is created (line 1526 test_activity_generic_exception_is_retryable)

Signed-off-by: Filinto Duran <[email protected]>
@filintod
Copy link
Author

@acroca ptal

@filintod filintod requested a review from acroca November 13, 2025 16:59
@acroca acroca merged commit 9e8b34b into dapr:main Nov 14, 2025
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants