diff --git a/.coveragerc b/.coveragerc index ef34019bf..dc8009b3f 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,2 +1,2 @@ [report] -omit = connectors/quartz.py,connectors/conftest.py,tests/* +omit = connectors/quartz.py,connectors/conftest.py,tests/*,connectors/agent/*,connectors/cli/* diff --git a/Dockerfile.ftest.wolfi b/Dockerfile.ftest.wolfi index 0a206010b..90e64a388 100644 --- a/Dockerfile.ftest.wolfi +++ b/Dockerfile.ftest.wolfi @@ -1,4 +1,4 @@ -FROM docker.elastic.co/wolfi/python:3.11-dev@sha256:6d21dd09efd41b21f4abeab6291bc93af86d1a438225eeac98bdea85f6617b22 +FROM docker.elastic.co/wolfi/python:3.11-dev@sha256:7f67246b4646373e777d1e91d22cc0acb27b74969adcd2992e20c1e61c8759a4 USER root COPY . /connectors WORKDIR /connectors diff --git a/Dockerfile.wolfi b/Dockerfile.wolfi index eef700e82..0f4867af8 100644 --- a/Dockerfile.wolfi +++ b/Dockerfile.wolfi @@ -1,4 +1,4 @@ -FROM docker.elastic.co/wolfi/python:3.11-dev@sha256:6d21dd09efd41b21f4abeab6291bc93af86d1a438225eeac98bdea85f6617b22 +FROM docker.elastic.co/wolfi/python:3.11-dev@sha256:7f67246b4646373e777d1e91d22cc0acb27b74969adcd2992e20c1e61c8759a4 USER root COPY . /app WORKDIR /app diff --git a/connectors/agent/README.md b/connectors/agent/README.md deleted file mode 100644 index e69de29bb..000000000 diff --git a/connectors/agent/__init__.py b/connectors/agent/__init__.py new file mode 100644 index 000000000..1fa99ac6a --- /dev/null +++ b/connectors/agent/__init__.py @@ -0,0 +1,5 @@ +# +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License 2.0; +# you may not use this file except in compliance with the Elastic License 2.0. +# diff --git a/connectors/cli/__init__.py b/connectors/cli/__init__.py new file mode 100644 index 000000000..1fa99ac6a --- /dev/null +++ b/connectors/cli/__init__.py @@ -0,0 +1,5 @@ +# +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License 2.0; +# you may not use this file except in compliance with the Elastic License 2.0. +# diff --git a/connectors/es/management_client.py b/connectors/es/management_client.py index d08bed014..73c45257a 100644 --- a/connectors/es/management_client.py +++ b/connectors/es/management_client.py @@ -63,16 +63,13 @@ async def create_content_index(self, search_index_name, language_code): ) ) - async def ensure_content_index_mappings(self, index, mappings): + async def ensure_content_index_mappings(self, index_name, index, desired_mappings): # open = Match open, non-hidden indices. Also matches any non-hidden data stream. # Content indices are always non-hidden. - response = await self._retrier.execute_with_retry( - partial(self.client.indices.get_mapping, index=index) - ) - existing_mappings = response[index].get("mappings", {}) + existing_mappings = index.get("mappings", {}) if len(existing_mappings) == 0: - if mappings: + if desired_mappings: logger.info( "Index %s has no mappings or it's empty. Adding mappings...", index ) @@ -80,13 +77,15 @@ async def ensure_content_index_mappings(self, index, mappings): await self._retrier.execute_with_retry( partial( self.client.indices.put_mapping, - index=index, - dynamic=mappings.get("dynamic", False), - dynamic_templates=mappings.get("dynamic_templates", []), - properties=mappings.get("properties", {}), + index=index_name, + dynamic=desired_mappings.get("dynamic", False), + dynamic_templates=desired_mappings.get( + "dynamic_templates", [] + ), + properties=desired_mappings.get("properties", {}), ) ) - logger.info("Successfully added mappings for index %s", index) + logger.info("Successfully added mappings for index %s", index_name) except Exception as e: logger.warning( f"Could not create mappings for index {index}, encountered error {e}" @@ -97,7 +96,7 @@ async def ensure_content_index_mappings(self, index, mappings): ) else: logger.debug( - "Index %s already has mappings, skipping mappings creation", index + "Index %s already has mappings, skipping mappings creation", index_name ) async def ensure_content_index_settings( @@ -225,8 +224,19 @@ async def index_exists(self, index_name): partial(self.client.indices.exists, index=index_name) ) - async def get_index(self, index_name, ignore_unavailable=False): - return await self._retrier.execute_with_retry( + async def get_index_or_alias(self, index_name, ignore_unavailable=False): + """ + Get index definition (mappings and settings) by its name or its alias. + """ + # Example structure of response: + # { + # "my-data-index": { + # "aliases": {"search-my-data-index": {}}, + # "mappings": { ... }, + # "settings": { ... }, + # } + # } + get_index_response = await self._retrier.execute_with_retry( partial( self.client.indices.get, index=index_name, @@ -234,6 +244,25 @@ async def get_index(self, index_name, ignore_unavailable=False): ) ) + if index_name in get_index_response: + logger.debug(f"Got index by its name: {index_name}") + return get_index_response[index_name] + + for ( + existing_index_name, + existing_index_definition, + ) in get_index_response.items(): + if "aliases" not in existing_index_definition: + continue + + if index_name in existing_index_definition["aliases"]: + logger.debug( + f"Got index {existing_index_name} by its alias {index_name}" + ) + return existing_index_definition + + return None + async def upsert(self, _id, index_name, doc): return await self._retrier.execute_with_retry( partial( diff --git a/connectors/es/sink.py b/connectors/es/sink.py index 1410b9fd2..876ed0e9d 100644 --- a/connectors/es/sink.py +++ b/connectors/es/sink.py @@ -836,16 +836,17 @@ async def has_active_license_enabled(self, license_): # TODO: think how to make it not a proxy method to the client return await self.es_management_client.has_active_license_enabled(license_) + def extract_index_or_alias(self, get_index_response, expected_index_name): + return None + async def prepare_content_index(self, index_name, language_code=None): """Creates the index, given a mapping/settings if it does not exist.""" self._logger.debug(f"Checking index {index_name}") - result = await self.es_management_client.get_index( + index = await self.es_management_client.get_index_or_alias( index_name, ignore_unavailable=True ) - index = result.get(index_name, None) - mappings = Mappings.default_text_fields_mappings(is_connectors_index=True) if index: @@ -859,7 +860,7 @@ async def prepare_content_index(self, index_name, language_code=None): ) await self.es_management_client.ensure_content_index_mappings( - index_name, mappings + index_name=index_name, index=index, desired_mappings=mappings ) else: # Create a new index diff --git a/connectors/sources/__init__.py b/connectors/sources/__init__.py new file mode 100644 index 000000000..1fa99ac6a --- /dev/null +++ b/connectors/sources/__init__.py @@ -0,0 +1,5 @@ +# +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License 2.0; +# you may not use this file except in compliance with the Elastic License 2.0. +# diff --git a/connectors/sync_job_runner.py b/connectors/sync_job_runner.py index 4ebb27361..d1e8797f8 100644 --- a/connectors/sync_job_runner.py +++ b/connectors/sync_job_runner.py @@ -10,9 +10,7 @@ from elasticsearch import ( AuthorizationException as ElasticAuthorizationException, ) -from elasticsearch import ( - NotFoundError as ElasticNotFoundError, -) +from elasticsearch import NotFoundError as ElasticNotFoundError from connectors.config import DataSourceFrameworkConfig from connectors.es.client import License, with_concurrency_control @@ -178,6 +176,7 @@ async def execute(self): if ( self.connector.native and self.connector.features.native_connector_api_keys_enabled() + and self.service_config.get("_use_native_connector_api_keys", True) ): # Update the config so native connectors can use API key authentication during sync await self._update_native_connector_authentication() @@ -380,9 +379,11 @@ async def _sync_done(self, sync_status, sync_error=None): sync_cursor = ( None if not self.data_provider # If we failed before initializing the data provider, we don't need to change the cursor - else self.data_provider.sync_cursor() - if self.sync_job.is_content_sync() - else None + else ( + self.data_provider.sync_cursor() + if self.sync_job.is_content_sync() + else None + ) ) await self.connector.sync_done( self.sync_job if await self.reload_sync_job() else None, diff --git a/tests/agent/test_cli.py b/tests/agent/test_cli.py index ba8b1c6a4..ec1a6e41d 100644 --- a/tests/agent/test_cli.py +++ b/tests/agent/test_cli.py @@ -12,7 +12,7 @@ @patch("connectors.agent.cli.ConnectorsAgentComponent", return_value=AsyncMock()) -def test_main(patch_component): +def test_main_responds_to_sigterm(patch_component): async def kill(): await asyncio.sleep(0.2) os.kill(os.getpid(), signal.SIGTERM) @@ -20,6 +20,9 @@ async def kill(): loop = asyncio.new_event_loop() loop.create_task(kill()) + # No asserts here. + # main() will block forever unless it's killed with a signal + # This test succeeds if it exits, if it hangs it'll be killed by a timeout main() loop.close() diff --git a/tests/es/test_management_client.py b/tests/es/test_management_client.py index e3a211c83..e42e3e265 100644 --- a/tests/es/test_management_client.py +++ b/tests/es/test_management_client.py @@ -66,11 +66,9 @@ async def test_ensure_content_index_mappings_when_mappings_exist( mappings = {} existing_mappings_response = {index_name: {"mappings": ["something"]}} - es_management_client.client.indices.get_mapping = AsyncMock( - return_value=existing_mappings_response + await es_management_client.ensure_content_index_mappings( + index_name, existing_mappings_response, mappings ) - - await es_management_client.ensure_content_index_mappings(index_name, mappings) es_management_client.client.indices.put_mapping.assert_not_called() @pytest.mark.asyncio @@ -85,11 +83,9 @@ async def test_ensure_content_index_mappings_when_mappings_do_not_exist( } existing_mappings_response = {index_name: {"mappings": {}}} - es_management_client.client.indices.get_mapping = AsyncMock( - return_value=existing_mappings_response + await es_management_client.ensure_content_index_mappings( + index_name, existing_mappings_response, mappings ) - - await es_management_client.ensure_content_index_mappings(index_name, mappings) es_management_client.client.indices.put_mapping.assert_awaited_with( index=index_name, dynamic=mappings["dynamic"], @@ -105,11 +101,9 @@ async def test_ensure_content_index_mappings_when_mappings_do_not_exist_but_no_m mappings = None existing_mappings_response = {index_name: {"mappings": {}}} - es_management_client.client.indices.get_mapping = AsyncMock( - return_value=existing_mappings_response + await es_management_client.ensure_content_index_mappings( + index_name, existing_mappings_response, mappings ) - - await es_management_client.ensure_content_index_mappings(index_name, mappings) es_management_client.client.indices.put_mapping.assert_not_called() @pytest.mark.asyncio @@ -367,3 +361,69 @@ async def test_create_connector_secret(self, es_management_client, mock_response body={"value": secret_value}, headers={"accept": "application/json", "content-type": "application/json"}, ) + + @pytest.mark.asyncio + async def test_extract_index_or_alias_with_index(self, es_management_client): + response = { + "shapo-online": { + "aliases": {"search-shapo-online": {}}, + "mappings": {}, + "settings": {}, + } + } + + es_management_client.client.indices.get = AsyncMock(return_value=response) + + index = await es_management_client.get_index_or_alias("shapo-online") + + assert index == response["shapo-online"] + + @pytest.mark.asyncio + async def test_extract_index_or_alias_with_alias(self, es_management_client): + response = { + "shapo-online": { + "aliases": {"search-shapo-online": {}}, + "mappings": {}, + "settings": {}, + } + } + + es_management_client.client.indices.get = AsyncMock(return_value=response) + + index = await es_management_client.get_index_or_alias("search-shapo-online") + + assert index == response["shapo-online"] + + @pytest.mark.asyncio + async def test_extract_index_or_alias_when_none_present(self, es_management_client): + response = { + "shapo-online": { + "aliases": {"search-shapo-online": {}}, + "mappings": {}, + "settings": {}, + } + } + + es_management_client.client.indices.get = AsyncMock(return_value=response) + + index = await es_management_client.get_index_or_alias("nonexistent") + + assert index is None + + @pytest.mark.asyncio + async def test_get_index_or_alias(self, es_management_client, mock_responses): + secret_id = "secret-id" + secret_value = "my-secret" + + es_management_client.client.perform_request = AsyncMock( + return_value={"id": secret_id} + ) + + returned_id = await es_management_client.create_connector_secret(secret_value) + assert returned_id == secret_id + es_management_client.client.perform_request.assert_awaited_with( + "POST", + "/_connector/_secret", + body={"value": secret_value}, + headers={"accept": "application/json", "content-type": "application/json"}, + ) diff --git a/tests/test_sink.py b/tests/test_sink.py index cac3ea6be..5fab77114 100644 --- a/tests/test_sink.py +++ b/tests/test_sink.py @@ -230,7 +230,9 @@ async def test_prepare_content_index(mock_responses): await es.close() - put_mapping_mock.assert_called_with(index_name, mappings) + put_mapping_mock.assert_called_with( + index_name=index_name, index=response[index_name], desired_mappings=mappings + ) def set_responses(mock_responses, ts=None): @@ -1481,6 +1483,7 @@ async def test_cancel_sync(extractor_task_done, sink_task_done, force_cancel): es._sink.force_cancel.assert_not_called() +@pytest.mark.asyncio async def test_extractor_run_when_mem_full_is_raised(): docs_from_source = [ {"_id": 1}, diff --git a/tests/test_sync_job_runner.py b/tests/test_sync_job_runner.py index 5af4afade..2948e34d2 100644 --- a/tests/test_sync_job_runner.py +++ b/tests/test_sync_job_runner.py @@ -88,6 +88,8 @@ def create_runner( index_name=SEARCH_INDEX_NAME, sync_cursor=SYNC_CURSOR, connector=None, + service_config=None, + es_config=None, ): source_klass = Mock() data_provider = Mock() @@ -112,8 +114,8 @@ def create_runner( if not connector: connector = mock_connector() - es_config = {} - service_config = {} + es_config = es_config if es_config is not None else {} + service_config = service_config if service_config is not None else {} return SyncJobRunner( source_klass=source_klass, @@ -995,6 +997,112 @@ async def test_native_connector_sync_fails_when_api_key_secret_missing( ) +@patch( + "connectors.sync_job_runner.SyncJobRunner._update_native_connector_authentication", + AsyncMock(), +) +@pytest.mark.parametrize( + "job_type, sync_cursor", + [ + (JobType.FULL, SYNC_CURSOR), + (JobType.INCREMENTAL, SYNC_CURSOR), + (JobType.ACCESS_CONTROL, None), + ], +) +@pytest.mark.asyncio +async def test_native_sync_runs_with_secrets_disabled_when_no_permissions( + job_type, sync_cursor, sync_orchestrator_mock +): + ingestion_stats = { + "indexed_document_count": 25, + "indexed_document_volume": 30, + "deleted_document_count": 20, + } + sync_orchestrator_mock.ingestion_stats.return_value = ingestion_stats + + sync_job_runner = create_runner( + job_type=job_type, + sync_cursor=sync_cursor, + service_config={"_use_native_connector_api_keys": False}, + ) + + error_meta = Mock() + error_meta.status = 403 + sync_job_runner._update_native_connector_authentication.side_effect = ( + ElasticAuthorizationException(message=None, meta=error_meta, body={}) + ) + + await sync_job_runner.execute() + + ingestion_stats["total_document_count"] = TOTAL_DOCUMENT_COUNT + + sync_job_runner.connector.sync_starts.assert_awaited_with(job_type) + sync_job_runner.sync_job.claim.assert_awaited() + sync_job_runner._update_native_connector_authentication.assert_not_awaited() + sync_job_runner.sync_orchestrator.async_bulk.assert_awaited() + sync_job_runner.sync_job.done.assert_awaited_with(ingestion_stats=ingestion_stats) + sync_job_runner.sync_job.fail.assert_not_awaited() + sync_job_runner.sync_job.cancel.assert_not_awaited() + sync_job_runner.sync_job.suspend.assert_not_awaited() + sync_job_runner.connector.sync_done.assert_awaited_with( + sync_job_runner.sync_job, cursor=sync_cursor + ) + sync_job_runner.sync_orchestrator.cancel.assert_called_once() + + +@patch( + "connectors.sync_job_runner.SyncJobRunner._update_native_connector_authentication", + AsyncMock(), +) +@pytest.mark.parametrize( + "job_type, sync_cursor", + [ + (JobType.FULL, SYNC_CURSOR), + (JobType.INCREMENTAL, SYNC_CURSOR), + (JobType.ACCESS_CONTROL, None), + ], +) +@pytest.mark.asyncio +async def test_native_sync_fails_with_secrets_enabled_when_no_permissions( + job_type, sync_cursor, sync_orchestrator_mock, es_management_client_mock +): + expected_error = f"Connector is not authorized to access index [{SEARCH_INDEX_NAME}]. API key may need to be regenerated. Status code: [403]." + ingestion_stats = { + "indexed_document_count": 0, + "indexed_document_volume": 0, + "deleted_document_count": 0, + "total_document_count": TOTAL_DOCUMENT_COUNT, + } + sync_orchestrator_mock.ingestion_stats.return_value = ingestion_stats + + sync_job_runner = create_runner( + job_type=job_type, + sync_cursor=sync_cursor, + ) + + error_meta = Mock() + error_meta.status = 403 + sync_job_runner._update_native_connector_authentication.side_effect = ( + ElasticAuthorizationException(message=None, meta=error_meta, body={}) + ) + + await sync_job_runner.execute() + + sync_job_runner.sync_job.claim.assert_awaited() + sync_job_runner._update_native_connector_authentication.assert_awaited() + sync_job_runner.sync_job.fail.assert_awaited_with( + expected_error, ingestion_stats=ingestion_stats + ) + sync_job_runner.sync_job.done.assert_not_awaited() + sync_job_runner.sync_job.cancel.assert_not_awaited() + sync_job_runner.sync_job.suspend.assert_not_awaited() + + sync_job_runner.connector.sync_starts.assert_awaited_with(job_type) + sync_job_runner.connector.sync_done.assert_awaited_with( + sync_job_runner.sync_job, cursor=sync_cursor + ) + + @pytest.mark.parametrize( "job_type, sync_cursor", [