Skip to content

Conversation

@droberts195
Copy link

@droberts195 droberts195 commented Sep 2, 2021

When every index that a transform is configured to search has
remained completely unchanged between checkpoints the transform
should not do a search at all.

Following #75839 there was a problem where the scenario of all
indices being unchanged between checkpoints could cause an empty
list of indices to be searched, which Elasticsearch treats as
meaning all indices. This change should prevent that happening
in future.

Fixes #77137

…ints

When every index that a transform is configured to search has
remained completely unchanged between checkpoints the transform
should not do a search at all.

Following elastic#75839 there was a problem where the scenario of all
indices being unchanged between checkpoints could cause an empty
list of indices to be searched, which Elasticsearch treats as
meaning _all_ indices. This change should prevent that happening
in future.

Fixes elastic#77137
@elasticmachine
Copy link
Collaborator

Pinging @elastic/ml-core (Team:ML)

@droberts195
Copy link
Author

Labelled >non-issue because it's fixing a problem in an unreleased version.

This proved to be very hard to test within the existing framework for testing continuous transforms using an external cluster. That framework repeatedly stops and starts the continuous transforms. Doing this appears to make the transform go through a different code path that never passed through the search avoidance code. Any advice on how to add this without adding a completely new continuous transform test framework would be gratefully received. Alternatively, maybe the QA tests can be used to guard against any regressions.

SearchRequest searchRequest = namedSearchRequest.v2();
// We want to treat a request to search 0 indices as a request to do nothing, not a request to search all indices
if (searchRequest.indices().length == 0) {
logger.debug("[{}] Search optimized to noop; request [{}]", getJobId(), searchRequest);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.debug("[{}] Search optimized to noop; request [{}]", getJobId(), searchRequest);
logger.debug(() -> new ParameterizedMessage("[{}] Search optimized to noop; request [{}]", getJobId(), searchRequest));

Serializing the search request to string could be non trivial

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, it would be good to log which named search was optimized namedSearchRequest.v1()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The debug() method is calling https://github.com/apache/logging-log4j2/blob/85b564a49f420fbcb9ac8abd3a20a252303d8198/log4j-api/src/main/java/org/apache/logging/log4j/spi/AbstractLogger.java#L449 which takes Object arguments. They only have their toString() methods called if logging at the level is enabled. As far as I can see the ParameterizedMessage approach is only necessary when you want to log both a parameterized message and an exception with stack trace.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AH! Cool! Well, forget about the message supplier stuff, but having the search name would be nice.

listener.onResponse(null);
return;
}
logger.trace("searchRequest: [{}]", searchRequest);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.trace("searchRequest: [{}]", searchRequest);
logger.trace(() -> new ParameterizedMessage("searchRequest: [{}]", searchRequest));


sourceBuilder.query(queryBuilder);
logger.debug(() -> new ParameterizedMessage("[{}] Querying for data: {}", getJobId(), sourceBuilder));
logger.debug("[{}] Querying {} for data: {}", getJobId(), request.indices(), sourceBuilder);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I always thought that the logger would serialize the strings unless we provided a message provider.

Consequently, if we weren't on debug level, we are serializing the sourceBuilder unnecessarily.

sourceBuilder.query(filteredQuery);

logger.debug("[{}] Querying for changes: {}", getJobId(), sourceBuilder);
logger.debug("[{}] Querying {} for changes: {}", getJobId(), request.indices(), sourceBuilder);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ParameterizedMessage?

@droberts195
Copy link
Author

@elasticmachine update branch

@droberts195
Copy link
Author

droberts195 commented Sep 3, 2021

This proved to be very hard to test within the existing framework for testing continuous transforms using an external cluster. That framework repeatedly stops and starts the continuous transforms. Doing this appears to make the transform go through a different code path that never passed through the search avoidance code.

I did a manual test instead that proves the changes of this PR work. However, there might also be some other bug that the automated tests aren't revealing because they repeatedly stop and restart the "continuous" transforms.

This is what I did:

  1. Run Elasticsearch locally using ./gradlew run -Drun.license_type=trial -Dtests.heap.size=4g
  2. Run Kibana locally using yarn kbn bootstrap followed by yarn start
  3. Log into Kibana
  4. Add the sample eCommerce data: Home -> Try sample data -> Sample eCommerce orders
  5. Go to transform management: Stack Management -> Transforms
  6. Create a new transform using the kibana_sample_data_ecommerce index pattern
  7. Create a Latest transform using customer_id as the unique field, order_date as the sort field, latest_orders as the transform name and latest_orders as the destination index name
  8. Select continuous mode with order_date as the date field, 10s delay and 10s frequency
  9. Click "Create and start"
  10. Watch the ES log

The transform worked for its initial checkpoint. The dodgy thing was that the second checkpoint that validated this bug fix didn't take place for over 8 minutes:

[2021-09-03T11:11:08,411][INFO ][o.e.x.t.t.TransformTask  ] [runTask-0] [latest_orders] updating state for transform to [{"task_state":"started","indexer_state":"stopped","checkpoint":0,"progress":{"docs_indexed":0,"docs_processed":0},"should_stop_at_checkpoint":false}].
[2021-09-03T11:11:08,498][INFO ][o.e.x.t.t.TransformPersistentTasksExecutor] [runTask-0] [latest_orders] successfully completed and scheduled task in node operation
[2021-09-03T11:11:08,627][INFO ][o.e.c.m.MetadataMappingService] [runTask-0] [latest_orders/UWahPSkzRa6PjOfmmSr1eA] update_mapping [_doc]
[2021-09-03T11:11:59,095][INFO ][o.e.c.m.MetadataCreateIndexService] [runTask-0] [.async-search] creating index, cause [auto(bulk api)], templates [], shards [1]/[0]
[2021-09-03T11:12:00,968][INFO ][o.e.c.m.MetadataMappingService] [runTask-0] [.kibana_8.0.0_001/rUYOAIvfRYOb3Agd2wrqYA] update_mapping [_doc]
[2021-09-03T11:12:04,960][INFO ][o.e.c.m.MetadataMappingService] [runTask-0] [.kibana_8.0.0_001/rUYOAIvfRYOb3Agd2wrqYA] update_mapping [_doc]
[2021-09-03T11:20:48,737][DEBUG][o.e.x.t.t.ClientTransformIndexer] [runTask-0] [latest_orders] Search request [apply_results] optimized to noop; searchRequest [SearchRequest{searchType=QUERY_THEN_FETCH, indices=[], indicesOptions=IndicesOptions[ignore_unavailable=true, allow_no_indices=true, expand_wildcards_open=true, expand_wildcards_closed=false, expand_wildcards_hidden=false, allow_aliases_to_multiple_indices=true, forbid_closed_indices=false, ignore_aliases=false, ignore_throttled=false], routing='null', preference='null', requestCache=null, scroll=null, maxConcurrentShardRequests=0, batchedReduceSize=512, preFilterShardSize=null, allowPartialSearchResults=false, localClusterAlias=null, getOrCreateAbsoluteStartMillis=-1, ccsMinimizeRoundtrips=true, source={"size":0,"query":{"bool":{"filter":[{"match_all":{"boost":1.0}},{"range":{"order_date":{"from":null,"to":1630664438671,"include_lower":true,"include_upper":false,"format":"epoch_millis","boost":1.0}}},{"range":{"order_date":{"from":1630663858500,"to":1630664438671,"include_lower":true,"include_upper":false,"format":"epoch_millis","boost":1.0}}}],"boost":1.0}},"aggregations":{"_transform":{"composite":{"size":5000,"sources":[{"customer_id":{"terms":{"field":"customer_id","missing_bucket":true,"order":"asc"}}}]},"aggregations":{"_top_hits":{"top_hits":{"from":0,"size":1,"version":false,"seq_no_primary_term":false,"explain":false,"sort":[{"order_date":{"order":"desc"}}]}}}}}}}]

This seems strange to me given that both frequency and delay were 10s. I don't think this is a bug introduced by this PR, but might be something to investigate separately.

Then the next two times we avoided searching all indices were 16 minutes later and 6 minutes after that:

[2021-09-03T11:36:49,007][DEBUG][o.e.x.t.t.ClientTransformIndexer] [runTask-0] [latest_orders] Search request [apply_results] optimized to noop; searchRequest [SearchRequest{searchType=QUERY_THEN_FETCH, indices=[], indicesOptions=IndicesOptions[ignore_unavailable=true, allow_no_indices=true, expand_wildcards_open=true, expand_wildcards_closed=false, expand_wildcards_hidden=false, allow_aliases_to_multiple_indices=true, forbid_closed_indices=false, ignore_aliases=false, ignore_throttled=false], routing='null', preference='null', requestCache=null, scroll=null, maxConcurrentShardRequests=0, batchedReduceSize=512, preFilterShardSize=null, allowPartialSearchResults=false, localClusterAlias=null, getOrCreateAbsoluteStartMillis=-1, ccsMinimizeRoundtrips=true, source={"size":0,"query":{"bool":{"filter":[{"match_all":{"boost":1.0}},{"range":{"order_date":{"from":null,"to":1630665398947,"include_lower":true,"include_upper":false,"format":"epoch_millis","boost":1.0}}},{"range":{"order_date":{"from":1630664438671,"to":1630665398947,"include_lower":true,"include_upper":false,"format":"epoch_millis","boost":1.0}}}],"boost":1.0}},"aggregations":{"_transform":{"composite":{"size":5000,"sources":[{"customer_id":{"terms":{"field":"customer_id","missing_bucket":true,"order":"asc"}}}]},"aggregations":{"_top_hits":{"top_hits":{"from":0,"size":1,"version":false,"seq_no_primary_term":false,"explain":false,"sort":[{"order_date":{"order":"desc"}}]}}}}}}}]
[2021-09-03T11:42:29,143][DEBUG][o.e.x.t.t.ClientTransformIndexer] [runTask-0] [latest_orders] Search request [apply_results] optimized to noop; searchRequest [SearchRequest{searchType=QUERY_THEN_FETCH, indices=[], indicesOptions=IndicesOptions[ignore_unavailable=true, allow_no_indices=true, expand_wildcards_open=true, expand_wildcards_closed=false, expand_wildcards_hidden=false, allow_aliases_to_multiple_indices=true, forbid_closed_indices=false, ignore_aliases=false, ignore_throttled=false], routing='null', preference='null', requestCache=null, scroll=null, maxConcurrentShardRequests=0, batchedReduceSize=512, preFilterShardSize=null, allowPartialSearchResults=false, localClusterAlias=null, getOrCreateAbsoluteStartMillis=-1, ccsMinimizeRoundtrips=true, source={"size":0,"query":{"bool":{"filter":[{"match_all":{"boost":1.0}},{"range":{"order_date":{"from":null,"to":1630665739041,"include_lower":true,"include_upper":false,"format":"epoch_millis","boost":1.0}}},{"range":{"order_date":{"from":1630665398947,"to":1630665739041,"include_lower":true,"include_upper":false,"format":"epoch_millis","boost":1.0}}}],"boost":1.0}},"aggregations":{"_transform":{"composite":{"size":5000,"sources":[{"customer_id":{"terms":{"field":"customer_id","missing_bucket":true,"order":"asc"}}}]},"aggregations":{"_top_hits":{"top_hits":{"from":0,"size":1,"version":false,"seq_no_primary_term":false,"explain":false,"sort":[{"order_date":{"order":"desc"}}]}}}}}}}]

@droberts195 droberts195 merged commit 4bf6a77 into elastic:master Sep 3, 2021
@droberts195 droberts195 deleted the handle_all_transform_indices_optimized_out branch September 3, 2021 11:42
droberts195 pushed a commit to droberts195/elasticsearch that referenced this pull request Sep 3, 2021
…ints (elastic#77204)

When every index that a transform is configured to search has
remained completely unchanged between checkpoints the transform
should not do a search at all.

Following elastic#75839 there was a problem where the scenario of all
indices being unchanged between checkpoints could cause an empty
list of indices to be searched, which Elasticsearch treats as
meaning _all_ indices. This change should prevent that happening
in future.

Fixes elastic#77137
@elasticsearchmachine
Copy link
Collaborator

💚 Backport successful

Status Branch Result
7.x

elasticsearchmachine pushed a commit that referenced this pull request Sep 3, 2021
…ints (#77204) (#77245)

When every index that a transform is configured to search has
remained completely unchanged between checkpoints the transform
should not do a search at all.

Following #75839 there was a problem where the scenario of all
indices being unchanged between checkpoints could cause an empty
list of indices to be searched, which Elasticsearch treats as
meaning _all_ indices. This change should prevent that happening
in future.

Fixes #77137
@hendrikmuhs
Copy link

hendrikmuhs commented Sep 6, 2021

The transform worked for its initial checkpoint. The dodgy thing was that the second checkpoint that validated this bug fix didn't take place for over 8 minutes:

I think this is by design, because transform should stop early if there are no changes. I wonder why #77137 passed this pre-check.

I will try your repro steps, but enable trace logging, if my suspicion is correct the log should tell us, see https://github.com/elastic/elasticsearch/blob/master/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java#L374:

logger.trace("[{}] source has not changed, finish indexer early.", getJobId());

The reason why after 8 and 16 minutes it found something should be future timestamps in the sample data. However, I am surprised that you saw:

[2021-09-03T11:36:49,007][DEBUG][o.e.x.t.t.ClientTransformIndexer] [runTask-0] [latest_orders] Search request [apply_results] optimized to noop; searchRequest [SearchRequest{searchType=QUERY_THEN_FETCH, indices=[], indicesOptions=IndicesOptions[ignore_unavailable=true, allow_no_indices=true, expand_wildcards_open=true, expand_wildcards_closed=false, expand_wildcards_hidden=false, allow_aliases_to_multiple_indices=true, forbid_closed_indices=false, ignore_aliases=false, ignore_throttled=false], routing='null', preference='null', requestCache=null, scroll=null, maxConcurrentShardRequests=0, batchedReduceSize=512, preFilterShardSize=null, allowPartialSearchResults=false, localClusterAlias=null, getOrCreateAbsoluteStartMillis=-1, ccsMinimizeRoundtrips=true, source={"size":0,"query":{"bool":{"filter":[{"match_all":{"boost":1.0}},{"range":{"order_date":{"from":null,"to":1630665398947,"include_lower":true,"include_upper":false,"format":"epoch_millis","boost":1.0}}},{"range":{"order_date":{"from":1630664438671,"to":1630665398947,"include_lower":true,"include_upper":false,"format":"epoch_millis","boost":1.0}}}],"boost":1.0}},"aggregations":{"_transform":{"composite":{"size":5000,"sources":[{"customer_id":{"terms":{"field":"customer_id","missing_bucket":true,"order":"asc"}}}]},"aggregations":{"_top_hits":{"top_hits":{"from":0,"size":1,"version":false,"seq_no_primary_term":false,"explain":false,"sort":[{"order_date":{"order":"desc"}}]}}}}}}}]

Something is fishy.

@hendrikmuhs
Copy link

After enabling trace logging:

[2021-09-07T08:55:39,600][DEBUG][o.e.x.t.t.TransformTask  ] [hendrik-p1] [data_frame/transforms/schedule_latest_orders] transform indexer schedule has triggered, state: [STARTED].
[2021-09-07T08:55:39,602][TRACE][o.e.x.t.t.TransformIndexer] [hendrik-p1] [latest_orders] change detected [false].
[2021-09-07T08:55:39,603][TRACE][o.e.x.t.t.TransformIndexer] [hendrik-p1] [latest_orders] source has not changed, finish indexer early.
[2021-09-07T08:56:39,600][DEBUG][o.e.x.t.t.TransformTask  ] [hendrik-p1] [data_frame/transforms/schedule_latest_orders] transform indexer schedule has triggered, state: [STARTED].
[2021-09-07T08:56:39,601][TRACE][o.e.x.t.t.TransformIndexer] [hendrik-p1] [latest_orders] change detected [false].
[2021-09-07T08:56:39,601][TRACE][o.e.x.t.t.TransformIndexer] [hendrik-p1] [latest_orders] source has not changed, finish indexer early.
[2021-09-07T08:57:39,600][DEBUG][o.e.x.t.t.TransformTask  ] [hendrik-p1] [data_frame/transforms/schedule_latest_orders] transform indexer schedule has triggered, state: [STARTED].
[2021-09-07T08:57:39,602][TRACE][o.e.x.t.t.TransformIndexer] [hendrik-p1] [latest_orders] change detected [false].
[2021-09-07T08:57:39,602][TRACE][o.e.x.t.t.TransformIndexer] [hendrik-p1] [latest_orders] source has not changed, finish indexer early.

This is expected.

I think the real problem sits in #75839 and the failures reported in #77310 is probably connected to the change in #75839.

// We want to treat a request to search 0 indices as a request to do nothing, not a request to search all indices
if (searchRequest.indices().length == 0) {
logger.debug("[{}] Search request [{}] optimized to noop; searchRequest [{}]", getJobId(), name, searchRequest);
listener.onResponse(null);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The introduction of a magic value null is dangerous, what if the client returns null. It seems better to return an empty result set, so that null causes the transform to fail as before.

@hendrikmuhs
Copy link

I think the bug is a disconnection between sequence id's and timestamp.

Let's take checkpoint A with:

timestamp_epoch_seconds: 1000
seq_id's: index1: 20_s1, 21_s2, 22_s3

and checkpoint B with:

timestamp_epoch_seconds: 1010
seq_id's: index1: 20_s1, 21_s2, 22_s3

The change in #75839 calculates the diff between A and B purely on sequence id's. Because they are equal changes(A,B) = {}.

However we must take the timestamp and delay into account, because when we query A we do not query for all data in A, but for all data in A with < timestamp_epoch_seconds - delay.

A could contain documents that are part of the checkpoints w.r.t. to the seq id, but not queried because of delay.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Transform] If there are no changes to source indices between checks then all indices are searched

6 participants