Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,30 @@ const buildTestDefinitions = (): Section[] => {
{
title: 'ES|QL commands and functions usage',
tests: [
{
title: 'using LOOKUP JOIN',
question: `
The user is working with both the "records" and "threats" indices. "threats" has the field "source.ip", "threat_level", "threat_type". "records" has the field "source.ip", "action", "timestamp".

Generate a query returning the 10 logs where threat_level is "high" or "medium", ordered by timestamp from most recent to oldest,
show only the source.ip, action, threat_level, and threat_type fields.

You should use the LOOKUP JOIN function to answer this question.

The relevant fields are:
- source.ip: keyword
- action: keyword
- threat_level: keyword
- threat_type: keyword
- timestamp: datetime
`,
expected: `FROM records
| LOOKUP JOIN threats ON source.ip
| WHERE threat_level IN ("high", "medium")
| SORT timestamp
| KEEP source.ip, action, threat_level, threat_type
| LIMIT 10`,
},
{
title: 'using FLOOR and CEIL',
question: `
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,12 @@ export async function extractDocEntries({
log: ToolingLog;
inferenceClient: ScriptInferenceClient;
}): Promise<ExtractionOutput> {
const path = `${builtDocsDir}/html/en/elasticsearch/reference/current/esql*.html`;
const files = await fastGlob(path);
const paths = ['esql*.html', '_lookup_join.html'].map(
(path) => `${builtDocsDir}/html/en/elasticsearch/reference/current/${path}`
);
const files = await fastGlob(paths);
if (!files.length) {
throw new Error(`No files found at path: ${path}`);
throw new Error(`No files found at paths: ${paths}`);
}

const output: ExtractionOutput = {
Expand Down Expand Up @@ -129,11 +131,24 @@ async function processFile({
limiter,
executePrompt,
});
} else if (basename === '_lookup_join.html') {
const $element = load(fileContent)('*');
const command: ExtractedCommandOrFunc = {
name: 'lookup-join',
markdownContent: await executePrompt(
convertToMarkdownPrompt({ htmlContent: getSimpleText($element) })
),
command: true,
};
output.commands.push(command);
} else if (contextArticles.includes(basename)) {
const $element = load(fileContent)('*');
output.pages.push({
sourceFile: basename,
name: basename === 'esql.html' ? 'overview' : basename.substring(5, basename.length - 5),
name:
basename === 'esql.html'
? 'overview'
: basename.replace(/^esql-/, '').replace(/\.html$/, ''),
content: getSimpleText($element),
});
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ export const generateDoc = async ({
})
);

const pageContentByName = (pageName: string) =>
extraction.pages.find((page) => page.name === pageName)!.content;
const pageContentByName = (pageName: string) => {
return extraction.pages.find((page) => page.name === pageName)?.content;
};

const pages: PageGeneration[] = [
{
Expand Down Expand Up @@ -100,10 +101,12 @@ export const generateDoc = async ({
await Promise.all(
pages.map(async (page) => {
return limiter(async () => {
const content = pageContentByName(page.sourceFile);
if (!content) return;
const pageContent = await callOutput(
createDocumentationPagePrompt({
documentation,
content: pageContentByName(page.sourceFile),
content,
specificInstructions: page.instructions,
})
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
const aliases: Record<string, string[]> = {
STATS: ['STATS_BY', 'BY', 'STATS...BY', 'STATS ... BY'],
OPERATORS: ['LIKE', 'RLIKE', 'IN'],
JOIN: ['LOOKUP JOIN'],
};

const getAliasMap = () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,17 @@ export class EsqlDocumentBase {
}

getDocumentation(
keywords: string[],
rawKeywords: string[],
{
generateMissingKeywordDoc = true,
addSuggestions = true,
addOverview = true,
resolveAliases = true,
}: GetDocsOptions = {}
) {
keywords = keywords.map((raw) => {
let keyword = format(raw);
const keywords = rawKeywords.map((raw) => {
// LOOKUP JOIN has space so we want to retain as is
let keyword = raw.toLowerCase().includes('join') ? raw : format(raw);
if (resolveAliases) {
keyword = tryResolveAlias(keyword);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# LOOKUP JOIN

The `LOOKUP JOIN` command combines data from a query results table with matching records from a specified lookup index. It adds fields from the lookup index as new columns to the results table based on matching values in the join field. This is particularly useful for enriching or correlating data across multiple indices, such as logs, IPs, user IDs, or hosts.

## Syntax

`LOOKUP JOIN <lookup_index> ON <field_name>`

### Parameters

#### lookup_index

The name of the lookup index. This must be a specific index name—wildcards, aliases, and remote cluster references are not supported. Indices used for lookups must be configured with the `lookup` mode.

#### field_name

The field to join on. This field must exist in both the current query results and the lookup index. If the field contains multi-valued entries, those entries will not match anything, and the added fields will contain `null` for those rows.

If no rows match in the lookup index, the incoming row is retained, and `null` values are added. If multiple rows in the lookup index match, one row is added per match.

## Examples

### Example 1: Enriching Firewall Logs with Threat Data

This example demonstrates how to enrich firewall logs with threat data from a lookup index.

#### Sample Data Setup

##### Create the `threat_list` index

```esql
PUT threat_list
{
"settings": {
"index.mode": "lookup"
},
"mappings": {
"properties": {
"source.ip": { "type": "ip" },
"threat_level": { "type": "keyword" },
"threat_type": { "type": "keyword" },
"last_updated": { "type": "date" }
}
}
}
```

##### Create the `firewall_logs` index

```esql
PUT firewall_logs
{
"mappings": {
"properties": {
"timestamp": { "type": "date" },
"source.ip": { "type": "ip" },
"destination.ip": { "type": "ip" },
"action": { "type": "keyword" },
"bytes_transferred": { "type": "long" }
}
}
}
```

##### Add sample data to `threat_list`

```esql
POST threat_list/_bulk
{"index":{}}
{"source.ip":"203.0.113.5","threat_level":"high","threat_type":"C2_SERVER","last_updated":"2025-04-22"}
{"index":{}}
{"source.ip":"198.51.100.2","threat_level":"medium","threat_type":"SCANNER","last_updated":"2025-04-23"}
```

##### Add sample data to `firewall_logs`

```esql
POST firewall_logs/_bulk
{"index":{}}
{"timestamp":"2025-04-23T10:00:01Z","source.ip":"192.0.2.1","destination.ip":"10.0.0.100","action":"allow","bytes_transferred":1024}
{"index":{}}
{"timestamp":"2025-04-23T10:00:05Z","source.ip":"203.0.113.5","destination.ip":"10.0.0.55","action":"allow","bytes_transferred":2048}
{"index":{}}
{"timestamp":"2025-04-23T10:00:08Z","source.ip":"198.51.100.2","destination.ip":"10.0.0.200","action":"block","bytes_transferred":0}
{"index":{}}
{"timestamp":"2025-04-23T10:00:15Z","source.ip":"203.0.113.5","destination.ip":"10.0.0.44","action":"allow","bytes_transferred":4096}
{"index":{}}
{"timestamp":"2025-04-23T10:00:30Z","source.ip":"192.0.2.1","destination.ip":"10.0.0.100","action":"allow","bytes_transferred":512}
```

#### Query the Data

```esql
FROM firewall_logs
| LOOKUP JOIN threat_list ON source.ip
| WHERE threat_level IS NOT NULL
| SORT timestamp
| KEEP source.ip, action, threat_level, threat_type
| LIMIT 10
```

This query:
- Matches the `source.ip` field in `firewall_logs` with the `source.ip` field in `threat_list`.
- Filters rows to include only those with non-null `threat_level`.
- Sorts the results by `timestamp`.
- Keeps only the `source.ip`, `action`, `threat_level`, and `threat_type` fields.
- Limits the output to 10 rows.

#### Response

| source.ip | action | threat_type | threat_level |
|---------------|--------|-------------|--------------|
| 203.0.113.5 | allow | C2_SERVER | high |
| 198.51.100.2 | block | SCANNER | medium |
| 203.0.113.5 | allow | C2_SERVER | high |

In this example, the `source.ip` field from `firewall_logs` is matched with the `source.ip` field in `threat_list`, and the corresponding `threat_level` and `threat_type` fields are added to the output.

## Limitations

- Indices in `lookup` mode are always single-sharded.
- Cross-cluster search is not supported; both source and lookup indices must be local.
- Only equality-based matching is supported.
- `LOOKUP JOIN` can only use a single match field and a single index.
- Wildcards, aliases, datemath, and datastreams are not supported.
- The match field in `LOOKUP JOIN <lookup_index> ON <match_field>` must match an existing field in the query. Renames or evaluations may be required to achieve this.
- The query may circuit break if there are too many matching documents in the lookup index or if the documents are too large. `LOOKUP JOIN` processes data in batches of approximately 10,000 rows, which can require significant heap space for large matching documents.
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,13 @@ The following processing commands are available:
function and can group using grouping functions.
- SORT: sorts the row in a table by a column. Expressions are not supported.
- WHERE: Filters rows based on a boolean condition. WHERE supports the same functions as EVAL.
- LOOKUP JOIN: Joins data from a query results table with matching records from a specified lookup index.

## Functions and operators

### Join functions


### Grouping functions

BUCKET: Creates groups of values out of a datetime or numeric input
Expand Down Expand Up @@ -359,3 +363,15 @@ FROM personal_info
| KEEP user_name, birth
| SORT birth
```


**Look up and join the `source.ip` field in `firewall_logs` with the `source.ip` field in `threat_list`, filters rows to include only those with non-null `threat_level`, sorts the results by `timestamp`, and keeps only the `source.ip`, `action`, `threat_level`, and `threat_type` fields.**

```esql
FROM firewall_logs
| LOOKUP JOIN threat_list ON source.ip
| WHERE threat_level IS NOT NULL
| SORT timestamp
| KEEP source.ip, action, threat_level, threat_type
| LIMIT 10
```