diff --git a/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusOrExceptionTests.java b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusOrExceptionTests.java index 05e5a6366bec0..2369e18531e96 100644 --- a/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusOrExceptionTests.java +++ b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusOrExceptionTests.java @@ -76,6 +76,52 @@ protected boolean supportsUnknownFields() { return true; } + /** + * Verifies that a {@link StatusOrException} constructed with a {@link BulkByScrollTask.Status} exposes it via + * {@link StatusOrException#getStatus()} and returns null from {@link StatusOrException#getException()}. + */ + public void testStatusOrExceptionWithStatus() { + BulkByScrollTask.Status status = BulkByScrollTaskStatusTests.randomStatusWithoutException(); + StatusOrException statusOrException = new StatusOrException(status); + assertNotNull(statusOrException.getStatus()); + assertSame(status, statusOrException.getStatus()); + assertNull(statusOrException.getException()); + } + + /** + * Verifies that a {@link StatusOrException} constructed with an {@link Exception} exposes it via + * {@link StatusOrException#getException()} and returns null from {@link StatusOrException#getStatus()}. + */ + public void testStatusOrExceptionWithException() { + String message = randomAlphaOfLengthBetween(5, 20); + Exception exception = new ElasticsearchException(message); + StatusOrException statusOrException = new StatusOrException(exception); + assertNull(statusOrException.getStatus()); + assertNotNull(statusOrException.getException()); + assertSame(exception, statusOrException.getException()); + assertThat(statusOrException.getException().getMessage(), containsString(message)); + } + + /** + * Verifies that {@link StatusOrException#equals(Object)} returns false for null and for an object of a different class. + */ + public void testEqualsReturnsFalseForNullAndWrongType() { + StatusOrException statusOrException = createTestInstanceWithoutExceptions(); + assertFalse(statusOrException.equals(null)); + assertFalse(statusOrException.equals(BulkByScrollTaskStatusTests.randomStatus())); + } + + /** + * Verifies that two {@link StatusOrException} instances with the same status are equal and have the same hashCode. + */ + public void testEqualsAndHashCodeWhenHoldingSameStatus() { + BulkByScrollTask.Status status = BulkByScrollTaskStatusTests.randomStatusWithoutException(); + StatusOrException first = new StatusOrException(status); + StatusOrException second = new StatusOrException(status); + assertEquals(first, second); + assertEquals(first.hashCode(), second.hashCode()); + } + /** * Test parsing {@link StatusOrException} with inner failures as they don't support asserting on xcontent equivalence, given that * exceptions are not parsed back as the same original class. We run the usual {@link AbstractXContentTestCase#testFromXContent()} diff --git a/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusOrExceptionWireSerializingTests.java b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusOrExceptionWireSerializingTests.java new file mode 100644 index 0000000000000..b846814f0b33a --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusOrExceptionWireSerializingTests.java @@ -0,0 +1,141 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.reindex; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.io.IOException; +import java.util.Objects; + +/** + * Wire serialization tests for {@link BulkByScrollTask.StatusOrException}. + * Uses a wrapper type so that equality after round-trip is semantic (e.g. exception + * messages instead of instance identity), matching + * {@link BulkByScrollTaskStatusOrExceptionTests#assertEqualStatusOrException}. + */ +public class BulkByScrollTaskStatusOrExceptionWireSerializingTests extends AbstractWireSerializingTestCase< + BulkByScrollTaskStatusOrExceptionWireSerializingTests.StatusOrExceptionWrapper> { + + @Override + protected Writeable.Reader instanceReader() { + return StatusOrExceptionWrapper::new; + } + + @Override + protected StatusOrExceptionWrapper createTestInstance() { + BulkByScrollTask.StatusOrException statusOrException = BulkByScrollTaskStatusOrExceptionTests.createTestInstanceWithExceptions(); + return new StatusOrExceptionWrapper(statusOrException); + } + + @Override + protected StatusOrExceptionWrapper mutateInstance(StatusOrExceptionWrapper instance) throws IOException { + BulkByScrollTask.StatusOrException statusOrException = instance.statusOrException; + int field = between(0, 1); + if (field == 0) { + if (statusOrException.getStatus() != null) { + return new StatusOrExceptionWrapper( + new BulkByScrollTask.StatusOrException( + BulkByScrollTaskStatusWireSerializingTests.mutateStatus(statusOrException.getStatus()) + ) + ); + } else { + return new StatusOrExceptionWrapper(new BulkByScrollTask.StatusOrException(BulkByScrollTaskStatusTests.randomStatus())); + } + } else { + if (statusOrException.getException() != null) { + Exception currentException = statusOrException.getException(); + Exception differentException = randomValueOtherThan( + currentException, + () -> new ElasticsearchException(randomAlphaOfLengthBetween(5, 15)) + ); + return new StatusOrExceptionWrapper(new BulkByScrollTask.StatusOrException(differentException)); + } else { + return new StatusOrExceptionWrapper( + new BulkByScrollTask.StatusOrException(new ElasticsearchException(randomAlphaOfLengthBetween(5, 15))) + ); + } + } + } + + @Override + protected void assertEqualInstances(StatusOrExceptionWrapper expectedInstance, StatusOrExceptionWrapper newInstance) { + assertNotSame(expectedInstance, newInstance); + BulkByScrollTaskStatusOrExceptionTests.assertEqualStatusOrException( + expectedInstance.statusOrException, + newInstance.statusOrException, + true, + true + ); + } + + /** + * Wrapper around {@link BulkByScrollTask.StatusOrException} that implements semantic equality and hashCode + * so that round-trip serialization passes (e.g. exceptions are compared by message, not reference). + */ + static class StatusOrExceptionWrapper implements Writeable { + private final BulkByScrollTask.StatusOrException statusOrException; + + StatusOrExceptionWrapper(BulkByScrollTask.StatusOrException statusOrException) { + this.statusOrException = statusOrException; + } + + StatusOrExceptionWrapper(StreamInput in) throws IOException { + this.statusOrException = new BulkByScrollTask.StatusOrException(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + statusOrException.writeTo(out); + } + + @Override + public boolean equals(Object other) { + if (this == other) return true; + if (other == null || getClass() != other.getClass()) return false; + StatusOrExceptionWrapper that = (StatusOrExceptionWrapper) other; + return statusOrExceptionEquals(statusOrException, that.statusOrException); + } + + @Override + public int hashCode() { + return statusOrExceptionHashCode(statusOrException); + } + + private static boolean statusOrExceptionEquals( + BulkByScrollTask.StatusOrException first, + BulkByScrollTask.StatusOrException second + ) { + if (first == second) return true; + if (first == null || second == null) return false; + if (first.getStatus() != null && second.getStatus() != null) { + return BulkByScrollTaskStatusWireSerializingTests.StatusWrapper.statusEquals(first.getStatus(), second.getStatus()); + } + if (first.getException() != null && second.getException() != null) { + return Objects.equals(first.getException().getMessage(), second.getException().getMessage()); + } + return false; + } + + private static int statusOrExceptionHashCode(BulkByScrollTask.StatusOrException statusOrException) { + if (statusOrException == null) return 0; + if (statusOrException.getStatus() != null) { + return BulkByScrollTaskStatusWireSerializingTests.StatusWrapper.statusHashCode(statusOrException.getStatus()); + } + if (statusOrException.getException() != null) { + return Objects.hashCode(statusOrException.getException().getMessage()); + } + return 0; + } + } +} diff --git a/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusTests.java b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusTests.java index e6c02ff33f38d..8be52594060f1 100644 --- a/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusTests.java +++ b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusTests.java @@ -118,7 +118,7 @@ public static BulkByScrollTask.Status randomStatusWithoutException() { return new BulkByScrollTask.Status(statuses, randomBoolean() ? "test" : null); } - private static BulkByScrollTask.Status randomWorkingStatus(Integer sliceId) { + public static BulkByScrollTask.Status randomWorkingStatus(Integer sliceId) { // These all should be believably small because we sum them if we have multiple workers int total = between(0, 10000000); int updated = between(0, total); @@ -369,4 +369,66 @@ protected ToXContent.Params getToXContentParams() { } return new ToXContent.MapParams(params); } + + /** + * Verifies that {@link BulkByScrollTask.Status#getWriteableName()} returns the expected name used for wire serialization. + */ + public void testGetWriteableName() { + BulkByScrollTask.Status status = randomStatusWithoutException(); + assertEquals(BulkByScrollTask.Status.NAME, status.getWriteableName()); + } + + /** + * Verifies that {@link BulkByScrollTask.Status#equalsWithoutSliceStatus(Object, boolean, boolean)} treats two statuses + * that differ only in {@code updated} as equal when {@code includeUpdated} is false. + */ + public void testEqualsWithoutSliceStatusRespectsIncludeUpdated() { + BulkByScrollTask.Status status = randomWorkingStatus(null); + long otherUpdated = randomValueOtherThan(status.getUpdated(), () -> (long) between(0, 10000)); + BulkByScrollTask.Status sameExceptUpdated = new BulkByScrollTask.Status( + status.getSliceId(), + status.getTotal(), + otherUpdated, + status.getCreated(), + status.getDeleted(), + status.getBatches(), + status.getVersionConflicts(), + status.getNoops(), + status.getBulkRetries(), + status.getSearchRetries(), + status.getThrottled(), + status.getRequestsPerSecond(), + status.getReasonCancelled(), + status.getThrottledUntil() + ); + assertTrue(status.equalsWithoutSliceStatus(sameExceptUpdated, false, true)); + assertFalse(status.equalsWithoutSliceStatus(sameExceptUpdated, true, true)); + } + + /** + * Verifies that {@link BulkByScrollTask.Status#equalsWithoutSliceStatus(Object, boolean, boolean)} treats two statuses + * that differ only in {@code created} as equal when {@code includeCreated} is false. + */ + public void testEqualsWithoutSliceStatusRespectsIncludeCreated() { + BulkByScrollTask.Status status = randomWorkingStatus(null); + long otherCreated = randomValueOtherThan(status.getCreated(), () -> (long) between(0, 10000)); + BulkByScrollTask.Status sameExceptCreated = new BulkByScrollTask.Status( + status.getSliceId(), + status.getTotal(), + status.getUpdated(), + otherCreated, + status.getDeleted(), + status.getBatches(), + status.getVersionConflicts(), + status.getNoops(), + status.getBulkRetries(), + status.getSearchRetries(), + status.getThrottled(), + status.getRequestsPerSecond(), + status.getReasonCancelled(), + status.getThrottledUntil() + ); + assertTrue(status.equalsWithoutSliceStatus(sameExceptCreated, true, false)); + assertFalse(status.equalsWithoutSliceStatus(sameExceptCreated, true, true)); + } } diff --git a/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusWireSerializingTests.java b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusWireSerializingTests.java new file mode 100644 index 0000000000000..62e5ded094f30 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusWireSerializingTests.java @@ -0,0 +1,244 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.reindex; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import static java.lang.Math.abs; + +/** + * Wire serialization tests for {@link BulkByScrollTask.Status}. + * Uses a wrapper type so that equality after round-trip is semantic (e.g. exception messages + * instead of instance identity), matching {@link BulkByScrollTaskStatusTests#assertTaskStatusEquals}. + */ +public class BulkByScrollTaskStatusWireSerializingTests extends AbstractWireSerializingTestCase< + BulkByScrollTaskStatusWireSerializingTests.StatusWrapper> { + + @Override + protected Writeable.Reader instanceReader() { + return StatusWrapper::new; + } + + @Override + protected StatusWrapper createTestInstance() { + return new StatusWrapper(BulkByScrollTaskStatusTests.randomStatus()); + } + + @Override + protected StatusWrapper mutateInstance(StatusWrapper instance) throws IOException { + return new StatusWrapper(mutateStatus(instance.status)); + } + + /** + * Returns a copy of the given status with exactly one field changed. Used by + * {@link BulkByScrollTaskStatusOrExceptionWireSerializingTests} when mutating the status field. + */ + public static BulkByScrollTask.Status mutateStatus(BulkByScrollTask.Status status) { + if (status.getSliceStatuses().isEmpty()) { + return mutateSingleFieldStatus(status); + } else { + return mutateMergedStatus(status); + } + } + + private static BulkByScrollTask.Status mutateSingleFieldStatus(BulkByScrollTask.Status status) { + int field = between(0, 13); + Integer sliceId = status.getSliceId(); + long total = status.getTotal(); + long updated = status.getUpdated(); + long created = status.getCreated(); + long deleted = status.getDeleted(); + int batches = status.getBatches(); + long versionConflicts = status.getVersionConflicts(); + long noops = status.getNoops(); + long bulkRetries = status.getBulkRetries(); + long searchRetries = status.getSearchRetries(); + TimeValue throttled = status.getThrottled(); + float requestsPerSecond = status.getRequestsPerSecond(); + String reasonCancelled = status.getReasonCancelled(); + TimeValue throttledUntil = status.getThrottledUntil(); + + switch (field) { + case 0 -> sliceId = randomValueOtherThan(sliceId, () -> randomBoolean() ? null : between(1, 100)); + case 1 -> total = randomValueOtherThan(total, () -> randomLongBetween(0, 100)); + case 2 -> updated = randomValueOtherThan(updated, () -> randomLongBetween(0, 100)); + case 3 -> created = randomValueOtherThan(created, () -> randomLongBetween(0, 100)); + case 4 -> deleted = randomValueOtherThan(deleted, () -> randomLongBetween(0, 100)); + case 5 -> batches = randomValueOtherThan(batches, () -> between(0, 100)); + case 6 -> versionConflicts = randomValueOtherThan(versionConflicts, () -> randomLongBetween(0, 100)); + case 7 -> noops = randomValueOtherThan(noops, () -> randomLongBetween(0, 100)); + case 8 -> bulkRetries = randomValueOtherThan(bulkRetries, () -> randomLongBetween(0, 100)); + case 9 -> searchRetries = randomValueOtherThan(searchRetries, () -> randomLongBetween(0, 100)); + case 10 -> throttled = randomValueOtherThan(throttled, BulkByScrollTaskStatusWireSerializingTests::randomTimeValue); + case 11 -> requestsPerSecond = randomValueOtherThan(status.getRequestsPerSecond(), () -> abs(Randomness.get().nextFloat())); + case 12 -> reasonCancelled = randomValueOtherThan(reasonCancelled, () -> randomBoolean() ? null : randomAlphaOfLength(10)); + default -> throttledUntil = randomValueOtherThan(throttledUntil, BulkByScrollTaskStatusWireSerializingTests::randomTimeValue); + } + + return new BulkByScrollTask.Status( + sliceId, + total, + updated, + created, + deleted, + batches, + versionConflicts, + noops, + bulkRetries, + searchRetries, + throttled, + requestsPerSecond, + reasonCancelled, + throttledUntil + ); + } + + private static BulkByScrollTask.Status mutateMergedStatus(BulkByScrollTask.Status status) { + int field = between(0, 1); + if (field == 0) { + String reasonCancelled = randomValueOtherThan( + status.getReasonCancelled(), + () -> randomBoolean() ? null : randomAlphaOfLength(10) + ); + return new BulkByScrollTask.Status(status.getSliceStatuses(), reasonCancelled); + } else { + List newSlices = new ArrayList<>(status.getSliceStatuses()); + newSlices.add(randomSliceStatusOrException()); + return new BulkByScrollTask.Status(newSlices, status.getReasonCancelled()); + } + } + + private static BulkByScrollTask.StatusOrException randomSliceStatusOrException() { + return switch (between(0, 2)) { + case 0 -> null; + case 1 -> new BulkByScrollTask.StatusOrException(new ElasticsearchException(randomAlphaOfLength(5))); + default -> new BulkByScrollTask.StatusOrException(BulkByScrollTaskStatusTests.randomWorkingStatus(between(0, 100))); + }; + } + + @Override + protected void assertEqualInstances(StatusWrapper expectedInstance, StatusWrapper newInstance) { + assertNotSame(expectedInstance, newInstance); + BulkByScrollTaskStatusTests.assertTaskStatusEquals(expectedInstance.status, newInstance.status); + } + + /** + * Wrapper around {@link BulkByScrollTask.Status} that implements semantic equality and hashCode + * so that round-trip serialization passes (e.g. exceptions in slice statuses are compared by + * message, not reference). + */ + static class StatusWrapper implements Writeable { + private final BulkByScrollTask.Status status; + + StatusWrapper(BulkByScrollTask.Status status) { + this.status = status; + } + + StatusWrapper(StreamInput in) throws IOException { + this.status = new BulkByScrollTask.Status(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + status.writeTo(out); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + StatusWrapper that = (StatusWrapper) o; + return statusEquals(status, that.status); + } + + @Override + public int hashCode() { + return statusHashCode(status); + } + + static boolean statusEquals(BulkByScrollTask.Status a, BulkByScrollTask.Status b) { + if (a == b) return true; + if (a == null || b == null) return false; + if (a.getTotal() != b.getTotal()) return false; + if (a.getUpdated() != b.getUpdated()) return false; + if (a.getCreated() != b.getCreated()) return false; + if (a.getDeleted() != b.getDeleted()) return false; + if (a.getBatches() != b.getBatches()) return false; + if (a.getVersionConflicts() != b.getVersionConflicts()) return false; + if (a.getNoops() != b.getNoops()) return false; + if (a.getBulkRetries() != b.getBulkRetries()) return false; + if (a.getSearchRetries() != b.getSearchRetries()) return false; + if (Float.compare(a.getRequestsPerSecond(), b.getRequestsPerSecond()) != 0) return false; + if (Objects.equals(a.getSliceId(), b.getSliceId()) == false) return false; + if (Objects.equals(a.getThrottled(), b.getThrottled()) == false) return false; + if (Objects.equals(a.getReasonCancelled(), b.getReasonCancelled()) == false) return false; + if (Objects.equals(a.getThrottledUntil(), b.getThrottledUntil()) == false) return false; + List sa = a.getSliceStatuses(); + List sb = b.getSliceStatuses(); + if (sa.size() != sb.size()) return false; + for (int i = 0; i < sa.size(); i++) { + if (statusOrExceptionEquals(sa.get(i), sb.get(i)) == false) return false; + } + return true; + } + + private static boolean statusOrExceptionEquals(BulkByScrollTask.StatusOrException a, BulkByScrollTask.StatusOrException b) { + if (a == b) return true; + if (a == null || b == null) return false; + if (a.getStatus() != null && b.getStatus() != null) return statusEquals(a.getStatus(), b.getStatus()); + if (a.getException() != null && b.getException() != null) { + return Objects.equals(a.getException().getMessage(), b.getException().getMessage()); + } + return false; + } + + static int statusHashCode(BulkByScrollTask.Status s) { + if (s == null) return 0; + int h = Objects.hash( + s.getSliceId(), + s.getTotal(), + s.getUpdated(), + s.getCreated(), + s.getDeleted(), + s.getBatches(), + s.getVersionConflicts(), + s.getNoops(), + s.getBulkRetries(), + s.getSearchRetries(), + s.getThrottled(), + s.getRequestsPerSecond(), + s.getReasonCancelled(), + s.getThrottledUntil() + ); + for (BulkByScrollTask.StatusOrException soe : s.getSliceStatuses()) { + h = 31 * h + statusOrExceptionHashCode(soe); + } + return h; + } + + private static int statusOrExceptionHashCode(BulkByScrollTask.StatusOrException statusOrException) { + if (statusOrException == null) return 0; + if (statusOrException.getStatus() != null) return statusHashCode(statusOrException.getStatus()); + if (statusOrException.getException() != null) return Objects.hashCode(statusOrException.getException().getMessage()); + return 0; + } + } +} diff --git a/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskTests.java b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskTests.java index 576f7edc06d16..a8f9bea5bb875 100644 --- a/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskTests.java +++ b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskTests.java @@ -11,6 +11,8 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; @@ -18,6 +20,9 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; import static java.lang.Math.min; import static org.elasticsearch.core.TimeValue.timeValueMillis; @@ -25,6 +30,22 @@ import static org.hamcrest.Matchers.containsString; public class BulkByScrollTaskTests extends ESTestCase { + + /** + * Creates a minimal {@link BulkByScrollTask} with random id, type, action, description and optional relocation eligibility. + * The task is neither a leader nor a worker until {@link BulkByScrollTask#setWorkerCount(int)} or + * {@link BulkByScrollTask#setWorker(float, Integer)} is called. + */ + private static BulkByScrollTask createTask(boolean eligibleForRelocationOnShutdown) { + long taskId = randomLong(); + String type = randomAlphaOfLengthBetween(1, 10); + String action = randomAlphaOfLengthBetween(1, 10); + String description = randomAlphaOfLengthBetween(0, 20); + TaskId parentTaskId = randomBoolean() ? TaskId.EMPTY_TASK_ID : new TaskId(randomAlphaOfLength(5), randomLong()); + Map headers = randomBoolean() ? Collections.emptyMap() : Map.of("header", randomAlphaOfLength(5)); + return new BulkByScrollTask(taskId, type, action, description, parentTaskId, headers, eligibleForRelocationOnShutdown); + } + public void testStatusHatesNegatives() { checkStatusNegatives(-1, 0, 0, 0, 0, 0, 0, 0, 0, 0, "sliceId"); checkStatusNegatives(null, -1, 0, 0, 0, 0, 0, 0, 0, 0, "total"); @@ -218,4 +239,241 @@ public void testMergeStatuses() { assertEquals(reasonCancelled, merged.getReasonCancelled()); } + /** + * Verifies that {@link BulkByScrollTask#getStatus()} returns an empty status (merged from empty slice list) + * when the task is neither a leader nor a worker. + */ + public void testGetStatusReturnsEmptyStatusWhenNeitherLeaderNorWorker() { + BulkByScrollTask task = createTask(randomBoolean()); + assertFalse(task.isLeader()); + assertFalse(task.isWorker()); + BulkByScrollTask.Status status = task.getStatus(); + assertEquals(0, status.getTotal()); + assertEquals(0, status.getUpdated()); + assertEquals(0, status.getCreated()); + assertEquals(0, status.getDeleted()); + assertEquals(0, status.getBatches()); + assertTrue(status.getSliceStatuses().isEmpty()); + } + + /** + * Verifies that {@link BulkByScrollTask#isLeader()} returns false for a freshly created task and true after + * {@link BulkByScrollTask#setWorkerCount(int)} is called. + */ + public void testIsLeader() { + BulkByScrollTask task = createTask(randomBoolean()); + assertFalse(task.isLeader()); + int slices = between(2, 20); + task.setWorkerCount(slices); + assertTrue(task.isLeader()); + } + + /** + * Verifies that {@link BulkByScrollTask#isWorker()} returns false for a freshly created task and true after + * {@link BulkByScrollTask#setWorker(float, Integer)} is called. + */ + public void testIsWorker() { + BulkByScrollTask task = createTask(randomBoolean()); + assertFalse(task.isWorker()); + float requestsPerSecond = randomFloatBetween(0.1f, 1000f); + Integer sliceId = randomBoolean() ? null : between(0, 10); + task.setWorker(requestsPerSecond, sliceId); + assertTrue(task.isWorker()); + } + + /** + * Verifies that {@link BulkByScrollTask#setWorkerCount(int)} throws when the task is already a leader. + */ + public void testSetWorkerCountThrowsWhenAlreadyLeader() { + BulkByScrollTask task = createTask(randomBoolean()); + task.setWorkerCount(between(2, 10)); + IllegalStateException exception = expectThrows(IllegalStateException.class, () -> task.setWorkerCount(between(2, 10))); + assertThat(exception.getMessage(), containsString("already a leader")); + } + + /** + * Verifies that {@link BulkByScrollTask#setWorkerCount(int)} throws when the task is already a worker. + */ + public void testSetWorkerCountThrowsWhenAlreadyWorker() { + BulkByScrollTask task = createTask(randomBoolean()); + task.setWorker(randomFloatBetween(0.1f, 100f), null); + IllegalStateException exception = expectThrows(IllegalStateException.class, () -> task.setWorkerCount(between(2, 10))); + assertThat(exception.getMessage(), containsString("already a worker")); + } + + /** + * Verifies that {@link BulkByScrollTask#setWorker(float, Integer)} throws when the task is already a worker. + */ + public void testSetWorkerThrowsWhenAlreadyWorker() { + BulkByScrollTask task = createTask(randomBoolean()); + task.setWorker(randomFloatBetween(0.1f, 100f), null); + IllegalStateException exception = expectThrows( + IllegalStateException.class, + () -> task.setWorker(randomFloatBetween(0.1f, 100f), between(0, 5)) + ); + assertThat(exception.getMessage(), containsString("already a worker")); + } + + /** + * Verifies that {@link BulkByScrollTask#setWorker(float, Integer)} throws when the task is already a leader. + */ + public void testSetWorkerThrowsWhenAlreadyLeader() { + BulkByScrollTask task = createTask(randomBoolean()); + task.setWorkerCount(between(2, 10)); + IllegalStateException exception = expectThrows( + IllegalStateException.class, + () -> task.setWorker(randomFloatBetween(0.1f, 100f), between(0, 5)) + ); + assertThat(exception.getMessage(), containsString("already a leader")); + } + + /** + * Verifies that {@link BulkByScrollTask#getLeaderState()} returns the leader state after + * {@link BulkByScrollTask#setWorkerCount(int)} and throws when the task is not a leader. + */ + public void testGetLeaderState() { + BulkByScrollTask task = createTask(randomBoolean()); + IllegalStateException exception = expectThrows(IllegalStateException.class, () -> task.getLeaderState()); + assertThat(exception.getMessage(), containsString("not set to be a leader")); + + int slices = between(2, 20); + task.setWorkerCount(slices); + LeaderBulkByScrollTaskState leaderState = task.getLeaderState(); + assertNotNull(leaderState); + assertEquals(slices, leaderState.getSlices()); + } + + /** + * Verifies that {@link BulkByScrollTask#getWorkerState()} returns the worker state after + * {@link BulkByScrollTask#setWorker(float, Integer)} and throws when the task is not a worker. + */ + public void testGetWorkerState() { + BulkByScrollTask task = createTask(randomBoolean()); + IllegalStateException exception = expectThrows(IllegalStateException.class, () -> task.getWorkerState()); + assertThat(exception.getMessage(), containsString("not set to be a worker")); + + float requestsPerSecond = randomFloatBetween(0.1f, 100f); + Integer sliceId = randomBoolean() ? null : between(0, 10); + task.setWorker(requestsPerSecond, sliceId); + WorkerBulkByScrollTaskState workerState = task.getWorkerState(); + assertNotNull(workerState); + } + + /** + * Verifies that {@link BulkByScrollTask#onCancelled()} does not throw when the task is a worker + * (it delegates to the worker state's handleCancel). + */ + public void testOnCancelledWhenWorkerDoesNotThrow() { + BulkByScrollTask task = createTask(randomBoolean()); + task.setWorker(randomFloatBetween(0.1f, 100f), null); + task.onCancelled(); + } + + /** + * Verifies that {@link BulkByScrollTask#onCancelled()} does not throw when the task is neither leader nor worker. + */ + public void testOnCancelledWhenNeitherLeaderNorWorkerDoesNotThrow() { + BulkByScrollTask task = createTask(randomBoolean()); + task.onCancelled(); + } + + /** + * Verifies that {@link BulkByScrollTask#isEligibleForRelocationOnShutdown()} returns the value passed to the constructor. + */ + public void testIsEligibleForRelocationOnShutdown() { + boolean eligible = randomBoolean(); + BulkByScrollTask task = createTask(eligible); + assertEquals(eligible, task.isEligibleForRelocationOnShutdown()); + } + + /** + * Verifies that {@link BulkByScrollTask#requestRelocation()} sets the relocation-requested flag when the task + * is eligible for relocation, and that {@link BulkByScrollTask#isRelocationRequested()} reflects it. + */ + public void testRequestRelocationWhenEligible() { + BulkByScrollTask task = createTask(true); + assertFalse(task.isRelocationRequested()); + task.requestRelocation(); + assertTrue(task.isRelocationRequested()); + } + + /** + * Verifies that {@link BulkByScrollTask#requestRelocation()} throws when the task is not eligible for relocation. + */ + public void testRequestRelocationThrowsWhenNotEligible() { + BulkByScrollTask task = createTask(false); + IllegalStateException exception = expectThrows(IllegalStateException.class, task::requestRelocation); + assertThat(exception.getMessage(), containsString("eligibleForRelocationOnShutdown is false")); + } + + /** + * Verifies that {@link BulkByScrollTask#taskInfoGivenSubtaskInfo(String, List)} builds a combined + * {@link TaskInfo} from the given slice task infos when the task is a leader. + */ + public void testTaskInfoGivenSubtaskInfo() { + int slices = between(2, 8); + BulkByScrollTask task = createTask(randomBoolean()); + task.setWorkerCount(slices); + + String localNodeId = randomAlphaOfLength(5); + List sliceInfoList = Arrays.asList(new TaskInfo[slices]); + for (int sliceIndex = 0; sliceIndex < slices; sliceIndex++) { + BulkByScrollTask.Status sliceStatus = new BulkByScrollTask.Status( + sliceIndex, + between(0, 100), + between(0, 50), + between(0, 50), + between(0, 50), + between(0, 10), + between(0, 10), + between(0, 10), + between(0, 10), + between(0, 10), + timeValueMillis(0), + randomFloatBetween(0.1f, 100f), + randomBoolean() ? null : randomAlphaOfLength(5), + timeValueMillis(0) + ); + TaskId sliceTaskId = new TaskId(localNodeId, randomLong()); + TaskInfo sliceTaskInfo = new TaskInfo( + sliceTaskId, + task.getType(), + localNodeId, + task.getAction(), + task.getDescription(), + sliceStatus, + randomLong(), + randomLong(), + true, + false, + TaskId.EMPTY_TASK_ID, + Collections.emptyMap() + ); + sliceInfoList.set(sliceIndex, sliceTaskInfo); + } + + TaskInfo combinedTaskInfo = task.taskInfoGivenSubtaskInfo(localNodeId, sliceInfoList); + assertNotNull(combinedTaskInfo); + assertEquals(localNodeId, combinedTaskInfo.node()); + assertNotNull(combinedTaskInfo.status()); + assertTrue(combinedTaskInfo.status() instanceof BulkByScrollTask.Status); + } + + /** + * Verifies that {@link BulkByScrollTask#taskInfoGivenSubtaskInfo(String, List)} throws when the task is not a leader. + */ + public void testTaskInfoGivenSubtaskInfoThrowsWhenNotLeader() { + BulkByScrollTask task = createTask(randomBoolean()); + String localNodeId = randomAlphaOfLength(5); + List sliceInfoList = Collections.emptyList(); + IllegalStateException exception = expectThrows( + IllegalStateException.class, + () -> task.taskInfoGivenSubtaskInfo(localNodeId, sliceInfoList) + ); + assertThat(exception.getMessage(), containsString("not set to be a leader")); + } + + private static float randomFloatBetween(float min, float max) { + return min + (max - min) * random().nextFloat(); + } }