Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
c8812d1
Changes to get remote upload and replication to work for lucene indices
raghuvanshraj Dec 30, 2025
c2e9231
Replication changes
raghuvanshraj Jan 9, 2026
32a4171
Fixes for remote store integration (#20325)
mgodwan Dec 29, 2025
3b000d0
Updated and Refactored the code to have only required changes for rec…
Jan 12, 2026
b951269
Removed extra logging lines
Jan 12, 2026
b3e46a5
Resolving PR comments
Jan 12, 2026
c5eea06
Updated the VSRManagerTests to use ParquetFileMetadata as flushResult
Jan 12, 2026
7925ce3
Fixing RemoteSegmentStoreDirectoryTests and RemoteSegmentStoreDirecto…
Jan 13, 2026
7a5c4b0
Fix for merged files not getting uploaded/deleted
raghuvanshraj Jan 14, 2026
799279f
Added support for SSE KMS testing via run.gradle
raghuvanshraj Jan 13, 2026
5a56859
[RemoteStore] Add support for repository with server side encryption …
pranikum Oct 28, 2025
3e24e1c
Adding support for SSE KMS in RemoteSegmentStoreDirectoryFactory
raghuvanshraj Jan 14, 2026
db2b6c5
Fixing lucene remote recovery and files should not get written in rep…
Jan 15, 2026
b444e6c
Removing extra logging statements
Jan 15, 2026
c67d004
Revert "Fixing lucene remote recovery and files should not get writte…
raghuvanshraj Jan 15, 2026
9483cbb
Using NIOFSDirectory in GenericStoreDirectory
raghuvanshraj Jan 15, 2026
93911e1
Adding NRTReplicationCompositeEngine for checkpoint tracking during r…
raghuvanshraj Jan 15, 2026
d638436
Using index input for checksum calculation instead of input stream
raghuvanshraj Jan 16, 2026
caef264
Fixing lucene remote recovery and files should not get written in rep…
Jan 15, 2026
d7d5220
Fixed RemoteIndexRecoveryIT::testRerouteRecovery
Jan 16, 2026
760f2c3
Removed the isReadonly check from compositeEngine as we have the NRTE…
Jan 16, 2026
9926737
Removing override for getHistoryUUID in NRTReplicationCompositeEngine
raghuvanshraj Jan 19, 2026
817c024
Updated the IndexFileDeleterTests to use absolute file path
Jan 19, 2026
76fc740
Refactoring syncSegmentsFromGivenRemoteSegmentStore and syncSegmentsF…
Jan 19, 2026
dd5463e
Made deleteUnrefrencedFiles api more generic
Jan 19, 2026
7341e0f
Modifying UUID to be added before file name extension for catalog sup…
raghuvanshraj Jan 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions .idea/runConfigurations/Debug_OpenSearch.xml

This file was deleted.

15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,21 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add streaming cardinality aggregator ([#19484](https://github.com/opensearch-project/OpenSearch/pull/19484))
- Disable request cache for streaming aggregation queries ([#19520](https://github.com/opensearch-project/OpenSearch/pull/19520))

- Add support for a ForkJoinPool type ([#19008](https://github.com/opensearch-project/OpenSearch/pull/19008))
- Add seperate shard limit validation for local and remote indices ([#19532](https://github.com/opensearch-project/OpenSearch/pull/19532))
- Use Lucene `pack` method for `half_float` and `usigned_long` when using `ApproximatePointRangeQuery`.
- Add a mapper for context aware segments grouping criteria ([#19233](https://github.com/opensearch-project/OpenSearch/pull/19233))
- Return full error for GRPC error response ([#19568](https://github.com/opensearch-project/OpenSearch/pull/19568))
- Add support for repository with Server side encryption enabled and client side encryption as well based on a flag. ([#19630)](https://github.com/opensearch-project/OpenSearch/pull/19630))
- Add pluggable gRPC interceptors with explicit ordering([#19005](https://github.com/opensearch-project/OpenSearch/pull/19005))
- Add BindableServices extension point to transport-grpc-spi ([#19304](https://github.com/opensearch-project/OpenSearch/pull/19304))
- Add metrics for the merged segment warmer feature ([#18929](https://github.com/opensearch-project/OpenSearch/pull/18929))
- Add pointer based lag metric in pull-based ingestion ([#19635](https://github.com/opensearch-project/OpenSearch/pull/19635))
- Introduced internal API for retrieving metadata about requested indices from transport actions ([#18523](https://github.com/opensearch-project/OpenSearch/pull/18523))
- Add cluster defaults for merge autoThrottle, maxMergeThreads, and maxMergeCount; Add segment size filter to the merged segment warmer ([#19629](https://github.com/opensearch-project/OpenSearch/pull/19629))
- Add build-tooling to run in FIPS environment ([#18921](https://github.com/opensearch-project/OpenSearch/pull/18921))
- Add SMILE/CBOR/YAML document format support to Bulk GRPC endpoint ([#19744](https://github.com/opensearch-project/OpenSearch/pull/19744))

### Changed
- Refactor `if-else` chains to use `Java 17 pattern matching switch expressions`(([#18965](https://github.com/opensearch-project/OpenSearch/pull/18965))
- Add CompletionStage variants to methods in the Client Interface and default to ActionListener impl ([#18998](https://github.com/opensearch-project/OpenSearch/pull/18998))
Expand Down
28 changes: 28 additions & 0 deletions gradle/run.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,34 @@ testClusters {
testDistribution = 'archive'
if (numZones > 1) numberOfZones = numZones
if (numNodes > 1) numberOfNodes = numNodes
// S3 repository configuration
if (findProperty("enableS3")) {
plugin(':plugins:repository-s3')
if (findProperty("s3Endpoint")) {
setting 's3.client.default.endpoint', findProperty("s3Endpoint")
}
setting 's3.client.default.region', findProperty("s3Region") ?: 'us-east-1'
keystore 's3.client.default.access_key', findProperty("s3AccessKey") ?: System.getenv("AWS_ACCESS_KEY_ID") ?: 'test'
keystore 's3.client.default.secret_key', findProperty("s3SecretKey") ?: System.getenv("AWS_SECRET_ACCESS_KEY") ?: 'test'


// Remote store configuration
setting 'node.attr.remote_store.segment.repository', 'my-s3-repo'
setting 'node.attr.remote_store.translog.repository', 'my-s3-repo'
setting 'node.attr.remote_store.state.repository', 'my-s3-repo'
setting 'cluster.remote_store.state.enabled', 'true'
setting 'node.attr.remote_store.repository.my-s3-repo.type', 's3'
setting 'node.attr.remote_store.repository.my-s3-repo.settings.bucket', 'local-opensearch-bucket'
setting 'node.attr.remote_store.repository.my-s3-repo.settings.base_path', 'raghraaj-local-1230'

// SSE-KMS configuration
if (findProperty("enableSseKms")) {
setting 'node.attr.remote_store.repository.my-s3-repo.settings.server_side_encryption_type', 'aws:kms'
setting 'node.attr.remote_store.repository.my-s3-repo.settings.server_side_encryption_kms_key_id', 'arn:aws:kms:us-east-1:389347062219:key/006ef490-5452-4f4d-8da3-a0cb7344ab59'
setting 'node.attr.remote_store.repository.my-s3-repo.settings.server_side_encryption_bucket_key_enabled', findProperty("sseBucketKeyEnabled") ?: 'true'
setting 'node.attr.remote_store.repository.my-s3-repo.settings.server_side_encryption_encryption_context', '{"identifier":"mustang"}'
}
}
if (findProperty("installedPlugins")) {
installedPlugins = Eval.me(installedPlugins)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,80 +314,80 @@ public void testFormatAwareMetadataReplication() throws Exception {
/**
* Tests that replica can recover from remote store with Parquet files.
*/
// public void testReplicaRecoveryWithParquetFiles() throws Exception {
// internalCluster().startClusterManagerOnlyNode();
// internalCluster().startDataOnlyNodes(2);
// createReplicationIndex(INDEX_NAME, 1);
//
// // Index documents
// for (int i = 0; i < 20; i++) {
// client().prepareIndex(INDEX_NAME)
// .setId(String.valueOf(i))
// .setSource("id", String.valueOf(i), "field", "recovery" + i, "value", (long) i)
// .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
// .get();
// }
//
// String primaryNode = getPrimaryNodeName(INDEX_NAME);
// String replicaNode = getReplicaNodeName(INDEX_NAME);
//
// // Wait for initial replication
// assertBusy(() -> {
// IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME);
// IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME);
// assertEquals(
// primaryShard.getLatestReplicationCheckpoint().getSegmentInfosVersion(),
// replicaShard.getLatestReplicationCheckpoint().getSegmentInfosVersion()
// );
// }, 30, TimeUnit.SECONDS);
//
// // Stop replica node to simulate failure
// internalCluster().restartNode(replicaNode, new InternalTestCluster.RestartCallback() {
// @Override
// public Settings onNodeStopped(String nodeName) throws Exception {
// // Index more documents on primary while replica is down
// try {
// for (int i = 20; i < 40; i++) {
// client().prepareIndex(INDEX_NAME)
// .setId(String.valueOf(i))
// .setSource("id", String.valueOf(i), "field", "after_failure" + i, "value", (long) i)
// .get();
// }
// client().admin().indices().prepareRefresh(INDEX_NAME).get();
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
// return super.onNodeStopped(nodeName);
// }
// });
//
// ensureGreen(INDEX_NAME);
//
// // Verify replica recovered with Parquet files
// assertBusy(() -> {
// IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME);
// IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME);
//
// // Verify checkpoints match after recovery
// assertEquals(
// "Replica should catch up after recovery",
// primaryShard.getLatestReplicationCheckpoint().getSegmentInfosVersion(),
// replicaShard.getLatestReplicationCheckpoint().getSegmentInfosVersion()
// );
//
// // Verify replica has Parquet files
// RemoteSegmentStoreDirectory replicaRemoteDir = replicaShard.getRemoteDirectory();
// Map<String, UploadedSegmentMetadata> replicaSegments =
// replicaRemoteDir.getSegmentsUploadedToRemoteStore();
//
// Set<String> formats = replicaSegments.keySet().stream()
// .map(file -> new FileMetadata(file).dataFormat())
// .collect(Collectors.toSet());
//
// assertTrue("Recovered replica should have Parquet files", formats.contains("parquet"));
//
// }, 60, TimeUnit.SECONDS);
// }
public void testReplicaRecoveryWithParquetFiles() throws Exception {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNodes(2);
createReplicationIndex(INDEX_NAME, 1);

// Index documents
for (int i = 0; i < 20; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource("id", String.valueOf(i), "field", "recovery" + i, "value", (long) i)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
}

String primaryNode = getPrimaryNodeName(INDEX_NAME);
String replicaNode = getReplicaNodeName(INDEX_NAME);

// Wait for initial replication
assertBusy(() -> {
IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME);
IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME);
assertEquals(
primaryShard.getLatestReplicationCheckpoint().getSegmentInfosVersion(),
replicaShard.getLatestReplicationCheckpoint().getSegmentInfosVersion()
);
}, 30, TimeUnit.SECONDS);

// Stop replica node to simulate failure
internalCluster().restartNode(replicaNode, new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
// Index more documents on primary while replica is down
try {
for (int i = 20; i < 40; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource("id", String.valueOf(i), "field", "after_failure" + i, "value", (long) i)
.get();
}
client().admin().indices().prepareRefresh(INDEX_NAME).get();
} catch (Exception e) {
throw new RuntimeException(e);
}
return super.onNodeStopped(nodeName);
}
});

ensureGreen(INDEX_NAME);

// Verify replica recovered with Parquet files
assertBusy(() -> {
IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME);
IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME);

// Verify checkpoints match after recovery
assertEquals(
"Replica should catch up after recovery",
primaryShard.getLatestReplicationCheckpoint().getSegmentInfosVersion(),
replicaShard.getLatestReplicationCheckpoint().getSegmentInfosVersion()
);

// Verify replica has Parquet files
RemoteSegmentStoreDirectory replicaRemoteDir = replicaShard.getRemoteDirectory();
Map<String, UploadedSegmentMetadata> replicaSegments =
replicaRemoteDir.getSegmentsUploadedToRemoteStore();

Set<String> formats = replicaSegments.keySet().stream()
.map(file -> new FileMetadata(file).dataFormat())
.collect(Collectors.toSet());

assertTrue("Recovered replica should have Parquet files", formats.contains("parquet"));

}, 60, TimeUnit.SECONDS);
}

/**
* Tests that ReplicationCheckpoint contains format-aware metadata.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package com.parquet.parquetdataformat.vsr;

import com.parquet.parquetdataformat.bridge.ArrowExport;
import com.parquet.parquetdataformat.bridge.ParquetFileMetadata;
import com.parquet.parquetdataformat.bridge.RustBridge;
import com.parquet.parquetdataformat.memory.ArrowBufferPool;
import com.parquet.parquetdataformat.writer.ParquetDocumentInput;
Expand Down Expand Up @@ -90,8 +91,8 @@ public void testVSRManagerInitializationAndActiveVSR() throws Exception {

// Flush before close (transitions VSR to FROZEN)
FlushIn flushIn = Mockito.mock(FlushIn.class);
String flushResult = vsrManager.flush(flushIn);
assertEquals("Flush should return filename", testFileName, flushResult);
ParquetFileMetadata flushResult = vsrManager.flush(flushIn);
assertNotNull("Flush should return metadata", flushResult);
assertEquals("VSR should be FROZEN after flush", VSRState.FROZEN, vsrManager.getActiveManagedVSR().getState());

// Now close should succeed
Expand Down Expand Up @@ -125,8 +126,8 @@ public void testDocumentAdditionThroughVSRManager() throws Exception {
// Follow proper VSRManager lifecycle: Write → Flush → Close
// Flush before close (transitions VSR to FROZEN)
FlushIn flushIn = Mockito.mock(FlushIn.class);
String flushResult = vsrManager.flush(flushIn);
assertEquals("Flush should return filename", testFileName, flushResult);
ParquetFileMetadata flushResult = vsrManager.flush(flushIn);
assertNotNull("Flush should return metadata", flushResult);
assertEquals("VSR should be FROZEN after flush", VSRState.FROZEN, vsrManager.getActiveManagedVSR().getState());

// Now close should succeed
Expand All @@ -142,9 +143,9 @@ public void testFlushThroughVSRManager() throws Exception {

// Flush through VSRManager (create mock FlushIn)
FlushIn flushIn = Mockito.mock(FlushIn.class);
String result = vsrManager.flush(flushIn);
ParquetFileMetadata result = vsrManager.flush(flushIn);

assertEquals("Flush should return filename", testFileName, result);
assertNotNull("Flush should return metadata", result);

// VSR should be FROZEN after flush
assertEquals("VSR should be FROZEN after flush",
Expand All @@ -166,9 +167,9 @@ public void testVSRManagerStateTransitionWorkflow() throws Exception {

// 3. Flush - should transition VSR to FROZEN
FlushIn flushIn = Mockito.mock(FlushIn.class);
String flushResult = vsrManager.flush(flushIn);
ParquetFileMetadata flushResult = vsrManager.flush(flushIn);

assertEquals("Flush should return filename", testFileName, flushResult);
assertNotNull("Flush should return metadata", flushResult);
assertEquals("VSR should be FROZEN after flush", VSRState.FROZEN, vsrManager.getActiveManagedVSR().getState());
assertTrue("VSR should be immutable when frozen", vsrManager.getActiveManagedVSR().isImmutable());

Expand Down
6 changes: 6 additions & 0 deletions plugins/engine-datafusion/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ test {
systemProperty 'java.library.path', file('src/main/resources/native').absolutePath
}

internalClusterTest {
// Add same JVM arguments for integration tests
jvmArgs += ["--add-opens", "java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED"]
systemProperty 'java.library.path', file('src/main/resources/native').absolutePath
}

yamlRestTest {
systemProperty 'tests.security.manager', 'false'
// Disable yamlRestTest since this plugin doesn't have REST API endpoints
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import java.util.HashSet;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.lucene.search.ReferenceManager;
import org.opensearch.index.engine.CatalogSnapshotAwareRefreshListener;
import org.opensearch.index.engine.EngineReaderManager;
import org.opensearch.index.engine.FileDeletionListener;
Expand Down
Loading
Loading