Skip to content

Commit 4f97c1d

Browse files
author
Rajat Gupta
committed
Backport to 3.1 for channelFactory param in translog
Signed-off-by: Rajat Gupta <[email protected]>
1 parent fa58fd4 commit 4f97c1d

File tree

6 files changed

+111
-68
lines changed

6 files changed

+111
-68
lines changed

server/src/main/java/org/opensearch/index/translog/LocalTranslog.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,18 @@ public LocalTranslog(
4949
TranslogDeletionPolicy deletionPolicy,
5050
final LongSupplier globalCheckpointSupplier,
5151
final LongSupplier primaryTermSupplier,
52-
final LongConsumer persistedSequenceNumberConsumer
52+
final LongConsumer persistedSequenceNumberConsumer,
53+
final ChannelFactory channelFactory
5354
) throws IOException {
54-
super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer);
55+
super(
56+
config,
57+
translogUUID,
58+
deletionPolicy,
59+
globalCheckpointSupplier,
60+
primaryTermSupplier,
61+
persistedSequenceNumberConsumer,
62+
channelFactory
63+
);
5564
try {
5665
final Checkpoint checkpoint = readCheckpoint(location);
5766
final Path nextTranslogFile = location.resolve(getFilename(checkpoint.generation + 1));
@@ -104,6 +113,20 @@ public LocalTranslog(
104113
}
105114
}
106115

116+
/**
117+
* Secondary constructor that does not accept ChannelFactory parameter.
118+
*/
119+
public LocalTranslog(
120+
final TranslogConfig config,
121+
final String translogUUID,
122+
TranslogDeletionPolicy deletionPolicy,
123+
final LongSupplier globalCheckpointSupplier,
124+
final LongSupplier primaryTermSupplier,
125+
final LongConsumer persistedSequenceNumberConsumer
126+
) throws IOException {
127+
this(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer, null);
128+
}
129+
107130
/**
108131
* Ensures that the given location has be synced / written to the underlying storage.
109132
*

server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,18 @@ public RemoteFsTranslog(
107107
ThreadPool threadPool,
108108
BooleanSupplier startedPrimarySupplier,
109109
RemoteTranslogTransferTracker remoteTranslogTransferTracker,
110-
RemoteStoreSettings remoteStoreSettings
110+
RemoteStoreSettings remoteStoreSettings,
111+
ChannelFactory channelFactory
111112
) throws IOException {
112-
super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer);
113+
super(
114+
config,
115+
translogUUID,
116+
deletionPolicy,
117+
globalCheckpointSupplier,
118+
primaryTermSupplier,
119+
persistedSequenceNumberConsumer,
120+
channelFactory
121+
);
113122
logger = Loggers.getLogger(getClass(), shardId);
114123
this.startedPrimarySupplier = startedPrimarySupplier;
115124
this.remoteTranslogTransferTracker = remoteTranslogTransferTracker;
@@ -167,6 +176,35 @@ public RemoteFsTranslog(
167176
}
168177
}
169178

179+
public RemoteFsTranslog(
180+
TranslogConfig config,
181+
String translogUUID,
182+
TranslogDeletionPolicy deletionPolicy,
183+
LongSupplier globalCheckpointSupplier,
184+
LongSupplier primaryTermSupplier,
185+
LongConsumer persistedSequenceNumberConsumer,
186+
BlobStoreRepository blobStoreRepository,
187+
ThreadPool threadPool,
188+
BooleanSupplier startedPrimarySupplier,
189+
RemoteTranslogTransferTracker remoteTranslogTransferTracker,
190+
RemoteStoreSettings remoteStoreSettings
191+
) throws IOException {
192+
this(
193+
config,
194+
translogUUID,
195+
deletionPolicy,
196+
globalCheckpointSupplier,
197+
primaryTermSupplier,
198+
persistedSequenceNumberConsumer,
199+
blobStoreRepository,
200+
threadPool,
201+
startedPrimarySupplier,
202+
remoteTranslogTransferTracker,
203+
remoteStoreSettings,
204+
null
205+
);
206+
}
207+
170208
// visible for testing
171209
RemoteTranslogTransferTracker getRemoteTranslogTracker() {
172210
return remoteTranslogTransferTracker;

server/src/main/java/org/opensearch/index/translog/Translog.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ public abstract class Translog extends AbstractIndexShardComponent implements In
153153
protected final String translogUUID;
154154
protected final TranslogDeletionPolicy deletionPolicy;
155155
protected final LongConsumer persistedSequenceNumberConsumer;
156+
protected final ChannelFactory channelFactory;
156157

157158
/**
158159
* Creates a new Translog instance. This method will create a new transaction log unless the given {@link TranslogGeneration} is
@@ -172,14 +173,16 @@ public abstract class Translog extends AbstractIndexShardComponent implements In
172173
* and reject operation whose term is higher than the primary term stored in the translog header.
173174
* @param persistedSequenceNumberConsumer a callback that's called whenever an operation with a given sequence number is successfully
174175
* persisted.
176+
* @param channelFactory a custom channel factory that could be used instead of default OPEN File Channel.
175177
*/
176178
public Translog(
177179
final TranslogConfig config,
178180
final String translogUUID,
179181
TranslogDeletionPolicy deletionPolicy,
180182
final LongSupplier globalCheckpointSupplier,
181183
final LongSupplier primaryTermSupplier,
182-
final LongConsumer persistedSequenceNumberConsumer
184+
final LongConsumer persistedSequenceNumberConsumer,
185+
final ChannelFactory channelFactory
183186
) throws IOException {
184187
super(config.getShardId(), config.getIndexSettings());
185188
this.config = config;
@@ -194,6 +197,21 @@ public Translog(
194197
writeLock = new ReleasableLock(rwl.writeLock());
195198
this.location = config.getTranslogPath();
196199
Files.createDirectories(this.location);
200+
this.channelFactory = channelFactory != null ? channelFactory : FileChannel::open;
201+
}
202+
203+
/**
204+
* Constructor that does not accept channelFactory parameter
205+
*/
206+
public Translog(
207+
final TranslogConfig config,
208+
final String translogUUID,
209+
TranslogDeletionPolicy deletionPolicy,
210+
final LongSupplier globalCheckpointSupplier,
211+
final LongSupplier primaryTermSupplier,
212+
final LongConsumer persistedSequenceNumberConsumer
213+
) throws IOException {
214+
this(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer, null);
197215
}
198216

199217
/** recover all translog files found on disk */
@@ -296,7 +314,7 @@ protected void copyCheckpointTo(Path targetPath) throws IOException {
296314
}
297315

298316
TranslogReader openReader(Path path, Checkpoint checkpoint) throws IOException {
299-
FileChannel channel = FileChannel.open(path, StandardOpenOption.READ);
317+
FileChannel channel = getChannelFactory().open(path, StandardOpenOption.READ);
300318
try {
301319
assert Translog.parseIdFromFileName(path) == checkpoint.generation : "expected generation: "
302320
+ Translog.parseIdFromFileName(path)
@@ -1901,7 +1919,7 @@ protected void ensureOpen() {
19011919
}
19021920

19031921
ChannelFactory getChannelFactory() {
1904-
return FileChannel::open;
1922+
return this.channelFactory;
19051923
}
19061924

19071925
/**

server/src/test/java/org/opensearch/index/translog/LocalTranslogTests.java

Lines changed: 13 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1504,13 +1504,9 @@ public int write(ByteBuffer src) throws IOException {
15041504
new DefaultTranslogDeletionPolicy(-1, -1, 0),
15051505
() -> SequenceNumbers.NO_OPS_PERFORMED,
15061506
primaryTerm::get,
1507-
persistedSeqNos::add
1508-
) {
1509-
@Override
1510-
ChannelFactory getChannelFactory() {
1511-
return channelFactory;
1512-
}
1513-
}
1507+
persistedSeqNos::add,
1508+
channelFactory
1509+
)
15141510
) {
15151511
TranslogWriter writer = translog.getCurrent();
15161512
int initialWriteCalls = writeCalls.get();
@@ -1609,13 +1605,9 @@ public void force(boolean metaData) throws IOException {
16091605
new DefaultTranslogDeletionPolicy(-1, -1, 0),
16101606
() -> SequenceNumbers.NO_OPS_PERFORMED,
16111607
primaryTerm::get,
1612-
persistedSeqNos::add
1613-
) {
1614-
@Override
1615-
ChannelFactory getChannelFactory() {
1616-
return channelFactory;
1617-
}
1618-
}
1608+
persistedSeqNos::add,
1609+
channelFactory
1610+
)
16191611
) {
16201612
TranslogWriter writer = translog.getCurrent();
16211613
byte[] bytes = new byte[256];
@@ -1706,13 +1698,9 @@ public void force(boolean metaData) throws IOException {
17061698
new DefaultTranslogDeletionPolicy(-1, -1, 0),
17071699
() -> SequenceNumbers.NO_OPS_PERFORMED,
17081700
primaryTerm::get,
1709-
persistedSeqNos::add
1710-
) {
1711-
@Override
1712-
ChannelFactory getChannelFactory() {
1713-
return channelFactory;
1714-
}
1715-
}
1701+
persistedSeqNos::add,
1702+
channelFactory
1703+
)
17161704
) {
17171705
TranslogWriter writer = translog.getCurrent();
17181706

@@ -2983,13 +2971,9 @@ private Translog getFailableTranslog(
29832971
deletionPolicy,
29842972
() -> SequenceNumbers.NO_OPS_PERFORMED,
29852973
primaryTerm::get,
2986-
seqNo -> {}
2974+
seqNo -> {},
2975+
channelFactory
29872976
) {
2988-
@Override
2989-
ChannelFactory getChannelFactory() {
2990-
return channelFactory;
2991-
}
2992-
29932977
@Override
29942978
void deleteReaderFiles(TranslogReader reader) {
29952979
if (fail.fail()) {
@@ -4068,13 +4052,9 @@ public void force(boolean metaData) throws IOException {
40684052
createTranslogDeletionPolicy(),
40694053
() -> SequenceNumbers.NO_OPS_PERFORMED,
40704054
primaryTerm::get,
4071-
seqNo -> {}
4055+
seqNo -> {},
4056+
channelFactory
40724057
) {
4073-
@Override
4074-
ChannelFactory getChannelFactory() {
4075-
return channelFactory;
4076-
}
4077-
40784058
@Override
40794059
void syncBeforeRollGeneration() {
40804060
// make it a noop like the old versions

server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -619,13 +619,9 @@ public void testExtraGenToKeep() throws Exception {
619619
threadPool,
620620
() -> Boolean.TRUE,
621621
new RemoteTranslogTransferTracker(shardId, 10),
622-
DefaultRemoteStoreSettings.INSTANCE
623-
) {
624-
@Override
625-
ChannelFactory getChannelFactory() {
626-
return channelFactory;
627-
}
628-
}
622+
DefaultRemoteStoreSettings.INSTANCE,
623+
channelFactory
624+
)
629625
) {
630626
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 }));
631627
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 1, primaryTerm.get(), new byte[] { 1 }));

server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -473,13 +473,9 @@ public void testExtraGenToKeep() throws Exception {
473473
threadPool,
474474
() -> Boolean.TRUE,
475475
new RemoteTranslogTransferTracker(shardId, 10),
476-
DefaultRemoteStoreSettings.INSTANCE
477-
) {
478-
@Override
479-
ChannelFactory getChannelFactory() {
480-
return channelFactory;
481-
}
482-
}
476+
DefaultRemoteStoreSettings.INSTANCE,
477+
channelFactory
478+
)
483479
) {
484480
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 }));
485481

@@ -1522,13 +1518,9 @@ public void testTranslogWriterCanFlushInAddOrReadCall() throws IOException {
15221518
threadPool,
15231519
() -> Boolean.TRUE,
15241520
new RemoteTranslogTransferTracker(shardId, 10),
1525-
DefaultRemoteStoreSettings.INSTANCE
1526-
) {
1527-
@Override
1528-
ChannelFactory getChannelFactory() {
1529-
return channelFactory;
1530-
}
1531-
}
1521+
DefaultRemoteStoreSettings.INSTANCE,
1522+
channelFactory
1523+
)
15321524
) {
15331525
TranslogWriter writer = translog.getCurrent();
15341526
int initialWriteCalls = writeCalls.get();
@@ -1632,13 +1624,9 @@ public void force(boolean metaData) throws IOException {
16321624
threadPool,
16331625
() -> Boolean.TRUE,
16341626
new RemoteTranslogTransferTracker(shardId, 10),
1635-
DefaultRemoteStoreSettings.INSTANCE
1636-
) {
1637-
@Override
1638-
ChannelFactory getChannelFactory() {
1639-
return channelFactory;
1640-
}
1641-
}
1627+
DefaultRemoteStoreSettings.INSTANCE,
1628+
channelFactory
1629+
)
16421630
) {
16431631
TranslogWriter writer = translog.getCurrent();
16441632
byte[] bytes = new byte[256];

0 commit comments

Comments
 (0)