Skip to content

Commit 1c194d0

Browse files
committed
feedback
Signed-off-by: Filinto Duran <[email protected]>
1 parent c01d8b3 commit 1c194d0

File tree

3 files changed

+45
-66
lines changed

3 files changed

+45
-66
lines changed

durabletask/worker.py

Lines changed: 29 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -253,8 +253,6 @@ def __init__(
253253
self._interceptors = None
254254

255255
self._async_worker_manager = _AsyncWorkerManager(self._concurrency_options)
256-
# Readiness flag set once the worker has an active stream to the sidecar
257-
self._ready = Event()
258256

259257
@property
260258
def concurrency_options(self) -> ConcurrencyOptions:
@@ -357,8 +355,6 @@ def invalidate_connection():
357355
pass
358356
current_channel = None
359357
current_stub = None
360-
# No longer ready if connection is gone
361-
self._ready.clear()
362358

363359
def should_invalidate_connection(rpc_error):
364360
error_code = rpc_error.code() # type: ignore
@@ -398,8 +394,6 @@ def should_invalidate_connection(rpc_error):
398394
self._logger.info(
399395
f"Successfully connected to {self._host_address}. Waiting for work items..."
400396
)
401-
# Signal readiness once stream is established
402-
self._ready.set()
403397

404398
# Use a thread to read from the blocking gRPC stream and forward to asyncio
405399
import queue
@@ -508,14 +502,34 @@ def stop(self):
508502
self._async_worker_manager.shutdown()
509503
self._logger.info("Worker shutdown completed")
510504
self._is_running = False
511-
self._ready.clear()
512505

513-
def wait_for_ready(self, timeout: Optional[float] = None) -> bool:
514-
"""Block until the worker has an active connection to the sidecar.
515-
516-
Returns True if the worker became ready within the timeout; otherwise False.
517-
"""
518-
return self._ready.wait(timeout)
506+
def _handle_grpc_execution_error(self, rpc_error: grpc.RpcError, request_type: str):
507+
"""Handle a gRPC execution error during shutdown or benign condition."""
508+
# During shutdown or if the instance was terminated, the channel may be close
509+
# or the instance may no longer be recognized by the sidecar. Treat these as benign
510+
# to reduce noisy logging when shutting down.
511+
details = str(rpc_error).lower()
512+
benign_errors = {
513+
grpc.StatusCode.CANCELLED,
514+
grpc.StatusCode.UNAVAILABLE,
515+
grpc.StatusCode.UNKNOWN,
516+
}
517+
if (
518+
self._shutdown.is_set()
519+
and rpc_error.code() in benign_errors
520+
or (
521+
"unknown instance id/task id combo" in details
522+
or "channel closed" in details
523+
or "locally cancelled by application" in details
524+
)
525+
):
526+
self._logger.debug(
527+
f"Ignoring gRPC {request_type} execution error during shutdown/benign condition: {rpc_error}"
528+
)
529+
else:
530+
self._logger.exception(
531+
f"Failed to execute gRPC {request_type} execution error: {rpc_error}"
532+
)
519533

520534
def _execute_orchestrator(
521535
self,
@@ -551,24 +565,7 @@ def _execute_orchestrator(
551565
try:
552566
stub.CompleteOrchestratorTask(res)
553567
except grpc.RpcError as rpc_error: # type: ignore
554-
# During shutdown or if the instance was terminated, the channel may be closed
555-
# or the instance may no longer be recognized by the sidecar. Treat these as benign
556-
# to reduce noisy logging when shutting down.
557-
code = rpc_error.code() # type: ignore
558-
details = str(rpc_error)
559-
benign = (
560-
code in {grpc.StatusCode.CANCELLED, grpc.StatusCode.UNAVAILABLE}
561-
or "unknown instance ID/task ID combo" in details
562-
or "Channel closed" in details
563-
)
564-
if self._shutdown.is_set() or benign:
565-
self._logger.debug(
566-
f"Ignoring orchestrator completion delivery error during shutdown/benign condition: {rpc_error}"
567-
)
568-
else:
569-
self._logger.exception(
570-
f"Failed to deliver orchestrator response for '{req.instanceId}' to sidecar: {rpc_error}"
571-
)
568+
self._handle_grpc_execution_error(rpc_error, "orchestrator")
572569
except Exception as ex:
573570
self._logger.exception(
574571
f"Failed to deliver orchestrator response for '{req.instanceId}' to sidecar: {ex}"
@@ -601,26 +598,7 @@ def _execute_activity(
601598
try:
602599
stub.CompleteActivityTask(res)
603600
except grpc.RpcError as rpc_error: # type: ignore
604-
# Treat common shutdown/termination races as benign to avoid noisy logs
605-
code = rpc_error.code() # type: ignore
606-
details = str(rpc_error)
607-
benign = code in {
608-
grpc.StatusCode.CANCELLED,
609-
grpc.StatusCode.UNAVAILABLE,
610-
grpc.StatusCode.UNKNOWN,
611-
} and (
612-
"unknown instance ID/task ID combo" in details
613-
or "Channel closed" in details
614-
or "Locally cancelled by application" in details
615-
)
616-
if self._shutdown.is_set() or benign:
617-
self._logger.debug(
618-
f"Ignoring activity completion delivery error during shutdown/benign condition: {rpc_error}"
619-
)
620-
else:
621-
self._logger.exception(
622-
f"Failed to deliver activity response for '{req.name}#{req.taskId}' of orchestration ID '{instance_id}' to sidecar: {rpc_error}"
623-
)
601+
self._handle_grpc_execution_error(rpc_error, "activity")
624602
except Exception as ex:
625603
self._logger.exception(
626604
f"Failed to deliver activity response for '{req.name}#{req.taskId}' of orchestration ID '{instance_id}' to sidecar: {ex}"

tests/durabletask/test_client.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from unittest.mock import MagicMock, patch
22

3+
from durabletask import client
34
from durabletask.internal.grpc_interceptor import DefaultClientInterceptorImpl
45
from durabletask.internal.shared import get_default_host_address, get_grpc_channel
56

@@ -149,8 +150,18 @@ def test_taskhub_client_close_handles_exceptions():
149150
mock_channel.close.side_effect = Exception("close failed")
150151
mock_get_channel.return_value = mock_channel
151152

152-
from durabletask import client
153-
154153
task_hub_client = client.TaskHubGrpcClient()
155154
# Should not raise exception
156155
task_hub_client.close()
156+
157+
158+
def test_taskhub_client_close_closes_channel_handles_exceptions():
159+
"""Test that close() closes the channel and handles exceptions gracefully."""
160+
with patch("durabletask.internal.shared.get_grpc_channel") as mock_get_channel:
161+
mock_channel = MagicMock()
162+
mock_channel.close.side_effect = Exception("close failed")
163+
mock_get_channel.return_value = mock_channel
164+
165+
task_hub_client = client.TaskHubGrpcClient()
166+
task_hub_client.close()
167+
mock_channel.close.assert_called_once()

tests/durabletask/test_orchestration_e2e.py

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ def empty_orchestrator(ctx: task.OrchestrationContext, _):
5858
with worker.TaskHubGrpcWorker(channel_options=channel_options) as w:
5959
w.add_orchestrator(empty_orchestrator)
6060
w.start()
61-
w.wait_for_ready(timeout=10)
6261

6362
# set a custom max send length option
6463
c = client.TaskHubGrpcClient(channel_options=channel_options)
@@ -98,7 +97,6 @@ def sequence(ctx: task.OrchestrationContext, start_val: int):
9897
w.add_orchestrator(sequence)
9998
w.add_activity(plus_one)
10099
w.start()
101-
w.wait_for_ready(timeout=10)
102100

103101
with client.TaskHubGrpcClient() as task_hub_client:
104102
id = task_hub_client.schedule_new_orchestration(sequence, input=1)
@@ -143,7 +141,6 @@ def orchestrator(ctx: task.OrchestrationContext, input: int):
143141
w.add_activity(throw)
144142
w.add_activity(increment_counter)
145143
w.start()
146-
w.wait_for_ready(timeout=10)
147144

148145
with client.TaskHubGrpcClient() as task_hub_client:
149146
id = task_hub_client.schedule_new_orchestration(orchestrator, input=1)
@@ -186,7 +183,6 @@ def parent_orchestrator(ctx: task.OrchestrationContext, count: int):
186183
w.add_orchestrator(orchestrator_child)
187184
w.add_orchestrator(parent_orchestrator)
188185
w.start()
189-
w.wait_for_ready(timeout=10)
190186

191187
with client.TaskHubGrpcClient() as task_hub_client:
192188
id = task_hub_client.schedule_new_orchestration(parent_orchestrator, input=10)
@@ -209,7 +205,6 @@ def orchestrator(ctx: task.OrchestrationContext, _):
209205
with worker.TaskHubGrpcWorker(stop_timeout=2.0) as w:
210206
w.add_orchestrator(orchestrator)
211207
w.start()
212-
w.wait_for_ready(timeout=10)
213208

214209
# Start the orchestration and immediately raise events to it.
215210
task_hub_client = client.TaskHubGrpcClient()
@@ -239,7 +234,6 @@ def orchestrator(ctx: task.OrchestrationContext, _):
239234
with worker.TaskHubGrpcWorker(stop_timeout=2.0) as w:
240235
w.add_orchestrator(orchestrator)
241236
w.start()
242-
w.wait_for_ready(timeout=10)
243237

244238
# Start the orchestration and immediately raise events to it.
245239
with client.TaskHubGrpcClient() as task_hub_client:
@@ -265,7 +259,7 @@ def orchestrator(ctx: task.OrchestrationContext, _):
265259
with worker.TaskHubGrpcWorker(stop_timeout=2.0) as w:
266260
w.add_orchestrator(orchestrator)
267261
w.start()
268-
w.wait_for_ready(timeout=10)
262+
269263
with client.TaskHubGrpcClient() as task_hub_client:
270264
id = task_hub_client.schedule_new_orchestration(orchestrator)
271265
state = task_hub_client.wait_for_orchestration_start(id, timeout=30)
@@ -304,7 +298,7 @@ def orchestrator(ctx: task.OrchestrationContext, _):
304298
with worker.TaskHubGrpcWorker(stop_timeout=2.0) as w:
305299
w.add_orchestrator(orchestrator)
306300
w.start()
307-
w.wait_for_ready(timeout=10)
301+
308302
with client.TaskHubGrpcClient() as task_hub_client:
309303
id = task_hub_client.schedule_new_orchestration(orchestrator)
310304
state = task_hub_client.wait_for_orchestration_start(id, timeout=30)
@@ -348,7 +342,7 @@ def parent_orchestrator(ctx: task.OrchestrationContext, count: int):
348342
w.add_orchestrator(orchestrator_child)
349343
w.add_orchestrator(parent_orchestrator)
350344
w.start()
351-
w.wait_for_ready(timeout=10)
345+
352346
with client.TaskHubGrpcClient() as task_hub_client:
353347
instance_id = task_hub_client.schedule_new_orchestration(
354348
parent_orchestrator, input=5
@@ -397,7 +391,6 @@ def orchestrator(ctx: task.OrchestrationContext, input: int):
397391
with worker.TaskHubGrpcWorker(stop_timeout=2.0) as w:
398392
w.add_orchestrator(orchestrator)
399393
w.start()
400-
w.wait_for_ready(timeout=10)
401394

402395
task_hub_client = client.TaskHubGrpcClient()
403396
id = task_hub_client.schedule_new_orchestration(orchestrator, input=0)
@@ -498,7 +491,6 @@ def throw_activity_with_retry(ctx: task.ActivityContext, _):
498491
w.add_orchestrator(child_orchestrator_with_retry)
499492
w.add_activity(throw_activity_with_retry)
500493
w.start()
501-
w.wait_for_ready(timeout=10)
502494

503495
task_hub_client = client.TaskHubGrpcClient()
504496
id = task_hub_client.schedule_new_orchestration(parent_orchestrator_with_retry)
@@ -529,7 +521,6 @@ def orchestrator_with_non_retryable(ctx: task.OrchestrationContext, _):
529521
w.add_orchestrator(orchestrator_with_non_retryable)
530522
w.add_activity(throw_non_retryable)
531523
w.start()
532-
w.wait_for_ready(timeout=10)
533524

534525
task_hub_client = client.TaskHubGrpcClient()
535526
id = task_hub_client.schedule_new_orchestration(orchestrator_with_non_retryable)
@@ -568,7 +559,6 @@ def throw_activity(ctx: task.ActivityContext, _):
568559
w.add_orchestrator(mock_orchestrator)
569560
w.add_activity(throw_activity)
570561
w.start()
571-
w.wait_for_ready(timeout=10)
572562

573563
task_hub_client = client.TaskHubGrpcClient()
574564
id = task_hub_client.schedule_new_orchestration(mock_orchestrator)

0 commit comments

Comments
 (0)