[Entity Store] Add support to Cross Cluster Search#254779
[Entity Store] Add support to Cross Cluster Search#254779romulets merged 12 commits intoelastic:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds Cross Cluster Search (CCS) support to the Entity Store, enabling extraction of entity data from remote indices in addition to local ones. Remote indices are queried in parallel with local extraction, and partial entities from CCS are written to the updates data stream for merging in the next extraction run. Additionally, it fixes the extraction window persistence by storing lastSearchTimestamp as lastExecutionTimestamp.
Changes:
- Split index patterns into local and remote (CCS), running extraction paths in parallel
- CCS extraction writes partial entities to updates stream with timestamps in the extraction window
- Fixed
lastExecutionTimestampto uselastSearchTimestamp(thetoDateISOof the completed window) instead of "now"
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| factories.ts | Instantiates CcsLogsExtractionClient and passes it to LogsExtractionClient via dependencies object |
| upsert_bulk.ts | Updates upsertEntitiesBulk call to use options object instead of boolean parameter |
| request_context_factory.ts | Creates CcsLogsExtractionClient and injects it into LogsExtractionClient |
| logs_extraction_client.ts | Adds CCS parallel extraction logic, splits index patterns, fixes lastExecutionTimestamp to use lastSearchTimestamp |
| logs_extraction_client.test.ts | Updates tests for constructor changes and adds CCS extraction verification |
| logs_extraction_query_builder.ts | Adds buildCcsLogsExtractionEsqlQuery for CCS-only extraction without LOOKUP JOIN |
| logs_extraction_query_builder.test.ts | Adds tests for CCS query builder |
| logs_extraction_query_builder.test.ts.snap | Snapshot tests for CCS ESQL queries |
| crud_client/utils.ts | Adds timestampGenerator parameter for custom timestamp generation during upsert |
| crud_client/utils.test.ts | Tests for flat document handling with force=true |
| crud_client/index.ts | Changes upsertEntitiesBulk to accept options object with timestampGenerator |
| ccs_logs_extraction_client.ts | New client for CCS extraction that writes partial entities to updates stream |
| ccs_logs_extraction_client.test.ts | Comprehensive tests for CCS extraction client |
| asset_manager.ts | Updates getIndexPatterns call to getLocalIndexPatterns |
918412b to
ac806f7
Compare
| abortController: opts?.abortController, | ||
| }); | ||
|
|
||
| const [mainResult, ccsResult] = await Promise.all([mainPromise, ccsPromise]); |
There was a problem hiding this comment.
One concern that comes to mind:
We’re currently using the same timeframe window for both remoteExtraction and localExtraction.
Consider the following scenario:
The local loop finishes with a last log timestamp of X + 1
The remote loop finishes with a last log timestamp of X + 2
Now we have two possible approaches for the next iteration:
Option 1:
Use X + 1 as the starting point for the next run.
→ This would likely result in duplicate document collection on the remote cluster.
Option 2:
Use X + 2 as the starting point for the next run.
→ This would likely result in missing logs on the local cluster.
What am I missing here?
There was a problem hiding this comment.
We explored this problem on the tech beat yesterday. It's a known issue and currently the CCS extraction is best effort, having always the main extraction as the leading window setting.
We decided to explore a better resilience to this on a next moment. But this allows us already to test with CCS env.
15ae915 to
82dcb34
Compare
…ion and move to generic entity definition
💔 Build Failed
Failed CI StepsMetrics [docs]
History
cc @romulets |
kubasobon
left a comment
There was a problem hiding this comment.
LGTM 🚀 Great way to iterate fast
## Summary
Adds support for extracting entity data from **remote (CCS) indices** in
addition to local indices. Remote indices are queried in parallel with
local extraction; partial entities from CCS are written to the
**updates** data stream so the **next run** merges them into the latest
index. Also fixes persistence of the extraction window by storing
`lastSearchTimestamp` as `lastExecutionTimestamp`.
**Obs: only works with all clusters being on >9.4.0**
## How the CCS solution works
Main extraction uses ESQL with a **LOOKUP JOIN** against the latest
index, which **does not support cross-cluster indices**. So we split
index patterns into **local** and **remote**, run two paths in parallel,
and let the existing “updates → next run” flow merge remote data.
```
┌─────────────────────────────────────────────────────────────────────────────┐
│ Single extraction run │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Index patterns (data view) │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ Split by CCS │──────┬──────────────────────────────────────────────┐ │
│ └──────────────┘ │ │ │
│ ▼ ▼ │
│ ┌──────────────────┐ ┌───────────────────┐ │
│ │ Local patterns │ │ Remote patterns │ │
│ │ (e.g. logs-*) │ │ (e.g. remote:logs)│ │
│ └────────┬─────────┘ └────────┬──────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Main extraction │ │ CCS extraction │ │
│ │ ESQL + LOOKUP │ │ ESQL (no LOOKUP) │ │
│ │ → latest index │ │ → updates stream │ │
│ └──────────────────┘ └──────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ Next extraction run │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Main extraction reads: [ updates stream, local patterns, ... ] │
│ → Picks up partial entities written by CCS in the previous run │
│ → LOOKUP JOIN + merge → latest index │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
```
1. **Step 1 (current run)**
- CCS extraction reads only **remote** index patterns.
- Runs ESQL aggregation (same logic as main, but no LOOKUP).
- Writes results as partial entities to the **updates** data stream
(with `@timestamp` in the extraction window, e.g. `toDateISO` + small
random offset to reduce collisions).
2. **Step 2 (next run)**
- Main extraction uses index patterns that **include the updates data
stream** (and local indices).
- Documents written by CCS in step 1 are now in that stream.
- Main extraction runs ESQL + LOOKUP JOIN against the latest index and
merges everything (local + updates, including those CCS partials) into
latest.
No change is required to the main extraction logic; it already reads
from updates. We only add a parallel path that **feeds** updates from
remote indices so the next run sees them.
---
## Fix: `lastExecutionTimestamp` from `lastSearchTimestamp`
The **next** extraction window is computed as:
- `fromDate` = `lastExecutionTimestamp` (minus delay), or lookback if
not set
- `toDate` = now − delay
So **`lastExecutionTimestamp` must be the end of the window we just
finished searching** (i.e. the `toDateISO` of that run). If we stored
something else (e.g. “now” at update time), the next run would use the
wrong `fromDate` and we could **skip** a segment of data or create
overlapping windows.
**Change:** After a successful extraction we now set:
```ts
lastExecutionTimestamp: lastSearchTimestamp || moment().utc().toISOString()
```
where `lastSearchTimestamp` is the **toDateISO** of the window we just
completed. The next run therefore starts from the correct point and no
segment is dropped.
## Testing manually:
1. Start an ECH deployment on 9.4-SNAPSHOT
2. Go to Stack Management > API Keys and generate a new `Cross-Cluster`
api key.
- Save the provided credentials
3. Start kibana and elasticsearch local
4. Add the stored credentials to the local deployment running this
command in your CLI:`.es/9.4.0/bin/elasticsearch-keystore add
cluster.remote.${REMOTE_CLUSTER_NAME}.credentials`. This command will
prompt you to add the credential.
5. Reload security settings from kibana dev tools `POST
/_nodes/reload_secure_settings`
6. Go to the cloud console of your deployment, under security, at the
bottom of the page copy the proxy address
<img width="1143" height="194" alt="image"
src="https://github.com/user-attachments/assets/19ce4142-c184-466a-a1e1-a91ecdbec18f"
/>
7. Register a new cluster with the proxy address
```
PUT _cluster/settings
{
"persistent": {
"cluster.remote.${REMOTE_CLUSTER_NAME}.mode": "proxy",
"cluster.remote.${REMOTE_CLUSTER_NAME}.proxy_address": "${PROXY_ADDRESS}"
}
}
```
8. Add data to the remote cluster observer it be ingested in your
environment!
VerifiedI created 2 Elastic Cloud projects and connected one into another using the Testing manually section in this PRs description and https://www.elastic.co/docs/deploy-manage/remote-clusters/ec-remote-cluster-same-ess. I installed Entity Store adding the Proof
Image: Doctored entity with
Image: Request to |


Summary
Adds support for extracting entity data from remote (CCS) indices in addition to local indices. Remote indices are queried in parallel with local extraction; partial entities from CCS are written to the updates data stream so the next run merges them into the latest index. Also fixes persistence of the extraction window by storing
lastSearchTimestampaslastExecutionTimestamp.Obs: only works with all clusters being on >9.4.0
How the CCS solution works
Main extraction uses ESQL with a LOOKUP JOIN against the latest index, which does not support cross-cluster indices. So we split index patterns into local and remote, run two paths in parallel, and let the existing “updates → next run” flow merge remote data.
Step 1 (current run)
@timestampin the extraction window, e.g.toDateISO+ small random offset to reduce collisions).Step 2 (next run)
No change is required to the main extraction logic; it already reads from updates. We only add a parallel path that feeds updates from remote indices so the next run sees them.
Fix:
lastExecutionTimestampfromlastSearchTimestampThe next extraction window is computed as:
fromDate=lastExecutionTimestamp(minus delay), or lookback if not settoDate= now − delaySo
lastExecutionTimestampmust be the end of the window we just finished searching (i.e. thetoDateISOof that run). If we stored something else (e.g. “now” at update time), the next run would use the wrongfromDateand we could skip a segment of data or create overlapping windows.Change: After a successful extraction we now set:
where
lastSearchTimestampis the toDateISO of the window we just completed. The next run therefore starts from the correct point and no segment is dropped.Testing manually:
Cross-Clusterapi key..es/9.4.0/bin/elasticsearch-keystore add cluster.remote.${REMOTE_CLUSTER_NAME}.credentials. This command will prompt you to add the credential.POST /_nodes/reload_secure_settings