Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add StoreFactory plugin interface for custom Store implementations([#19091](https://github.com/opensearch-project/OpenSearch/pull/19091))
- Use S3CrtClient for higher throughput while uploading files to S3 ([#18800](https://github.com/opensearch-project/OpenSearch/pull/18800))
- Add a dynamic setting to change skip_cache_factor and min_frequency for querycache ([#18351](https://github.com/opensearch-project/OpenSearch/issues/18351))
- Add overload constructor for Translog to accept Channel Factory as a parameter ([#18918](https://github.com/opensearch-project/OpenSearch/pull/18918))

### Changed
- Add CompletionStage variants to methods in the Client Interface and default to ActionListener impl ([#18998](https://github.com/opensearch-project/OpenSearch/pull/18998))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public Translog newTranslog(
globalCheckpointSupplier,
primaryTermSupplier,
persistedSequenceNumberConsumer,
TranslogOperationHelper.DEFAULT
TranslogOperationHelper.DEFAULT,
null
);
}

Expand All @@ -64,7 +65,8 @@ public Translog newTranslog(
globalCheckpointSupplier,
primaryTermSupplier,
persistedSequenceNumberConsumer,
translogOperationHelper
translogOperationHelper,
null
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ public LocalTranslog(
final LongSupplier globalCheckpointSupplier,
final LongSupplier primaryTermSupplier,
final LongConsumer persistedSequenceNumberConsumer,
final TranslogOperationHelper translogOperationHelper
final TranslogOperationHelper translogOperationHelper,
final ChannelFactory channelFactory
) throws IOException {
super(
config,
Expand All @@ -59,7 +60,8 @@ public LocalTranslog(
globalCheckpointSupplier,
primaryTermSupplier,
persistedSequenceNumberConsumer,
translogOperationHelper
translogOperationHelper,
channelFactory
);
try {
final Checkpoint checkpoint = readCheckpoint(location);
Expand Down Expand Up @@ -113,6 +115,30 @@ public LocalTranslog(
}
}

/**
* Secondary constructor that does not accept ChannelFactory parameter.
*/
public LocalTranslog(
final TranslogConfig config,
final String translogUUID,
TranslogDeletionPolicy deletionPolicy,
final LongSupplier globalCheckpointSupplier,
final LongSupplier primaryTermSupplier,
final LongConsumer persistedSequenceNumberConsumer,
final TranslogOperationHelper translogOperationHelper
) throws IOException {
this(
config,
translogUUID,
deletionPolicy,
globalCheckpointSupplier,
primaryTermSupplier,
persistedSequenceNumberConsumer,
translogOperationHelper,
null
);
}

/**
* Ensures that the given location has be synced / written to the underlying storage.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ public Translog newTranslog(
startedPrimarySupplier,
remoteTranslogTransferTracker,
remoteStoreSettings,
translogOperationHelper
translogOperationHelper,
null
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ public RemoteFsTimestampAwareTranslog(
startedPrimarySupplier,
remoteTranslogTransferTracker,
remoteStoreSettings,
translogOperationHelper
translogOperationHelper,
null
);
logger = Loggers.getLogger(getClass(), shardId);
this.metadataFilePinnedTimestampMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ public RemoteFsTranslog(
BooleanSupplier startedPrimarySupplier,
RemoteTranslogTransferTracker remoteTranslogTransferTracker,
RemoteStoreSettings remoteStoreSettings,
TranslogOperationHelper translogOperationHelper
TranslogOperationHelper translogOperationHelper,
ChannelFactory channelFactory
) throws IOException {
super(
config,
Expand All @@ -117,7 +118,8 @@ public RemoteFsTranslog(
globalCheckpointSupplier,
primaryTermSupplier,
persistedSequenceNumberConsumer,
translogOperationHelper
translogOperationHelper,
channelFactory
);
logger = Loggers.getLogger(getClass(), shardId);
this.startedPrimarySupplier = startedPrimarySupplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public abstract class Translog extends AbstractIndexShardComponent implements In
protected final TranslogDeletionPolicy deletionPolicy;
protected final LongConsumer persistedSequenceNumberConsumer;
protected final TranslogOperationHelper translogOperationHelper;
protected final ChannelFactory channelFactory;

/**
* Creates a new Translog instance. This method will create a new transaction log unless the given {@link TranslogGeneration} is
Expand Down Expand Up @@ -182,7 +183,8 @@ public Translog(
final LongSupplier globalCheckpointSupplier,
final LongSupplier primaryTermSupplier,
final LongConsumer persistedSequenceNumberConsumer,
final TranslogOperationHelper translogOperationHelper
final TranslogOperationHelper translogOperationHelper,
final ChannelFactory channelFactory
) throws IOException {
super(config.getShardId(), config.getIndexSettings());
this.config = config;
Expand All @@ -198,6 +200,31 @@ public Translog(
this.location = config.getTranslogPath();
Files.createDirectories(this.location);
this.translogOperationHelper = translogOperationHelper;
this.channelFactory = channelFactory != null ? channelFactory : FileChannel::open;
}

/**
* Constructor that does not accept channelFactory parameter but accepts translogOperationHelper
*/
public Translog(
final TranslogConfig config,
final String translogUUID,
TranslogDeletionPolicy deletionPolicy,
final LongSupplier globalCheckpointSupplier,
final LongSupplier primaryTermSupplier,
final LongConsumer persistedSequenceNumberConsumer,
final TranslogOperationHelper translogOperationHelper
) throws IOException {
this(
config,
translogUUID,
deletionPolicy,
globalCheckpointSupplier,
primaryTermSupplier,
persistedSequenceNumberConsumer,
translogOperationHelper,
null
);
}

/**
Expand All @@ -218,7 +245,8 @@ public Translog(
globalCheckpointSupplier,
primaryTermSupplier,
persistedSequenceNumberConsumer,
TranslogOperationHelper.DEFAULT
TranslogOperationHelper.DEFAULT,
FileChannel::open
);
assert config.getIndexSettings().isDerivedSourceEnabled() == false; // For derived source supported index, it is incorrect to use
// this constructor
Expand Down Expand Up @@ -324,7 +352,7 @@ protected void copyCheckpointTo(Path targetPath) throws IOException {
}

TranslogReader openReader(Path path, Checkpoint checkpoint) throws IOException {
FileChannel channel = FileChannel.open(path, StandardOpenOption.READ);
FileChannel channel = getChannelFactory().open(path, StandardOpenOption.READ);
try {
assert Translog.parseIdFromFileName(path) == checkpoint.generation : "expected generation: "
+ Translog.parseIdFromFileName(path)
Expand Down Expand Up @@ -1931,7 +1959,7 @@ protected void ensureOpen() {
}

ChannelFactory getChannelFactory() {
return FileChannel::open;
return this.channelFactory;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ public long minTranslogGenRequired(List<TranslogReader> readers, TranslogWriter
() -> translogGlobalCheckpoint,
() -> primaryTerm,
seqNo -> {},
TranslogOperationHelper.DEFAULT
TranslogOperationHelper.DEFAULT,
null
);
Translog.Snapshot snapshot = translog.newSnapshot(0, Long.MAX_VALUE)
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4182,7 +4182,8 @@ public void testRecoverFromForeignTranslog() throws IOException {
() -> SequenceNumbers.NO_OPS_PERFORMED,
primaryTerm::get,
seqNo -> {},
TranslogOperationHelper.DEFAULT
TranslogOperationHelper.DEFAULT,
null
);
translog.add(new Translog.Index("SomeBogusId", 0, primaryTerm.get(), "{}".getBytes(Charset.forName("UTF-8"))));
assertEquals(generation.translogFileGeneration, translog.currentFileGeneration());
Expand Down
Loading
Loading