Skip to content

Commit ae3c2df

Browse files
committed
wire deterministic to main running context, increase coverage on worker tests, add reduce stop timeout for task runner
Signed-off-by: Filinto Duran <[email protected]>
1 parent 366b2f6 commit ae3c2df

File tree

8 files changed

+480
-115
lines changed

8 files changed

+480
-115
lines changed

Makefile

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,13 @@ test-unit:
77
test-e2e:
88
pytest -m e2e --verbose
99

10+
coverage-clean:
11+
rm -f .coverage .coverage.* coverage.xml
12+
13+
coverage-all: coverage-clean
14+
pytest -m "not e2e" --cov=durabletask --cov-branch --cov-report=term-missing --cov-report=xml
15+
pytest -m e2e --cov=durabletask --cov-branch --cov-report=term-missing --cov-report=xml --cov-append
16+
1017
install:
1118
python3 -m pip install .
1219

@@ -18,4 +25,4 @@ gen-proto:
1825
python3 -m grpc_tools.protoc --proto_path=. --python_out=. --pyi_out=. --grpc_python_out=. ./durabletask/internal/orchestrator_service.proto
1926
rm durabletask/internal/*.proto
2027

21-
.PHONY: init test-unit test-e2e gen-proto install
28+
.PHONY: init test-unit test-e2e coverage-clean coverage-unit coverage-e2e coverage-all gen-proto install

durabletask/deterministic.py

Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
import uuid
2626
from collections.abc import Sequence
2727
from dataclasses import dataclass
28-
from datetime import datetime
29-
from typing import Optional, Protocol, TypeVar, runtime_checkable
28+
from datetime import datetime, timedelta
29+
from typing import Optional, TypeVar
3030

3131

3232
@dataclass
@@ -99,17 +99,6 @@ def deterministic_uuid_v5(instance_id: str, current_datetime: datetime, counter:
9999
return uuid.uuid5(namespace, name)
100100

101101

102-
@runtime_checkable
103-
class DeterministicContextProtocol(Protocol):
104-
"""Protocol for contexts that provide deterministic operations."""
105-
106-
@property
107-
def instance_id(self) -> str: ...
108-
109-
@property
110-
def current_utc_datetime(self) -> datetime: ...
111-
112-
113102
class DeterministicContextMixin:
114103
"""
115104
Mixin providing deterministic helpers for workflow contexts.
@@ -121,25 +110,25 @@ class DeterministicContextMixin:
121110
"""
122111

123112
def __init__(self, *args, **kwargs):
124-
"""Initialize the mixin with a UUID counter."""
113+
"""Initialize the mixin with UUID and timestamp counters."""
125114
super().__init__(*args, **kwargs)
126115
# Counter for deterministic UUID generation (matches .NET newGuidCounter)
127116
# This counter resets to 0 on each replay, ensuring determinism
128117
self._uuid_counter: int = 0
118+
# Counter for deterministic timestamp sequencing (resets on replay)
119+
self._timestamp_counter: int = 0
129120

130121
def now(self) -> datetime:
131-
"""Return orchestration time (deterministic UTC)."""
132-
value = self.current_utc_datetime # type: ignore[attr-defined]
133-
assert isinstance(value, datetime)
134-
return value
122+
"""Alias for deterministic current_utc_datetime."""
123+
return self.current_utc_datetime # type: ignore[attr-defined]
135124

136125
def random(self) -> random.Random:
137126
"""Return a PRNG seeded deterministically from instance id and orchestration time."""
138127
rnd = deterministic_random(
139128
self.instance_id, # type: ignore[attr-defined]
140129
self.current_utc_datetime, # type: ignore[attr-defined]
141130
)
142-
# Mark as deterministic for sandbox detector whitelisting of bound methods
131+
# Mark as deterministic for asyncio sandbox detector whitelisting of bound methods (randint, random)
143132
try:
144133
rnd._dt_deterministic = True
145134
except Exception:
@@ -201,3 +190,35 @@ def random_choice(self, sequence: Sequence[T]) -> T:
201190
raise IndexError("Cannot choose from empty sequence")
202191
rnd = self.random()
203192
return rnd.choice(sequence)
193+
194+
def now_with_sequence(self) -> datetime:
195+
"""
196+
Return deterministic timestamp with microsecond increment per call.
197+
198+
Each call returns: current_utc_datetime + (counter * 1 microsecond)
199+
200+
This provides ordered, unique timestamps for tracing/telemetry while maintaining
201+
determinism across replays. The counter resets to 0 on each replay (similar to
202+
_uuid_counter pattern).
203+
204+
Perfect for preserving event ordering within a workflow without requiring activities.
205+
206+
Returns:
207+
datetime: Deterministic timestamp that increments on each call
208+
209+
Example:
210+
```python
211+
def workflow(ctx):
212+
t1 = ctx.now_with_sequence() # 2024-01-01 12:00:00.000000
213+
result = yield ctx.call_activity(some_activity, input="data")
214+
t2 = ctx.now_with_sequence() # 2024-01-01 12:00:00.000001
215+
# t1 < t2, preserving order for telemetry
216+
```
217+
"""
218+
offset = timedelta(microseconds=self._timestamp_counter)
219+
self._timestamp_counter += 1
220+
return self.current_utc_datetime + offset # type: ignore[attr-defined]
221+
222+
def current_utc_datetime_with_sequence(self):
223+
"""Alias for now_with_sequence for API parity with other SDKs."""
224+
return self.now_with_sequence()

durabletask/worker.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import durabletask.internal.orchestrator_service_pb2 as pb
2020
import durabletask.internal.orchestrator_service_pb2_grpc as stubs
2121
import durabletask.internal.shared as shared
22-
from durabletask import task
22+
from durabletask import deterministic, task
2323
from durabletask.internal.grpc_interceptor import DefaultClientInterceptorImpl
2424

2525
TInput = TypeVar("TInput")
@@ -159,6 +159,8 @@ class TaskHubGrpcWorker:
159159
interceptors to apply to the channel. Defaults to None.
160160
concurrency_options (Optional[ConcurrencyOptions], optional): Configuration for
161161
controlling worker concurrency limits. If None, default settings are used.
162+
stop_timeout (float, optional): Maximum time in seconds to wait for the worker thread
163+
to stop when calling stop(). Defaults to 30.0. Useful to set lower values in tests.
162164
163165
Attributes:
164166
concurrency_options (ConcurrencyOptions): The current concurrency configuration.
@@ -224,6 +226,7 @@ def __init__(
224226
interceptors: Optional[Sequence[shared.ClientInterceptor]] = None,
225227
concurrency_options: Optional[ConcurrencyOptions] = None,
226228
channel_options: Optional[Sequence[tuple[str, Any]]] = None,
229+
stop_timeout: float = 30.0,
227230
):
228231
self._registry = _Registry()
229232
self._host_address = host_address if host_address else shared.get_default_host_address()
@@ -232,6 +235,7 @@ def __init__(
232235
self._is_running = False
233236
self._secure_channel = secure_channel
234237
self._channel_options = channel_options
238+
self._stop_timeout = stop_timeout
235239
# Track in-flight activity executions for graceful draining
236240
import threading as _threading
237241

@@ -512,7 +516,7 @@ def stop(self):
512516
if self._response_stream is not None:
513517
self._response_stream.cancel()
514518
if self._runLoop is not None:
515-
self._runLoop.join(timeout=30)
519+
self._runLoop.join(timeout=self._stop_timeout)
516520
self._async_worker_manager.shutdown()
517521
self._logger.info("Worker shutdown completed")
518522
self._is_running = False
@@ -659,11 +663,12 @@ def _execute_activity(
659663
)
660664

661665

662-
class _RuntimeOrchestrationContext(task.OrchestrationContext):
666+
class _RuntimeOrchestrationContext(task.OrchestrationContext, deterministic.DeterministicContextMixin):
663667
_generator: Optional[Generator[task.Task, Any, Any]]
664668
_previous_task: Optional[task.Task]
665669

666670
def __init__(self, instance_id: str):
671+
super().__init__()
667672
self._generator = None
668673
self._is_replaying = True
669674
self._is_suspended = False

0 commit comments

Comments
 (0)