Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use RemoteSegmentStoreDirectory instead of RemoteDirectory #4240

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Github workflow for changelog verification ([#4085](https://github.com/opensearch-project/OpenSearch/pull/4085))

### Changed
- Dependency updates (httpcore, mockito, slf4j, httpasyncclient, commons-codec) ([#4308](https://github.com/opensearch-project/OpenSearch/pull/4308))
- Dependency updates (httpcore, mockito, slf4j, httpasyncclient, commons-codec) ([#4308](https://github.com/opensearch-project/OpenSearch/pull/4308))
- Use RemoteSegmentStoreDirectory instead of RemoteDirectory ([#4240](https://github.com/opensearch-project/OpenSearch/pull/4240))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.RemoteDirectoryFactory;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
Expand Down Expand Up @@ -487,7 +486,7 @@ public IndexService newIndexService(
NamedWriteableRegistry namedWriteableRegistry,
BooleanSupplier idFieldDataEnabled,
ValuesSourceRegistry valuesSourceRegistry,
RemoteDirectoryFactory remoteDirectoryFactory
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory
) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.UsageTrackingQueryCachingPolicy;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.util.SetOnce;
import org.apache.lucene.util.ThreadInterruptedException;
import org.opensearch.Assertions;
Expand Down Expand Up @@ -3228,8 +3226,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
if (isRemoteStoreEnabled()) {
Directory remoteDirectory = ((FilterDirectory) ((FilterDirectory) remoteStore.directory()).getDelegate()).getDelegate();
internalRefreshListener.add(new RemoteStoreRefreshListener(store.directory(), remoteDirectory));
internalRefreshListener.add(new RemoteStoreRefreshListener(this));
}
if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary()) {
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,54 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.index.engine.EngineException;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;

import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

/**
* RefreshListener implementation to upload newly created segment files to the remote store
*
* @opensearch.internal
*/
public class RemoteStoreRefreshListener implements ReferenceManager.RefreshListener {
public final class RemoteStoreRefreshListener implements ReferenceManager.RefreshListener {
// Visible for testing
static final Set<String> EXCLUDE_FILES = Set.of("write.lock");
// Visible for testing
static final int LAST_N_METADATA_FILES_TO_KEEP = 10;

private final IndexShard indexShard;
private final Directory storeDirectory;
private final Directory remoteDirectory;
// ToDo: This can be a map with metadata of the uploaded file as value of the map (GitHub #3398)
private final Set<String> filesUploadedToRemoteStore;
private final RemoteSegmentStoreDirectory remoteDirectory;
private final Map<String, String> localSegmentChecksumMap;
private long primaryTerm;
private static final Logger logger = LogManager.getLogger(RemoteStoreRefreshListener.class);

public RemoteStoreRefreshListener(Directory storeDirectory, Directory remoteDirectory) throws IOException {
this.storeDirectory = storeDirectory;
this.remoteDirectory = remoteDirectory;
// ToDo: Handle failures in reading list of files (GitHub #3397)
this.filesUploadedToRemoteStore = new HashSet<>(Arrays.asList(remoteDirectory.listAll()));
public RemoteStoreRefreshListener(IndexShard indexShard) {
this.indexShard = indexShard;
this.storeDirectory = indexShard.store().directory();
this.remoteDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory())
.getDelegate()).getDelegate();
this.primaryTerm = indexShard.getOperationPrimaryTerm();
localSegmentChecksumMap = new HashMap<>();
Comment on lines -35 to +61
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would have preferred individual dependencies as supplier rather than a full IndexShard dependency, to protect against misuse

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I thought about that but there are different IndexShard methods (getOperationPrimaryTerm, getSegmentInfosSnapshot) that are getting called in the afterRefresh() flow.

Created #4316 to track this.

}

@Override
Expand All @@ -46,42 +68,112 @@ public void beforeRefresh() throws IOException {

/**
* Upload new segment files created as part of the last refresh to the remote segment store.
* The method also deletes segment files from remote store which are not part of local filesystem.
* This method also uploads remote_segments_metadata file which contains metadata of each segment file uploaded.
* @param didRefresh true if the refresh opened a new reference
* @throws IOException in case of I/O error in reading list of local files
*/
@Override
public void afterRefresh(boolean didRefresh) throws IOException {
if (didRefresh) {
Set<String> localFiles = Set.of(storeDirectory.listAll());
localFiles.stream().filter(file -> !filesUploadedToRemoteStore.contains(file)).forEach(file -> {
try {
remoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT);
filesUploadedToRemoteStore.add(file);
} catch (NoSuchFileException e) {
logger.info(
() -> new ParameterizedMessage("The file {} does not exist anymore. It can happen in case of temp files", file),
e
);
} catch (IOException e) {
// ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397)
logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", file), e);
}
});
public void afterRefresh(boolean didRefresh) {
synchronized (this) {
try {
if (indexShard.shardRouting.primary()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if index shard was demoted to replica from primary? Shouldn't RemoteStoreRefreshListener reflect that? Otherewise we are risking to lose track of shard state when it is flipping between primary <-> replica

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only primary should be uploading the segments. If replica is demoted, this check makes sure that we stop uploading segments from replica. Ideally, NRTReplicationEngine should take of this. This if condition is added as a safety check.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So in this case the usage of isPrimary property is not useful since it never reflects the actual state of the shard (if demoted from primary). I think we could drop it than

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the shard is demoted and promoted again, we need to make sure that we call RemoteSegmentStoreDirectory.init() to get the latest list of files from remote store (because it could be changed by another primary in the meantime).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will try to go step by step by step:

  • indexShard.shardRouting.primary() == true: if isPrimary was false, we set it to true and proceed with init()
  • the shard flips from primary to replica, indexShard.shardRouting.primary() == false, nothing happens to isPrimary, it is still, true (which is wrong)
  • the shard flips from replica to primary, indexShard.shardRouting.primary() == true and isPrimary is also true - the initialization call to init() won't happen anymore because the flip to replica was not reflected

Does it make sense?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is definitely a bug. Thanks for catching it. I will fix it and will add tests around it as you suggested in another comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reta I changed the implementation and added tests. Please take a look again.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, thanks!

if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) {
this.primaryTerm = indexShard.getOperationPrimaryTerm();
this.remoteDirectory.init();
}
try {
String lastCommittedLocalSegmentFileName = SegmentInfos.getLastCommitSegmentsFileName(storeDirectory);
if (!remoteDirectory.containsFile(
lastCommittedLocalSegmentFileName,
getChecksumOfLocalFile(lastCommittedLocalSegmentFileName)
)) {
deleteStaleCommits();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would clean up stall refresh ? Can we do this in the background?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clean up does not happen at each refresh. It would only be called when refresh is called during flush (as new segments_N file gets created). We can definitely preform stale commit deletion in the background. I have created a tracking issue here: #4315

}
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
Collection<String> refreshedLocalFiles = segmentInfos.files(true);

List<String> segmentInfosFiles = refreshedLocalFiles.stream()
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
.collect(Collectors.toList());
Optional<String> latestSegmentInfos = segmentInfosFiles.stream()
.max(Comparator.comparingLong(IndexFileNames::parseGeneration));

Set<String> remoteFilesToBeDeleted = new HashSet<>();
// ToDo: Instead of deleting files in sync, mark them and delete in async/periodic flow (GitHub #3142)
filesUploadedToRemoteStore.stream().filter(file -> !localFiles.contains(file)).forEach(file -> {
try {
remoteDirectory.deleteFile(file);
remoteFilesToBeDeleted.add(file);
} catch (IOException e) {
// ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397)
logger.warn(() -> new ParameterizedMessage("Exception while deleting file {} from the remote segment store", file), e);
if (latestSegmentInfos.isPresent()) {
refreshedLocalFiles.addAll(SegmentInfos.readCommit(storeDirectory, latestSegmentInfos.get()).files(true));
segmentInfosFiles.stream()
.filter(file -> !file.equals(latestSegmentInfos.get()))
.forEach(refreshedLocalFiles::remove);

boolean uploadStatus = uploadNewSegments(refreshedLocalFiles);
if (uploadStatus) {
remoteDirectory.uploadMetadata(
refreshedLocalFiles,
storeDirectory,
indexShard.getOperationPrimaryTerm(),
segmentInfos.getGeneration()
);
localSegmentChecksumMap.keySet()
.stream()
.filter(file -> !refreshedLocalFiles.contains(file))
.collect(Collectors.toSet())
.forEach(localSegmentChecksumMap::remove);
}
}
} catch (EngineException e) {
Copy link
Collaborator

@reta reta Aug 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we swallow all exceptions, what happens if shard was promoted to primary but segments were not refreshed due to some failures? Should we go back to replica mode? Or should we schedule the retry later on?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the current implementation, we retry segment upload in the next refresh.

If the upload fails, we don't want to demote primary as the failure could be due to issues with remote segment store itself.

logger.warn("Exception while reading SegmentInfosSnapshot", e);
}
} catch (IOException e) {
// We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried
// in the next refresh. This should not affect durability of the indexed data after remote trans-log integration.
logger.warn("Exception while uploading new segments to the remote segment store", e);
}
}
});
} catch (Throwable t) {
logger.error("Exception in RemoteStoreRefreshListener.afterRefresh()", t);
}
}
}

// Visible for testing
boolean uploadNewSegments(Collection<String> localFiles) throws IOException {
AtomicBoolean uploadSuccess = new AtomicBoolean(true);
localFiles.stream().filter(file -> !EXCLUDE_FILES.contains(file)).filter(file -> {
try {
return !remoteDirectory.containsFile(file, getChecksumOfLocalFile(file));
} catch (IOException e) {
logger.info(
"Exception while reading checksum of local segment file: {}, ignoring the exception and re-uploading the file",
file
);
return true;
}
}).forEach(file -> {
try {
remoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT);
} catch (IOException e) {
uploadSuccess.set(false);
// ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397)
logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", file), e);
}
});
return uploadSuccess.get();
Comment on lines +138 to +159
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the upload is happening serially, can we either bulk upload or do concurrent async uploads

}

private String getChecksumOfLocalFile(String file) throws IOException {
if (!localSegmentChecksumMap.containsKey(file)) {
try (IndexInput indexInput = storeDirectory.openInput(file, IOContext.DEFAULT)) {
String checksum = Long.toString(CodecUtil.retrieveChecksum(indexInput));
localSegmentChecksumMap.put(file, checksum);
}
}
return localSegmentChecksumMap.get(file);
}

remoteFilesToBeDeleted.forEach(filesUploadedToRemoteStore::remove);
private void deleteStaleCommits() {
try {
remoteDirectory.deleteStaleSegments(LAST_N_METADATA_FILES_TO_KEEP);
} catch (IOException e) {
logger.info("Exception while deleting stale commits from remote segment store, will retry delete post next commit", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,12 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco
}
indexShard.preRecovery();
indexShard.prepareForIndexRecovery();
final Directory remoteDirectory = remoteStore.directory();
assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory";
FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory();
assert remoteStoreDirectory.getDelegate() instanceof FilterDirectory
: "Store.directory is not enclosing an instance of FilterDirectory";
FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate();
final Directory remoteDirectory = byteSizeCachingStoreDirectory.getDelegate();
final Store store = indexShard.store();
final Directory storeDirectory = store.directory();
store.incRef();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,37 @@ public class RemoteIndexInput extends IndexInput {

private final InputStream inputStream;
private final long size;
private long filePointer;

public RemoteIndexInput(String name, InputStream inputStream, long size) {
super(name);
this.inputStream = inputStream;
this.size = size;
this.filePointer = 0;
}

@Override
public byte readByte() throws IOException {
byte[] buffer = new byte[1];
inputStream.read(buffer);
int numberOfBytesRead = inputStream.read(buffer);
if (numberOfBytesRead != -1) {
filePointer += numberOfBytesRead;
}
return buffer[0];
}

@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
int bytesRead = inputStream.read(b, offset, len);
while (bytesRead > 0 && bytesRead < len) {
len -= bytesRead;
offset += bytesRead;
bytesRead = inputStream.read(b, offset, len);
if (bytesRead == len) {
filePointer += bytesRead;
} else {
while (bytesRead > 0 && bytesRead < len) {
filePointer += bytesRead;
len -= bytesRead;
offset += bytesRead;
bytesRead = inputStream.read(b, offset, len);
}
}
}

Expand All @@ -61,22 +71,25 @@ public long length() {
return size;
}

@Override
public void seek(long pos) throws IOException {
inputStream.skip(pos);
}

/**
* Guaranteed to throw an exception and leave the RemoteIndexInput unmodified.
* This method is not implemented as it is not used for the file transfer to/from the remote store.
*
* @throws UnsupportedOperationException always
*/
@Override
public long getFilePointer() {
public void seek(long pos) throws IOException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, why inputStream.skip(pos); was replaced with UnsupportedOperationException ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation was incorrect. This was pointed by @andrross earlier. seek() should provide functionality to go to the exact point provided by the pos argument. That means pos could be 0 and it will take you to the beginning of the inputStream. This requires special handling of the state which is not required at least for the V1 of remote store.

throw new UnsupportedOperationException();
}

/**
* Returns the current position in this file in terms of number of bytes read so far.
*/
@Override
public long getFilePointer() {
return filePointer;
}

/**
* Guaranteed to throw an exception and leave the RemoteIndexInput unmodified.
* This method is not implemented as it is not used for the file transfer to/from the remote store.
Expand Down
Loading