Skip to content
57 changes: 43 additions & 14 deletions connectors/es/management_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,30 +63,29 @@ async def create_content_index(self, search_index_name, language_code):
)
)

async def ensure_content_index_mappings(self, index, mappings):
async def ensure_content_index_mappings(self, index_name, index, desired_mappings):
# open = Match open, non-hidden indices. Also matches any non-hidden data stream.
# Content indices are always non-hidden.
response = await self._retrier.execute_with_retry(
partial(self.client.indices.get_mapping, index=index)
)

existing_mappings = response[index].get("mappings", {})
existing_mappings = index.get("mappings", {})
if len(existing_mappings) == 0:
if mappings:
if desired_mappings:
logger.info(
"Index %s has no mappings or it's empty. Adding mappings...", index
)
try:
await self._retrier.execute_with_retry(
partial(
self.client.indices.put_mapping,
index=index,
dynamic=mappings.get("dynamic", False),
dynamic_templates=mappings.get("dynamic_templates", []),
properties=mappings.get("properties", {}),
index=index_name,
dynamic=desired_mappings.get("dynamic", False),
dynamic_templates=desired_mappings.get(
"dynamic_templates", []
),
properties=desired_mappings.get("properties", {}),
)
)
logger.info("Successfully added mappings for index %s", index)
logger.info("Successfully added mappings for index %s", index_name)
except Exception as e:
logger.warning(
f"Could not create mappings for index {index}, encountered error {e}"
Expand All @@ -97,7 +96,7 @@ async def ensure_content_index_mappings(self, index, mappings):
)
else:
logger.debug(
"Index %s already has mappings, skipping mappings creation", index
"Index %s already has mappings, skipping mappings creation", index_name
)

async def ensure_content_index_settings(
Expand Down Expand Up @@ -225,15 +224,45 @@ async def index_exists(self, index_name):
partial(self.client.indices.exists, index=index_name)
)

async def get_index(self, index_name, ignore_unavailable=False):
return await self._retrier.execute_with_retry(
async def get_index_or_alias(self, index_name, ignore_unavailable=False):
"""
Get index definition (mappings and settings) by its name or its alias.
"""
# Example structure of response:
# {
# "my-data-index": {
# "aliases": {"search-my-data-index": {}},
# "mappings": { ... },
# "settings": { ... },
# }
# }
get_index_response = await self._retrier.execute_with_retry(
partial(
self.client.indices.get,
index=index_name,
ignore_unavailable=ignore_unavailable,
)
)

if index_name in get_index_response:
logger.debug(f"Got index by its name: {index_name}")
return get_index_response[index_name]

for (
existing_index_name,
existing_index_definition,
) in get_index_response.items():
if "aliases" not in existing_index_definition:
continue

if index_name in existing_index_definition["aliases"]:
logger.debug(
f"Got index {existing_index_name} by its alias {index_name}"
)
return existing_index_definition

return None

async def upsert(self, _id, index_name, doc):
return await self._retrier.execute_with_retry(
partial(
Expand Down
9 changes: 5 additions & 4 deletions connectors/es/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -836,16 +836,17 @@ async def has_active_license_enabled(self, license_):
# TODO: think how to make it not a proxy method to the client
return await self.es_management_client.has_active_license_enabled(license_)

def extract_index_or_alias(self, get_index_response, expected_index_name):
return None

async def prepare_content_index(self, index_name, language_code=None):
"""Creates the index, given a mapping/settings if it does not exist."""
self._logger.debug(f"Checking index {index_name}")

result = await self.es_management_client.get_index(
index = await self.es_management_client.get_index_or_alias(
index_name, ignore_unavailable=True
)

index = result.get(index_name, None)

mappings = Mappings.default_text_fields_mappings(is_connectors_index=True)

if index:
Expand All @@ -859,7 +860,7 @@ async def prepare_content_index(self, index_name, language_code=None):
)

await self.es_management_client.ensure_content_index_mappings(
index_name, mappings
index_name=index_name, index=index, desired_mappings=mappings
)
else:
# Create a new index
Expand Down
84 changes: 72 additions & 12 deletions tests/es/test_management_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,9 @@ async def test_ensure_content_index_mappings_when_mappings_exist(
mappings = {}
existing_mappings_response = {index_name: {"mappings": ["something"]}}

es_management_client.client.indices.get_mapping = AsyncMock(
return_value=existing_mappings_response
await es_management_client.ensure_content_index_mappings(
index_name, existing_mappings_response, mappings
)

await es_management_client.ensure_content_index_mappings(index_name, mappings)
es_management_client.client.indices.put_mapping.assert_not_called()

@pytest.mark.asyncio
Expand All @@ -85,11 +83,9 @@ async def test_ensure_content_index_mappings_when_mappings_do_not_exist(
}
existing_mappings_response = {index_name: {"mappings": {}}}

es_management_client.client.indices.get_mapping = AsyncMock(
return_value=existing_mappings_response
await es_management_client.ensure_content_index_mappings(
index_name, existing_mappings_response, mappings
)

await es_management_client.ensure_content_index_mappings(index_name, mappings)
es_management_client.client.indices.put_mapping.assert_awaited_with(
index=index_name,
dynamic=mappings["dynamic"],
Expand All @@ -105,11 +101,9 @@ async def test_ensure_content_index_mappings_when_mappings_do_not_exist_but_no_m
mappings = None
existing_mappings_response = {index_name: {"mappings": {}}}

es_management_client.client.indices.get_mapping = AsyncMock(
return_value=existing_mappings_response
await es_management_client.ensure_content_index_mappings(
index_name, existing_mappings_response, mappings
)

await es_management_client.ensure_content_index_mappings(index_name, mappings)
es_management_client.client.indices.put_mapping.assert_not_called()

@pytest.mark.asyncio
Expand Down Expand Up @@ -367,3 +361,69 @@ async def test_create_connector_secret(self, es_management_client, mock_response
body={"value": secret_value},
headers={"accept": "application/json", "content-type": "application/json"},
)

@pytest.mark.asyncio
async def test_extract_index_or_alias_with_index(self, es_management_client):
response = {
"shapo-online": {
"aliases": {"search-shapo-online": {}},
"mappings": {},
"settings": {},
}
}

es_management_client.client.indices.get = AsyncMock(return_value=response)

index = await es_management_client.get_index_or_alias("shapo-online")

assert index == response["shapo-online"]

@pytest.mark.asyncio
async def test_extract_index_or_alias_with_alias(self, es_management_client):
response = {
"shapo-online": {
"aliases": {"search-shapo-online": {}},
"mappings": {},
"settings": {},
}
}

es_management_client.client.indices.get = AsyncMock(return_value=response)

index = await es_management_client.get_index_or_alias("search-shapo-online")

assert index == response["shapo-online"]

@pytest.mark.asyncio
async def test_extract_index_or_alias_when_none_present(self, es_management_client):
response = {
"shapo-online": {
"aliases": {"search-shapo-online": {}},
"mappings": {},
"settings": {},
}
}

es_management_client.client.indices.get = AsyncMock(return_value=response)

index = await es_management_client.get_index_or_alias("nonexistent")

assert index is None

@pytest.mark.asyncio
async def test_get_index_or_alias(self, es_management_client, mock_responses):
secret_id = "secret-id"
secret_value = "my-secret"

es_management_client.client.perform_request = AsyncMock(
return_value={"id": secret_id}
)

returned_id = await es_management_client.create_connector_secret(secret_value)
assert returned_id == secret_id
es_management_client.client.perform_request.assert_awaited_with(
"POST",
"/_connector/_secret",
body={"value": secret_value},
headers={"accept": "application/json", "content-type": "application/json"},
)
5 changes: 4 additions & 1 deletion tests/test_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,9 @@ async def test_prepare_content_index(mock_responses):

await es.close()

put_mapping_mock.assert_called_with(index_name, mappings)
put_mapping_mock.assert_called_with(
index_name=index_name, index=response[index_name], desired_mappings=mappings
)


def set_responses(mock_responses, ts=None):
Expand Down Expand Up @@ -1481,6 +1483,7 @@ async def test_cancel_sync(extractor_task_done, sink_task_done, force_cancel):
es._sink.force_cancel.assert_not_called()


@pytest.mark.asyncio
async def test_extractor_run_when_mem_full_is_raised():
docs_from_source = [
{"_id": 1},
Expand Down