Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/changelog/142856.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
area: Transform
issues:
- 90643
pr: 142856
summary: "[ML]Fix latest transforms disregarding updates when sort and sync fields\
\ are non-monotonic"
type: bug
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
setup:
- do:
indices.create:
index: latest-test-data
body:
mappings:
properties:
order_id:
type: keyword
region:
type: keyword
status:
type: keyword
updated_at:
type: date
event_ingested:
type: date

- do:
bulk:
index: latest-test-data
refresh: true
body:
- '{"index": {}}'
- '{"order_id": "order-1", "region": "US", "status": "accepted", "updated_at": "2024-01-01T00:00:05Z", "event_ingested": "2024-01-01T00:00:10Z"}'
- '{"index": {}}'
- '{"order_id": "order-1", "region": "US", "status": "pending", "updated_at": "2024-01-01T00:00:03Z", "event_ingested": "2024-01-01T00:00:20Z"}'
- '{"index": {}}'
- '{"order_id": "order-2", "region": "EU", "status": "shipped", "updated_at": "2024-01-01T00:00:08Z", "event_ingested": "2024-01-01T00:00:15Z"}'
- '{"index": {}}'
- '{"order_id": "order-2", "region": "EU", "status": "processing", "updated_at": "2024-01-01T00:00:01Z", "event_ingested": "2024-01-01T00:00:25Z"}'
- '{"index": {}}'
- '{"order_id": "order-3", "region": "US", "status": "delivered", "updated_at": "2024-01-01T00:00:02Z", "event_ingested": "2024-01-01T00:00:05Z"}'

---
teardown:
- do:
indices.delete:
index: latest-test-data
ignore: 404

---
"Test latest preview picks correct document when sort and sync fields are non-monotonic":
- do:
transform.preview_transform:
body: >
{
"source": { "index": "latest-test-data" },
"latest": {
"unique_key": [ "order_id" ],
"sort": "updated_at"
}
}

- length: { preview: 3 }

# Composite agg returns results sorted by unique_key lexicographically.
# order-1 should pick status=accepted (updated_at T+5s) over status=pending (updated_at T+3s)
# even though pending has a later event_ingested.
- match: { preview.0.order_id: "order-1" }
- match: { preview.0.status: "accepted" }
- match: { preview.1.order_id: "order-2" }
- match: { preview.1.status: "shipped" }
- match: { preview.2.order_id: "order-3" }
- match: { preview.2.status: "delivered" }

---
"Test latest preview with multi-field unique key":
- do:
transform.preview_transform:
body: >
{
"source": { "index": "latest-test-data" },
"latest": {
"unique_key": [ "order_id", "region" ],
"sort": "updated_at"
}
}

- length: { preview: 3 }

# With multi-field unique key [order_id, region], each combination should pick highest updated_at.
# Results sorted lexicographically by [order_id, region].
- match: { preview.0.order_id: "order-1" }
- match: { preview.0.region: "US" }
- match: { preview.0.status: "accepted" }
- match: { preview.1.order_id: "order-2" }
- match: { preview.1.region: "EU" }
- match: { preview.1.status: "shipped" }
- match: { preview.2.order_id: "order-3" }
- match: { preview.2.region: "US" }
- match: { preview.2.status: "delivered" }

---
"Test latest preview includes document IDs":
- do:
transform.preview_transform:
body: >
{
"source": { "index": "latest-test-data" },
"latest": {
"unique_key": [ "order_id" ],
"sort": "updated_at"
}
}

- length: { preview: 3 }

# Verify all expected fields are present with correct values.
- match: { preview.0.order_id: "order-1" }
- match: { preview.0.region: "US" }
- match: { preview.0.status: "accepted" }
- is_true: preview.0.updated_at
- match: { preview.1.order_id: "order-2" }
- match: { preview.1.region: "EU" }
- match: { preview.1.status: "shipped" }
- is_true: preview.1.updated_at
- match: { preview.2.order_id: "order-3" }
- match: { preview.2.region: "US" }
- match: { preview.2.status: "delivered" }
- is_true: preview.2.updated_at
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,92 @@ private void testContinuousLatestWithFrom(String transformId, String indexName,
deleteIndex(indexName);
}

/**
* Verifies that a continuous latest transform correctly handles non-monotonic alignment between
* the sort field and the sync time field (gh#90643).
*
* Scenario: Two documents for the same unique key arrive in separate checkpoints. The first
* document has a higher sort value (updated_at) but a lower sync value (event_ingested). The
* second document has a lower sort value but a higher sync value. Without the fix, the second
* checkpoint overwrites the destination with the stale document because the narrow time-window
* filter hides the first document from top_hits.
*/
public void testContinuousLatestWithNonMonotonicSortField() throws Exception {
String sourceIndex = "non_monotonic_source";
String transformId = "non_monotonic_latest";
String transformIndex = transformId + "-dest";
setupDataAccessRole(DATA_ACCESS_ROLE, sourceIndex, transformIndex);

Request createIndex = new Request("PUT", sourceIndex);
createIndex.setJsonEntity("""
{
"mappings": {
"properties": {
"order_id": { "type": "keyword" },
"status": { "type": "keyword" },
"updated_at": { "type": "date" },
"event_ingested": { "type": "date" }
}
}
}""");
client().performRequest(createIndex);

long now = System.currentTimeMillis();

// Doc A: higher sort value, ingested well in the past so it's visible in checkpoint 1
doBulk(org.elasticsearch.core.Strings.format("""
{"index":{"_index":"%s"}}
{"order_id":"order-1","status":"accepted","updated_at":%d,"event_ingested":%d}
""", sourceIndex, now + 5000, now - 10000), true);

Request createTransformRequest = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
);
createTransformRequest.setJsonEntity(org.elasticsearch.core.Strings.format("""
{
"source": { "index": "%s" },
"dest": { "index": "%s" },
"frequency": "1s",
"sync": {
"time": {
"field": "event_ingested",
"delay": "1s"
}
},
"latest": {
"unique_key": [ "order_id" ],
"sort": "updated_at"
}
}""", sourceIndex, transformIndex));
Map<String, Object> createResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createResponse.get("acknowledged"), equalTo(Boolean.TRUE));

startAndWaitForContinuousTransform(transformId, transformIndex, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS);

Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?q=order_id:order-1");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.status", searchResult)).get(0), is(equalTo("accepted")));

// Doc B: lower sort value but ingested after checkpoint 1, using real current time
long ingestedNow = System.currentTimeMillis();
doBulk(org.elasticsearch.core.Strings.format("""
{"index":{"_index":"%s"}}
{"order_id":"order-1","status":"pending","updated_at":%d,"event_ingested":%d}
""", sourceIndex, now + 3000, ingestedNow), true);

waitForTransformCheckpoint(transformId, 2);
refreshIndex(transformIndex);

searchResult = getAsMap(transformIndex + "/_search?q=order_id:order-1");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.status", searchResult)).get(0), is(equalTo("accepted")));

stopTransform(transformId, false);
deleteIndex(sourceIndex);
}

private void assertSourceIndexContents(String indexName, int expectedNumDocs, String expectedMinTimestamp, String expectedMaxTimestamp)
throws IOException {
Request searchRequest = new Request("GET", indexName + "/_search");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public int getInitialPageSize() {

@Override
public ChangeCollector buildChangeCollector(String synchronizationField) {
return new LatestChangeCollector(synchronizationField);
return new LatestChangeCollector(synchronizationField, config.getUniqueKey());
}

private static Map<String, Object> convertBucketToDocument(
Expand Down
Loading
Loading