Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,8 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
import org.elasticsearch.client.internal.transport.NoNodeAvailableException;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.index.reindex.BulkByScrollTask.Status;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.AbstractXContentTestCase;
Expand Down Expand Up @@ -108,24 +105,6 @@ private static Object parseFailure(XContentParser parser) throws IOException {
private boolean includeCreated;
private boolean testExceptions = randomBoolean();

public void testRountTrip() throws IOException {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is greater serialisation test coverage from BulkByScrollResponseserializationTests than this test previously provided

BulkByScrollResponse response = new BulkByScrollResponse(
timeValueMillis(randomNonNegativeLong()),
BulkByScrollTaskStatusTests.randomStatus(),
randomIndexingFailures(),
randomSearchFailures(),
randomBoolean()
);
BulkByScrollResponse tripped;
try (BytesStreamOutput out = new BytesStreamOutput()) {
response.writeTo(out);
try (StreamInput in = out.bytes().streamInput()) {
tripped = new BulkByScrollResponse(in);
}
}
assertResponseEquals(response, tripped);
}

private List<Failure> randomIndexingFailures() {
return usually()
? emptyList()
Expand All @@ -152,31 +131,6 @@ private List<PaginatedHitSource.SearchFailure> randomSearchFailures() {
return singletonList(new PaginatedHitSource.SearchFailure(exception, index, shardId, nodeId));
}

private void assertResponseEquals(BulkByScrollResponse expected, BulkByScrollResponse actual) {
assertEquals(expected.getTook(), actual.getTook());
BulkByScrollTaskStatusTests.assertTaskStatusEquals(TransportVersion.current(), expected.getStatus(), actual.getStatus());
assertEquals(expected.getBulkFailures().size(), actual.getBulkFailures().size());
for (int i = 0; i < expected.getBulkFailures().size(); i++) {
Failure expectedFailure = expected.getBulkFailures().get(i);
Failure actualFailure = actual.getBulkFailures().get(i);
assertEquals(expectedFailure.getIndex(), actualFailure.getIndex());
assertEquals(expectedFailure.getId(), actualFailure.getId());
assertEquals(expectedFailure.getMessage(), actualFailure.getMessage());
assertEquals(expectedFailure.getStatus(), actualFailure.getStatus());
}
assertEquals(expected.getSearchFailures().size(), actual.getSearchFailures().size());
for (int i = 0; i < expected.getSearchFailures().size(); i++) {
PaginatedHitSource.SearchFailure expectedFailure = expected.getSearchFailures().get(i);
PaginatedHitSource.SearchFailure actualFailure = actual.getSearchFailures().get(i);
assertEquals(expectedFailure.getIndex(), actualFailure.getIndex());
assertEquals(expectedFailure.getShardId(), actualFailure.getShardId());
assertEquals(expectedFailure.getNodeId(), actualFailure.getNodeId());
assertEquals(expectedFailure.getReason().getClass(), actualFailure.getReason().getClass());
assertEquals(expectedFailure.getReason().getMessage(), actualFailure.getReason().getMessage());
assertEquals(expectedFailure.getStatus(), actualFailure.getStatus());
}
}

public static void assertEqualBulkResponse(
BulkByScrollResponse expected,
BulkByScrollResponse actual,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.reindex;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
import org.elasticsearch.action.bulk.IndexDocFailureStoreStatus;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

public class BulkByScrollResponseWireSerializingTests extends AbstractWireSerializingTestCase<
BulkByScrollResponseWireSerializingTests.BulkByScrollResponseWrapper> {
@Override
protected Writeable.Reader<BulkByScrollResponseWrapper> instanceReader() {
return BulkByScrollResponseWrapper::new;
}

@Override
protected BulkByScrollResponseWrapper createTestInstance() {
return new BulkByScrollResponseWrapper(
new BulkByScrollResponse(
randomTimeValue(),
BulkByScrollTaskStatusTests.randomStatus(),
randomBulkFailures(),
randomSearchFailures(),
randomBoolean()
)
);
}

@Override
protected BulkByScrollResponseWrapper mutateInstance(BulkByScrollResponseWrapper instance) {
BulkByScrollResponse r = instance.response();
return new BulkByScrollResponseWrapper(switch (between(0, 4)) {
case 0 -> new BulkByScrollResponse(
randomValueOtherThan(r.getTook(), ESTestCase::randomTimeValue),
r.getStatus(),
r.getBulkFailures(),
r.getSearchFailures(),
r.isTimedOut()
);
case 1 -> new BulkByScrollResponse(
r.getTook(),
mutateRandomStatus(r.getStatus()),
r.getBulkFailures(),
r.getSearchFailures(),
r.isTimedOut()
);
case 2 -> new BulkByScrollResponse(
r.getTook(),
r.getStatus(),
mutateBulkFailures(r.getBulkFailures()),
r.getSearchFailures(),
r.isTimedOut()
);
case 3 -> new BulkByScrollResponse(
r.getTook(),
r.getStatus(),
r.getBulkFailures(),
mutateSearchFailures(r.getSearchFailures()),
r.isTimedOut()
);
case 4 -> new BulkByScrollResponse(
r.getTook(),
r.getStatus(),
r.getBulkFailures(),
r.getSearchFailures(),
r.isTimedOut() == false
);
default -> throw new AssertionError();
});
}

private BulkByScrollTask.Status mutateRandomStatus(BulkByScrollTask.Status currentStatus) {
while (true) {
BulkByScrollTask.Status candidate = BulkByScrollTaskStatusTests.randomStatus();
try {
BulkByScrollTaskStatusTests.assertTaskStatusEquals(currentStatus, candidate);
// Equal → try again
} catch (AssertionError e) {
// Not equal → success
return candidate;
}
}
}

private List<Failure> mutateBulkFailures(List<Failure> currentFailures) {
List<Failure> newFailures = new ArrayList<>(currentFailures);
newFailures.add(randomFailure());
return newFailures;
}

private List<Failure> randomBulkFailures() {
return randomList(0, 5, BulkByScrollResponseWireSerializingTests::randomFailure);
}

static Failure randomFailure() {
String index = randomAlphaOfLengthBetween(3, 10);
String id = randomBoolean() ? randomAlphaOfLengthBetween(3, 10) : null;
Exception cause = randomException();
return randomFailure(
index,
id,
cause,
randomBoolean() ? randomNonNegativeLong() : SequenceNumbers.UNASSIGNED_SEQ_NO,
randomBoolean() ? randomNonNegativeLong() : SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
randomBoolean(),
randomFrom(IndexDocFailureStoreStatus.values())
);
}

static Failure randomFailure(
String index,
String id,
Exception cause,
long seqNo,
long term,
boolean aborted,
IndexDocFailureStoreStatus failureStoreStatus
) {
Failure failure;
if (seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO || term != SequenceNumbers.UNASSIGNED_PRIMARY_TERM) {
failure = new Failure(index, id, cause, seqNo, term);
} else if (aborted) {
failure = new Failure(index, id, cause, true);
} else if (failureStoreStatus != IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN) {
failure = new Failure(index, id, cause, failureStoreStatus);
} else {
failure = new Failure(index, id, cause);
}
failure.setFailureStoreStatus(failureStoreStatus);
return failure;
}

static Exception randomException() {
return randomFrom(
new IllegalArgumentException(randomAlphaOfLengthBetween(5, 20)),
new IllegalStateException(randomAlphaOfLengthBetween(5, 20)),
new ElasticsearchException(randomAlphaOfLengthBetween(5, 20))
);
}

private List<PaginatedHitSource.SearchFailure> randomSearchFailures() {
return randomList(0, 5, this::randomSearchFailure);
}

private List<PaginatedHitSource.SearchFailure> mutateSearchFailures(List<PaginatedHitSource.SearchFailure> searchFailures) {
List<PaginatedHitSource.SearchFailure> newFailures = new ArrayList<>(searchFailures);
newFailures.add(randomSearchFailure());
return newFailures;
}

private PaginatedHitSource.SearchFailure randomSearchFailure() {
Throwable reason = randomException();
String index = randomBoolean() ? randomAlphaOfLengthBetween(1, 10) : null;
Integer shardId = randomBoolean() ? randomIntBetween(0, 100) : null;
String nodeId = randomBoolean() ? randomAlphaOfLengthBetween(1, 10) : null;
return new PaginatedHitSource.SearchFailure(reason, index, shardId, nodeId);
}

/**
* {@code BulkByScrollResponse} does not implement {@code equals}/{@code hashCode},
* and its {@link BulkByScrollTask.Status} contains slice-level and implementation
* details that are not stable for direct equality checks.
* <p>
* Equality is defined in terms of wire-relevant state only: top-level fields,
* aggregated task status counters (via
* {@link BulkByScrollTaskStatusTests#assertTaskStatusEquals}), and the stable
* attributes of bulk and search failures. Care must be taken for exceptions, since
* two messages with the same cause and message would be different instances after
* serialization / deserialization, and fail the default equality check. For this
* reason, we define custom equality below.
*/
static class BulkByScrollResponseWrapper implements Writeable {
private final BulkByScrollResponse response;

BulkByScrollResponseWrapper(BulkByScrollResponse response) {
this.response = response;
}

BulkByScrollResponseWrapper(StreamInput in) throws IOException {
this.response = new BulkByScrollResponse(in);
}

BulkByScrollResponse response() {
return response;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
response.writeTo(out);
}

@Override
public String toString() {
return response.toString();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
BulkByScrollResponseWrapper that = (BulkByScrollResponseWrapper) o;
return responsesEqual(response, that.response);
}

@Override
public int hashCode() {
return Objects.hash(
response.getTook(),
response.isTimedOut(),
response.getCreated(),
response.getTotal(),
response.getDeleted(),
response.getUpdated(),
response.getBatches(),
response.getVersionConflicts(),
response.getNoops(),
response.getBulkRetries(),
response.getSearchRetries(),
response.getReasonCancelled(),
response.getBulkFailures()
.stream()
.map(f -> Objects.hash(f.getIndex(), f.getId(), f.getStatus(), f.getCause().getClass()))
.toList(),
response.getSearchFailures()
.stream()
.map(f -> Objects.hash(f.getIndex(), f.getShardId(), f.getNodeId(), f.getReason().getClass(), f.getStatus()))
.toList()
);
}

}

private static boolean responsesEqual(BulkByScrollResponse a, BulkByScrollResponse b) {
if (a.getTook().equals(b.getTook()) == false) return false;

try {
BulkByScrollTaskStatusTests.assertTaskStatusEquals(a.getStatus(), b.getStatus());
// Equal → skip to next check
} catch (AssertionError e) {
// Assertion error → not equal
return false;
}

if (a.getBulkFailures().size() != b.getBulkFailures().size()) return false;
for (int i = 0; i < a.getBulkFailures().size(); i++) {
Failure fa = a.getBulkFailures().get(i);
Failure fb = b.getBulkFailures().get(i);
if (Objects.equals(fa.getIndex(), fb.getIndex()) == false) return false;
if (Objects.equals(fa.getId(), fb.getId()) == false) return false;
if (Objects.equals(fa.getStatus(), fb.getStatus()) == false) return false;
if (Objects.equals(fa.getCause().getClass(), fb.getCause().getClass()) == false) return false;
}

if (a.getSearchFailures().size() != b.getSearchFailures().size()) return false;
for (int i = 0; i < a.getSearchFailures().size(); i++) {
PaginatedHitSource.SearchFailure fa = a.getSearchFailures().get(i);
PaginatedHitSource.SearchFailure fb = b.getSearchFailures().get(i);
if (Objects.equals(fa.getIndex(), fb.getIndex()) == false) return false;
if (Objects.equals(fa.getShardId(), fb.getShardId()) == false) return false;
if (Objects.equals(fa.getNodeId(), fb.getNodeId()) == false) return false;
if (Objects.equals(fa.getReason().getClass(), fb.getReason().getClass()) == false) return false;
if (Objects.equals(fa.getStatus(), fb.getStatus()) == false) return false;
}

return a.isTimedOut() == b.isTimedOut();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import org.apache.lucene.tests.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
Expand Down Expand Up @@ -52,14 +51,13 @@ public void testBulkByTaskStatus() throws IOException {
BytesStreamOutput out = new BytesStreamOutput();
status.writeTo(out);
BulkByScrollTask.Status tripped = new BulkByScrollTask.Status(out.bytes().streamInput());
assertTaskStatusEquals(out.getTransportVersion(), status, tripped);
assertTaskStatusEquals(status, tripped);
}

/**
* Assert that two task statuses are equal after serialization.
* @param version the version at which expected was serialized
*/
public static void assertTaskStatusEquals(TransportVersion version, BulkByScrollTask.Status expected, BulkByScrollTask.Status actual) {
public static void assertTaskStatusEquals(BulkByScrollTask.Status expected, BulkByScrollTask.Status actual) {
assertEquals(expected.getTotal(), actual.getTotal());
assertEquals(expected.getUpdated(), actual.getUpdated());
assertEquals(expected.getCreated(), actual.getCreated());
Expand All @@ -80,7 +78,7 @@ public static void assertTaskStatusEquals(TransportVersion version, BulkByScroll
assertNull(actual.getSliceStatuses().get(i));
} else if (sliceStatus.getException() == null) {
assertNull(actual.getSliceStatuses().get(i).getException());
assertTaskStatusEquals(version, sliceStatus.getStatus(), actual.getSliceStatuses().get(i).getStatus());
assertTaskStatusEquals(sliceStatus.getStatus(), actual.getSliceStatuses().get(i).getStatus());
} else {
assertNull(actual.getSliceStatuses().get(i).getStatus());
// Just check the message because we're not testing exception serialization in general here.
Expand Down