Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 126 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Development Commands

### Installation and Setup
- `make install` - Install dependencies and set up the development environment
- `make clean` - Clean build artifacts and virtual environment
- `make clean install` - Clean install from scratch

### Code Quality
- `make lint` - Run linting with ruff and type checking with pyright
- `make autoformat` - Auto-format code with ruff (runs the linter as part of this, and fixes what it can)
- `make test` - Run unit tests with pytest (requires 92% coverage)

### Testing
- `make test` - Run unit tests with coverage reporting
- `make ftest NAME={service_type}` - Run functional tests for a specific connector
- `make ftrace NAME={service_type}` - Run functional tests with performance tracing

### Running the Service
- `make run` - Run the connector service in debug mode
- `.venv/bin/elastic-ingest --help` - View CLI options
- `make default-config SERVICE_TYPE={type}` - Generate default configuration

### Docker Operations
- `make docker-build` - Build Docker image
- `make docker-run` - Run Docker container
- `make agent-docker-build` - Build agent Docker image
- `make agent-docker-run` - Run agent Docker container

## Architecture Overview

This is the Elastic Connectors framework - a Python-based system for syncing data from various sources into Elasticsearch.

### Core Components

**Service Layer (`connectors/service_cli.py`)**
- Main entry point via `elastic-ingest` command
- Runs an asynchronous event loop that polls Elasticsearch for sync jobs
- Follows the Connector Protocol for communication with Kibana

**Source Framework (`connectors/source.py`)**
- `BaseDataSource` - Base class for all connector implementations
- Rich Configurable Fields (RCF) system for dynamic UI generation in Kibana
- Support for full sync, incremental sync, and document-level security

**Configuration (`connectors/config.py`)**
- YAML-based configuration with environment variable support
- Sources registry mapping connector names to their Python classes
- Default Elasticsearch connection and bulk operation settings

### Connector Sources

All connector implementations are in `connectors/sources/` and registered in `config.py`:
- MongoDB, MySQL, PostgreSQL, Oracle, MSSQL, Redis (databases)
- SharePoint Online/Server, OneDrive, Box, Dropbox, Google Drive (file storage)
- Jira, Confluence, ServiceNow, Salesforce, Slack, Teams (SaaS platforms)
- S3, Azure Blob Storage, Google Cloud Storage (cloud storage)
- GitHub, Notion, Gmail, Outlook, Zoom (other platforms)

### Key Patterns

**Async Architecture**
- All connector code must be non-blocking using asyncio
- Use `async`/`await` for I/O operations
- For CPU-bound work, use `run_in_executor` with threads/processes

**Connector Implementation**
- Extend `BaseDataSource` class
- Implement `get_docs()` for full sync, `get_docs_incrementally()` for incremental sync
- Define `get_default_configuration()` for Rich Configurable Fields
- Use `self._logger` for logging to include sync job context

**Document Processing**
- Documents yielded as tuples: `(document, lazy_download, operation)`
- Operations: `index`, `update`, `delete`
- Content extraction via Tika for binary files
- Document IDs stored in memory for deletion detection

**Sync Rules**
- Basic rules: framework-level filtering (enabled by default)
- Advanced rules: connector-specific filtering (implement `advanced_rules_validators()`)
- Document-level security via `_allow_permissions` field

## Testing Requirements

### Unit Tests
- Located in `tests/` directory
- Minimum 92% test coverage required
- Run with `make test`
- Mock external services and APIs

### Functional Tests
- Located in `tests/fixtures/` for each connector
- Requires Docker containers or real service credentials
- Must return >10k documents to test pagination
- Run with `make ftest NAME={connector_name}`
- Includes performance monitoring and memory usage analysis

### Test Configuration
- `pytest.ini` configures async mode and warning filters
- `pyrightconfig.json` sets up type checking for Python 3.10+
- Coverage reports generated in HTML format

## Development Guidelines

**Adding New Connectors**
1. Create connector class in `connectors/sources/{name}.py`
2. Add mapping in `connectors/config.py` sources section
3. Create unit tests in `tests/sources/test_{name}.py`
4. Add dependencies to `requirements/framework.txt` (pinned versions)
5. Provide Docker test environment in `tests/fixtures/{name}/`
6. Implement required methods from `BaseDataSource`

**Dependencies**
- Python 3.10+ required
- Install architecture-specific requirements: `requirements/{arch}.txt`
- Pin all dependency versions
- Document licenses for new dependencies

**Configuration**
- Copy `config.yml.example` to `config.yml` for local development
- Environment variables supported via EnvYAML
- Elasticsearch connection defaults to `localhost:9200` with `elastic/changeme`
1 change: 1 addition & 0 deletions connectors/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def _default_config():
"confluence": "connectors.sources.confluence:ConfluenceDataSource",
"dir": "connectors.sources.directory:DirectoryDataSource",
"dropbox": "connectors.sources.dropbox:DropboxDataSource",
"elasticsearch_mappings": "connectors.sources.elasticsearch_mappings:ElasticsearchMappingsDataSource",
"github": "connectors.sources.github:GitHubDataSource",
"gmail": "connectors.sources.gmail:GMailDataSource",
"google_cloud_storage": "connectors.sources.google_cloud_storage:GoogleCloudStorageDataSource",
Expand Down
191 changes: 191 additions & 0 deletions connectors/sources/elasticsearch_mappings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
"""
Elasticsearch Mappings Connector

Connects to an Elasticsearch cluster and extracts index mappings as documents.
"""

import json

from connectors.es.client import ESClient
from connectors.source import BaseDataSource


class ElasticsearchMappingsDataSource(BaseDataSource):
"""Elasticsearch Mappings"""

name = "Elasticsearch Mappings"
service_type = "elasticsearch_mappings"

def __init__(self, configuration):
super().__init__(configuration=configuration)

# Build ES client configuration from our configuration
es_config = {
"host": self.configuration["host"],
"ssl": self.configuration.get("ssl", False),
"verify_certs": self.configuration.get("ssl_verify", True),
}

if self.configuration.get("username") and self.configuration.get("password"):
es_config["username"] = self.configuration["username"]
es_config["password"] = self.configuration["password"]

if self.configuration.get("api_key"):
es_config["api_key"] = self.configuration["api_key"]

self.es_client = ESClient(es_config)
self.include_system_indices = self.configuration.get(
"include_system_indices", False
)

@classmethod
def get_default_configuration(cls):
return {
"host": {
"label": "Elasticsearch Host",
"order": 1,
"type": "str",
"value": "http://localhost:9200",
},
"username": {
"label": "Username",
"order": 2,
"type": "str",
"value": "",
"required": False,
},
"password": {
"label": "Password",
"order": 3,
"type": "str",
"value": "",
"required": False,
"sensitive": True,
},
"api_key": {
"label": "API Key",
"order": 4,
"type": "str",
"value": "",
"required": False,
"sensitive": True,
},
"ssl": {
"label": "Enable SSL",
"order": 5,
"type": "bool",
"value": False,
},
"ssl_verify": {
"label": "Verify SSL certificate",
"order": 6,
"type": "bool",
"value": True,
},
"include_system_indices": {
"label": "Include system indices (starting with .)",
"order": 7,
"type": "bool",
"value": False,
},
}

async def ping(self):
"""Test connection to Elasticsearch"""
try:
response = await self.es_client.ping()
return response is not None
except Exception as e:
self._logger.error(f"Failed to ping Elasticsearch: {e}")
return False

async def close(self):
"""Close the ES client"""
if hasattr(self, "es_client"):
await self.es_client.close()

async def _get_indices(self):
"""Get list of indices from Elasticsearch"""
try:
# Use _cat/indices API to get index list
response = await self.es_client.client.cat.indices(format="json", h="index")
indices = [item["index"] for item in response]

# Filter out system indices if not requested
if not self.include_system_indices:
indices = [idx for idx in indices if not idx.startswith(".")]

return indices
except Exception as e:
self._logger.error(f"Error getting indices: {e}")
return []

async def _get_index_mapping(self, index_name):
"""Get mapping for a specific index"""
try:
response = await self.es_client.client.indices.get_mapping(index=index_name)
return response.get(index_name, {}).get("mappings", {})
except Exception as e:
self._logger.warning(f"Error getting mapping for {index_name}: {e}")
return {}

async def get_docs(self, filtering=None):
"""Yield documents containing index mappings"""
indices = await self._get_indices()
self._logger.info(f"Found {len(indices)} indices to process")

for index_name in indices:
self._logger.debug(f"Processing mapping for index: {index_name}")

mapping = await self._get_index_mapping(index_name)

# Create document with mapping information
doc = {
"_id": index_name, # Use index name as document ID
"index_name": index_name,
"mapping": json.dumps(mapping), # Convert mapping to JSON string
"source_cluster": self.es_client.configured_host,
}

# Extract _meta properties and add them as separate fields
if mapping and "_meta" in mapping:
for meta_key, meta_value in mapping["_meta"].items():
doc[f"meta_{meta_key}"] = meta_value

# Extract field descriptions from meta.description properties
field_descriptions = []
if mapping and "properties" in mapping:
field_descriptions = self._extract_field_descriptions(
mapping["properties"]
)

if field_descriptions:
doc["field_descriptions"] = field_descriptions

# No lazy download function needed for this connector
yield doc, None

def _extract_field_descriptions(self, properties):
"""Extract field descriptions from meta.description properties"""
descriptions = []

for _field_name, field_config in properties.items():
# Check if this field has a meta.description
if isinstance(field_config, dict) and "meta" in field_config:
meta = field_config["meta"]
if isinstance(meta, dict) and "description" in meta:
descriptions.append(meta["description"])

# Recursively check nested properties
if isinstance(field_config, dict) and "properties" in field_config:
nested_descriptions = self._extract_field_descriptions(
field_config["properties"]
)
descriptions.extend(nested_descriptions)

return descriptions
Loading