Skip to content

Commit ae80e4b

Browse files
committed
feat: better reconnect gherkins
Signed-off-by: Simon Schrottner <[email protected]>
1 parent 38cf9b9 commit ae80e4b

File tree

7 files changed

+161
-109
lines changed

7 files changed

+161
-109
lines changed

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class CacheType(Enum):
2626
DEFAULT_RESOLVER_TYPE = ResolverType.RPC
2727
DEFAULT_RETRY_BACKOFF = 1000
2828
DEFAULT_RETRY_BACKOFF_MAX = 120000
29-
DEFAULT_RETRY_GRACE_ATTEMPTS = 5
29+
DEFAULT_RETRY_GRACE_PERIOD = 5
3030
DEFAULT_STREAM_DEADLINE = 600000
3131
DEFAULT_TLS = False
3232

@@ -41,7 +41,7 @@ class CacheType(Enum):
4141
ENV_VAR_RESOLVER_TYPE = "FLAGD_RESOLVER"
4242
ENV_VAR_RETRY_BACKOFF_MS = "FLAGD_RETRY_BACKOFF_MS"
4343
ENV_VAR_RETRY_BACKOFF_MAX_MS = "FLAGD_RETRY_BACKOFF_MAX_MS"
44-
ENV_VAR_RETRY_GRACE_ATTEMPTS = "FLAGD_RETRY_GRACE_ATTEMPTS"
44+
ENV_VAR_RETRY_GRACE_PERIOD = "FLAGD_RETRY_GRACE_PERIOD"
4545
ENV_VAR_STREAM_DEADLINE_MS = "FLAGD_STREAM_DEADLINE_MS"
4646
ENV_VAR_TLS = "FLAGD_TLS"
4747

@@ -81,7 +81,7 @@ def __init__( # noqa: PLR0913
8181
offline_poll_interval_ms: typing.Optional[int] = None,
8282
retry_backoff_ms: typing.Optional[int] = None,
8383
retry_backoff_max_ms: typing.Optional[int] = None,
84-
retry_grace_attempts: typing.Optional[int] = None,
84+
retry_grace_period: typing.Optional[int] = None,
8585
deadline_ms: typing.Optional[int] = None,
8686
stream_deadline_ms: typing.Optional[int] = None,
8787
keep_alive_time: typing.Optional[int] = None,
@@ -115,14 +115,14 @@ def __init__( # noqa: PLR0913
115115
else retry_backoff_max_ms
116116
)
117117

118-
self.retry_grace_attempts: int = (
118+
self.retry_grace_period: int = (
119119
int(
120120
env_or_default(
121-
ENV_VAR_RETRY_GRACE_ATTEMPTS, DEFAULT_RETRY_GRACE_ATTEMPTS, cast=int
121+
ENV_VAR_RETRY_GRACE_PERIOD, DEFAULT_RETRY_GRACE_PERIOD, cast=int
122122
)
123123
)
124-
if retry_grace_attempts is None
125-
else retry_grace_attempts
124+
if retry_grace_period is None
125+
else retry_grace_period
126126
)
127127

128128
self.resolver = (

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def __init__( # noqa: PLR0913
5353
cache_type: typing.Optional[CacheType] = None,
5454
max_cache_size: typing.Optional[int] = None,
5555
retry_backoff_max_ms: typing.Optional[int] = None,
56-
retry_grace_attempts: typing.Optional[int] = None,
56+
retry_grace_period: typing.Optional[int] = None,
5757
):
5858
"""
5959
Create an instance of the FlagdProvider
@@ -84,7 +84,7 @@ def __init__( # noqa: PLR0913
8484
deadline_ms=deadline,
8585
retry_backoff_ms=retry_backoff_ms,
8686
retry_backoff_max_ms=retry_backoff_max_ms,
87-
retry_grace_attempts=retry_grace_attempts,
87+
retry_grace_period=retry_grace_period,
8888
resolver=resolver_type,
8989
offline_flag_source_path=offline_flag_source_path,
9090
stream_deadline_ms=stream_deadline_ms,

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py

Lines changed: 101 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
import logging
23
import threading
34
import time
@@ -7,6 +8,7 @@
78
from cachebox import BaseCacheImpl, LRUCache
89
from google.protobuf.json_format import MessageToDict
910
from google.protobuf.struct_pb2 import Struct
11+
from grpc import ChannelConnectivity
1012

1113
from openfeature.evaluation_context import EvaluationContext
1214
from openfeature.event import ProviderEventDetails
@@ -47,6 +49,7 @@ def __init__(
4749
[ProviderEventDetails], None
4850
],
4951
):
52+
self.active = False
5053
self.config = config
5154
self.emit_provider_ready = emit_provider_ready
5255
self.emit_provider_error = emit_provider_error
@@ -57,26 +60,50 @@ def __init__(
5760
if self.config.cache == CacheType.LRU
5861
else None
5962
)
60-
self.stub, self.channel = self._create_stub()
61-
self.retry_backoff_seconds = config.retry_backoff_ms * 0.001
62-
self.retry_backoff_max_seconds = config.retry_backoff_max_ms * 0.001
63-
self.retry_grace_attempts = config.retry_grace_attempts
63+
64+
retry_backoff_seconds = config.retry_backoff_ms * 0.001
65+
retry_backoff_max_seconds = config.retry_backoff_max_ms * 0.001
66+
self.retry_grace_period = config.retry_grace_period
6467
self.streamline_deadline_seconds = config.stream_deadline_ms * 0.001
6568
self.deadline = config.deadline_ms * 0.001
6669
self.connected = False
67-
68-
def _create_stub(
69-
self,
70-
) -> typing.Tuple[evaluation_pb2_grpc.ServiceStub, grpc.Channel]:
71-
config = self.config
7270
channel_factory = grpc.secure_channel if config.tls else grpc.insecure_channel
73-
channel = channel_factory(
71+
service_config = {
72+
"methodConfig": [
73+
{
74+
"name": [
75+
{
76+
"service": "flagd.evaluation.v1.Service",
77+
"method": "EventStream",
78+
}
79+
],
80+
"retryPolicy": {
81+
"maxAttempts": 50000, # Max value for a 32-bit integer
82+
"initialBackoff": f"{retry_backoff_seconds}s", # Initial backoff delay
83+
"maxBackoff": f"{retry_backoff_max_seconds}s", # Maximum backoff delay
84+
"backoffMultiplier": 2, # Exponential backoff multiplier
85+
"retryableStatusCodes": [
86+
"UNAVAILABLE",
87+
"UNKNOWN",
88+
], # Retry on these statuses
89+
},
90+
}
91+
],
92+
}
93+
94+
# Create the channel with the service config
95+
options = [
96+
("grpc.service_config", json.dumps(service_config)),
97+
("grpc.keepalive_time_ms", config.keep_alive_time),
98+
]
99+
self.channel = channel_factory(
74100
f"{config.host}:{config.port}",
75-
options=(("grpc.keepalive_time_ms", config.keep_alive_time),),
101+
options=options,
76102
)
77-
stub = evaluation_pb2_grpc.ServiceStub(channel)
103+
self.stub = evaluation_pb2_grpc.ServiceStub(self.channel)
78104

79-
return stub, channel
105+
self.thread: typing.Optional[threading.Thread] = None
106+
self.timer: typing.Optional[threading.Timer] = None
80107

81108
def initialize(self, evaluation_context: EvaluationContext) -> None:
82109
self.connect()
@@ -89,11 +116,12 @@ def shutdown(self) -> None:
89116

90117
def connect(self) -> None:
91118
self.active = True
92-
self.thread = threading.Thread(
93-
target=self.listen, daemon=True, name="FlagdGrpcServiceWorkerThread"
94-
)
95-
self.thread.start()
96119

120+
# Run monitoring in a separate thread
121+
self.monitor_thread = threading.Thread(
122+
target=self.monitor, daemon=True, name="FlagdGrpcServiceMonitorThread"
123+
)
124+
self.monitor_thread.start()
97125
## block until ready or deadline reached
98126
timeout = self.deadline + time.time()
99127
while not self.connected and time.time() < timeout:
@@ -105,81 +133,81 @@ def connect(self) -> None:
105133
"Blocking init finished before data synced. Consider increasing startup deadline to avoid inconsistent evaluations."
106134
)
107135

136+
def monitor(self):
137+
def state_change_callback(new_state: ChannelConnectivity):
138+
logger.debug(f"gRPC state change: {new_state}")
139+
if new_state == ChannelConnectivity.READY:
140+
if not self.thread or not self.thread.is_alive():
141+
self.thread = threading.Thread(
142+
target=self.listen,
143+
daemon=True,
144+
name="FlagdGrpcServiceWorkerThread",
145+
)
146+
self.thread.start()
147+
148+
if self.timer and self.timer.is_alive():
149+
logger.debug("gRPC error timer expired")
150+
self.timer.cancel()
151+
152+
elif new_state == ChannelConnectivity.TRANSIENT_FAILURE:
153+
# this is the failed reonnect attempt so we are going into stale
154+
self.emit_provider_stale(
155+
ProviderEventDetails(
156+
message="gRPC sync disconnected, reconnecting",
157+
)
158+
)
159+
# adding a timer, so we can emit the error event after time
160+
self.timer = threading.Timer(self.retry_grace_period, self.emit_error)
161+
162+
logger.debug("gRPC error timer started")
163+
self.timer.start()
164+
self.connected = False
165+
166+
self.channel.subscribe(state_change_callback, try_to_connect=True)
167+
168+
def emit_error(self) -> None:
169+
logger.debug("gRPC error emitted")
170+
if self.cache:
171+
self.cache.clear()
172+
self.emit_provider_error(
173+
ProviderEventDetails(
174+
message="gRPC sync disconnected, reconnecting",
175+
error_code=ErrorCode.GENERAL,
176+
)
177+
)
178+
108179
def listen(self) -> None:
109-
retry_delay = self.retry_backoff_seconds
180+
logger.info("gRPC starting listener thread")
110181
call_args = (
111182
{"timeout": self.streamline_deadline_seconds}
112183
if self.streamline_deadline_seconds > 0
113184
else {}
114185
)
115-
retry_counter = 0
116-
while self.active:
117-
request = evaluation_pb2.EventStreamRequest()
186+
request = evaluation_pb2.EventStreamRequest()
118187

188+
# defining a never ending loop to recreate the stream
189+
while self.active:
119190
try:
120-
logger.debug("Setting up gRPC sync flags connection")
191+
logger.info("Setting up gRPC sync flags connection")
121192
for message in self.stub.EventStream(request, **call_args):
122193
if message.type == "provider_ready":
123-
if not self.connected:
124-
self.emit_provider_ready(
125-
ProviderEventDetails(
126-
message="gRPC sync connection established"
127-
)
194+
self.connected = True
195+
self.emit_provider_ready(
196+
ProviderEventDetails(
197+
message="gRPC sync connection established"
128198
)
129-
self.connected = True
130-
retry_counter = 0
131-
# reset retry delay after successsful read
132-
retry_delay = self.retry_backoff_seconds
133-
199+
)
134200
elif message.type == "configuration_change":
135201
data = MessageToDict(message)["data"]
136202
self.handle_changed_flags(data)
137-
138-
if not self.active:
139-
logger.info("Terminating gRPC sync thread")
140-
return
141-
except grpc.RpcError as e:
142-
logger.error(f"SyncFlags stream error, {e.code()=} {e.details()=}")
143-
# re-create the stub if there's a connection issue - otherwise reconnect does not work as expected
144-
self.stub, self.channel = self._create_stub()
203+
except grpc.RpcError as e: # noqa: PERF203
204+
# although it seems like this error log is not interesting, without it, the retry is not working as expected
205+
logger.debug(f"SyncFlags stream error, {e.code()=} {e.details()=}")
145206
except ParseError:
146207
logger.exception(
147208
f"Could not parse flag data using flagd syntax: {message=}"
148209
)
149210

150-
self.connected = False
151-
self.on_connection_error(retry_counter, retry_delay)
152-
153-
retry_delay = self.handle_retry(retry_counter, retry_delay)
154-
155-
retry_counter = retry_counter + 1
156-
157-
def handle_retry(self, retry_counter: int, retry_delay: float) -> float:
158-
if retry_counter == 0:
159-
logger.info("gRPC sync disconnected, reconnecting immediately")
160-
else:
161-
logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s")
162-
time.sleep(retry_delay)
163-
retry_delay = min(1.1 * retry_delay, self.retry_backoff_max_seconds)
164-
return retry_delay
165-
166-
def on_connection_error(self, retry_counter: int, retry_delay: float) -> None:
167-
if retry_counter == self.retry_grace_attempts:
168-
if self.cache:
169-
self.cache.clear()
170-
self.emit_provider_error(
171-
ProviderEventDetails(
172-
message=f"gRPC sync disconnected, reconnecting in {retry_delay}s",
173-
error_code=ErrorCode.GENERAL,
174-
)
175-
)
176-
elif retry_counter == 1:
177-
self.emit_provider_stale(
178-
ProviderEventDetails(
179-
message=f"gRPC sync disconnected, reconnecting in {retry_delay}s",
180-
)
181-
)
182-
183211
def handle_changed_flags(self, data: typing.Any) -> None:
184212
changed_flags = list(data["flags"].keys())
185213

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
import typing
22

3-
import pytest
4-
from testcontainers.core.container import DockerContainer
5-
from tests.e2e.flagd_container import FlagdContainer
63
from tests.e2e.steps import * # noqa: F403
74

85
JsonPrimitive = typing.Union[str, bool, float, int]
@@ -18,21 +15,3 @@ def pytest_collection_modifyitems(config):
1815
# this seems to not work with python 3.8
1916
if hasattr(config.option, "markexpr") and config.option.markexpr == "":
2017
config.option.markexpr = marker
21-
22-
23-
@pytest.fixture(autouse=True, scope="module")
24-
def setup(request, port, image):
25-
container: DockerContainer = FlagdContainer(
26-
image=image,
27-
port=port,
28-
)
29-
# Setup code
30-
c = container.start()
31-
32-
def fin():
33-
c.stop()
34-
35-
# Teardown code
36-
request.addfinalizer(fin)
37-
38-
return c.get_exposed_port(port)

0 commit comments

Comments
 (0)