Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes problems with large snapshots #411

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ratis-proto/src/main/proto/Grpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ service RaftServerProtocolService {
returns(stream ratis.common.AppendEntriesReplyProto) {}

rpc installSnapshot(stream ratis.common.InstallSnapshotRequestProto)
returns(ratis.common.InstallSnapshotReplyProto) {}
returns(stream ratis.common.InstallSnapshotReplyProto) {}
}

service AdminProtocolService {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,9 @@ private synchronized void waitForCommit() throws InterruptedException {
}

private void reload() throws IOException {
Preconditions.assertTrue(stateMachine.getLifeCycleState() == LifeCycle.State.PAUSED);
LifeCycle.State current = stateMachine.getLifeCycleState();

Preconditions.assertTrue(current == LifeCycle.State.NEW || current == LifeCycle.State.PAUSED);

stateMachine.reinitialize();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.Optional;

Expand Down Expand Up @@ -65,7 +66,7 @@ public FileChunkReader(FileInfo info, RaftStorageDirectory directory) throws IOE
public FileChunkProto readFileChunk(int chunkMaxSize) throws IOException {
final long remaining = info.getFileSize() - offset;
final int chunkLength = remaining < chunkMaxSize ? (int) remaining : chunkMaxSize;
final ByteString data = ByteString.readFrom(in, chunkLength);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ByteString.readFrom(in, chunkLength) will read the whole file not just chunkLength

final ByteString data = readChunk(in, chunkLength);

final FileChunkProto proto = FileChunkProto.newBuilder()
.setFilename(relativePath.toString())
Expand All @@ -80,6 +81,32 @@ public FileChunkProto readFileChunk(int chunkMaxSize) throws IOException {
return proto;
}

/**
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code copied from ByteString class

* Blocks until a chunk of the given size can be made from the stream, or EOF is reached. Calls
* read() repeatedly in case the given stream implementation doesn't completely fill the given
* buffer in one read() call.
*
* @return A chunk of the desired size, or else a chunk as large as was available when end of
* stream was reached. Returns null if the given stream had no more data in it.
*/
private static ByteString readChunk(InputStream in, final int chunkSize) throws IOException {
final byte[] buf = new byte[chunkSize];
int bytesRead = 0;
while (bytesRead < chunkSize) {
final int count = in.read(buf, bytesRead, chunkSize - bytesRead);
if (count == -1) {
break;
}
bytesRead += count;
}

if (bytesRead == 0) {
return null;
}

// Always make a copy since InputStream could steal a reference to buf.
return ByteString.copyFrom(buf, 0, bytesRead);
}
@Override
public void close() throws IOException {
in.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.util.UUID;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.ratis.io.CorruptedFileException;
Expand Down Expand Up @@ -63,22 +62,25 @@ public void installSnapshot(StateMachine stateMachine,
final long lastIncludedIndex = snapshotChunkRequest.getTermIndex().getIndex();
final RaftStorageDirectory dir = storage.getStorageDir();

// create a unique temporary directory
final File tmpDir = new File(dir.getTmpDir(), UUID.randomUUID().toString());
Copy link
Author

@alpapad alpapad Jan 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when receiving a snapshot which does not fit in a single chunk, then multiple directories are created, each having a chunk of the snapshot

// create a unique temporary directory based on the request id
final File tmpDir = new File(dir.getTmpDir(), snapshotChunkRequest.getRequestId());
FileUtils.createDirectories(tmpDir);
tmpDir.deleteOnExit();

LOG.info("Installing snapshot:{}, to tmp dir:{}", request, tmpDir);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will log the whole chunk.. Lots of bytes..

LOG.info("Installing snapshot:{}, to tmp dir:{}", snapshotChunkRequest.getRequestId(), tmpDir);

// TODO: Make sure that subsequent requests for the same installSnapshot are coming in order,
// and are not lost when whole request cycle is done. Check requestId and requestIndex here

for (FileChunkProto chunk : snapshotChunkRequest.getFileChunksList()) {
LOG.info("Installing chunk :{} with offset{}, to tmp dir:{} for file {}",
chunk.getChunkIndex(), chunk.getOffset(), tmpDir, chunk.getFilename());
SnapshotInfo pi = stateMachine.getLatestSnapshot();
if (pi != null && pi.getTermIndex().getIndex() >= lastIncludedIndex) {
throw new IOException("There exists snapshot file "
+ pi.getFiles() + " in " + selfId
+ " with endIndex >= lastIncludedIndex " + lastIncludedIndex);
+ " with endIndex (" + pi.getTermIndex().getIndex()
+ ") >= lastIncludedIndex (" + lastIncludedIndex + ")");
}

String fileName = chunk.getFilename(); // this is relative to the root dir
Expand Down Expand Up @@ -132,10 +134,10 @@ public void installSnapshot(StateMachine stateMachine,
}

if (snapshotChunkRequest.getDone()) {
LOG.info("Install snapshot is done, renaming tnp dir:{} to:{}",
LOG.info("Install snapshot is done, moving files from dir:{} to:{}",
tmpDir, dir.getStateMachineDir());
dir.getStateMachineDir().delete();
tmpDir.renameTo(dir.getStateMachineDir());
FileUtils.moveDirectory(tmpDir.toPath(), dir.getStateMachineDir().toPath());
FileUtils.deleteFully(tmpDir);
}
}
}