Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Change default value of remote_data_ratio, which is used in Searchable Snapshots and Writeable Warm from 0 to 5 and min allowed value to 1 ([#18767](https://github.com/opensearch-project/OpenSearch/pull/18767))
- Making multi rate limiters in repository dynamic [#18069](https://github.com/opensearch-project/OpenSearch/pull/18069)

### Fixed
- Fix unnecessary refreshes on update preparation failures ([#15261](https://github.com/opensearch-project/OpenSearch/issues/15261))

### Dependencies
- Bump `stefanzweifel/git-auto-commit-action` from 5 to 6 ([#18524](https://github.com/opensearch-project/OpenSearch/pull/18524))
- Bump Apache Lucene to 10.2.2 ([#18573](https://github.com/opensearch-project/OpenSearch/pull/18573))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.ChannelActionListener;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.support.replication.ReplicationMode;
import org.opensearch.action.support.replication.ReplicationOperation;
import org.opensearch.action.support.replication.ReplicationTask;
Expand Down Expand Up @@ -520,12 +521,32 @@ public boolean isForceExecution() {
}

private void finishRequest() {
// If no actual writes occurred (locationToSync is null), we should not trigger refresh
// even if the request has RefreshPolicy.IMMEDIATE
final Translog.Location locationToSync = context.getLocationToSync();
final BulkShardRequest bulkShardRequest = context.getBulkShardRequest();

// Create a modified request with NONE refresh policy if no writes occurred
final BulkShardRequest requestForResult;
if (locationToSync == null && bulkShardRequest.getRefreshPolicy() != WriteRequest.RefreshPolicy.NONE) {
// No actual writes occurred, so we should not refresh
requestForResult = new BulkShardRequest(
bulkShardRequest.shardId(),
WriteRequest.RefreshPolicy.NONE,
bulkShardRequest.items()
);
requestForResult.index(bulkShardRequest.index());
requestForResult.setParentTask(bulkShardRequest.getParentTask());
} else {
requestForResult = bulkShardRequest;
}

ActionListener.completeWith(
listener,
() -> new WritePrimaryResult<>(
context.getBulkShardRequest(),
requestForResult,
context.buildShardResponse(),
context.getLocationToSync(),
locationToSync,
null,
context.getPrimary(),
logger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongSupplier;

Expand All @@ -119,6 +120,7 @@
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -822,6 +824,270 @@ public void testFailureDuringUpdateProcessing() throws Exception {
assertThat(failure.getStatus(), equalTo(RestStatus.INTERNAL_SERVER_ERROR));
}

public void testFailedUpdatePreparationDoesNotTriggerRefresh() throws Exception {
IndexSettings indexSettings = new IndexSettings(indexMetadata(), Settings.EMPTY);

// Create an update request that will fail during preparation
DocWriteRequest<UpdateRequest> writeRequest = new UpdateRequest("index", "id").doc(Requests.INDEX_CONTENT_TYPE, "field", "value");
BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest);

IndexShard shard = mock(IndexShard.class);
when(shard.indexSettings()).thenReturn(indexSettings);
when(shard.shardId()).thenReturn(shardId);

// Mock the UpdateHelper to throw a version conflict exception during preparation
UpdateHelper updateHelper = mock(UpdateHelper.class);
final VersionConflictEngineException versionConflict = new VersionConflictEngineException(
shardId,
"id",
"version conflict during update preparation"
);
when(updateHelper.prepare(any(), eq(shard), any())).thenThrow(versionConflict);

// Create bulk request with IMMEDIATE refresh policy
BulkItemRequest[] items = new BulkItemRequest[] { primaryRequest };
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.IMMEDIATE, items);

randomlySetIgnoredPrimaryResponse(primaryRequest);

// Execute the bulk operation through performOnPrimary
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean refreshCalled = new AtomicBoolean(false);

// Mock refresh to track if it's called
doAnswer(invocation -> {
refreshCalled.set(true);
return null;
}).when(shard).refresh(any());

TransportShardBulkAction.performOnPrimary(
bulkShardRequest,
shard,
updateHelper,
threadPool::absoluteTimeInMillis,
new NoopMappingUpdatePerformer(),
listener -> listener.onResponse(null),
new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> {
WritePrimaryResult<BulkShardRequest, BulkShardResponse> primaryResult = (WritePrimaryResult<
BulkShardRequest,
BulkShardResponse>) result;

// Verify no location to sync (no writes occurred)
assertNull(primaryResult.location);

// Run post replication actions
primaryResult.runPostReplicationActions(ActionListener.wrap(v -> {
// Success - refresh should not have been called
}, e -> { fail("Post replication actions should not fail: " + e.getMessage()); }));

// Verify refresh was NOT called even though refresh policy was IMMEDIATE
assertFalse(refreshCalled.get());
}), latch),
threadPool,
Names.WRITE
);

assertTrue(latch.await(5, TimeUnit.SECONDS));
}

public void testBulkRequestWithMixedSuccessAndFailureRefresh() throws Exception {
IndexSettings indexSettings = new IndexSettings(indexMetadata(), Settings.EMPTY);

// Create a mix of successful and failed operations
BulkItemRequest[] items = new BulkItemRequest[3];

// Item 0: Successful index operation
items[0] = new BulkItemRequest(0, new IndexRequest("index").id("success1").source(Requests.INDEX_CONTENT_TYPE, "field", "value"));

// Item 1: Failed update operation
items[1] = new BulkItemRequest(1, new UpdateRequest("index", "fail1").doc(Requests.INDEX_CONTENT_TYPE, "field", "value"));

// Item 2: Successful index operation
items[2] = new BulkItemRequest(2, new IndexRequest("index").id("success2").source(Requests.INDEX_CONTENT_TYPE, "field", "value"));

BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.IMMEDIATE, items);

IndexShard shard = mock(IndexShard.class);
when(shard.indexSettings()).thenReturn(indexSettings);
when(shard.shardId()).thenReturn(shardId);

// Mock successful index operations
Translog.Location resultLocation1 = new Translog.Location(42, 42, 42);
Translog.Location resultLocation2 = new Translog.Location(43, 43, 43);
Engine.IndexResult successResult1 = new FakeIndexResult(1, 1, 10, true, resultLocation1);
Engine.IndexResult successResult2 = new FakeIndexResult(1, 1, 12, true, resultLocation2);

when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())).thenReturn(
successResult1
).thenReturn(successResult2);

// Mock failed update operation
UpdateHelper updateHelper = mock(UpdateHelper.class);
when(updateHelper.prepare(any(), eq(shard), any())).thenThrow(
new VersionConflictEngineException(shardId, "fail1", "version conflict")
);

// Track refresh calls
AtomicBoolean refreshCalled = new AtomicBoolean(false);
doAnswer(invocation -> {
refreshCalled.set(true);
return null;
}).when(shard).refresh(any());

// Execute bulk operation
CountDownLatch latch = new CountDownLatch(1);
TransportShardBulkAction.performOnPrimary(
bulkShardRequest,
shard,
updateHelper,
threadPool::absoluteTimeInMillis,
new NoopMappingUpdatePerformer(),
listener -> listener.onResponse(null),
new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> {
WritePrimaryResult<BulkShardRequest, BulkShardResponse> primaryResult = (WritePrimaryResult<
BulkShardRequest,
BulkShardResponse>) result;

// Should have a location since some operations succeeded
assertNotNull(primaryResult.location);

// Run post replication actions
primaryResult.runPostReplicationActions(ActionListener.wrap(v -> {
// Success
}, e -> { fail("Post replication actions should not fail: " + e.getMessage()); }));

// Verify refresh WAS called because there were successful writes
assertTrue(refreshCalled.get());
}), latch),
threadPool,
Names.WRITE
);

assertTrue(latch.await(5, TimeUnit.SECONDS));
}

public void testBulkRequestWithAllFailedUpdatesNoRefresh() throws Exception {
IndexSettings indexSettings = new IndexSettings(indexMetadata(), Settings.EMPTY);

// Create multiple failed update operations
BulkItemRequest[] items = new BulkItemRequest[3];
for (int i = 0; i < 3; i++) {
items[i] = new BulkItemRequest(i, new UpdateRequest("index", "id" + i).doc(Requests.INDEX_CONTENT_TYPE, "field", "value"));
}

BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.IMMEDIATE, items);

IndexShard shard = mock(IndexShard.class);
when(shard.indexSettings()).thenReturn(indexSettings);
when(shard.shardId()).thenReturn(shardId);

// Mock all updates to fail
UpdateHelper updateHelper = mock(UpdateHelper.class);
when(updateHelper.prepare(any(), eq(shard), any())).thenThrow(
new VersionConflictEngineException(shardId, "id", "version conflict")
);

// Track refresh calls
AtomicBoolean refreshCalled = new AtomicBoolean(false);
doAnswer(invocation -> {
refreshCalled.set(true);
return null;
}).when(shard).refresh(any());

// Execute bulk operation
CountDownLatch latch = new CountDownLatch(1);
TransportShardBulkAction.performOnPrimary(
bulkShardRequest,
shard,
updateHelper,
threadPool::absoluteTimeInMillis,
new NoopMappingUpdatePerformer(),
listener -> listener.onResponse(null),
new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> {
WritePrimaryResult<BulkShardRequest, BulkShardResponse> primaryResult = (WritePrimaryResult<
BulkShardRequest,
BulkShardResponse>) result;

// No location since all operations failed
assertNull(primaryResult.location);

// Run post replication actions
primaryResult.runPostReplicationActions(ActionListener.wrap(v -> {
// Success
}, e -> { fail("Post replication actions should not fail: " + e.getMessage()); }));

// Verify refresh was NOT called
assertFalse(refreshCalled.get());
}), latch),
threadPool,
Names.WRITE
);

assertTrue(latch.await(5, TimeUnit.SECONDS));
}

public void testSuccessfulBulkOperationStillTriggersRefresh() throws Exception {
IndexSettings indexSettings = new IndexSettings(indexMetadata(), Settings.EMPTY);

// Create successful operations
BulkItemRequest[] items = new BulkItemRequest[2];
items[0] = new BulkItemRequest(0, new IndexRequest("index").id("id1").source(Requests.INDEX_CONTENT_TYPE, "field", "value1"));
items[1] = new BulkItemRequest(1, new IndexRequest("index").id("id2").source(Requests.INDEX_CONTENT_TYPE, "field", "value2"));

BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.IMMEDIATE, items);

IndexShard shard = mock(IndexShard.class);
when(shard.indexSettings()).thenReturn(indexSettings);
when(shard.shardId()).thenReturn(shardId);

// Mock successful operations
AtomicInteger locationCounter = new AtomicInteger(42);
when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())).thenAnswer(
invocation -> {
int loc = locationCounter.getAndIncrement();
return new FakeIndexResult(1, 1, loc, true, new Translog.Location(loc, loc, loc));
}
);

// Track refresh calls
AtomicBoolean refreshCalled = new AtomicBoolean(false);
doAnswer(invocation -> {
refreshCalled.set(true);
return null;
}).when(shard).refresh(any());

// Execute bulk operation
CountDownLatch latch = new CountDownLatch(1);
TransportShardBulkAction.performOnPrimary(
bulkShardRequest,
shard,
null,
threadPool::absoluteTimeInMillis,
new NoopMappingUpdatePerformer(),
listener -> listener.onResponse(null),
new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> {
WritePrimaryResult<BulkShardRequest, BulkShardResponse> primaryResult = (WritePrimaryResult<
BulkShardRequest,
BulkShardResponse>) result;

// Should have location from successful operations
assertNotNull(primaryResult.location);

// Run post replication actions
primaryResult.runPostReplicationActions(ActionListener.wrap(v -> {
// Success
}, e -> { fail("Post replication actions should not fail: " + e.getMessage()); }));

// Verify refresh WAS called
assertTrue(refreshCalled.get());
}), latch),
threadPool,
Names.WRITE
);

assertTrue(latch.await(5, TimeUnit.SECONDS));
}

public void testTranslogPositionToSync() throws Exception {
IndexShard shard = newStartedShard(true);

Expand Down
Loading