diff --git a/providers/openfeature-provider-flagd/openfeature/test-harness b/providers/openfeature-provider-flagd/openfeature/test-harness index e908fb7d..706a7e95 160000 --- a/providers/openfeature-provider-flagd/openfeature/test-harness +++ b/providers/openfeature-provider-flagd/openfeature/test-harness @@ -1 +1 @@ -Subproject commit e908fb7d19def6a4768ca90b02665075bbc1afbb +Subproject commit 706a7e951bb72a145523b38fe83060becc34c4d7 diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py index bcd4da85..1eb37463 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py @@ -26,7 +26,7 @@ class CacheType(Enum): DEFAULT_RESOLVER_TYPE = ResolverType.RPC DEFAULT_RETRY_BACKOFF = 1000 DEFAULT_RETRY_BACKOFF_MAX = 120000 -DEFAULT_RETRY_GRACE_ATTEMPTS = 5 +DEFAULT_RETRY_GRACE_PERIOD_SECONDS = 5 DEFAULT_STREAM_DEADLINE = 600000 DEFAULT_TLS = False @@ -41,7 +41,7 @@ class CacheType(Enum): ENV_VAR_RESOLVER_TYPE = "FLAGD_RESOLVER" ENV_VAR_RETRY_BACKOFF_MS = "FLAGD_RETRY_BACKOFF_MS" ENV_VAR_RETRY_BACKOFF_MAX_MS = "FLAGD_RETRY_BACKOFF_MAX_MS" -ENV_VAR_RETRY_GRACE_ATTEMPTS = "FLAGD_RETRY_GRACE_ATTEMPTS" +ENV_VAR_RETRY_GRACE_PERIOD_SECONDS = "FLAGD_RETRY_GRACE_PERIOD" ENV_VAR_STREAM_DEADLINE_MS = "FLAGD_STREAM_DEADLINE_MS" ENV_VAR_TLS = "FLAGD_TLS" @@ -81,7 +81,7 @@ def __init__( # noqa: PLR0913 offline_poll_interval_ms: typing.Optional[int] = None, retry_backoff_ms: typing.Optional[int] = None, retry_backoff_max_ms: typing.Optional[int] = None, - retry_grace_attempts: typing.Optional[int] = None, + retry_grace_period: typing.Optional[int] = None, deadline_ms: typing.Optional[int] = None, stream_deadline_ms: typing.Optional[int] = None, keep_alive_time: typing.Optional[int] = None, @@ -115,14 +115,16 @@ def __init__( # noqa: PLR0913 else retry_backoff_max_ms ) - self.retry_grace_attempts: int = ( + self.retry_grace_period: int = ( int( env_or_default( - ENV_VAR_RETRY_GRACE_ATTEMPTS, DEFAULT_RETRY_GRACE_ATTEMPTS, cast=int + ENV_VAR_RETRY_GRACE_PERIOD_SECONDS, + DEFAULT_RETRY_GRACE_PERIOD_SECONDS, + cast=int, ) ) - if retry_grace_attempts is None - else retry_grace_attempts + if retry_grace_period is None + else retry_grace_period ) self.resolver = ( diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py index 07e148e1..dd8beeab 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py @@ -53,7 +53,7 @@ def __init__( # noqa: PLR0913 cache_type: typing.Optional[CacheType] = None, max_cache_size: typing.Optional[int] = None, retry_backoff_max_ms: typing.Optional[int] = None, - retry_grace_attempts: typing.Optional[int] = None, + retry_grace_period: typing.Optional[int] = None, ): """ Create an instance of the FlagdProvider @@ -84,7 +84,7 @@ def __init__( # noqa: PLR0913 deadline_ms=deadline, retry_backoff_ms=retry_backoff_ms, retry_backoff_max_ms=retry_backoff_max_ms, - retry_grace_attempts=retry_grace_attempts, + retry_grace_period=retry_grace_period, resolver=resolver_type, offline_flag_source_path=offline_flag_source_path, stream_deadline_ms=stream_deadline_ms, diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py index 8dfda518..5841912c 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py @@ -7,6 +7,7 @@ from cachebox import BaseCacheImpl, LRUCache from google.protobuf.json_format import MessageToDict from google.protobuf.struct_pb2 import Struct +from grpc import ChannelConnectivity from openfeature.evaluation_context import EvaluationContext from openfeature.event import ProviderEventDetails @@ -47,6 +48,7 @@ def __init__( [ProviderEventDetails], None ], ): + self.active = False self.config = config self.emit_provider_ready = emit_provider_ready self.emit_provider_error = emit_provider_error @@ -57,26 +59,30 @@ def __init__( if self.config.cache == CacheType.LRU else None ) - self.stub, self.channel = self._create_stub() - self.retry_backoff_seconds = config.retry_backoff_ms * 0.001 - self.retry_backoff_max_seconds = config.retry_backoff_max_ms * 0.001 - self.retry_grace_attempts = config.retry_grace_attempts + + self.retry_grace_period = config.retry_grace_period self.streamline_deadline_seconds = config.stream_deadline_ms * 0.001 self.deadline = config.deadline_ms * 0.001 self.connected = False - - def _create_stub( - self, - ) -> typing.Tuple[evaluation_pb2_grpc.ServiceStub, grpc.Channel]: - config = self.config channel_factory = grpc.secure_channel if config.tls else grpc.insecure_channel - channel = channel_factory( + + # Create the channel with the service config + options = [ + ("grpc.keepalive_time_ms", config.keep_alive_time), + ("grpc.initial_reconnect_backoff_ms", config.retry_backoff_ms), + ("grpc.max_reconnect_backoff_ms", config.retry_backoff_max_ms), + ("grpc.min_reconnect_backoff_ms", config.deadline_ms), + ] + self.channel = channel_factory( f"{config.host}:{config.port}", - options=(("grpc.keepalive_time_ms", config.keep_alive_time),), + options=options, ) - stub = evaluation_pb2_grpc.ServiceStub(channel) + self.stub = evaluation_pb2_grpc.ServiceStub(self.channel) - return stub, channel + self.thread: typing.Optional[threading.Thread] = None + self.timer: typing.Optional[threading.Timer] = None + + self.start_time = time.time() def initialize(self, evaluation_context: EvaluationContext) -> None: self.connect() @@ -89,11 +95,12 @@ def shutdown(self) -> None: def connect(self) -> None: self.active = True - self.thread = threading.Thread( - target=self.listen, daemon=True, name="FlagdGrpcServiceWorkerThread" - ) - self.thread.start() + # Run monitoring in a separate thread + self.monitor_thread = threading.Thread( + target=self.monitor, daemon=True, name="FlagdGrpcServiceMonitorThread" + ) + self.monitor_thread.start() ## block until ready or deadline reached timeout = self.deadline + time.time() while not self.connected and time.time() < timeout: @@ -105,32 +112,72 @@ def connect(self) -> None: "Blocking init finished before data synced. Consider increasing startup deadline to avoid inconsistent evaluations." ) + def monitor(self) -> None: + self.channel.subscribe(self._state_change_callback, try_to_connect=True) + + def _state_change_callback(self, new_state: ChannelConnectivity) -> None: + logger.debug(f"gRPC state change: {new_state}") + if new_state == ChannelConnectivity.READY: + if not self.thread or not self.thread.is_alive(): + self.thread = threading.Thread( + target=self.listen, + daemon=True, + name="FlagdGrpcServiceWorkerThread", + ) + self.thread.start() + + if self.timer and self.timer.is_alive(): + logger.debug("gRPC error timer expired") + self.timer.cancel() + + elif new_state == ChannelConnectivity.TRANSIENT_FAILURE: + # this is the failed reconnect attempt so we are going into stale + self.emit_provider_stale( + ProviderEventDetails( + message="gRPC sync disconnected, reconnecting", + ) + ) + self.start_time = time.time() + # adding a timer, so we can emit the error event after time + self.timer = threading.Timer(self.retry_grace_period, self.emit_error) + + logger.debug("gRPC error timer started") + self.timer.start() + self.connected = False + + def emit_error(self) -> None: + logger.debug("gRPC error emitted") + if self.cache: + self.cache.clear() + self.emit_provider_error( + ProviderEventDetails( + message="gRPC sync disconnected, reconnecting", + error_code=ErrorCode.GENERAL, + ) + ) + def listen(self) -> None: - retry_delay = self.retry_backoff_seconds + logger.debug("gRPC starting listener thread") call_args = ( {"timeout": self.streamline_deadline_seconds} if self.streamline_deadline_seconds > 0 else {} ) - retry_counter = 0 - while self.active: - request = evaluation_pb2.EventStreamRequest() + call_args["wait_for_ready"] = True + request = evaluation_pb2.EventStreamRequest() + # defining a never ending loop to recreate the stream + while self.active: try: logger.debug("Setting up gRPC sync flags connection") for message in self.stub.EventStream(request, **call_args): if message.type == "provider_ready": - if not self.connected: - self.emit_provider_ready( - ProviderEventDetails( - message="gRPC sync connection established" - ) + self.connected = True + self.emit_provider_ready( + ProviderEventDetails( + message="gRPC sync connection established" ) - self.connected = True - retry_counter = 0 - # reset retry delay after successsful read - retry_delay = self.retry_backoff_seconds - + ) elif message.type == "configuration_change": data = MessageToDict(message)["data"] self.handle_changed_flags(data) @@ -138,48 +185,14 @@ def listen(self) -> None: if not self.active: logger.info("Terminating gRPC sync thread") return - except grpc.RpcError as e: - logger.error(f"SyncFlags stream error, {e.code()=} {e.details()=}") - # re-create the stub if there's a connection issue - otherwise reconnect does not work as expected - self.stub, self.channel = self._create_stub() + except grpc.RpcError as e: # noqa: PERF203 + # although it seems like this error log is not interesting, without it, the retry is not working as expected + logger.debug(f"SyncFlags stream error, {e.code()=} {e.details()=}") except ParseError: logger.exception( f"Could not parse flag data using flagd syntax: {message=}" ) - self.connected = False - self.on_connection_error(retry_counter, retry_delay) - - retry_delay = self.handle_retry(retry_counter, retry_delay) - - retry_counter = retry_counter + 1 - - def handle_retry(self, retry_counter: int, retry_delay: float) -> float: - if retry_counter == 0: - logger.info("gRPC sync disconnected, reconnecting immediately") - else: - logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s") - time.sleep(retry_delay) - retry_delay = min(1.1 * retry_delay, self.retry_backoff_max_seconds) - return retry_delay - - def on_connection_error(self, retry_counter: int, retry_delay: float) -> None: - if retry_counter == self.retry_grace_attempts: - if self.cache: - self.cache.clear() - self.emit_provider_error( - ProviderEventDetails( - message=f"gRPC sync disconnected, reconnecting in {retry_delay}s", - error_code=ErrorCode.GENERAL, - ) - ) - elif retry_counter == 1: - self.emit_provider_stale( - ProviderEventDetails( - message=f"gRPC sync disconnected, reconnecting in {retry_delay}s", - ) - ) - def handle_changed_flags(self, data: typing.Any) -> None: changed_flags = list(data["flags"].keys()) diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/file_watcher.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/file_watcher.py index 0918981f..74835f94 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/file_watcher.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/file_watcher.py @@ -31,17 +31,19 @@ def __init__( self.last_modified = 0.0 self.flag_data: typing.Mapping[str, Flag] = {} self.load_data() + self.active = True self.thread = threading.Thread(target=self.refresh_file, daemon=True) self.thread.start() def shutdown(self) -> None: + self.active = False pass def get_flag(self, key: str) -> typing.Optional[Flag]: return self.flag_data.get(key) def refresh_file(self) -> None: - while True: + while self.active: time.sleep(self.poll_interval_seconds) logger.debug("checking for new flag store contents from file") last_modified = os.path.getmtime(self.file_path) diff --git a/providers/openfeature-provider-flagd/tests/e2e/conftest.py b/providers/openfeature-provider-flagd/tests/e2e/conftest.py index 077e6926..e80eb15b 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/conftest.py +++ b/providers/openfeature-provider-flagd/tests/e2e/conftest.py @@ -1,8 +1,5 @@ import typing -import pytest -from testcontainers.core.container import DockerContainer -from tests.e2e.flagd_container import FlagdContainer from tests.e2e.steps import * # noqa: F403 JsonPrimitive = typing.Union[str, bool, float, int] @@ -18,21 +15,3 @@ def pytest_collection_modifyitems(config): # this seems to not work with python 3.8 if hasattr(config.option, "markexpr") and config.option.markexpr == "": config.option.markexpr = marker - - -@pytest.fixture(autouse=True, scope="module") -def setup(request, port, image): - container: DockerContainer = FlagdContainer( - image=image, - port=port, - ) - # Setup code - c = container.start() - - def fin(): - c.stop() - - # Teardown code - request.addfinalizer(fin) - - return c.get_exposed_port(port) diff --git a/providers/openfeature-provider-flagd/tests/e2e/steps.py b/providers/openfeature-provider-flagd/tests/e2e/steps.py index c81c8871..b1325aff 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/steps.py +++ b/providers/openfeature-provider-flagd/tests/e2e/steps.py @@ -1,10 +1,13 @@ import logging +import threading import time import typing import pytest from asserts import assert_equal, assert_in, assert_not_equal, assert_true from pytest_bdd import given, parsers, then, when +from testcontainers.core.container import DockerContainer +from tests.e2e.flagd_container import FlagdContainer from tests.e2e.parsers import to_bool, to_list from openfeature import api @@ -26,9 +29,21 @@ def evaluation_context() -> EvaluationContext: @given("a flagd provider is set", target_fixture="client") @given("a provider is registered", target_fixture="client") -def setup_provider(setup, resolver_type, client_name) -> OpenFeatureClient: +def setup_provider( + container: FlagdContainer, resolver_type, client_name, port +) -> OpenFeatureClient: + try: + container.get_exposed_port(port) + except: # noqa: E722 + container.start() + api.set_provider( - FlagdProvider(resolver_type=resolver_type, port=setup, timeout=1), + FlagdProvider( + resolver_type=resolver_type, + port=int(container.get_exposed_port(port)), + timeout=1, + retry_grace_period=3, + ), client_name, ) client = api.get_client(client_name) @@ -517,6 +532,12 @@ def error_handles() -> list: return [] +@given( + parsers.cfparse( + "a {event_type:ProviderEvent} handler is added", + extra_types={"ProviderEvent": ProviderEvent}, + ), +) @when( parsers.cfparse( "a {event_type:ProviderEvent} handler is added", @@ -631,7 +652,10 @@ def assert_disconnect_error( def assert_flag_changed(event_handles, key): handle = None for h in event_handles: - if h["type"] == ProviderEvent.PROVIDER_CONFIGURATION_CHANGED: + if ( + h["type"] == ProviderEvent.PROVIDER_CONFIGURATION_CHANGED + and key in h["event"].flags_changed + ): handle = h break @@ -650,10 +674,7 @@ def wait_for(pred, poll_sec=2, timeout_sec=10): @given("flagd is unavailable", target_fixture="client") def flagd_unavailable(resolver_type): api.set_provider( - FlagdProvider( - resolver_type=resolver_type, - port=99999, - ), + FlagdProvider(resolver_type=resolver_type, port=99999, retry_grace_period=2), "unavailable", ) return api.get_client("unavailable") @@ -668,3 +689,33 @@ def flagd_init(client: OpenFeatureClient, event_handles, error_handles): @then("an error should be indicated within the configured deadline") def flagd_error(error_handles): assert_handlers(error_handles, ProviderEvent.PROVIDER_ERROR) + + +@when(parsers.cfparse("the connection is lost for {seconds}s")) +def flagd_restart(seconds, container): + def starting(): + container.start() + + container.stop() + threading.Timer(int(seconds), starting).start() + + +@pytest.fixture(autouse=True, scope="module") +def container(request, port, image): + container: DockerContainer = FlagdContainer( + image=image, + port=port, + ) + # Setup code + container = container.start() + + def fin(): + try: + container.stop() + except: # noqa: E722 + logging.debug("container was not running anymore") + + # Teardown code + request.addfinalizer(fin) + + return container diff --git a/providers/openfeature-provider-flagd/tests/e2e/test_config.py b/providers/openfeature-provider-flagd/tests/e2e/test_config.py index dc413aee..238112d2 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/test_config.py +++ b/providers/openfeature-provider-flagd/tests/e2e/test_config.py @@ -37,8 +37,13 @@ def convert_resolver_type(val: typing.Union[str, ResolverType]) -> ResolverType: } -@pytest.fixture(autouse=True, scope="module") -def setup(request): +@pytest.fixture(autouse=True) +def container(): + pass + + +@pytest.fixture(autouse=True) +def setup_provider(request): pass diff --git a/providers/openfeature-provider-flagd/tests/e2e/test_in_process_file.py b/providers/openfeature-provider-flagd/tests/e2e/test_in_process_file.py index f73dc990..278bd1ea 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/test_in_process_file.py +++ b/providers/openfeature-provider-flagd/tests/e2e/test_in_process_file.py @@ -62,6 +62,11 @@ def resolver_type() -> ResolverType: return ResolverType.IN_PROCESS +@pytest.fixture(autouse=True) +def container(): + pass + + @pytest.fixture(autouse=True, scope="module") def setup(request, client_name, file_name, resolver_type): """nothing to boot"""