[ML]Fix latest transforms disregarding updates when sort and sync fields are non-monotonic#142856
[ML]Fix latest transforms disregarding updates when sort and sync fields are non-monotonic#142856valeriy42 merged 12 commits intoelastic:mainfrom
Conversation
|
Hi @valeriy42, I've created a changelog YAML for you. |
There was a problem hiding this comment.
Pull request overview
This PR fixes a bug in continuous latest transforms where documents could be incorrectly overwritten when the sort field and sync time field don't increase monotonically together. The fix implements a two-phase change detection mechanism similar to pivot transforms, ensuring the destination always contains the document with the highest sort value for each unique key.
Changes:
- Implemented two-phase change detection in
LatestChangeCollectorto correctly handle non-monotonic sort/sync field alignment - Updated
Latest.buildChangeCollectorto pass the unique key list to the change collector - Added comprehensive unit tests for the new change collector behavior
- Added integration test reproducing the non-monotonic scenario
- Added YAML REST tests validating preview and batch behavior with non-monotonic data
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| LatestChangeCollector.java | Implements two-phase change detection with composite aggregation and filtering logic |
| Latest.java | Passes unique key to change collector constructor |
| LatestChangeCollectorTests.java | Comprehensive unit tests for the new change collector implementation |
| TransformLatestRestIT.java | Integration test verifying correct behavior with non-monotonic sort/sync fields |
| transforms_latest.yml | YAML REST tests for preview and batch operations with non-monotonic data |
| 142856.yaml | Changelog entry documenting the bug fix |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
| } | ||
|
|
||
| public void testProcessSearchResponseClearsPreviousPageKeys() throws IOException { |
There was a problem hiding this comment.
The test name 'testProcessSearchResponseClearsPreviousPageKeys' is misleading. The test actually verifies that calling processSearchResponse replaces (not clears) the previous page's keys with new keys from the current page, as evidenced by the assertion checking for only 'page2-id'. Consider renaming to 'testProcessSearchResponseReplacesKeysFromPreviousPage' to better reflect the actual behavior.
| public void testProcessSearchResponseClearsPreviousPageKeys() throws IOException { | |
| public void testProcessSearchResponseReplacesKeysFromPreviousPage() throws IOException { |
|
Pinging @elastic/ml-core (Team:ML) |
x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/transform/transforms_latest.yml
Outdated
Show resolved
Hide resolved
x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/transform/transforms_latest.yml
Show resolved
Hide resolved
...src/main/java/org/elasticsearch/xpack/transform/transforms/latest/LatestChangeCollector.java
Show resolved
Hide resolved
|
@prwhelan I addressed your comments. Please take another look. |
…lds are non-monotonic (elastic#142856) Continuous latest transforms could overwrite newer documents if sort and sync fields didn't increase together. Checkpoint N only queried documents in `[lastCheckpoint, nextCheckpoint)`, making some documents invisible and causing `top_hits` to select the wrong document. The fix introduces two-phase change detection: phase 1 finds updated keys via a composite aggregation; phase 2 runs a full query with a filter for those keys, ensuring `top_hits` picks the latest document. Changes are within `LatestChangeCollector` with no impact on API or schema, ensuring correct behavior after upgrade. The new behavior aligns `latest` with the pattern used by `pivot` transforms. Pivot employs `CompositeBucketsChangeCollector`, which runs two phases via `TransformIndexer`: in **IDENTIFY_CHANGES**, it performs a composite aggregation over the checkpoint window with sync range, recording changed buckets using collectors like `TermsFieldCollector` and `DateHistogramFieldCollector`. In **APPLY_RESULTS**, it builds the pivot query, narrowing it with filters from these collectors. `Latest` now mirrors this at the unique-key level: phase 1 is a composite over unique key fields, and phase 2 filters by collected key values to run over full history. The key difference is that pivot’s “changed buckets” are the group-by dimensions, while latest’s are the unique key values for recomputing Performance impact is limited: one extra search per checkpoint in phase 1 (composite aggregation only, no `top_hits`), and phase 2 processes only changed unique keys, not the whole dataset. No Painless scripts, per-document GET/UpdateRequest, or new destination fields. Unit tests cover `LatestChangeCollector` (buildChangesQuery, processSearchResponse, buildFilterQuery, clear, single and multi-field unique key, null buckets); a Java REST test reproduces the non-monotonic scenario (two docs, same key, different sort/sync order) and asserts the destination keeps the doc with higher sort value after checkpoint 2; YAML REST tests assert latest preview and batch behavior with non-monotonic data. Fixes elastic#90643
…lds are non-monotonic (elastic#142856) Continuous latest transforms could overwrite newer documents if sort and sync fields didn't increase together. Checkpoint N only queried documents in `[lastCheckpoint, nextCheckpoint)`, making some documents invisible and causing `top_hits` to select the wrong document. The fix introduces two-phase change detection: phase 1 finds updated keys via a composite aggregation; phase 2 runs a full query with a filter for those keys, ensuring `top_hits` picks the latest document. Changes are within `LatestChangeCollector` with no impact on API or schema, ensuring correct behavior after upgrade. The new behavior aligns `latest` with the pattern used by `pivot` transforms. Pivot employs `CompositeBucketsChangeCollector`, which runs two phases via `TransformIndexer`: in **IDENTIFY_CHANGES**, it performs a composite aggregation over the checkpoint window with sync range, recording changed buckets using collectors like `TermsFieldCollector` and `DateHistogramFieldCollector`. In **APPLY_RESULTS**, it builds the pivot query, narrowing it with filters from these collectors. `Latest` now mirrors this at the unique-key level: phase 1 is a composite over unique key fields, and phase 2 filters by collected key values to run over full history. The key difference is that pivot’s “changed buckets” are the group-by dimensions, while latest’s are the unique key values for recomputing Performance impact is limited: one extra search per checkpoint in phase 1 (composite aggregation only, no `top_hits`), and phase 2 processes only changed unique keys, not the whole dataset. No Painless scripts, per-document GET/UpdateRequest, or new destination fields. Unit tests cover `LatestChangeCollector` (buildChangesQuery, processSearchResponse, buildFilterQuery, clear, single and multi-field unique key, null buckets); a Java REST test reproduces the non-monotonic scenario (two docs, same key, different sort/sync order) and asserts the destination keeps the doc with higher sort value after checkpoint 2; YAML REST tests assert latest preview and batch behavior with non-monotonic data. Fixes elastic#90643
…lds are non-monotonic (elastic#142856) Continuous latest transforms could overwrite newer documents if sort and sync fields didn't increase together. Checkpoint N only queried documents in `[lastCheckpoint, nextCheckpoint)`, making some documents invisible and causing `top_hits` to select the wrong document. The fix introduces two-phase change detection: phase 1 finds updated keys via a composite aggregation; phase 2 runs a full query with a filter for those keys, ensuring `top_hits` picks the latest document. Changes are within `LatestChangeCollector` with no impact on API or schema, ensuring correct behavior after upgrade. The new behavior aligns `latest` with the pattern used by `pivot` transforms. Pivot employs `CompositeBucketsChangeCollector`, which runs two phases via `TransformIndexer`: in **IDENTIFY_CHANGES**, it performs a composite aggregation over the checkpoint window with sync range, recording changed buckets using collectors like `TermsFieldCollector` and `DateHistogramFieldCollector`. In **APPLY_RESULTS**, it builds the pivot query, narrowing it with filters from these collectors. `Latest` now mirrors this at the unique-key level: phase 1 is a composite over unique key fields, and phase 2 filters by collected key values to run over full history. The key difference is that pivot’s “changed buckets” are the group-by dimensions, while latest’s are the unique key values for recomputing Performance impact is limited: one extra search per checkpoint in phase 1 (composite aggregation only, no `top_hits`), and phase 2 processes only changed unique keys, not the whole dataset. No Painless scripts, per-document GET/UpdateRequest, or new destination fields. Unit tests cover `LatestChangeCollector` (buildChangesQuery, processSearchResponse, buildFilterQuery, clear, single and multi-field unique key, null buckets); a Java REST test reproduces the non-monotonic scenario (two docs, same key, different sort/sync order) and asserts the destination keeps the doc with higher sort value after checkpoint 2; YAML REST tests assert latest preview and batch behavior with non-monotonic data. Fixes elastic#90643
…lds are non-monotonic (#142856) (#143215) Continuous latest transforms could overwrite newer documents if sort and sync fields didn't increase together. Checkpoint N only queried documents in `[lastCheckpoint, nextCheckpoint)`, making some documents invisible and causing `top_hits` to select the wrong document. The fix introduces two-phase change detection: phase 1 finds updated keys via a composite aggregation; phase 2 runs a full query with a filter for those keys, ensuring `top_hits` picks the latest document. Changes are within `LatestChangeCollector` with no impact on API or schema, ensuring correct behavior after upgrade. The new behavior aligns `latest` with the pattern used by `pivot` transforms. Pivot employs `CompositeBucketsChangeCollector`, which runs two phases via `TransformIndexer`: in **IDENTIFY_CHANGES**, it performs a composite aggregation over the checkpoint window with sync range, recording changed buckets using collectors like `TermsFieldCollector` and `DateHistogramFieldCollector`. In **APPLY_RESULTS**, it builds the pivot query, narrowing it with filters from these collectors. `Latest` now mirrors this at the unique-key level: phase 1 is a composite over unique key fields, and phase 2 filters by collected key values to run over full history. The key difference is that pivot’s “changed buckets” are the group-by dimensions, while latest’s are the unique key values for recomputing Performance impact is limited: one extra search per checkpoint in phase 1 (composite aggregation only, no `top_hits`), and phase 2 processes only changed unique keys, not the whole dataset. No Painless scripts, per-document GET/UpdateRequest, or new destination fields. Unit tests cover `LatestChangeCollector` (buildChangesQuery, processSearchResponse, buildFilterQuery, clear, single and multi-field unique key, null buckets); a Java REST test reproduces the non-monotonic scenario (two docs, same key, different sort/sync order) and asserts the destination keeps the doc with higher sort value after checkpoint 2; YAML REST tests assert latest preview and batch behavior with non-monotonic data. Fixes #90643
…lds are non-monotonic (#142856) (#143213) Continuous latest transforms could overwrite newer documents if sort and sync fields didn't increase together. Checkpoint N only queried documents in `[lastCheckpoint, nextCheckpoint)`, making some documents invisible and causing `top_hits` to select the wrong document. The fix introduces two-phase change detection: phase 1 finds updated keys via a composite aggregation; phase 2 runs a full query with a filter for those keys, ensuring `top_hits` picks the latest document. Changes are within `LatestChangeCollector` with no impact on API or schema, ensuring correct behavior after upgrade. The new behavior aligns `latest` with the pattern used by `pivot` transforms. Pivot employs `CompositeBucketsChangeCollector`, which runs two phases via `TransformIndexer`: in **IDENTIFY_CHANGES**, it performs a composite aggregation over the checkpoint window with sync range, recording changed buckets using collectors like `TermsFieldCollector` and `DateHistogramFieldCollector`. In **APPLY_RESULTS**, it builds the pivot query, narrowing it with filters from these collectors. `Latest` now mirrors this at the unique-key level: phase 1 is a composite over unique key fields, and phase 2 filters by collected key values to run over full history. The key difference is that pivot’s “changed buckets” are the group-by dimensions, while latest’s are the unique key values for recomputing Performance impact is limited: one extra search per checkpoint in phase 1 (composite aggregation only, no `top_hits`), and phase 2 processes only changed unique keys, not the whole dataset. No Painless scripts, per-document GET/UpdateRequest, or new destination fields. Unit tests cover `LatestChangeCollector` (buildChangesQuery, processSearchResponse, buildFilterQuery, clear, single and multi-field unique key, null buckets); a Java REST test reproduces the non-monotonic scenario (two docs, same key, different sort/sync order) and asserts the destination keeps the doc with higher sort value after checkpoint 2; YAML REST tests assert latest preview and batch behavior with non-monotonic data. Fixes #90643
…ync fields are non-monotonic (#142856) (#143214) * [ML]Fix latest transforms disregarding updates when sort and sync fields are non-monotonic (#142856) Continuous latest transforms could overwrite newer documents if sort and sync fields didn't increase together. Checkpoint N only queried documents in `[lastCheckpoint, nextCheckpoint)`, making some documents invisible and causing `top_hits` to select the wrong document. The fix introduces two-phase change detection: phase 1 finds updated keys via a composite aggregation; phase 2 runs a full query with a filter for those keys, ensuring `top_hits` picks the latest document. Changes are within `LatestChangeCollector` with no impact on API or schema, ensuring correct behavior after upgrade. The new behavior aligns `latest` with the pattern used by `pivot` transforms. Pivot employs `CompositeBucketsChangeCollector`, which runs two phases via `TransformIndexer`: in **IDENTIFY_CHANGES**, it performs a composite aggregation over the checkpoint window with sync range, recording changed buckets using collectors like `TermsFieldCollector` and `DateHistogramFieldCollector`. In **APPLY_RESULTS**, it builds the pivot query, narrowing it with filters from these collectors. `Latest` now mirrors this at the unique-key level: phase 1 is a composite over unique key fields, and phase 2 filters by collected key values to run over full history. The key difference is that pivot’s “changed buckets” are the group-by dimensions, while latest’s are the unique key values for recomputing Performance impact is limited: one extra search per checkpoint in phase 1 (composite aggregation only, no `top_hits`), and phase 2 processes only changed unique keys, not the whole dataset. No Painless scripts, per-document GET/UpdateRequest, or new destination fields. Unit tests cover `LatestChangeCollector` (buildChangesQuery, processSearchResponse, buildFilterQuery, clear, single and multi-field unique key, null buckets); a Java REST test reproduces the non-monotonic scenario (two docs, same key, different sort/sync order) and asserts the destination keeps the doc with higher sort value after checkpoint 2; YAML REST tests assert latest preview and batch behavior with non-monotonic data. Fixes #90643 * Fix build error
…lds are non-monotonic (elastic#142856) Continuous latest transforms could overwrite newer documents if sort and sync fields didn't increase together. Checkpoint N only queried documents in `[lastCheckpoint, nextCheckpoint)`, making some documents invisible and causing `top_hits` to select the wrong document. The fix introduces two-phase change detection: phase 1 finds updated keys via a composite aggregation; phase 2 runs a full query with a filter for those keys, ensuring `top_hits` picks the latest document. Changes are within `LatestChangeCollector` with no impact on API or schema, ensuring correct behavior after upgrade. The new behavior aligns `latest` with the pattern used by `pivot` transforms. Pivot employs `CompositeBucketsChangeCollector`, which runs two phases via `TransformIndexer`: in **IDENTIFY_CHANGES**, it performs a composite aggregation over the checkpoint window with sync range, recording changed buckets using collectors like `TermsFieldCollector` and `DateHistogramFieldCollector`. In **APPLY_RESULTS**, it builds the pivot query, narrowing it with filters from these collectors. `Latest` now mirrors this at the unique-key level: phase 1 is a composite over unique key fields, and phase 2 filters by collected key values to run over full history. The key difference is that pivot’s “changed buckets” are the group-by dimensions, while latest’s are the unique key values for recomputing Performance impact is limited: one extra search per checkpoint in phase 1 (composite aggregation only, no `top_hits`), and phase 2 processes only changed unique keys, not the whole dataset. No Painless scripts, per-document GET/UpdateRequest, or new destination fields. Unit tests cover `LatestChangeCollector` (buildChangesQuery, processSearchResponse, buildFilterQuery, clear, single and multi-field unique key, null buckets); a Java REST test reproduces the non-monotonic scenario (two docs, same key, different sort/sync order) and asserts the destination keeps the doc with higher sort value after checkpoint 2; YAML REST tests assert latest preview and batch behavior with non-monotonic data. Fixes elastic#90643
…cations * upstream/main: (35 commits) Create ARM bulk sqrI8 implementation (elastic#142461) Rework get-snapshots predicates (elastic#143161) Refactor downsampling fetchers and producers (elastic#140357) ESQL: Unmute test and add extra logging to generative test validation (elastic#143168) Fix metadata fields being nullified/loaded by unmapped_fields setting (elastic#143155) Determine remote cluster version (elastic#142494) Populate failure message for aborted clones (elastic#143206) Allow kibana_system role to read and manage logs streams (elastic#143053) Mute org.elasticsearch.xpack.esql.CsvIT test {csv-spec:eval.DocsLength} elastic#143224 Mute org.elasticsearch.xpack.esql.CsvIT test {csv-spec:eval.DocsByteLength} elastic#143223 Mute org.elasticsearch.xpack.esql.CsvIT test {csv-spec:docs.DocsBitLength} elastic#143222 Fix FloatVectorScorerSupplier bulkScore bug (elastic#143211) ESQL: Add data node execution for external sources (elastic#143209) [ESQL] Cleanup commands docs (elastic#143058) [ML]Fix latest transforms disregarding updates when sort and sync fields are non-monotonic (elastic#142856) Mute org.elasticsearch.index.mapper.IpFieldMapperTests testSyntheticSourceInObject elastic#143212 Tests: Fix StoreDirectoryMetricsIT (elastic#143084) ESQL: Add distribution strategy for external sources (elastic#143194) CSV IT spec (elastic#142585) Fix VectorScorerOSQBenchmark.score to read corrections properly (elastic#143137) ...
…ync fields are non-monotonic (#142856) (#143214) * [ML]Fix latest transforms disregarding updates when sort and sync fields are non-monotonic (#142856) Continuous latest transforms could overwrite newer documents if sort and sync fields didn't increase together. Checkpoint N only queried documents in `[lastCheckpoint, nextCheckpoint)`, making some documents invisible and causing `top_hits` to select the wrong document. The fix introduces two-phase change detection: phase 1 finds updated keys via a composite aggregation; phase 2 runs a full query with a filter for those keys, ensuring `top_hits` picks the latest document. Changes are within `LatestChangeCollector` with no impact on API or schema, ensuring correct behavior after upgrade. The new behavior aligns `latest` with the pattern used by `pivot` transforms. Pivot employs `CompositeBucketsChangeCollector`, which runs two phases via `TransformIndexer`: in **IDENTIFY_CHANGES**, it performs a composite aggregation over the checkpoint window with sync range, recording changed buckets using collectors like `TermsFieldCollector` and `DateHistogramFieldCollector`. In **APPLY_RESULTS**, it builds the pivot query, narrowing it with filters from these collectors. `Latest` now mirrors this at the unique-key level: phase 1 is a composite over unique key fields, and phase 2 filters by collected key values to run over full history. The key difference is that pivot’s “changed buckets” are the group-by dimensions, while latest’s are the unique key values for recomputing Performance impact is limited: one extra search per checkpoint in phase 1 (composite aggregation only, no `top_hits`), and phase 2 processes only changed unique keys, not the whole dataset. No Painless scripts, per-document GET/UpdateRequest, or new destination fields. Unit tests cover `LatestChangeCollector` (buildChangesQuery, processSearchResponse, buildFilterQuery, clear, single and multi-field unique key, null buckets); a Java REST test reproduces the non-monotonic scenario (two docs, same key, different sort/sync order) and asserts the destination keeps the doc with higher sort value after checkpoint 2; YAML REST tests assert latest preview and batch behavior with non-monotonic data. Fixes #90643 * Fix build error
…lds are non-monotonic (elastic#142856) Continuous latest transforms could overwrite newer documents if sort and sync fields didn't increase together. Checkpoint N only queried documents in `[lastCheckpoint, nextCheckpoint)`, making some documents invisible and causing `top_hits` to select the wrong document. The fix introduces two-phase change detection: phase 1 finds updated keys via a composite aggregation; phase 2 runs a full query with a filter for those keys, ensuring `top_hits` picks the latest document. Changes are within `LatestChangeCollector` with no impact on API or schema, ensuring correct behavior after upgrade. The new behavior aligns `latest` with the pattern used by `pivot` transforms. Pivot employs `CompositeBucketsChangeCollector`, which runs two phases via `TransformIndexer`: in **IDENTIFY_CHANGES**, it performs a composite aggregation over the checkpoint window with sync range, recording changed buckets using collectors like `TermsFieldCollector` and `DateHistogramFieldCollector`. In **APPLY_RESULTS**, it builds the pivot query, narrowing it with filters from these collectors. `Latest` now mirrors this at the unique-key level: phase 1 is a composite over unique key fields, and phase 2 filters by collected key values to run over full history. The key difference is that pivot’s “changed buckets” are the group-by dimensions, while latest’s are the unique key values for recomputing Performance impact is limited: one extra search per checkpoint in phase 1 (composite aggregation only, no `top_hits`), and phase 2 processes only changed unique keys, not the whole dataset. No Painless scripts, per-document GET/UpdateRequest, or new destination fields. Unit tests cover `LatestChangeCollector` (buildChangesQuery, processSearchResponse, buildFilterQuery, clear, single and multi-field unique key, null buckets); a Java REST test reproduces the non-monotonic scenario (two docs, same key, different sort/sync order) and asserts the destination keeps the doc with higher sort value after checkpoint 2; YAML REST tests assert latest preview and batch behavior with non-monotonic data. Fixes elastic#90643
Continuous latest transforms could overwrite newer documents if sort and sync fields didn't increase together. Checkpoint N only queried documents in
[lastCheckpoint, nextCheckpoint), making some documents invisible and causingtop_hitsto select the wrong document. The fix introduces two-phase change detection: phase 1 finds updated keys via a composite aggregation; phase 2 runs a full query with a filter for those keys, ensuringtop_hitspicks the latest document. Changes are withinLatestChangeCollectorwith no impact on API or schema, ensuring correct behavior after upgrade.The new behavior aligns
latestwith the pattern used bypivottransforms. Pivot employsCompositeBucketsChangeCollector, which runs two phases viaTransformIndexer: in IDENTIFY_CHANGES, it performs a composite aggregation over the checkpoint window with sync range, recording changed buckets using collectors likeTermsFieldCollectorandDateHistogramFieldCollector. In APPLY_RESULTS, it builds the pivot query, narrowing it with filters from these collectors.Latestnow mirrors this at the unique-key level: phase 1 is a composite over unique key fields, and phase 2 filters by collected key values to run over full history. The key difference is that pivot’s “changed buckets” are the group-by dimensions, while latest’s are the unique key values for recomputingPerformance impact is limited: one extra search per checkpoint in phase 1 (composite aggregation only, no
top_hits), and phase 2 processes only changed unique keys, not the whole dataset. No Painless scripts, per-document GET/UpdateRequest, or new destination fields. Unit tests coverLatestChangeCollector(buildChangesQuery, processSearchResponse, buildFilterQuery, clear, single and multi-field unique key, null buckets); a Java REST test reproduces the non-monotonic scenario (two docs, same key, different sort/sync order) and asserts the destination keeps the doc with higher sort value after checkpoint 2; YAML REST tests assert latest preview and batch behavior with non-monotonic data.Fixes #90643