Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .coveragerc
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[report]
omit = connectors/quartz.py,connectors/conftest.py,tests/*
omit = connectors/quartz.py,connectors/conftest.py,tests/*,connectors/agent/*,connectors/cli/*
2 changes: 1 addition & 1 deletion Dockerfile.ftest.wolfi
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM docker.elastic.co/wolfi/python:3.11-dev@sha256:6d21dd09efd41b21f4abeab6291bc93af86d1a438225eeac98bdea85f6617b22
FROM docker.elastic.co/wolfi/python:3.11-dev@sha256:7f67246b4646373e777d1e91d22cc0acb27b74969adcd2992e20c1e61c8759a4
USER root
COPY . /connectors
WORKDIR /connectors
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.wolfi
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM docker.elastic.co/wolfi/python:3.11-dev@sha256:6d21dd09efd41b21f4abeab6291bc93af86d1a438225eeac98bdea85f6617b22
FROM docker.elastic.co/wolfi/python:3.11-dev@sha256:7f67246b4646373e777d1e91d22cc0acb27b74969adcd2992e20c1e61c8759a4
USER root
COPY . /app
WORKDIR /app
Expand Down
Empty file removed connectors/agent/README.md
Empty file.
5 changes: 5 additions & 0 deletions connectors/agent/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
5 changes: 5 additions & 0 deletions connectors/cli/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
57 changes: 43 additions & 14 deletions connectors/es/management_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,30 +63,29 @@ async def create_content_index(self, search_index_name, language_code):
)
)

async def ensure_content_index_mappings(self, index, mappings):
async def ensure_content_index_mappings(self, index_name, index, desired_mappings):
# open = Match open, non-hidden indices. Also matches any non-hidden data stream.
# Content indices are always non-hidden.
response = await self._retrier.execute_with_retry(
partial(self.client.indices.get_mapping, index=index)
)

existing_mappings = response[index].get("mappings", {})
existing_mappings = index.get("mappings", {})
if len(existing_mappings) == 0:
if mappings:
if desired_mappings:
logger.info(
"Index %s has no mappings or it's empty. Adding mappings...", index
)
try:
await self._retrier.execute_with_retry(
partial(
self.client.indices.put_mapping,
index=index,
dynamic=mappings.get("dynamic", False),
dynamic_templates=mappings.get("dynamic_templates", []),
properties=mappings.get("properties", {}),
index=index_name,
dynamic=desired_mappings.get("dynamic", False),
dynamic_templates=desired_mappings.get(
"dynamic_templates", []
),
properties=desired_mappings.get("properties", {}),
)
)
logger.info("Successfully added mappings for index %s", index)
logger.info("Successfully added mappings for index %s", index_name)
except Exception as e:
logger.warning(
f"Could not create mappings for index {index}, encountered error {e}"
Expand All @@ -97,7 +96,7 @@ async def ensure_content_index_mappings(self, index, mappings):
)
else:
logger.debug(
"Index %s already has mappings, skipping mappings creation", index
"Index %s already has mappings, skipping mappings creation", index_name
)

async def ensure_content_index_settings(
Expand Down Expand Up @@ -225,15 +224,45 @@ async def index_exists(self, index_name):
partial(self.client.indices.exists, index=index_name)
)

async def get_index(self, index_name, ignore_unavailable=False):
return await self._retrier.execute_with_retry(
async def get_index_or_alias(self, index_name, ignore_unavailable=False):
"""
Get index definition (mappings and settings) by its name or its alias.
"""
# Example structure of response:
# {
# "my-data-index": {
# "aliases": {"search-my-data-index": {}},
# "mappings": { ... },
# "settings": { ... },
# }
# }
get_index_response = await self._retrier.execute_with_retry(
partial(
self.client.indices.get,
index=index_name,
ignore_unavailable=ignore_unavailable,
)
)

if index_name in get_index_response:
logger.debug(f"Got index by its name: {index_name}")
return get_index_response[index_name]

for (
existing_index_name,
existing_index_definition,
) in get_index_response.items():
if "aliases" not in existing_index_definition:
continue

if index_name in existing_index_definition["aliases"]:
logger.debug(
f"Got index {existing_index_name} by its alias {index_name}"
)
return existing_index_definition

return None

async def upsert(self, _id, index_name, doc):
return await self._retrier.execute_with_retry(
partial(
Expand Down
9 changes: 5 additions & 4 deletions connectors/es/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -836,16 +836,17 @@ async def has_active_license_enabled(self, license_):
# TODO: think how to make it not a proxy method to the client
return await self.es_management_client.has_active_license_enabled(license_)

def extract_index_or_alias(self, get_index_response, expected_index_name):
return None

async def prepare_content_index(self, index_name, language_code=None):
"""Creates the index, given a mapping/settings if it does not exist."""
self._logger.debug(f"Checking index {index_name}")

result = await self.es_management_client.get_index(
index = await self.es_management_client.get_index_or_alias(
index_name, ignore_unavailable=True
)

index = result.get(index_name, None)

mappings = Mappings.default_text_fields_mappings(is_connectors_index=True)

if index:
Expand All @@ -859,7 +860,7 @@ async def prepare_content_index(self, index_name, language_code=None):
)

await self.es_management_client.ensure_content_index_mappings(
index_name, mappings
index_name=index_name, index=index, desired_mappings=mappings
)
else:
# Create a new index
Expand Down
5 changes: 5 additions & 0 deletions connectors/sources/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
13 changes: 7 additions & 6 deletions connectors/sync_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@
from elasticsearch import (
AuthorizationException as ElasticAuthorizationException,
)
from elasticsearch import (
NotFoundError as ElasticNotFoundError,
)
from elasticsearch import NotFoundError as ElasticNotFoundError

from connectors.config import DataSourceFrameworkConfig
from connectors.es.client import License, with_concurrency_control
Expand Down Expand Up @@ -178,6 +176,7 @@ async def execute(self):
if (
self.connector.native
and self.connector.features.native_connector_api_keys_enabled()
and self.service_config.get("_use_native_connector_api_keys", True)
):
# Update the config so native connectors can use API key authentication during sync
await self._update_native_connector_authentication()
Expand Down Expand Up @@ -380,9 +379,11 @@ async def _sync_done(self, sync_status, sync_error=None):
sync_cursor = (
None
if not self.data_provider # If we failed before initializing the data provider, we don't need to change the cursor
else self.data_provider.sync_cursor()
if self.sync_job.is_content_sync()
else None
else (
self.data_provider.sync_cursor()
if self.sync_job.is_content_sync()
else None
)
)
await self.connector.sync_done(
self.sync_job if await self.reload_sync_job() else None,
Expand Down
5 changes: 4 additions & 1 deletion tests/agent/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@


@patch("connectors.agent.cli.ConnectorsAgentComponent", return_value=AsyncMock())
def test_main(patch_component):
def test_main_responds_to_sigterm(patch_component):
async def kill():
await asyncio.sleep(0.2)
os.kill(os.getpid(), signal.SIGTERM)

loop = asyncio.new_event_loop()
loop.create_task(kill())

# No asserts here.
# main() will block forever unless it's killed with a signal
# This test succeeds if it exits, if it hangs it'll be killed by a timeout
main()

loop.close()
84 changes: 72 additions & 12 deletions tests/es/test_management_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,9 @@ async def test_ensure_content_index_mappings_when_mappings_exist(
mappings = {}
existing_mappings_response = {index_name: {"mappings": ["something"]}}

es_management_client.client.indices.get_mapping = AsyncMock(
return_value=existing_mappings_response
await es_management_client.ensure_content_index_mappings(
index_name, existing_mappings_response, mappings
)

await es_management_client.ensure_content_index_mappings(index_name, mappings)
es_management_client.client.indices.put_mapping.assert_not_called()

@pytest.mark.asyncio
Expand All @@ -85,11 +83,9 @@ async def test_ensure_content_index_mappings_when_mappings_do_not_exist(
}
existing_mappings_response = {index_name: {"mappings": {}}}

es_management_client.client.indices.get_mapping = AsyncMock(
return_value=existing_mappings_response
await es_management_client.ensure_content_index_mappings(
index_name, existing_mappings_response, mappings
)

await es_management_client.ensure_content_index_mappings(index_name, mappings)
es_management_client.client.indices.put_mapping.assert_awaited_with(
index=index_name,
dynamic=mappings["dynamic"],
Expand All @@ -105,11 +101,9 @@ async def test_ensure_content_index_mappings_when_mappings_do_not_exist_but_no_m
mappings = None
existing_mappings_response = {index_name: {"mappings": {}}}

es_management_client.client.indices.get_mapping = AsyncMock(
return_value=existing_mappings_response
await es_management_client.ensure_content_index_mappings(
index_name, existing_mappings_response, mappings
)

await es_management_client.ensure_content_index_mappings(index_name, mappings)
es_management_client.client.indices.put_mapping.assert_not_called()

@pytest.mark.asyncio
Expand Down Expand Up @@ -367,3 +361,69 @@ async def test_create_connector_secret(self, es_management_client, mock_response
body={"value": secret_value},
headers={"accept": "application/json", "content-type": "application/json"},
)

@pytest.mark.asyncio
async def test_extract_index_or_alias_with_index(self, es_management_client):
response = {
"shapo-online": {
"aliases": {"search-shapo-online": {}},
"mappings": {},
"settings": {},
}
}

es_management_client.client.indices.get = AsyncMock(return_value=response)

index = await es_management_client.get_index_or_alias("shapo-online")

assert index == response["shapo-online"]

@pytest.mark.asyncio
async def test_extract_index_or_alias_with_alias(self, es_management_client):
response = {
"shapo-online": {
"aliases": {"search-shapo-online": {}},
"mappings": {},
"settings": {},
}
}

es_management_client.client.indices.get = AsyncMock(return_value=response)

index = await es_management_client.get_index_or_alias("search-shapo-online")

assert index == response["shapo-online"]

@pytest.mark.asyncio
async def test_extract_index_or_alias_when_none_present(self, es_management_client):
response = {
"shapo-online": {
"aliases": {"search-shapo-online": {}},
"mappings": {},
"settings": {},
}
}

es_management_client.client.indices.get = AsyncMock(return_value=response)

index = await es_management_client.get_index_or_alias("nonexistent")

assert index is None

@pytest.mark.asyncio
async def test_get_index_or_alias(self, es_management_client, mock_responses):
secret_id = "secret-id"
secret_value = "my-secret"

es_management_client.client.perform_request = AsyncMock(
return_value={"id": secret_id}
)

returned_id = await es_management_client.create_connector_secret(secret_value)
assert returned_id == secret_id
es_management_client.client.perform_request.assert_awaited_with(
"POST",
"/_connector/_secret",
body={"value": secret_value},
headers={"accept": "application/json", "content-type": "application/json"},
)
5 changes: 4 additions & 1 deletion tests/test_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,9 @@ async def test_prepare_content_index(mock_responses):

await es.close()

put_mapping_mock.assert_called_with(index_name, mappings)
put_mapping_mock.assert_called_with(
index_name=index_name, index=response[index_name], desired_mappings=mappings
)


def set_responses(mock_responses, ts=None):
Expand Down Expand Up @@ -1481,6 +1483,7 @@ async def test_cancel_sync(extractor_task_done, sink_task_done, force_cancel):
es._sink.force_cancel.assert_not_called()


@pytest.mark.asyncio
async def test_extractor_run_when_mem_full_is_raised():
docs_from_source = [
{"_id": 1},
Expand Down
Loading