diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 000000000..2e5c0c50d --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,126 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Development Commands + +### Installation and Setup +- `make install` - Install dependencies and set up the development environment +- `make clean` - Clean build artifacts and virtual environment +- `make clean install` - Clean install from scratch + +### Code Quality +- `make lint` - Run linting with ruff and type checking with pyright +- `make autoformat` - Auto-format code with ruff (runs the linter as part of this, and fixes what it can) +- `make test` - Run unit tests with pytest (requires 92% coverage) + +### Testing +- `make test` - Run unit tests with coverage reporting +- `make ftest NAME={service_type}` - Run functional tests for a specific connector +- `make ftrace NAME={service_type}` - Run functional tests with performance tracing + +### Running the Service +- `make run` - Run the connector service in debug mode +- `.venv/bin/elastic-ingest --help` - View CLI options +- `make default-config SERVICE_TYPE={type}` - Generate default configuration + +### Docker Operations +- `make docker-build` - Build Docker image +- `make docker-run` - Run Docker container +- `make agent-docker-build` - Build agent Docker image +- `make agent-docker-run` - Run agent Docker container + +## Architecture Overview + +This is the Elastic Connectors framework - a Python-based system for syncing data from various sources into Elasticsearch. + +### Core Components + +**Service Layer (`connectors/service_cli.py`)** +- Main entry point via `elastic-ingest` command +- Runs an asynchronous event loop that polls Elasticsearch for sync jobs +- Follows the Connector Protocol for communication with Kibana + +**Source Framework (`connectors/source.py`)** +- `BaseDataSource` - Base class for all connector implementations +- Rich Configurable Fields (RCF) system for dynamic UI generation in Kibana +- Support for full sync, incremental sync, and document-level security + +**Configuration (`connectors/config.py`)** +- YAML-based configuration with environment variable support +- Sources registry mapping connector names to their Python classes +- Default Elasticsearch connection and bulk operation settings + +### Connector Sources + +All connector implementations are in `connectors/sources/` and registered in `config.py`: +- MongoDB, MySQL, PostgreSQL, Oracle, MSSQL, Redis (databases) +- SharePoint Online/Server, OneDrive, Box, Dropbox, Google Drive (file storage) +- Jira, Confluence, ServiceNow, Salesforce, Slack, Teams (SaaS platforms) +- S3, Azure Blob Storage, Google Cloud Storage (cloud storage) +- GitHub, Notion, Gmail, Outlook, Zoom (other platforms) + +### Key Patterns + +**Async Architecture** +- All connector code must be non-blocking using asyncio +- Use `async`/`await` for I/O operations +- For CPU-bound work, use `run_in_executor` with threads/processes + +**Connector Implementation** +- Extend `BaseDataSource` class +- Implement `get_docs()` for full sync, `get_docs_incrementally()` for incremental sync +- Define `get_default_configuration()` for Rich Configurable Fields +- Use `self._logger` for logging to include sync job context + +**Document Processing** +- Documents yielded as tuples: `(document, lazy_download, operation)` +- Operations: `index`, `update`, `delete` +- Content extraction via Tika for binary files +- Document IDs stored in memory for deletion detection + +**Sync Rules** +- Basic rules: framework-level filtering (enabled by default) +- Advanced rules: connector-specific filtering (implement `advanced_rules_validators()`) +- Document-level security via `_allow_permissions` field + +## Testing Requirements + +### Unit Tests +- Located in `tests/` directory +- Minimum 92% test coverage required +- Run with `make test` +- Mock external services and APIs + +### Functional Tests +- Located in `tests/fixtures/` for each connector +- Requires Docker containers or real service credentials +- Must return >10k documents to test pagination +- Run with `make ftest NAME={connector_name}` +- Includes performance monitoring and memory usage analysis + +### Test Configuration +- `pytest.ini` configures async mode and warning filters +- `pyrightconfig.json` sets up type checking for Python 3.10+ +- Coverage reports generated in HTML format + +## Development Guidelines + +**Adding New Connectors** +1. Create connector class in `connectors/sources/{name}.py` +2. Add mapping in `connectors/config.py` sources section +3. Create unit tests in `tests/sources/test_{name}.py` +4. Add dependencies to `requirements/framework.txt` (pinned versions) +5. Provide Docker test environment in `tests/fixtures/{name}/` +6. Implement required methods from `BaseDataSource` + +**Dependencies** +- Python 3.10+ required +- Install architecture-specific requirements: `requirements/{arch}.txt` +- Pin all dependency versions +- Document licenses for new dependencies + +**Configuration** +- Copy `config.yml.example` to `config.yml` for local development +- Environment variables supported via EnvYAML +- Elasticsearch connection defaults to `localhost:9200` with `elastic/changeme` \ No newline at end of file diff --git a/connectors/config.py b/connectors/config.py index c55ad7454..12b098b70 100644 --- a/connectors/config.py +++ b/connectors/config.py @@ -117,6 +117,7 @@ def _default_config(): "confluence": "connectors.sources.confluence:ConfluenceDataSource", "dir": "connectors.sources.directory:DirectoryDataSource", "dropbox": "connectors.sources.dropbox:DropboxDataSource", + "elasticsearch_mappings": "connectors.sources.elasticsearch_mappings:ElasticsearchMappingsDataSource", "github": "connectors.sources.github:GitHubDataSource", "gmail": "connectors.sources.gmail:GMailDataSource", "google_cloud_storage": "connectors.sources.google_cloud_storage:GoogleCloudStorageDataSource", diff --git a/connectors/sources/elasticsearch_mappings.py b/connectors/sources/elasticsearch_mappings.py new file mode 100644 index 000000000..89673d7bf --- /dev/null +++ b/connectors/sources/elasticsearch_mappings.py @@ -0,0 +1,191 @@ +# +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License 2.0; +# you may not use this file except in compliance with the Elastic License 2.0. +# +""" +Elasticsearch Mappings Connector + +Connects to an Elasticsearch cluster and extracts index mappings as documents. +""" + +import json + +from connectors.es.client import ESClient +from connectors.source import BaseDataSource + + +class ElasticsearchMappingsDataSource(BaseDataSource): + """Elasticsearch Mappings""" + + name = "Elasticsearch Mappings" + service_type = "elasticsearch_mappings" + + def __init__(self, configuration): + super().__init__(configuration=configuration) + + # Build ES client configuration from our configuration + es_config = { + "host": self.configuration["host"], + "ssl": self.configuration.get("ssl", False), + "verify_certs": self.configuration.get("ssl_verify", True), + } + + if self.configuration.get("username") and self.configuration.get("password"): + es_config["username"] = self.configuration["username"] + es_config["password"] = self.configuration["password"] + + if self.configuration.get("api_key"): + es_config["api_key"] = self.configuration["api_key"] + + self.es_client = ESClient(es_config) + self.include_system_indices = self.configuration.get( + "include_system_indices", False + ) + + @classmethod + def get_default_configuration(cls): + return { + "host": { + "label": "Elasticsearch Host", + "order": 1, + "type": "str", + "value": "http://localhost:9200", + }, + "username": { + "label": "Username", + "order": 2, + "type": "str", + "value": "", + "required": False, + }, + "password": { + "label": "Password", + "order": 3, + "type": "str", + "value": "", + "required": False, + "sensitive": True, + }, + "api_key": { + "label": "API Key", + "order": 4, + "type": "str", + "value": "", + "required": False, + "sensitive": True, + }, + "ssl": { + "label": "Enable SSL", + "order": 5, + "type": "bool", + "value": False, + }, + "ssl_verify": { + "label": "Verify SSL certificate", + "order": 6, + "type": "bool", + "value": True, + }, + "include_system_indices": { + "label": "Include system indices (starting with .)", + "order": 7, + "type": "bool", + "value": False, + }, + } + + async def ping(self): + """Test connection to Elasticsearch""" + try: + response = await self.es_client.ping() + return response is not None + except Exception as e: + self._logger.error(f"Failed to ping Elasticsearch: {e}") + return False + + async def close(self): + """Close the ES client""" + if hasattr(self, "es_client"): + await self.es_client.close() + + async def _get_indices(self): + """Get list of indices from Elasticsearch""" + try: + # Use _cat/indices API to get index list + response = await self.es_client.client.cat.indices(format="json", h="index") + indices = [item["index"] for item in response] + + # Filter out system indices if not requested + if not self.include_system_indices: + indices = [idx for idx in indices if not idx.startswith(".")] + + return indices + except Exception as e: + self._logger.error(f"Error getting indices: {e}") + return [] + + async def _get_index_mapping(self, index_name): + """Get mapping for a specific index""" + try: + response = await self.es_client.client.indices.get_mapping(index=index_name) + return response.get(index_name, {}).get("mappings", {}) + except Exception as e: + self._logger.warning(f"Error getting mapping for {index_name}: {e}") + return {} + + async def get_docs(self, filtering=None): + """Yield documents containing index mappings""" + indices = await self._get_indices() + self._logger.info(f"Found {len(indices)} indices to process") + + for index_name in indices: + self._logger.debug(f"Processing mapping for index: {index_name}") + + mapping = await self._get_index_mapping(index_name) + + # Create document with mapping information + doc = { + "_id": index_name, # Use index name as document ID + "index_name": index_name, + "mapping": json.dumps(mapping), # Convert mapping to JSON string + "source_cluster": self.es_client.configured_host, + } + + # Extract _meta properties and add them as separate fields + if mapping and "_meta" in mapping: + for meta_key, meta_value in mapping["_meta"].items(): + doc[f"meta_{meta_key}"] = meta_value + + # Extract field descriptions from meta.description properties + field_descriptions = [] + if mapping and "properties" in mapping: + field_descriptions = self._extract_field_descriptions( + mapping["properties"] + ) + + if field_descriptions: + doc["field_descriptions"] = field_descriptions + + # No lazy download function needed for this connector + yield doc, None + + def _extract_field_descriptions(self, properties): + """Extract field descriptions from meta.description properties""" + descriptions = [] + + for _field_name, field_config in properties.items(): + # Check if this field has a meta.description + if isinstance(field_config, dict) and "meta" in field_config: + meta = field_config["meta"] + if isinstance(meta, dict) and "description" in meta: + descriptions.append(meta["description"]) + + # Recursively check nested properties + if isinstance(field_config, dict) and "properties" in field_config: + nested_descriptions = self._extract_field_descriptions( + field_config["properties"] + ) + descriptions.extend(nested_descriptions) + + return descriptions diff --git a/tests/sources/test_elasticsearch_mappings.py b/tests/sources/test_elasticsearch_mappings.py new file mode 100644 index 000000000..1daf72acb --- /dev/null +++ b/tests/sources/test_elasticsearch_mappings.py @@ -0,0 +1,255 @@ +# +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License 2.0; +# you may not use this file except in compliance with the Elastic License 2.0. +# +from unittest.mock import AsyncMock + +import pytest + +from connectors.sources.elasticsearch_mappings import ElasticsearchMappingsDataSource +from tests.sources.support import assert_basics, create_source + + +@pytest.mark.asyncio +async def test_basics(): + await assert_basics( + ElasticsearchMappingsDataSource, "host", "http://localhost:9200" + ) + + +@pytest.mark.asyncio +async def test_ping_success(): + async with create_source(ElasticsearchMappingsDataSource) as source: + source.es_client.ping = AsyncMock(return_value={"name": "test"}) + result = await source.ping() + assert result is True + + +@pytest.mark.asyncio +async def test_ping_failure(): + async with create_source(ElasticsearchMappingsDataSource) as source: + source.es_client.ping = AsyncMock(return_value=None) + result = await source.ping() + assert result is False + + +@pytest.mark.asyncio +async def test_get_docs(): + async with create_source(ElasticsearchMappingsDataSource) as source: + # Mock ES client methods + source.es_client.client.cat.indices = AsyncMock( + return_value=[ + {"index": "test-index-1"}, + {"index": "test-index-2"}, + {"index": ".system-index"}, # should be filtered out by default + ] + ) + + source.es_client.client.indices.get_mapping = AsyncMock( + side_effect=[ + { + "test-index-1": { + "mappings": { + "properties": { + "field1": {"type": "text"}, + "field2": {"type": "keyword"}, + "nested_field": { + "type": "nested", + "properties": {"inner_field": {"type": "text"}}, + }, + } + } + } + }, + { + "test-index-2": { + "mappings": {"properties": {"simple_field": {"type": "text"}}} + } + }, + ] + ) + + documents = [] + async for doc, _lazy_download in source.get_docs(): + documents.append(doc) + assert _lazy_download is None # No lazy download for this connector + + assert len(documents) == 2 + + # Check first document + doc1 = documents[0] + assert doc1["_id"] == "test-index-1" + assert doc1["index_name"] == "test-index-1" + assert "mapping" in doc1 + # mapping should be a JSON string, not an object + import json + + mapping_obj = json.loads(doc1["mapping"]) + assert "properties" in mapping_obj + assert doc1["source_cluster"] == "http://localhost:9200" + + # Check second document + doc2 = documents[1] + assert doc2["_id"] == "test-index-2" + assert doc2["index_name"] == "test-index-2" + + +@pytest.mark.asyncio +async def test_get_docs_with_system_indices(): + async with create_source( + ElasticsearchMappingsDataSource, include_system_indices=True + ) as source: + # Mock ES client methods + source.es_client.client.cat.indices = AsyncMock( + return_value=[{"index": "test-index"}, {"index": ".kibana"}] + ) + + source.es_client.client.indices.get_mapping = AsyncMock( + side_effect=[ + { + "test-index": { + "mappings": {"properties": {"field1": {"type": "text"}}} + } + }, + { + ".kibana": { + "mappings": {"properties": {"config": {"type": "object"}}} + } + }, + ] + ) + + documents = [] + async for doc, _lazy_download in source.get_docs(): + documents.append(doc) + + assert len(documents) == 2 + index_names = {doc["index_name"] for doc in documents} + assert "test-index" in index_names + assert ".kibana" in index_names + + +@pytest.mark.asyncio +async def test_get_docs_with_auth(): + async with create_source( + ElasticsearchMappingsDataSource, username="elastic", password="changeme" + ) as source: + # Mock ES client methods + source.es_client.client.cat.indices = AsyncMock( + return_value=[{"index": "test-index"}] + ) + + source.es_client.client.indices.get_mapping = AsyncMock( + return_value={ + "test-index": {"mappings": {"properties": {"field1": {"type": "text"}}}} + } + ) + + documents = [] + async for doc, _lazy_download in source.get_docs(): + documents.append(doc) + + assert len(documents) == 1 + assert documents[0]["index_name"] == "test-index" + + +@pytest.mark.asyncio +async def test_get_docs_with_meta_and_field_descriptions(): + async with create_source(ElasticsearchMappingsDataSource) as source: + # Mock ES client methods + source.es_client.client.cat.indices = AsyncMock( + return_value=[{"index": "test-index-with-meta"}] + ) + + source.es_client.client.indices.get_mapping = AsyncMock( + return_value={ + "test-index-with-meta": { + "mappings": { + "_meta": { + "version": "1.0", + "author": "test-user", + "description": "Test index with metadata", + }, + "properties": { + "field1": { + "type": "text", + "meta": {"description": "First field description"}, + }, + "field2": { + "type": "keyword", + "meta": {"description": "Second field description"}, + }, + "nested_field": { + "type": "nested", + "properties": { + "inner_field": { + "type": "text", + "meta": { + "description": "Nested field description" + }, + } + }, + }, + }, + } + } + } + ) + + documents = [] + async for doc, _lazy_download in source.get_docs(): + documents.append(doc) + + assert len(documents) == 1 + doc = documents[0] + + # Check _meta properties are extracted + assert doc["meta_version"] == "1.0" + assert doc["meta_author"] == "test-user" + assert doc["meta_description"] == "Test index with metadata" + + # Check field descriptions are extracted + assert "field_descriptions" in doc + field_descriptions = doc["field_descriptions"] + assert len(field_descriptions) == 3 + assert "First field description" in field_descriptions + assert "Second field description" in field_descriptions + assert "Nested field description" in field_descriptions + + +@pytest.mark.asyncio +async def test_get_docs_empty_mapping(): + async with create_source(ElasticsearchMappingsDataSource) as source: + # Mock ES client methods + source.es_client.client.cat.indices = AsyncMock( + return_value=[{"index": "empty-index"}] + ) + + source.es_client.client.indices.get_mapping = AsyncMock( + return_value={"empty-index": {"mappings": {}}} + ) + + documents = [] + async for doc, _lazy_download in source.get_docs(): + documents.append(doc) + + assert len(documents) == 1 + doc = documents[0] + assert doc["_id"] == "empty-index" + assert doc["index_name"] == "empty-index" + + +@pytest.mark.asyncio +async def test_changed(): + async with create_source(ElasticsearchMappingsDataSource) as source: + result = await source.changed() + assert result is True + + +@pytest.mark.asyncio +async def test_close(): + async with create_source(ElasticsearchMappingsDataSource) as source: + source.es_client.close = AsyncMock() + await source.close() + source.es_client.close.assert_called_once()