diff --git a/docs/changelog/142856.yaml b/docs/changelog/142856.yaml new file mode 100644 index 0000000000000..22809efbb4f96 --- /dev/null +++ b/docs/changelog/142856.yaml @@ -0,0 +1,7 @@ +area: Transform +issues: + - 90643 +pr: 142856 +summary: "[ML]Fix latest transforms disregarding updates when sort and sync fields\ + \ are non-monotonic" +type: bug diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/transform/transforms_latest.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/transform/transforms_latest.yml new file mode 100644 index 0000000000000..01ef0e5e05c2b --- /dev/null +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/transform/transforms_latest.yml @@ -0,0 +1,121 @@ +setup: + - do: + indices.create: + index: latest-test-data + body: + mappings: + properties: + order_id: + type: keyword + region: + type: keyword + status: + type: keyword + updated_at: + type: date + event_ingested: + type: date + + - do: + bulk: + index: latest-test-data + refresh: true + body: + - '{"index": {}}' + - '{"order_id": "order-1", "region": "US", "status": "accepted", "updated_at": "2024-01-01T00:00:05Z", "event_ingested": "2024-01-01T00:00:10Z"}' + - '{"index": {}}' + - '{"order_id": "order-1", "region": "US", "status": "pending", "updated_at": "2024-01-01T00:00:03Z", "event_ingested": "2024-01-01T00:00:20Z"}' + - '{"index": {}}' + - '{"order_id": "order-2", "region": "EU", "status": "shipped", "updated_at": "2024-01-01T00:00:08Z", "event_ingested": "2024-01-01T00:00:15Z"}' + - '{"index": {}}' + - '{"order_id": "order-2", "region": "EU", "status": "processing", "updated_at": "2024-01-01T00:00:01Z", "event_ingested": "2024-01-01T00:00:25Z"}' + - '{"index": {}}' + - '{"order_id": "order-3", "region": "US", "status": "delivered", "updated_at": "2024-01-01T00:00:02Z", "event_ingested": "2024-01-01T00:00:05Z"}' + +--- +teardown: + - do: + indices.delete: + index: latest-test-data + ignore: 404 + +--- +"Test latest preview picks correct document when sort and sync fields are non-monotonic": + - do: + transform.preview_transform: + body: > + { + "source": { "index": "latest-test-data" }, + "latest": { + "unique_key": [ "order_id" ], + "sort": "updated_at" + } + } + + - length: { preview: 3 } + + # Composite agg returns results sorted by unique_key lexicographically. + # order-1 should pick status=accepted (updated_at T+5s) over status=pending (updated_at T+3s) + # even though pending has a later event_ingested. + - match: { preview.0.order_id: "order-1" } + - match: { preview.0.status: "accepted" } + - match: { preview.1.order_id: "order-2" } + - match: { preview.1.status: "shipped" } + - match: { preview.2.order_id: "order-3" } + - match: { preview.2.status: "delivered" } + +--- +"Test latest preview with multi-field unique key": + - do: + transform.preview_transform: + body: > + { + "source": { "index": "latest-test-data" }, + "latest": { + "unique_key": [ "order_id", "region" ], + "sort": "updated_at" + } + } + + - length: { preview: 3 } + + # With multi-field unique key [order_id, region], each combination should pick highest updated_at. + # Results sorted lexicographically by [order_id, region]. + - match: { preview.0.order_id: "order-1" } + - match: { preview.0.region: "US" } + - match: { preview.0.status: "accepted" } + - match: { preview.1.order_id: "order-2" } + - match: { preview.1.region: "EU" } + - match: { preview.1.status: "shipped" } + - match: { preview.2.order_id: "order-3" } + - match: { preview.2.region: "US" } + - match: { preview.2.status: "delivered" } + +--- +"Test latest preview includes document IDs": + - do: + transform.preview_transform: + body: > + { + "source": { "index": "latest-test-data" }, + "latest": { + "unique_key": [ "order_id" ], + "sort": "updated_at" + } + } + + - length: { preview: 3 } + + # Verify all expected fields are present with correct values. + - match: { preview.0.order_id: "order-1" } + - match: { preview.0.region: "US" } + - match: { preview.0.status: "accepted" } + - is_true: preview.0.updated_at + - match: { preview.1.order_id: "order-2" } + - match: { preview.1.region: "EU" } + - match: { preview.1.status: "shipped" } + - is_true: preview.1.updated_at + - match: { preview.2.order_id: "order-3" } + - match: { preview.2.region: "US" } + - match: { preview.2.status: "delivered" } + - is_true: preview.2.updated_at diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformLatestRestIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformLatestRestIT.java index ed78655e22820..74fb7e6ac481c 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformLatestRestIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformLatestRestIT.java @@ -253,6 +253,92 @@ private void testContinuousLatestWithFrom(String transformId, String indexName, deleteIndex(indexName); } + /** + * Verifies that a continuous latest transform correctly handles non-monotonic alignment between + * the sort field and the sync time field (gh#90643). + * + * Scenario: Two documents for the same unique key arrive in separate checkpoints. The first + * document has a higher sort value (updated_at) but a lower sync value (event_ingested). The + * second document has a lower sort value but a higher sync value. Without the fix, the second + * checkpoint overwrites the destination with the stale document because the narrow time-window + * filter hides the first document from top_hits. + */ + public void testContinuousLatestWithNonMonotonicSortField() throws Exception { + String sourceIndex = "non_monotonic_source"; + String transformId = "non_monotonic_latest"; + String transformIndex = transformId + "-dest"; + setupDataAccessRole(DATA_ACCESS_ROLE, sourceIndex, transformIndex); + + Request createIndex = new Request("PUT", sourceIndex); + createIndex.setJsonEntity(""" + { + "mappings": { + "properties": { + "order_id": { "type": "keyword" }, + "status": { "type": "keyword" }, + "updated_at": { "type": "date" }, + "event_ingested": { "type": "date" } + } + } + }"""); + client().performRequest(createIndex); + + long now = System.currentTimeMillis(); + + // Doc A: higher sort value, ingested well in the past so it's visible in checkpoint 1 + doBulk(org.elasticsearch.core.Strings.format(""" + {"index":{"_index":"%s"}} + {"order_id":"order-1","status":"accepted","updated_at":%d,"event_ingested":%d} + """, sourceIndex, now + 5000, now - 10000), true); + + Request createTransformRequest = createRequestWithAuth( + "PUT", + getTransformEndpoint() + transformId, + BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS + ); + createTransformRequest.setJsonEntity(org.elasticsearch.core.Strings.format(""" + { + "source": { "index": "%s" }, + "dest": { "index": "%s" }, + "frequency": "1s", + "sync": { + "time": { + "field": "event_ingested", + "delay": "1s" + } + }, + "latest": { + "unique_key": [ "order_id" ], + "sort": "updated_at" + } + }""", sourceIndex, transformIndex)); + Map createResponse = entityAsMap(client().performRequest(createTransformRequest)); + assertThat(createResponse.get("acknowledged"), equalTo(Boolean.TRUE)); + + startAndWaitForContinuousTransform(transformId, transformIndex, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS); + + Map searchResult = getAsMap(transformIndex + "/_search?q=order_id:order-1"); + assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult)); + assertThat(((List) XContentMapValues.extractValue("hits.hits._source.status", searchResult)).get(0), is(equalTo("accepted"))); + + // Doc B: lower sort value but ingested after checkpoint 1, using real current time + long ingestedNow = System.currentTimeMillis(); + doBulk(org.elasticsearch.core.Strings.format(""" + {"index":{"_index":"%s"}} + {"order_id":"order-1","status":"pending","updated_at":%d,"event_ingested":%d} + """, sourceIndex, now + 3000, ingestedNow), true); + + waitForTransformCheckpoint(transformId, 2); + refreshIndex(transformIndex); + + searchResult = getAsMap(transformIndex + "/_search?q=order_id:order-1"); + assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult)); + assertThat(((List) XContentMapValues.extractValue("hits.hits._source.status", searchResult)).get(0), is(equalTo("accepted"))); + + stopTransform(transformId, false); + deleteIndex(sourceIndex); + } + private void assertSourceIndexContents(String indexName, int expectedNumDocs, String expectedMinTimestamp, String expectedMaxTimestamp) throws IOException { Request searchRequest = new Request("GET", indexName + "/_search"); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/latest/Latest.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/latest/Latest.java index 2b244fef515d6..ad77047cbe909 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/latest/Latest.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/latest/Latest.java @@ -70,7 +70,7 @@ public int getInitialPageSize() { @Override public ChangeCollector buildChangeCollector(String synchronizationField) { - return new LatestChangeCollector(synchronizationField); + return new LatestChangeCollector(synchronizationField, config.getUniqueKey()); } private static Map convertBucketToDocument( diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/latest/LatestChangeCollector.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/latest/LatestChangeCollector.java index f4369a18c0b6a..24554cc65aeae 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/latest/LatestChangeCollector.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/latest/LatestChangeCollector.java @@ -8,70 +8,168 @@ package org.elasticsearch.xpack.transform.transforms.latest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.ExistsQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.query.TermsQueryBuilder; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder; +import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.transform.transforms.Function; import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; + +import static java.util.stream.Collectors.toList; /** * {@link Function.ChangeCollector} implementation for the latest function. + * Uses a two-phase approach to correctly handle the case where the sort field and sync.time.field + * don't increase monotonically together (gh#90643). */ class LatestChangeCollector implements Function.ChangeCollector { + static final String COMPOSITE_AGGREGATION_NAME = "_transform_latest_change_collector"; + private final String synchronizationField; + private final List uniqueKey; + private final CompositeAggregationBuilder compositeAggregation; + private final Map> changedKeyValues; + private final Set fieldsWithNullValues; - LatestChangeCollector(String synchronizationField) { + LatestChangeCollector(String synchronizationField, List uniqueKey) { this.synchronizationField = Objects.requireNonNull(synchronizationField); + this.uniqueKey = Objects.requireNonNull(uniqueKey); + this.compositeAggregation = createCompositeAggregation(uniqueKey); + this.changedKeyValues = new HashMap<>(); + for (String field : uniqueKey) { + changedKeyValues.put(field, new HashSet<>()); + } + this.fieldsWithNullValues = new HashSet<>(); + } + + private static CompositeAggregationBuilder createCompositeAggregation(List uniqueKey) { + List> sources = uniqueKey.stream() + .map(field -> new TermsValuesSourceBuilder(field).field(field).missingBucket(true)) + .collect(toList()); + return AggregationBuilders.composite(COMPOSITE_AGGREGATION_NAME, sources); } + /** + * Phase 1 (IDENTIFY_CHANGES): Build a composite aggregation over the checkpoint time window + * to discover which unique_key values have new source documents. + */ @Override public SearchSourceBuilder buildChangesQuery(SearchSourceBuilder searchSourceBuilder, Map position, int pageSize) { - // This method is never called because "queryForChanges()" method returns "false". - throw new UnsupportedOperationException(); + compositeAggregation.size(pageSize); + compositeAggregation.aggregateAfter(position); + return searchSourceBuilder.size(0).aggregation(compositeAggregation); } + /** + * Phase 1 (IDENTIFY_CHANGES): Collect the unique_key values from the composite agg response; + * these are the keys that need to be re-evaluated in phase 2. + */ @Override public Map processSearchResponse(SearchResponse searchResponse) { - // This method is never called because "queryForChanges()" method returns "false". - throw new UnsupportedOperationException(); + clearCollectedKeys(); + + InternalAggregations aggregations = searchResponse.getAggregations(); + if (aggregations == null) { + return null; + } + + CompositeAggregation compositeAgg = aggregations.get(COMPOSITE_AGGREGATION_NAME); + if (compositeAgg == null || compositeAgg.getBuckets().isEmpty()) { + return null; + } + + for (CompositeAggregation.Bucket bucket : compositeAgg.getBuckets()) { + for (String field : uniqueKey) { + Object value = bucket.getKey().get(field); + if (value != null) { + changedKeyValues.get(field).add(value.toString()); + } else { + fieldsWithNullValues.add(field); + } + } + } + + return compositeAgg.afterKey(); } + /** + * Phase 2 (APPLY_RESULTS): Build a filter so the main query searches only the collected + * unique keys. The indexer applies sync_field < nextCheckpoint separately, so the main + * query sees ALL historical data for those keys and top_hits correctly picks the document + * with the highest sort field value. + */ @Override public QueryBuilder buildFilterQuery(TransformCheckpoint lastCheckpoint, TransformCheckpoint nextCheckpoint) { - // We are only interested in documents that were created in the timeline of the current checkpoint. - // Older documents cannot influence the transform results as we require the sort field values to change monotonically over time. - return QueryBuilders.rangeQuery(synchronizationField) - .gte(lastCheckpoint.getTimeUpperBound()) - .lt(nextCheckpoint.getTimeUpperBound()) - .format("epoch_millis"); + if (uniqueKey.size() == 1) { + String field = uniqueKey.get(0); + return buildFieldFilter(field, changedKeyValues.get(field), fieldsWithNullValues.contains(field)); + } + + BoolQueryBuilder filterQuery = new BoolQueryBuilder(); + for (String field : uniqueKey) { + QueryBuilder fieldFilter = buildFieldFilter(field, changedKeyValues.get(field), fieldsWithNullValues.contains(field)); + if (fieldFilter != null) { + filterQuery.filter(fieldFilter); + } + } + + return filterQuery; + } + + private static QueryBuilder buildFieldFilter(String field, Set values, boolean includeNull) { + if (includeNull) { + QueryBuilder missingBucketQuery = new BoolQueryBuilder().mustNot(new ExistsQueryBuilder(field)); + if (values.isEmpty()) { + return missingBucketQuery; + } + return new BoolQueryBuilder().should(new TermsQueryBuilder(field, values)).should(missingBucketQuery); + } + + if (values.isEmpty()) { + return null; + } + return new TermsQueryBuilder(field, values); } @Override public Collection getIndicesToQuery(TransformCheckpoint lastCheckpoint, TransformCheckpoint nextCheckpoint) { - // we can shortcut here, only the changed indices are of interest // gh#77329 optimization turned off return TransformCheckpoint.getChangedIndices(TransformCheckpoint.EMPTY, nextCheckpoint); } @Override public void clear() { - // This object is stateless so there is no internal state to clear + clearCollectedKeys(); } @Override public boolean isOptimized() { - // Change collection is optimized as we never process a document more than once. return true; } @Override public boolean queryForChanges() { - // Calculating latest does not require elaborate change detection mechanism that is used in pivot. - return false; + return true; + } + + private void clearCollectedKeys() { + changedKeyValues.values().forEach(Set::clear); + fieldsWithNullValues.clear(); } } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/latest/LatestChangeCollectorTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/latest/LatestChangeCollectorTests.java index 69b1d5a96412e..02cbf6cfb1a93 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/latest/LatestChangeCollectorTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/latest/LatestChangeCollectorTests.java @@ -7,42 +7,232 @@ package org.elasticsearch.xpack.transform.transforms.latest; -import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.TermsQueryBuilder; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.SearchResponseUtils; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.composite.InternalComposite; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; +import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Set; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class LatestChangeCollectorTests extends ESTestCase { - public void testBuildFilterQuery() { - LatestChangeCollector changeCollector = new LatestChangeCollector("timestamp"); + private static final TransformCheckpoint CHECKPOINT_OLD = new TransformCheckpoint("t_id", 42L, 42L, Collections.emptyMap(), 0L); + private static final TransformCheckpoint CHECKPOINT_NEW = new TransformCheckpoint("t_id", 42L, 42L, Collections.emptyMap(), 123456789L); - assertThat( - changeCollector.buildFilterQuery( - new TransformCheckpoint("t_id", 42L, 42L, Collections.emptyMap(), 0L), - new TransformCheckpoint("t_id", 42L, 42L, Collections.emptyMap(), 123456789L) - ), - is(equalTo(QueryBuilders.rangeQuery("timestamp").gte(0L).lt(123456789L).format("epoch_millis"))) + public void testQueryForChanges() { + LatestChangeCollector changeCollector = new LatestChangeCollector("timestamp", List.of("orderId")); + assertTrue(changeCollector.queryForChanges()); + } + + public void testIsOptimized() { + LatestChangeCollector changeCollector = new LatestChangeCollector("timestamp", List.of("orderId")); + assertTrue(changeCollector.isOptimized()); + } + + public void testBuildChangesQuerySingleField() throws IOException { + LatestChangeCollector changeCollector = new LatestChangeCollector("timestamp", List.of("orderId")); + SearchSourceBuilder sourceBuilder = changeCollector.buildChangesQuery(new SearchSourceBuilder(), null, 500); + + assertThat(sourceBuilder.size(), is(equalTo(0))); + CompositeAggregationBuilder compositeAgg = getCompositeAggregationBuilder(sourceBuilder); + assertThat(compositeAgg.getName(), is(equalTo(LatestChangeCollector.COMPOSITE_AGGREGATION_NAME))); + assertThat(compositeAgg.size(), is(equalTo(500))); + } + + public void testBuildChangesQueryMultipleFields() throws IOException { + LatestChangeCollector changeCollector = new LatestChangeCollector("timestamp", List.of("orderId", "region")); + SearchSourceBuilder sourceBuilder = changeCollector.buildChangesQuery(new SearchSourceBuilder(), null, 1000); + + CompositeAggregationBuilder compositeAgg = getCompositeAggregationBuilder(sourceBuilder); + assertThat(compositeAgg.getName(), is(equalTo(LatestChangeCollector.COMPOSITE_AGGREGATION_NAME))); + assertThat(compositeAgg.size(), is(equalTo(1000))); + } + + public void testBuildChangesQueryWithAfterKey() throws IOException { + LatestChangeCollector changeCollector = new LatestChangeCollector("timestamp", List.of("orderId")); + Map afterKey = Map.of("orderId", "id99"); + SearchSourceBuilder sourceBuilder = changeCollector.buildChangesQuery(new SearchSourceBuilder(), afterKey, 500); + + CompositeAggregationBuilder compositeAgg = getCompositeAggregationBuilder(sourceBuilder); + assertThat(compositeAgg.size(), is(equalTo(500))); + String searchSource = sourceBuilder.toString(); + assertTrue("after key should be set in the composite aggregation", searchSource.contains("id99")); + } + + public void testProcessSearchResponseCollectsKeys() throws IOException { + LatestChangeCollector changeCollector = new LatestChangeCollector("timestamp", List.of("orderId")); + // Set page size to match bucket count so we get afterKey (not last page) + changeCollector.buildChangesQuery(new SearchSourceBuilder(), null, 3); + + SearchResponse response = createSearchResponse( + List.of(Map.of("orderId", "id1"), Map.of("orderId", "id2"), Map.of("orderId", "id3")), + Map.of("orderId", "id3") ); + try { + Map afterKey = changeCollector.processSearchResponse(response); + assertThat(afterKey, is(notNullValue())); - assertThat( - changeCollector.buildFilterQuery( - new TransformCheckpoint("t_id", 42L, 42L, Collections.emptyMap(), 123456789L), - new TransformCheckpoint("t_id", 42L, 42L, Collections.emptyMap(), 234567890L) - ), - is(equalTo(QueryBuilders.rangeQuery("timestamp").gte(123456789L).lt(234567890L).format("epoch_millis"))) + QueryBuilder filterQuery = changeCollector.buildFilterQuery(CHECKPOINT_OLD, CHECKPOINT_NEW); + assertThat(filterQuery, instanceOf(TermsQueryBuilder.class)); + assertThat(((TermsQueryBuilder) filterQuery).values(), containsInAnyOrder("id1", "id2", "id3")); + } finally { + response.decRef(); + } + } + + public void testProcessSearchResponseReturnsNullForNoAggregations() { + LatestChangeCollector changeCollector = new LatestChangeCollector("timestamp", List.of("orderId")); + + SearchResponse response = SearchResponseUtils.response(SearchHits.EMPTY_WITH_TOTAL_HITS).build(); + try { + Map afterKey = changeCollector.processSearchResponse(response); + assertThat(afterKey, is(nullValue())); + } finally { + response.decRef(); + } + } + + public void testProcessSearchResponseReturnsNullForEmptyBuckets() { + LatestChangeCollector changeCollector = new LatestChangeCollector("timestamp", List.of("orderId")); + + SearchResponse response = createSearchResponse(List.of(), null); + try { + Map afterKey = changeCollector.processSearchResponse(response); + assertThat(afterKey, is(nullValue())); + } finally { + response.decRef(); + } + } + + public void testBuildFilterQuerySingleField() throws IOException { + LatestChangeCollector changeCollector = new LatestChangeCollector("timestamp", List.of("orderId")); + + SearchResponse response = createSearchResponse( + List.of(Map.of("orderId", "order-A"), Map.of("orderId", "order-B")), + Map.of("orderId", "order-B") ); + try { + changeCollector.processSearchResponse(response); + + QueryBuilder filterQuery = changeCollector.buildFilterQuery(CHECKPOINT_OLD, CHECKPOINT_NEW); + assertThat(filterQuery, instanceOf(TermsQueryBuilder.class)); + TermsQueryBuilder termsQuery = (TermsQueryBuilder) filterQuery; + assertThat(termsQuery.fieldName(), is(equalTo("orderId"))); + assertThat(termsQuery.values(), containsInAnyOrder("order-A", "order-B")); + } finally { + response.decRef(); + } + } + + public void testBuildFilterQueryMultipleFields() throws IOException { + LatestChangeCollector changeCollector = new LatestChangeCollector("timestamp", List.of("orderId", "region")); + + SearchResponse response = createSearchResponse( + List.of(Map.of("orderId", "order-A", "region", "US"), Map.of("orderId", "order-B", "region", "EU")), + Map.of("orderId", "order-B", "region", "EU") + ); + try { + changeCollector.processSearchResponse(response); + + QueryBuilder filterQuery = changeCollector.buildFilterQuery(CHECKPOINT_OLD, CHECKPOINT_NEW); + assertThat(filterQuery, instanceOf(BoolQueryBuilder.class)); + BoolQueryBuilder boolQuery = (BoolQueryBuilder) filterQuery; + assertThat(boolQuery.filter().size(), is(equalTo(2))); + + TermsQueryBuilder orderFilter = (TermsQueryBuilder) boolQuery.filter().get(0); + assertThat(orderFilter.fieldName(), is(equalTo("orderId"))); + assertThat(orderFilter.values(), containsInAnyOrder("order-A", "order-B")); + + TermsQueryBuilder regionFilter = (TermsQueryBuilder) boolQuery.filter().get(1); + assertThat(regionFilter.fieldName(), is(equalTo("region"))); + assertThat(regionFilter.values(), containsInAnyOrder("US", "EU")); + } finally { + response.decRef(); + } + } + + public void testBuildFilterQueryWithNullBucket() throws IOException { + LatestChangeCollector changeCollector = new LatestChangeCollector("timestamp", List.of("orderId")); + + Map bucketWithNull = new java.util.HashMap<>(); + bucketWithNull.put("orderId", null); + + SearchResponse response = createSearchResponse(List.of(Map.of("orderId", "order-A"), bucketWithNull), Map.of("orderId", "order-A")); + try { + changeCollector.processSearchResponse(response); + + QueryBuilder filterQuery = changeCollector.buildFilterQuery(CHECKPOINT_OLD, CHECKPOINT_NEW); + assertThat(filterQuery, instanceOf(BoolQueryBuilder.class)); + BoolQueryBuilder boolQuery = (BoolQueryBuilder) filterQuery; + assertThat(boolQuery.should().size(), is(equalTo(2))); + } finally { + response.decRef(); + } + } + + public void testClearResetsState() throws IOException { + LatestChangeCollector changeCollector = new LatestChangeCollector("timestamp", List.of("orderId")); + + SearchResponse response = createSearchResponse(List.of(Map.of("orderId", "order-A")), Map.of("orderId", "order-A")); + try { + changeCollector.processSearchResponse(response); + assertThat(changeCollector.buildFilterQuery(CHECKPOINT_OLD, CHECKPOINT_NEW), is(notNullValue())); + + changeCollector.clear(); + assertThat(changeCollector.buildFilterQuery(CHECKPOINT_OLD, CHECKPOINT_NEW), is(nullValue())); + } finally { + response.decRef(); + } + } + + public void testProcessSearchResponseClearsPreviousPageKeys() throws IOException { + LatestChangeCollector changeCollector = new LatestChangeCollector("timestamp", List.of("orderId")); + + SearchResponse firstPage = createSearchResponse(List.of(Map.of("orderId", "page1-id")), Map.of("orderId", "page1-id")); + try { + changeCollector.processSearchResponse(firstPage); + } finally { + firstPage.decRef(); + } + + SearchResponse secondPage = createSearchResponse(List.of(Map.of("orderId", "page2-id")), Map.of("orderId", "page2-id")); + try { + changeCollector.processSearchResponse(secondPage); + + QueryBuilder filterQuery = changeCollector.buildFilterQuery(CHECKPOINT_OLD, CHECKPOINT_NEW); + assertThat(filterQuery, instanceOf(TermsQueryBuilder.class)); + TermsQueryBuilder termsQuery = (TermsQueryBuilder) filterQuery; + assertThat(termsQuery.values(), containsInAnyOrder("page2-id")); + } finally { + secondPage.decRef(); + } } @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/77329") public void testGetIndicesToQuery() { - LatestChangeCollector changeCollector = new LatestChangeCollector("timestamp"); + LatestChangeCollector changeCollector = new LatestChangeCollector("timestamp", List.of("orderId")); long[] indexSequenceIds1 = { 25L, 25L, 25L }; long[] indexSequenceIds2 = { 324L, 2425L, 2225L }; @@ -52,7 +242,6 @@ public void testGetIndicesToQuery() { long[] indexSequenceIds3_1 = { 246L, 255L, 2485L }; long[] indexSequenceIds4_1 = { 2105L, 2545L, 2525L }; - // no changes assertThat( changeCollector.getIndicesToQuery( new TransformCheckpoint( @@ -91,7 +280,6 @@ public void testGetIndicesToQuery() { equalTo(Collections.emptySet()) ); - // 3 and 4 changed, 1 and 2 not assertThat( changeCollector.getIndicesToQuery( new TransformCheckpoint( @@ -129,119 +317,25 @@ public void testGetIndicesToQuery() { ), equalTo(Set.of("index-3", "index-4")) ); + } - // only 3 changed (no order) - assertThat( - changeCollector.getIndicesToQuery( - new TransformCheckpoint( - "t_id", - 123513L, - 42L, - Map.of( - "index-1", - indexSequenceIds1, - "index-2", - indexSequenceIds2, - "index-3", - indexSequenceIds3, - "index-4", - indexSequenceIds4 - ), - 123543L - ), - new TransformCheckpoint( - "t_id", - 123456759L, - 43L, - Map.of( - "index-1", - indexSequenceIds1, - "index-2", - indexSequenceIds2, - "index-3", - indexSequenceIds3_1, - "index-4", - indexSequenceIds4 - ), - 123456789L - ) - ), - equalTo(Collections.singleton("index-3")) - ); + private static SearchResponse createSearchResponse(List> bucketKeys, Map afterKey) { + InternalComposite composite = mock(InternalComposite.class); + when(composite.getName()).thenReturn(LatestChangeCollector.COMPOSITE_AGGREGATION_NAME); + when(composite.afterKey()).thenReturn(afterKey); - // all have changed - assertThat( - changeCollector.getIndicesToQuery( - new TransformCheckpoint("t_id", 123513L, 42L, Map.of("index-3", indexSequenceIds3, "index-4", indexSequenceIds4), 123543L), - new TransformCheckpoint( - "t_id", - 123456759L, - 43L, - Map.of("index-3", indexSequenceIds3_1, "index-4", indexSequenceIds4_1), - 123456789L - ) - ), - equalTo(Set.of("index-3", "index-4")) - ); + List compositeBuckets = new ArrayList<>(); + for (Map key : bucketKeys) { + InternalComposite.InternalBucket bucket = mock(InternalComposite.InternalBucket.class); + when(bucket.getKey()).thenReturn(key); + compositeBuckets.add(bucket); + } + when(composite.getBuckets()).thenReturn(compositeBuckets); - // a new index appeared - assertThat( - changeCollector.getIndicesToQuery( - new TransformCheckpoint( - "t_id", - 123513L, - 42L, - Map.of("index-2", indexSequenceIds2, "index-3", indexSequenceIds3, "index-4", indexSequenceIds4), - 123543L - ), - new TransformCheckpoint( - "t_id", - 123456759L, - 43L, - Map.of( - "index-1", - indexSequenceIds1, - "index-2", - indexSequenceIds2, - "index-3", - indexSequenceIds3_1, - "index-4", - indexSequenceIds4_1 - ), - 123456789L - ) - ), - equalTo(Set.of("index-1", "index-3", "index-4")) - ); + return SearchResponseUtils.response(SearchHits.EMPTY_WITH_TOTAL_HITS).aggregations(InternalAggregations.from(composite)).build(); + } - // index disappeared - assertThat( - changeCollector.getIndicesToQuery( - new TransformCheckpoint( - "t_id", - 123513L, - 42L, - Map.of( - "index-1", - indexSequenceIds1, - "index-2", - indexSequenceIds2, - "index-3", - indexSequenceIds3, - "index-4", - indexSequenceIds4 - ), - 123543L - ), - new TransformCheckpoint( - "t_id", - 123456759L, - 43L, - Map.of("index-2", indexSequenceIds2, "index-3", indexSequenceIds3_1, "index-4", indexSequenceIds4_1), - 123456789L - ) - ), - equalTo(Set.of("index-3", "index-4")) - ); + private static CompositeAggregationBuilder getCompositeAggregationBuilder(SearchSourceBuilder builder) { + return (CompositeAggregationBuilder) builder.aggregations().getAggregatorFactories().iterator().next(); } }