Skip to content

Commit

Permalink
Changes for remote translog backed by blob store
Browse files Browse the repository at this point in the history
Signed-off-by: Bukhtawar Khan <[email protected]>
  • Loading branch information
Bukhtawar committed Jul 6, 2022
1 parent daef729 commit 5444aac
Show file tree
Hide file tree
Showing 21 changed files with 478 additions and 200 deletions.
12 changes: 6 additions & 6 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -91,10 +92,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.*;

/**
* IndexModule represents the central extension point for index level custom implementations like:
Expand Down Expand Up @@ -475,7 +473,8 @@ public IndexService newIndexService(
NamedWriteableRegistry namedWriteableRegistry,
BooleanSupplier idFieldDataEnabled,
ValuesSourceRegistry valuesSourceRegistry,
RemoteDirectoryFactory remoteDirectoryFactory
RemoteDirectoryFactory remoteDirectoryFactory,
Supplier<RepositoriesService> repositoriesServiceSupplier
) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
Expand Down Expand Up @@ -530,7 +529,8 @@ public IndexService newIndexService(
allowExpensiveQueries,
expressionResolver,
valuesSourceRegistry,
recoveryStateFactory
recoveryStateFactory,
repositoriesServiceSupplier
);
success = true;
return indexService;
Expand Down
10 changes: 9 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.InternalTranslogFactory;
import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.cluster.IndicesClusterStateService;
Expand All @@ -96,6 +98,7 @@
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -172,6 +175,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final IndexNameExpressionResolver expressionResolver;
private final Supplier<Sort> indexSortSupplier;
private final ValuesSourceRegistry valuesSourceRegistry;
private final Supplier<RepositoriesService> repositoriesServiceSupplier;

public IndexService(
IndexSettings indexSettings,
Expand Down Expand Up @@ -203,7 +207,8 @@ public IndexService(
BooleanSupplier allowExpensiveQueries,
IndexNameExpressionResolver expressionResolver,
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
super(indexSettings);
this.allowExpensiveQueries = allowExpensiveQueries;
Expand Down Expand Up @@ -275,6 +280,7 @@ public IndexService(
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
this.repositoriesServiceSupplier = repositoriesServiceSupplier;
updateFsyncTaskIfNecessary();
}

Expand Down Expand Up @@ -553,6 +559,8 @@ public synchronized IndexShard createShard(
() -> globalCheckpointSyncer.accept(shardId),
retentionLeaseSyncer,
circuitBreakerService,
this.indexSettings.isSegRepEnabled() && this.indexSettings.isRemoteStoreEnabled() ? new InternalTranslogFactory() :
new RemoteBlobStoreInternalTranslogFactory(repositoriesServiceSupplier, clusterService.state().metadata().clusterUUID()),
this.indexSettings.isSegRepEnabled() && routing.primary() ? checkpointPublisher : null,
remoteStore
);
Expand Down
20 changes: 18 additions & 2 deletions server/src/main/java/org/opensearch/index/engine/EngineConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

package org.opensearch.index.engine;

import com.sun.jna.StringArray;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.MergePolicy;
Expand All @@ -51,8 +52,10 @@
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.InternalTranslogFactory;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogDeletionPolicyFactory;
import org.opensearch.index.translog.TranslogFactory;
import org.opensearch.indices.IndexingMemoryController;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -148,6 +151,8 @@ public Supplier<RetentionLeases> retentionLeasesSupplier() {
Property.Dynamic
);

private final TranslogFactory translogFactory;

private final TranslogConfig translogConfig;

public EngineConfig(
Expand Down Expand Up @@ -253,7 +258,8 @@ public EngineConfig(
retentionLeasesSupplier,
primaryTermSupplier,
tombstoneDocSupplier,
false
false,
new InternalTranslogFactory()
);
}

Expand Down Expand Up @@ -284,7 +290,8 @@ public EngineConfig(
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier,
boolean isReadOnlyReplica
boolean isReadOnlyReplica,
TranslogFactory translogFactory
) {
if (isReadOnlyReplica && indexSettings.isSegRepEnabled() == false) {
throw new IllegalArgumentException("Shard can only be wired as a read only replica with Segment Replication enabled");
Expand Down Expand Up @@ -328,6 +335,7 @@ public EngineConfig(
this.primaryTermSupplier = primaryTermSupplier;
this.tombstoneDocSupplier = tombstoneDocSupplier;
this.isReadOnlyReplica = isReadOnlyReplica;
this.translogFactory = translogFactory;
}

/**
Expand Down Expand Up @@ -532,6 +540,14 @@ public boolean isReadOnlyReplica() {
return indexSettings.isSegRepEnabled() && isReadOnlyReplica;
}

/**
* Returns the underlying translog factory
* @return the translog factory
*/
public TranslogFactory getTranslogFactory() {
return translogFactory;
}

/**
* A supplier supplies tombstone documents which will be used in soft-update methods.
* The returned document consists only _uid, _seqno, _term and _version fields; other metadata fields are excluded.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogDeletionPolicyFactory;
import org.opensearch.index.translog.TranslogFactory;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.plugins.EnginePlugin;
import org.opensearch.plugins.PluginsService;
Expand Down Expand Up @@ -147,7 +148,8 @@ public EngineConfig newEngineConfig(
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier,
boolean isReadOnlyReplica
boolean isReadOnlyReplica,
TranslogFactory translogFactory
) {
CodecService codecServiceToUse = codecService;
if (codecService == null && this.codecServiceFactory != null) {
Expand Down Expand Up @@ -178,7 +180,8 @@ public EngineConfig newEngineConfig(
retentionLeasesSupplier,
primaryTermSupplier,
tombstoneDocSupplier,
isReadOnlyReplica
isReadOnlyReplica,
translogFactory
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@

package org.opensearch.index.engine;

import org.opensearch.index.translog.TranslogFactory;

/**
* Simple Engine Factory
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,7 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.OpenSearchMergePolicy;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogCorruptedException;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogManager;
import org.opensearch.index.translog.TranslogException;
import org.opensearch.index.translog.InternalTranslogManager;
import org.opensearch.index.translog.*;
import org.opensearch.index.translog.listener.TranslogEventListener;
import org.opensearch.index.translog.listener.CompositeTranslogEventListener;
import org.opensearch.search.suggest.completion.CompletionStats;
Expand Down Expand Up @@ -289,7 +284,8 @@ public void onFailure(String reason, Exception ex) {
() -> getLocalCheckpointTracker(),
translogUUID,
new CompositeTranslogEventListener(Arrays.asList(internalTranslogEventListener, translogEventListener), shardId),
this::ensureOpen
this::ensureOpen,
config().getTranslogFactory()
);
this.translogManager = translogManagerRef;
this.softDeletesPolicy = newSoftDeletesPolicy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ public void onAfterTranslogSync() {
}
}
},
this
this,
config().getTranslogFactory()
);
this.translogManager = translogManagerRef;
} catch (IOException e) {
Expand Down
10 changes: 2 additions & 8 deletions server/src/main/java/org/opensearch/index/engine/NoOpEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,7 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.DocsStats;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogManager;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogException;
import org.opensearch.index.translog.NoOpTranslogManager;
import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.*;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -195,7 +189,7 @@ public void trimUnreferencedTranslogFiles() throws TranslogException {
final TranslogDeletionPolicy translogDeletionPolicy = new DefaultTranslogDeletionPolicy(-1, -1, 0);
translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
try (
Translog translog = new Translog(
Translog translog = engineConfig.getTranslogFactory().newTranslog(
translogConfig,
translogUuid,
translogDeletionPolicy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm
final long localCheckpoint = Long.parseLong(infos.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
try (
Translog translog = new Translog(
Translog translog = config.getTranslogFactory().newTranslog(
translogConfig,
translogUuid,
translogDeletionPolicy,
Expand Down
11 changes: 6 additions & 5 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,7 @@
import org.opensearch.index.store.Store.MetadataSnapshot;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.store.StoreStats;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogRecoveryRunner;
import org.opensearch.index.translog.TranslogStats;
import org.opensearch.index.translog.*;
import org.opensearch.index.warmer.ShardIndexWarmerService;
import org.opensearch.index.warmer.WarmerStats;
import org.opensearch.indices.IndexingMemoryController;
Expand Down Expand Up @@ -308,6 +305,7 @@ Runnable getGlobalCheckpointSyncer() {
private final ReferenceManager.RefreshListener checkpointRefreshListener;

private final Store remoteStore;
private final TranslogFactory translogFactory;

public IndexShard(
final ShardRouting shardRouting,
Expand All @@ -330,6 +328,7 @@ public IndexShard(
final Runnable globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final CircuitBreakerService circuitBreakerService,
final TranslogFactory translogFactory,
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher,
@Nullable final Store remoteStore
) throws IOException {
Expand Down Expand Up @@ -420,6 +419,7 @@ public boolean shouldCache(Query query) {
this.checkpointRefreshListener = null;
}
this.remoteStore = remoteStore;
this.translogFactory = translogFactory;
}

public ThreadPool getThreadPool() {
Expand Down Expand Up @@ -3242,7 +3242,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
replicationTracker::getRetentionLeases,
() -> getOperationPrimaryTerm(),
tombstoneDocSupplier(),
indexSettings.isSegRepEnabled() && shardRouting.primary() == false
indexSettings.isSegRepEnabled() && shardRouting.primary() == false,
translogFactory
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.translog;

import java.io.IOException;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;

public class InternalTranslogFactory implements TranslogFactory {

@Override
public Translog newTranslog(TranslogConfig translogConfig,
String translogUUID,
TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier,
LongSupplier primaryTermSupplier,
LongConsumer persistedSequenceNumberConsumer) throws IOException {

return new LocalTranslog(
translogConfig,
translogUUID,
translogDeletionPolicy,
globalCheckpointSupplier,
primaryTermSupplier,
persistedSequenceNumberConsumer
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public InternalTranslogManager(
Supplier<LocalCheckpointTracker> localCheckpointTrackerSupplier,
String translogUUID,
TranslogEventListener translogEventListener,
LifecycleAware engineLifeCycleAware
LifecycleAware engineLifeCycleAware,
TranslogFactory translogFactory
) throws IOException {
this.shardId = shardId;
this.readLock = readLock;
Expand All @@ -67,7 +68,7 @@ public InternalTranslogManager(
if (tracker != null) {
tracker.markSeqNoAsPersisted(seqNo);
}
}, translogUUID);
}, translogUUID, translogFactory);
assert translog.getGeneration() != null;
this.translog = translog;
assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it";
Expand Down Expand Up @@ -333,10 +334,11 @@ protected Translog openTranslog(
TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier,
LongConsumer persistedSequenceNumberConsumer,
String translogUUID
String translogUUID,
TranslogFactory translogFactory
) throws IOException {

return new Translog(
return translogFactory.newTranslog(
translogConfig,
translogUUID,
translogDeletionPolicy,
Expand Down
Loading

0 comments on commit 5444aac

Please sign in to comment.