diff --git a/connectors/es/index.py b/connectors/es/index.py index e27858df9..42fdbbd6c 100644 --- a/connectors/es/index.py +++ b/connectors/es/index.py @@ -34,6 +34,22 @@ async def connector_check_in(self, connector_id): headers={"accept": "application/json"}, ) + async def connector_update_error(self, connector_id, error): + await self.client.perform_request( + "PUT", + f"/_connector/{connector_id}/_error", + headers={"accept": "application/json", "Content-Type": "application/json"}, + body={"error": error}, + ) + + async def connector_update_last_sync_info(self, connector_id, last_sync_info): + await self.client.perform_request( + "PUT", + f"/_connector/{connector_id}/_last_sync", + headers={"accept": "application/json", "Content-Type": "application/json"}, + body=last_sync_info, + ) + async def connector_update_filtering_draft_validation( self, connector_id, validation_result ): @@ -98,6 +114,20 @@ async def connector_check_in(self, connector_id): partial(self._api_wrapper.connector_check_in, connector_id) ) + async def connector_update_error(self, connector_id, error): + await self._retrier.execute_with_retry( + partial(self._api_wrapper.connector_update_error, connector_id, error) + ) + + async def connector_update_last_sync_info(self, connector_id, last_sync_info): + await self._retrier.execute_with_retry( + partial( + self._api_wrapper.connector_update_last_sync_info, + connector_id, + last_sync_info, + ) + ) + async def connector_update_filtering_draft_validation( self, connector_id, validation_result ): diff --git a/connectors/protocol/connectors.py b/connectors/protocol/connectors.py index be86f6cf3..1ee3d7a5d 100644 --- a/connectors/protocol/connectors.py +++ b/connectors/protocol/connectors.py @@ -707,12 +707,17 @@ def next_sync(self, job_type, now): return next_run(scheduling_property.get("interval"), now) async def _update_datetime(self, field, new_ts): - await self.index.update( - doc_id=self.id, - doc={field: iso_utc(new_ts)}, - if_seq_no=self._seq_no, - if_primary_term=self._primary_term, - ) + if self.index.feature_use_connectors_api: + await self.index.api.connector_update_last_sync_info( + connector_id=self.id, last_sync_info={field: iso_utc(new_ts)} + ) + else: + await self.index.update( + doc_id=self.id, + doc={field: iso_utc(new_ts)}, + if_seq_no=self._seq_no, + if_primary_term=self._primary_term, + ) async def update_last_sync_scheduled_at_by_job_type(self, job_type, new_ts): match job_type: @@ -745,24 +750,37 @@ async def sync_starts(self, job_type): msg = f"Unknown job type: {job_type}" raise ValueError(msg) - doc = { - "status": Status.CONNECTED.value, - "error": None, - } | last_sync_information + if self.index.feature_use_connectors_api: + await self.index.api.connector_update_error( + connector_id=self.id, error=None + ) + await self.index.api.connector_update_last_sync_info( + connector_id=self.id, last_sync_info=last_sync_information + ) + else: + doc = { + "status": Status.CONNECTED.value, + "error": None, + } | last_sync_information - await self.index.update( - doc_id=self.id, - doc=doc, - if_seq_no=self._seq_no, - if_primary_term=self._primary_term, - ) + await self.index.update( + doc_id=self.id, + doc=doc, + if_seq_no=self._seq_no, + if_primary_term=self._primary_term, + ) async def error(self, error): - doc = { - "status": Status.ERROR.value, - "error": str(error), - } - await self.index.update(doc_id=self.id, doc=doc) + if self.index.feature_use_connectors_api: + await self.index.api.connector_update_error( + connector_id=self.id, error=error + ) + else: + doc = { + "status": Status.ERROR.value, + "error": str(error), + } + await self.index.update(doc_id=self.id, doc=doc) async def sync_done(self, job, cursor=None): job_status = JobStatus.ERROR if job is None else job.status @@ -801,8 +819,6 @@ async def sync_done(self, job, cursor=None): doc = { "last_synced": iso_utc(), - "status": connector_status.value, - "error": job_error, } | last_sync_information # only update sync cursor after a successful content sync job @@ -813,7 +829,16 @@ async def sync_done(self, job, cursor=None): doc["last_indexed_document_count"] = job.indexed_document_count doc["last_deleted_document_count"] = job.deleted_document_count - await self.index.update(doc_id=self.id, doc=doc) + if self.index.feature_use_connectors_api: + await self.index.api.connector_update_error( + connector_id=self.id, error=job_error + ) + await self.index.api.connector_update_last_sync_info( + connector_id=self.id, last_sync_info=last_sync_information + ) + else: + doc = doc | {"status": connector_status.value, "error": job_error} + await self.index.update(doc_id=self.id, doc=doc) @with_concurrency_control() async def prepare(self, config, sources): diff --git a/connectors/utils.py b/connectors/utils.py index b7bc06d32..16a891110 100644 --- a/connectors/utils.py +++ b/connectors/utils.py @@ -457,7 +457,7 @@ def _callback(self, task): ) elif task.exception(): logger.error( - f"Exception found for task {task.get_name()}: {task.exception()}", + f"Exception found for task {task.get_name()}: {task.exception()} {task}" ) def _add_task(self, coroutine, name=None): diff --git a/tests/protocol/test_connectors.py b/tests/protocol/test_connectors.py index 0289992e6..54b4f03b4 100644 --- a/tests/protocol/test_connectors.py +++ b/tests/protocol/test_connectors.py @@ -444,6 +444,7 @@ async def test_sync_starts(job_type, expected_doc_source_update): connector_doc = {"_id": doc_id, "_seq_no": seq_no, "_primary_term": primary_term} index = Mock() index.update = AsyncMock() + index.feature_use_connectors_api = False connector = Connector(elastic_index=index, doc_source=connector_doc) await connector.sync_starts(job_type) @@ -455,12 +456,61 @@ async def test_sync_starts(job_type, expected_doc_source_update): ) +@pytest.mark.parametrize( + "job_type, last_sync_info, status, error", + [ + ( + JobType.FULL, + { + "last_sync_status": JobStatus.IN_PROGRESS.value, + "last_sync_error": None, + }, + Status.CONNECTED.value, + None, + ), + ( + JobType.INCREMENTAL, + {"last_sync_status": JobStatus.IN_PROGRESS.value, "last_sync_error": None}, + Status.CONNECTED.value, + None, + ), + ( + JobType.ACCESS_CONTROL, + { + "last_access_control_sync_status": JobStatus.IN_PROGRESS.value, + "last_access_control_sync_error": None, + }, + Status.CONNECTED.value, + None, + ), + ], +) +@pytest.mark.asyncio +async def test_sync_starts_with_connector_api(job_type, last_sync_info, status, error): + doc_id = "1" + connector_doc = {"_id": doc_id} + index = Mock() + index.api.connector_update_error = AsyncMock() + index.api.connector_update_last_sync_info = AsyncMock() + index.feature_use_connectors_api = True + + connector = Connector(elastic_index=index, doc_source=connector_doc) + await connector.sync_starts(job_type) + index.api.connector_update_last_sync_info.assert_called_with( + connector_id=connector.id, last_sync_info=last_sync_info + ) + index.api.connector_update_error.assert_called_with( + connector_id=connector.id, error=error + ) + + @pytest.mark.asyncio async def test_connector_error(): connector_doc = {"_id": "1"} error = "something wrong" index = Mock() index.update = AsyncMock(return_value=1) + index.feature_use_connectors_api = False expected_doc_source_update = { "status": Status.ERROR.value, "error": error, @@ -471,6 +521,20 @@ async def test_connector_error(): index.update.assert_called_with(doc_id=connector.id, doc=expected_doc_source_update) +@pytest.mark.asyncio +async def test_connector_error_with_connector_api(): + connector_doc = {"_id": "1"} + error = "something wrong" + index = Mock() + index.api.connector_update_error = AsyncMock() + index.feature_use_connectors_api = True + connector = Connector(elastic_index=index, doc_source=connector_doc) + await connector.error(error) + index.api.connector_update_error.assert_called_with( + connector_id=connector.id, error=error + ) + + def mock_job( status=JobStatus.COMPLETED, job_type=JobType.FULL, @@ -627,12 +691,165 @@ async def test_sync_done(job, expected_doc_source_update): connector_doc = {"_id": "1"} index = Mock() index.update = AsyncMock(return_value=1) + index.feature_use_connectors_api = False connector = Connector(elastic_index=index, doc_source=connector_doc) await connector.sync_done(job=job, cursor=SYNC_CURSOR) index.update.assert_called_with(doc_id=connector.id, doc=expected_doc_source_update) +@pytest.mark.asyncio +@pytest.mark.parametrize( + "job, last_sync_info, error, status", + [ + ( + None, + { + "last_access_control_sync_error": JOB_NOT_FOUND_ERROR, + "last_access_control_sync_status": JobStatus.ERROR.value, + "last_sync_error": JOB_NOT_FOUND_ERROR, + "last_sync_status": JobStatus.ERROR.value, + "last_synced": ANY, + "last_indexed_document_count": 0, + "last_deleted_document_count": 0, + }, + JOB_NOT_FOUND_ERROR, + Status.ERROR.value, + ), + ( + mock_job( + status=JobStatus.ERROR, job_type=JobType.FULL, error="something wrong" + ), + { + "last_sync_status": JobStatus.ERROR.value, + "last_synced": ANY, + "last_sync_error": "something wrong", + "last_indexed_document_count": 0, + "last_deleted_document_count": 0, + }, + "something wrong", + Status.ERROR.value, + ), + ( + mock_job(status=JobStatus.CANCELED, job_type=JobType.FULL), + { + "last_sync_status": JobStatus.CANCELED.value, + "last_synced": ANY, + "last_sync_error": None, + "last_indexed_document_count": 0, + "last_deleted_document_count": 0, + }, + None, + Status.CONNECTED.value, + ), + ( + mock_job( + status=JobStatus.SUSPENDED, job_type=JobType.FULL, terminated=False + ), + { + "last_sync_status": JobStatus.SUSPENDED.value, + "last_synced": ANY, + "last_sync_error": None, + }, + None, + Status.CONNECTED.value, + ), + ( + mock_job(job_type=JobType.FULL), + { + "last_sync_status": JobStatus.COMPLETED.value, + "last_synced": ANY, + "last_sync_error": None, + "sync_cursor": SYNC_CURSOR, + "last_indexed_document_count": 0, + "last_deleted_document_count": 0, + }, + None, + Status.CONNECTED.value, + ), + ( + mock_job(job_type=JobType.FULL), + { + "last_sync_status": JobStatus.COMPLETED.value, + "last_synced": ANY, + "last_sync_error": None, + "sync_cursor": SYNC_CURSOR, + "last_indexed_document_count": 0, + "last_deleted_document_count": 0, + }, + None, + Status.CONNECTED.value, + ), + ( + mock_job(job_type=JobType.ACCESS_CONTROL), + { + "last_access_control_sync_status": JobStatus.COMPLETED.value, + "last_synced": ANY, + "last_access_control_sync_error": None, + "last_indexed_document_count": 0, + "last_deleted_document_count": 0, + }, + None, + Status.CONNECTED.value, + ), + ( + mock_job( + status=JobStatus.ERROR, + job_type=JobType.ACCESS_CONTROL, + error="something wrong", + ), + { + "last_access_control_sync_status": JobStatus.ERROR.value, + "last_synced": ANY, + "last_access_control_sync_error": "something wrong", + "last_indexed_document_count": 0, + "last_deleted_document_count": 0, + }, + "something wrong", + Status.ERROR.value, + ), + ( + mock_job( + status=JobStatus.SUSPENDED, + job_type=JobType.ACCESS_CONTROL, + terminated=False, + ), + { + "last_access_control_sync_status": JobStatus.SUSPENDED.value, + "last_synced": ANY, + "last_access_control_sync_error": None, + }, + None, + Status.CONNECTED.value, + ), + ( + mock_job(status=JobStatus.CANCELED, job_type=JobType.ACCESS_CONTROL), + { + "last_access_control_sync_status": JobStatus.CANCELED.value, + "last_synced": ANY, + "last_access_control_sync_error": None, + "last_indexed_document_count": 0, + "last_deleted_document_count": 0, + }, + None, + Status.CONNECTED.value, + ), + ], +) +async def test_sync_done_with_connector_api(job, last_sync_info, error, status): + connector_doc = {"_id": "1"} + index = Mock() + index.feature_use_connectors_api = True + index.api.connector_update_error = AsyncMock() + index.api.connector_update_last_sync_info = AsyncMock() + connector = Connector(elastic_index=index, doc_source=connector_doc) + await connector.sync_done(job=job, cursor=SYNC_CURSOR) + index.api.connector_update_last_sync_info( + connector_id=connector.id, doc=last_sync_info + ) + index.api.connector_update_error(connector_id=connector.id, error=error) + + mock_next_run = iso_utc() @@ -1454,6 +1671,7 @@ async def test_connector_update_last_sync_scheduled_at_by_job_type( } index = Mock() index.update = AsyncMock() + index.feature_use_connectors_api = False connector = Connector(elastic_index=index, doc_source=connector_doc) await connector.update_last_sync_scheduled_at_by_job_type(job_type, new_ts) @@ -1465,6 +1683,36 @@ async def test_connector_update_last_sync_scheduled_at_by_job_type( ) +@pytest.mark.parametrize( + "job_type, date_field_to_update", + [ + (JobType.FULL, "last_sync_scheduled_at"), + (JobType.INCREMENTAL, "last_incremental_sync_scheduled_at"), + (JobType.ACCESS_CONTROL, "last_access_control_sync_scheduled_at"), + ], +) +@pytest.mark.asyncio +async def test_connector_update_last_sync_scheduled_at_by_job_type_with_connector_api( + job_type, date_field_to_update +): + doc_id = "2" + new_ts = datetime.now(timezone.utc) + timedelta(seconds=30) + connector_doc = { + "_id": doc_id, + "_source": {}, + } + index = Mock() + index.update = AsyncMock() + index.api.connector_update_last_sync_info = AsyncMock() + index.feature_use_connectors_api = True + connector = Connector(elastic_index=index, doc_source=connector_doc) + await connector.update_last_sync_scheduled_at_by_job_type(job_type, new_ts) + + index.api.connector_update_last_sync_info.assert_awaited_once_with( + connector_id=doc_id, last_sync_info={date_field_to_update: new_ts.isoformat()} + ) + + @pytest.mark.asyncio async def test_connector_validate_filtering_not_edited(): index = Mock()