Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.BulkByScrollTask;
import org.elasticsearch.index.reindex.ClientScrollablePaginatedHitSource;
import org.elasticsearch.index.reindex.PaginatedHitSource;
import org.elasticsearch.index.reindex.PaginatedHitSource.SearchFailure;
import org.elasticsearch.index.reindex.PaginatedSearchFailure;
import org.elasticsearch.index.reindex.ResumeInfo;
import org.elasticsearch.index.reindex.ResumeInfo.WorkerResumeInfo;
import org.elasticsearch.index.reindex.WorkerBulkByScrollTaskState;
Expand Down Expand Up @@ -318,7 +316,7 @@ protected PaginatedHitSource buildScrollableResultSource(BackoffPolicy backoffPo
protected BulkByScrollResponse buildResponse(
TimeValue took,
List<BulkItemResponse.Failure> indexingFailures,
List<SearchFailure> searchFailures,
List<PaginatedSearchFailure> searchFailures,
boolean timedOut
) {
return new BulkByScrollResponse(took, task.getStatus(), indexingFailures, searchFailures, timedOut);
Expand Down Expand Up @@ -611,7 +609,7 @@ private void recordFailure(Failure failure, List<Failure> failures) {
* Start terminating a request that finished non-catastrophically by refreshing the modified indices and then proceeding to
* {@link #finishHim(Exception, List, List, boolean)}.
*/
void refreshAndFinish(List<Failure> indexingFailures, List<SearchFailure> searchFailures, boolean timedOut) {
void refreshAndFinish(List<Failure> indexingFailures, List<PaginatedSearchFailure> searchFailures, boolean timedOut) {
if (task.isCancelled() || false == mainRequest.isRefresh() || destinationIndices.isEmpty()) {
finishHim(null, indexingFailures, searchFailures, timedOut);
return;
Expand Down Expand Up @@ -649,7 +647,12 @@ protected void finishHim(Exception failure) {
* @param searchFailures any search failures accumulated during the request
* @param timedOut have any of the sub-requests timed out?
*/
protected void finishHim(Exception failure, List<Failure> indexingFailures, List<SearchFailure> searchFailures, boolean timedOut) {
protected void finishHim(
Exception failure,
List<Failure> indexingFailures,
List<PaginatedSearchFailure> searchFailures,
boolean timedOut
) {
logger.debug("[{}]: finishing without any catastrophic failures", task.getId());
paginatedHitSource.close(threadPool.getThreadContext().preserveContext(() -> {
if (failure == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.BulkByScrollTask;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.PaginatedHitSource;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.PaginatedHitSource.SearchFailure;
import org.elasticsearch.index.reindex.PaginatedSearchFailure;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
Expand Down Expand Up @@ -55,7 +55,7 @@ private static RestStatus getStatus(BulkByScrollResponse response) {
status = failure.getStatus();
}
}
for (SearchFailure failure : response.getSearchFailures()) {
for (PaginatedSearchFailure failure : response.getSearchFailures()) {
RestStatus failureStatus = failure.getStatus();
if (failureStatus.getStatus() > status.getStatus()) {
status = failureStatus;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.reindex;
package org.elasticsearch.reindex;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
Expand All @@ -27,6 +27,8 @@
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.mapper.RoutingFieldMapper;
import org.elasticsearch.index.reindex.PaginatedSearchFailure;
import org.elasticsearch.index.reindex.RejectAwareActionListener;
import org.elasticsearch.index.reindex.ResumeInfo.ScrollWorkerResumeInfo;
import org.elasticsearch.index.reindex.ResumeInfo.WorkerResumeInfo;
import org.elasticsearch.search.SearchHit;
Expand Down Expand Up @@ -143,14 +145,14 @@ protected void cleanup(Runnable onCompletion) {
}

private static Response wrapSearchResponse(SearchResponse response) {
List<SearchFailure> failures;
List<PaginatedSearchFailure> failures;
if (response.getShardFailures() == null) {
failures = emptyList();
} else {
failures = new ArrayList<>(response.getShardFailures().length);
for (ShardSearchFailure failure : response.getShardFailures()) {
String nodeId = failure.shard() == null ? null : failure.shard().getNodeId();
failures.add(new SearchFailure(failure.getCause(), failure.index(), failure.shardId(), nodeId));
failures.add(new PaginatedSearchFailure(failure.getCause(), failure.index(), failure.shardId(), nodeId));
}
}
List<Hit> hits;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,29 @@
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.reindex;
package org.elasticsearch.reindex;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.BackoffPolicy;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.reindex.PaginatedSearchFailure;
import org.elasticsearch.index.reindex.RejectAwareActionListener;
import org.elasticsearch.index.reindex.ResumeInfo.WorkerResumeInfo;
import org.elasticsearch.index.reindex.RetryListener;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static java.util.Objects.requireNonNull;

/**
* A source of paginated search results. Pumps data out into the passed onResponse consumer. If a scrollable search is used, then the
* same data may come out several times in case of failures during searching (though not yet). Once the onResponse consumer is done,
Expand Down Expand Up @@ -195,12 +185,18 @@ public interface AsyncResponse {
*/
public static class Response {
private final boolean timedOut;
private final List<SearchFailure> failures;
private final List<PaginatedSearchFailure> failures;
private final long totalHits;
private final List<? extends Hit> hits;
private final String scrollId;

public Response(boolean timedOut, List<SearchFailure> failures, long totalHits, List<? extends Hit> hits, String scrollId) {
public Response(
boolean timedOut,
List<PaginatedSearchFailure> failures,
long totalHits,
List<? extends Hit> hits,
String scrollId
) {
this.timedOut = timedOut;
this.failures = failures;
this.totalHits = totalHits;
Expand All @@ -218,7 +214,7 @@ public boolean isTimedOut() {
/**
* Where there any search failures?
*/
public final List<SearchFailure> getFailures() {
public final List<PaginatedSearchFailure> getFailures() {
return failures;
}

Expand Down Expand Up @@ -374,117 +370,4 @@ public void setPrimaryTerm(long primaryTerm) {
this.primaryTerm = primaryTerm;
}
}

/**
* A failure during search. Like {@link ShardSearchFailure} but useful for reindex from remote as well.
*/
public static class SearchFailure implements Writeable, ToXContentObject {
private final Throwable reason;
private final RestStatus status;
@Nullable
private final String index;
@Nullable
private final Integer shardId;
@Nullable
private final String nodeId;

public static final String INDEX_FIELD = "index";
public static final String SHARD_FIELD = "shard";
public static final String NODE_FIELD = "node";
public static final String REASON_FIELD = "reason";
public static final String STATUS_FIELD = BulkItemResponse.Failure.STATUS_FIELD;

public SearchFailure(Throwable reason, @Nullable String index, @Nullable Integer shardId, @Nullable String nodeId) {
this(reason, index, shardId, nodeId, ExceptionsHelper.status(reason));
}

public SearchFailure(
Throwable reason,
@Nullable String index,
@Nullable Integer shardId,
@Nullable String nodeId,
RestStatus status
) {
this.index = index;
this.shardId = shardId;
this.reason = requireNonNull(reason, "reason cannot be null");
this.nodeId = nodeId;
this.status = status;
}

/**
* Build a search failure that doesn't have shard information available.
*/
public SearchFailure(Throwable reason) {
this(reason, null, null, null);
}

/**
* Read from a stream.
*/
public SearchFailure(StreamInput in) throws IOException {
reason = in.readException();
index = in.readOptionalString();
shardId = in.readOptionalVInt();
nodeId = in.readOptionalString();
status = ExceptionsHelper.status(reason);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeException(reason);
out.writeOptionalString(index);
out.writeOptionalVInt(shardId);
out.writeOptionalString(nodeId);
}

public String getIndex() {
return index;
}

public Integer getShardId() {
return shardId;
}

public RestStatus getStatus() {
return this.status;
}

public Throwable getReason() {
return reason;
}

@Nullable
public String getNodeId() {
return nodeId;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (index != null) {
builder.field(INDEX_FIELD, index);
}
if (shardId != null) {
builder.field(SHARD_FIELD, shardId);
}
if (nodeId != null) {
builder.field(NODE_FIELD, nodeId);
}
builder.field(STATUS_FIELD, status.getStatus());
builder.field(REASON_FIELD);
{
builder.startObject();
ElasticsearchException.generateThrowableXContent(builder, params, reason);
builder.endObject();
}
builder.endObject();
return builder;
}

@Override
public String toString() {
return Strings.toString(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.BulkByScrollTask;
import org.elasticsearch.index.reindex.PaginatedHitSource;
import org.elasticsearch.index.reindex.PaginatedSearchFailure;
import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.RejectAwareActionListener;
Expand Down Expand Up @@ -435,7 +435,7 @@ public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
var searchExceptionSample = Optional.ofNullable(bulkByScrollResponse.getSearchFailures())
.stream()
.flatMap(List::stream)
.map(PaginatedHitSource.SearchFailure::getReason)
.map(PaginatedSearchFailure::getReason)
.findFirst();
var bulkExceptionSample = Optional.ofNullable(bulkByScrollResponse.getBulkFailures())
.stream()
Expand Down Expand Up @@ -748,7 +748,7 @@ protected PaginatedHitSource buildScrollableResultSource(BackoffPolicy backoffPo
protected void finishHim(
Exception failure,
List<BulkItemResponse.Failure> indexingFailures,
List<PaginatedHitSource.SearchFailure> searchFailures,
List<PaginatedSearchFailure> searchFailures,
boolean timedOut
) {
super.finishHim(failure, indexingFailures, searchFailures, timedOut);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.BulkByScrollTask;
import org.elasticsearch.index.reindex.PaginatedHitSource;
import org.elasticsearch.index.reindex.UpdateByQueryAction;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.index.reindex.WorkerBulkByScrollTaskState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.reindex.PaginatedHitSource.BasicHit;
import org.elasticsearch.index.reindex.PaginatedHitSource.Hit;
import org.elasticsearch.index.reindex.PaginatedHitSource.Response;
import org.elasticsearch.index.reindex.PaginatedHitSource.SearchFailure;
import org.elasticsearch.index.reindex.PaginatedSearchFailure;
import org.elasticsearch.reindex.PaginatedHitSource.BasicHit;
import org.elasticsearch.reindex.PaginatedHitSource.Hit;
import org.elasticsearch.reindex.PaginatedHitSource.Response;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ObjectParser;
Expand Down Expand Up @@ -112,7 +112,7 @@ class Fields {
/**
* Parser for {@code failed} shards in the {@code _shards} elements.
*/
public static final ConstructingObjectParser<SearchFailure, Void> SEARCH_FAILURE_PARSER = new ConstructingObjectParser<>(
public static final ConstructingObjectParser<PaginatedSearchFailure, Void> SEARCH_FAILURE_PARSER = new ConstructingObjectParser<>(
"failure",
true,
a -> {
Expand All @@ -128,7 +128,7 @@ class Fields {
} else {
reasonThrowable = (Throwable) reason;
}
return new SearchFailure(reasonThrowable, index, shardId, nodeId);
return new PaginatedSearchFailure(reasonThrowable, index, shardId, nodeId);
}
);
static {
Expand Down Expand Up @@ -169,13 +169,13 @@ class Fields {
int i = 0;
Throwable catastrophicFailure = (Throwable) a[i++];
if (catastrophicFailure != null) {
return new Response(false, singletonList(new SearchFailure(catastrophicFailure)), 0, emptyList(), null);
return new Response(false, singletonList(new PaginatedSearchFailure(catastrophicFailure)), 0, emptyList(), null);
}
boolean timedOut = (boolean) a[i++];
String scroll = (String) a[i++];
Object[] hitsElement = (Object[]) a[i++];
@SuppressWarnings("unchecked")
List<SearchFailure> failures = (List<SearchFailure>) a[i++];
List<PaginatedSearchFailure> failures = (List<PaginatedSearchFailure>) a[i++];

long totalHits = 0;
List<Hit> hits = emptyList();
Expand Down
Loading
Loading