Skip to content

Commit 9c89fe4

Browse files
committed
feat(flagd): add graceful attempts
Signed-off-by: Simon Schrottner <[email protected]>
1 parent c74d6ad commit 9c89fe4

File tree

7 files changed

+80
-39
lines changed

7 files changed

+80
-39
lines changed

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import dataclasses
12
import os
23
import typing
34
from enum import Enum
@@ -19,10 +20,13 @@ class CacheType(Enum):
1920
DEFAULT_HOST = "localhost"
2021
DEFAULT_KEEP_ALIVE = 0
2122
DEFAULT_OFFLINE_SOURCE_PATH: typing.Optional[str] = None
23+
DEFAULT_OFFLINE_POLL_MS = 5000
2224
DEFAULT_PORT_IN_PROCESS = 8015
2325
DEFAULT_PORT_RPC = 8013
2426
DEFAULT_RESOLVER_TYPE = ResolverType.RPC
2527
DEFAULT_RETRY_BACKOFF = 1000
28+
DEFAULT_RETRY_BACKOFF_MAX = 120000
29+
DEFAULT_RETRY_GRACE_ATTEMPTS = 5
2630
DEFAULT_STREAM_DEADLINE = 600000
2731
DEFAULT_TLS = False
2832

@@ -32,9 +36,12 @@ class CacheType(Enum):
3236
ENV_VAR_HOST = "FLAGD_HOST"
3337
ENV_VAR_KEEP_ALIVE_TIME_MS = "FLAGD_KEEP_ALIVE_TIME_MS"
3438
ENV_VAR_OFFLINE_FLAG_SOURCE_PATH = "FLAGD_OFFLINE_FLAG_SOURCE_PATH"
39+
ENV_VAR_OFFLINE_POLL_MS = "FLAGD_OFFLINE_POLL_MS"
3540
ENV_VAR_PORT = "FLAGD_PORT"
3641
ENV_VAR_RESOLVER_TYPE = "FLAGD_RESOLVER"
3742
ENV_VAR_RETRY_BACKOFF_MS = "FLAGD_RETRY_BACKOFF_MS"
43+
ENV_VAR_RETRY_BACKOFF_MAX_MS = "FLAGD_RETRY_BACKOFF_MAX_MS"
44+
ENV_VAR_RETRY_GRACE_ATTEMPTS = "FLAGD_RETRY_GRACE_ATTEMPTS"
3845
ENV_VAR_STREAM_DEADLINE_MS = "FLAGD_STREAM_DEADLINE_MS"
3946
ENV_VAR_TLS = "FLAGD_TLS"
4047

@@ -62,6 +69,7 @@ def env_or_default(
6269
return val if cast is None else cast(val)
6370

6471

72+
@dataclasses.dataclass
6573
class Config:
6674
def __init__( # noqa: PLR0913
6775
self,
@@ -70,7 +78,10 @@ def __init__( # noqa: PLR0913
7078
tls: typing.Optional[bool] = None,
7179
resolver_type: typing.Optional[ResolverType] = None,
7280
offline_flag_source_path: typing.Optional[str] = None,
81+
offline_poll_ms: typing.Optional[int] = None,
7382
retry_backoff_ms: typing.Optional[int] = None,
83+
retry_backoff_max_ms: typing.Optional[int] = None,
84+
retry_grace_attempts: typing.Optional[int] = None,
7485
deadline: typing.Optional[int] = None,
7586
stream_deadline_ms: typing.Optional[int] = None,
7687
keep_alive: typing.Optional[int] = None,
@@ -94,6 +105,25 @@ def __init__( # noqa: PLR0913
94105
if retry_backoff_ms is None
95106
else retry_backoff_ms
96107
)
108+
self.retry_backoff_max_ms: int = (
109+
int(
110+
env_or_default(
111+
ENV_VAR_RETRY_BACKOFF_MAX_MS, DEFAULT_RETRY_BACKOFF_MAX, cast=int
112+
)
113+
)
114+
if retry_backoff_max_ms is None
115+
else retry_backoff_max_ms
116+
)
117+
118+
self.retry_grace_attempts: int = (
119+
int(
120+
env_or_default(
121+
ENV_VAR_RETRY_GRACE_ATTEMPTS, DEFAULT_RETRY_GRACE_ATTEMPTS, cast=int
122+
)
123+
)
124+
if retry_grace_attempts is None
125+
else retry_grace_attempts
126+
)
97127

98128
self.resolver_type = (
99129
env_or_default(
@@ -123,6 +153,16 @@ def __init__( # noqa: PLR0913
123153
else offline_flag_source_path
124154
)
125155

156+
self.offline_poll_ms: int = (
157+
int(
158+
env_or_default(
159+
ENV_VAR_OFFLINE_POLL_MS, DEFAULT_OFFLINE_POLL_MS, cast=int
160+
)
161+
)
162+
if offline_poll_ms is None
163+
else offline_poll_ms
164+
)
165+
126166
self.deadline: int = (
127167
int(env_or_default(ENV_VAR_DEADLINE_MS, DEFAULT_DEADLINE, cast=int))
128168
if deadline is None

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ def __init__( # noqa: PLR0913
5252
keep_alive_time: typing.Optional[int] = None,
5353
cache_type: typing.Optional[CacheType] = None,
5454
max_cache_size: typing.Optional[int] = None,
55+
retry_backoff_max_ms: typing.Optional[int] = None,
56+
retry_grace_attempts: typing.Optional[int] = None,
5557
):
5658
"""
5759
Create an instance of the FlagdProvider
@@ -81,6 +83,8 @@ def __init__( # noqa: PLR0913
8183
tls=tls,
8284
deadline=deadline,
8385
retry_backoff_ms=retry_backoff_ms,
86+
retry_backoff_max_ms=retry_backoff_max_ms,
87+
retry_grace_attempts=retry_grace_attempts,
8488
resolver_type=resolver_type,
8589
offline_flag_source_path=offline_flag_source_path,
8690
stream_deadline_ms=stream_deadline_ms,
@@ -97,6 +101,7 @@ def setup_resolver(self) -> AbstractResolver:
97101
self.config,
98102
self.emit_provider_ready,
99103
self.emit_provider_error,
104+
self.emit_provider_stale,
100105
self.emit_provider_configuration_changed,
101106
)
102107
elif self.config.resolver_type == ResolverType.IN_PROCESS:

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,20 +37,20 @@
3737

3838

3939
class GrpcResolver:
40-
MAX_BACK_OFF = 120
41-
4240
def __init__(
4341
self,
4442
config: Config,
4543
emit_provider_ready: typing.Callable[[ProviderEventDetails], None],
4644
emit_provider_error: typing.Callable[[ProviderEventDetails], None],
45+
emit_provider_stale: typing.Callable[[ProviderEventDetails], None],
4746
emit_provider_configuration_changed: typing.Callable[
4847
[ProviderEventDetails], None
4948
],
5049
):
5150
self.config = config
5251
self.emit_provider_ready = emit_provider_ready
5352
self.emit_provider_error = emit_provider_error
53+
self.emit_provider_stale = emit_provider_stale
5454
self.emit_provider_configuration_changed = emit_provider_configuration_changed
5555
self.cache: typing.Optional[BaseCacheImpl] = (
5656
LRUCache(maxsize=self.config.max_cache_size)
@@ -59,6 +59,8 @@ def __init__(
5959
)
6060
self.stub, self.channel = self._create_stub()
6161
self.retry_backoff_seconds = config.retry_backoff_ms * 0.001
62+
self.retry_backoff_max_seconds = config.retry_backoff_ms * 0.001
63+
self.retry_grace_attempts = config.retry_grace_attempts
6264
self.streamline_deadline_seconds = config.stream_deadline_ms * 0.001
6365
self.deadline = config.deadline * 0.001
6466
self.connected = False
@@ -74,9 +76,6 @@ def _create_stub(
7476
)
7577
stub = evaluation_pb2_grpc.ServiceStub(channel)
7678

77-
if self.cache:
78-
self.cache.clear()
79-
8079
return stub, channel
8180

8281
def initialize(self, evaluation_context: EvaluationContext) -> None:
@@ -113,8 +112,10 @@ def listen(self) -> None:
113112
if self.streamline_deadline_seconds > 0
114113
else {}
115114
)
115+
retry_counter = 0
116116
while self.active:
117117
request = evaluation_pb2.EventStreamRequest()
118+
118119
try:
119120
logger.debug("Setting up gRPC sync flags connection")
120121
for message in self.stub.EventStream(request, **call_args):
@@ -126,6 +127,7 @@ def listen(self) -> None:
126127
)
127128
)
128129
self.connected = True
130+
retry_counter = 0
129131
# reset retry delay after successsful read
130132
retry_delay = self.retry_backoff_seconds
131133

@@ -146,15 +148,37 @@ def listen(self) -> None:
146148
)
147149

148150
self.connected = False
151+
self.handle_error(retry_counter, retry_delay)
152+
153+
retry_delay = self.handle_retry(retry_counter, retry_delay)
154+
155+
retry_counter = retry_counter + 1
156+
157+
def handle_retry(self, retry_counter: int, retry_delay: float) -> float:
158+
if retry_counter == 0:
159+
logger.info("gRPC sync disconnected, reconnecting immediately")
160+
else:
161+
logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s")
162+
time.sleep(retry_delay)
163+
retry_delay = min(1.1 * retry_delay, self.retry_backoff_max_seconds)
164+
return retry_delay
165+
166+
def handle_error(self, retry_counter: int, retry_delay: float) -> None:
167+
if retry_counter == self.retry_grace_attempts:
168+
if self.cache:
169+
self.cache.clear()
149170
self.emit_provider_error(
150171
ProviderEventDetails(
151172
message=f"gRPC sync disconnected, reconnecting in {retry_delay}s",
152173
error_code=ErrorCode.GENERAL,
153174
)
154175
)
155-
logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s")
156-
time.sleep(retry_delay)
157-
retry_delay = min(1.1 * retry_delay, self.MAX_BACK_OFF)
176+
elif retry_counter == 1:
177+
self.emit_provider_stale(
178+
ProviderEventDetails(
179+
message=f"gRPC sync disconnected, reconnecting in {retry_delay}s",
180+
)
181+
)
158182

159183
def handle_changed_flags(self, data: typing.Any) -> None:
160184
changed_flags = list(data["flags"].keys())

providers/openfeature-provider-flagd/tests/e2e/events.feature

Lines changed: 0 additions & 29 deletions
This file was deleted.

providers/openfeature-provider-flagd/tests/e2e/steps.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -591,7 +591,7 @@ def assert_handlers(
591591
)
592592
)
593593
def assert_handler_run(event_type: ProviderEvent, event_handles):
594-
assert_handlers(event_handles, event_type, max_wait=6)
594+
assert_handlers(event_handles, event_type, max_wait=30)
595595

596596

597597
@then(

providers/openfeature-provider-flagd/tests/e2e/test_rpc_reconnect.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,5 @@ def image():
2727

2828
scenarios(
2929
f"{TEST_HARNESS_PATH}/gherkin/flagd-reconnect.feature",
30+
f"{TEST_HARNESS_PATH}/gherkin/events.feature",
3031
)

0 commit comments

Comments
 (0)