Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,17 @@
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.BufferedReader;
Expand All @@ -59,6 +63,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -67,13 +72,16 @@
import javax.servlet.WriteListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.Archiver;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.hdds.utils.db.DBStore;
Expand All @@ -94,6 +102,9 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.MockedStatic;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.DBOptions;
Expand Down Expand Up @@ -169,7 +180,10 @@ public void write(int b) throws IOException {

omDbCheckpointServletMock = mock(OMDBCheckpointServletInodeBasedXfer.class);

BootstrapStateHandler.Lock lock = new OMDBCheckpointServlet.Lock(om);
BootstrapStateHandler.Lock lock = null;
if (om != null) {
lock = new OMDBCheckpointServlet.Lock(om);
}
doCallRealMethod().when(omDbCheckpointServletMock).init();
assertNull(doCallRealMethod().when(omDbCheckpointServletMock).getDbStore());

Expand All @@ -195,6 +209,8 @@ public void write(int b) throws IOException {

doCallRealMethod().when(omDbCheckpointServletMock)
.writeDbDataToStream(any(), any(), any(), any(), any());
doCallRealMethod().when(omDbCheckpointServletMock)
.writeDBToArchive(any(), any(), any(), any(), any(), any(), anyBoolean());

when(omDbCheckpointServletMock.getBootstrapStateLock())
.thenReturn(lock);
Expand Down Expand Up @@ -308,6 +324,61 @@ public void testSnapshotDBConsistency() throws Exception {
assertNotNull(value);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testWriteDBToArchive(boolean expectOnlySstFiles) throws Exception {
setupMocks();
Path dbDir = folder.resolve("db_data");
Files.createDirectories(dbDir);
// Create dummy files: one SST, one non-SST
Path sstFile = dbDir.resolve("test.sst");
Files.write(sstFile, "sst content".getBytes(StandardCharsets.UTF_8)); // Write some content to make it non-empty

Path nonSstFile = dbDir.resolve("test.log");
Files.write(nonSstFile, "log content".getBytes(StandardCharsets.UTF_8));
Set<String> sstFilesToExclude = new HashSet<>();
AtomicLong maxTotalSstSize = new AtomicLong(1000000); // Sufficient size
Map<String, String> hardLinkFileMap = new java.util.HashMap<>();
Path tmpDir = folder.resolve("tmp");
Files.createDirectories(tmpDir);
TarArchiveOutputStream mockArchiveOutputStream = mock(TarArchiveOutputStream.class);
List<String> fileNames = new ArrayList<>();
try (MockedStatic<Archiver> archiverMock = mockStatic(Archiver.class)) {
archiverMock.when(() -> Archiver.linkAndIncludeFile(any(), any(), any(), any())).thenAnswer(invocation -> {
// Get the actual mockArchiveOutputStream passed from writeDBToArchive
TarArchiveOutputStream aos = invocation.getArgument(2);
File sourceFile = invocation.getArgument(0);
String fileId = invocation.getArgument(1);
fileNames.add(sourceFile.getName());
aos.putArchiveEntry(new TarArchiveEntry(sourceFile, fileId));
aos.write(new byte[100], 0, 100); // Simulate writing
aos.closeArchiveEntry();
return 100L;
});
boolean success = omDbCheckpointServletMock.writeDBToArchive(
sstFilesToExclude, dbDir, maxTotalSstSize, mockArchiveOutputStream,
tmpDir, hardLinkFileMap, expectOnlySstFiles);
assertTrue(success);
verify(mockArchiveOutputStream, times(fileNames.size())).putArchiveEntry(any());
verify(mockArchiveOutputStream, times(fileNames.size())).closeArchiveEntry();
verify(mockArchiveOutputStream, times(fileNames.size())).write(any(byte[].class), anyInt(),
anyInt()); // verify write was called once

boolean containsNonSstFile = false;
for (String fileName : fileNames) {
if (expectOnlySstFiles) {
assertTrue(fileName.endsWith(".sst"), "File is not an SST File");
} else {
containsNonSstFile = true;
}
}

if (!expectOnlySstFiles) {
assertTrue(containsNonSstFile, "SST File is not expected");
}
}
}

private static void deleteWalFiles(Path snapshotDbDir) throws IOException {
try (Stream<Path> filesInTarball = Files.list(snapshotDbDir)) {
List<Path> files = filesInTarball.filter(p -> p.toString().contains(".log"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1158,7 +1158,7 @@ public void pause() throws IOException {
// max size config. That way next time through, we get multiple
// tarballs.
if (count == 1) {
long sstSize = getSizeOfFiles(tarball);
long sstSize = getSizeOfSstFiles(tarball);
om.getConfiguration().setLong(
OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY, sstSize / 2);
// Now empty the tarball to restart the download
Expand All @@ -1172,13 +1172,16 @@ public void pause() throws IOException {
}

// Get Size of sstfiles in tarball.
private long getSizeOfFiles(File tarball) throws IOException {
private long getSizeOfSstFiles(File tarball) throws IOException {
FileUtil.unTar(tarball, tempDir.toFile());
List<Path> sstPaths = Files.walk(tempDir).
collect(Collectors.toList());
OmSnapshotUtils.createHardLinks(tempDir, true);
List<Path> sstPaths = Files.list(tempDir).collect(Collectors.toList());
long totalFileSize = 0;
for (Path sstPath : sstPaths) {
totalFileSize += Files.size(sstPath);
File file = sstPath.toFile();
if (file.isFile() && file.getName().endsWith(".sst")) {
totalFileSize += Files.size(sstPath);
}
}
return totalFileSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.hadoop.ozone.OzoneConsts.OM_CHECKPOINT_DIR;
import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST;
import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY;
import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_DB_LOCK;
Expand All @@ -32,6 +33,7 @@
import static org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.DATA_PREFIX;
import static org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.DATA_SUFFIX;

import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
Expand Down Expand Up @@ -223,18 +225,18 @@ public void writeDbDataToStream(HttpServletRequest request, OutputStream destina
break;
}
shouldContinue = writeDBToArchive(sstFilesToExclude, snapshotDbPath,
maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap);
maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap, true);
}


if (shouldContinue) {
shouldContinue = writeDBToArchive(sstFilesToExclude, getSstBackupDir(),
maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap);
maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap, true);
}

if (shouldContinue) {
shouldContinue = writeDBToArchive(sstFilesToExclude, getCompactionLogDir(),
maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap);
maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap, true);
}
}

Expand All @@ -246,14 +248,14 @@ public void writeDbDataToStream(HttpServletRequest request, OutputStream destina
maxTotalSstSize.set(Long.MAX_VALUE);
Path checkpointDir = checkpoint.getCheckpointLocation();
writeDBToArchive(sstFilesToExclude, checkpointDir,
maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap);
maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap, false);
if (includeSnapshotData) {
Path tmpCompactionLogDir = tmpdir.resolve(getCompactionLogDir().getFileName());
Path tmpSstBackupDir = tmpdir.resolve(getSstBackupDir().getFileName());
writeDBToArchive(sstFilesToExclude, tmpCompactionLogDir, maxTotalSstSize, archiveOutputStream, tmpdir,
hardLinkFileMap, getCompactionLogDir());
hardLinkFileMap, getCompactionLogDir(), false);
writeDBToArchive(sstFilesToExclude, tmpSstBackupDir, maxTotalSstSize, archiveOutputStream, tmpdir,
hardLinkFileMap, getSstBackupDir());
hardLinkFileMap, getSstBackupDir(), false);
// This is done to ensure all data to be copied correctly is flushed in the snapshot DB
transferSnapshotData(sstFilesToExclude, tmpdir, snapshotPaths, maxTotalSstSize,
archiveOutputStream, hardLinkFileMap);
Expand Down Expand Up @@ -293,18 +295,20 @@ private void transferSnapshotData(Set<String> sstFilesToExclude, Path tmpdir, Se
try {
// invalidate closes the snapshot DB
om.getOmSnapshotManager().invalidateCacheEntry(UUID.fromString(snapshotId));
writeDBToArchive(sstFilesToExclude, snapshotDir, maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap);
writeDBToArchive(sstFilesToExclude, snapshotDir, maxTotalSstSize, archiveOutputStream, tmpdir,
hardLinkFileMap, false);
} finally {
omMetadataManager.getLock().releaseReadLock(SNAPSHOT_DB_LOCK, snapshotId);
}
}
}

private boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dir,
@VisibleForTesting
boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dir,
AtomicLong maxTotalSstSize, ArchiveOutputStream<TarArchiveEntry> archiveOutputStream,
Path tmpdir, Map<String, String> hardLinkFileMap) throws IOException {
Path tmpdir, Map<String, String> hardLinkFileMap, boolean onlySstFile) throws IOException {
return writeDBToArchive(sstFilesToExclude, dir, maxTotalSstSize,
archiveOutputStream, tmpdir, hardLinkFileMap, null);
archiveOutputStream, tmpdir, hardLinkFileMap, null, onlySstFile);
}

private static void cleanupCheckpoint(DBCheckpoint checkpoint) {
Expand Down Expand Up @@ -394,12 +398,21 @@ Set<Path> getSnapshotDirs(OMMetadataManager omMetadataManager) throws IOExceptio
* @param maxTotalSstSize Maximum total size of SST files to include
* @param archiveOutputStream Archive output stream
* @param tmpDir Temporary directory for processing
* @param hardLinkFileMap Map of hardlink file paths to their unique identifiers for deduplication
* @param destDir Destination directory for the archived files. If null,
* the archived files are not moved to this directory.
* @param onlySstFile If true, only SST files are processed. If false, all files are processed.
* <p>
* This parameter is typically set to {@code true} for initial iterations to
* prioritize SST file transfer, and then set to {@code false} only for the
* final iteration to ensure all remaining file types are transferred.
* @return true if processing should continue, false if size limit reached
* @throws IOException if an I/O error occurs
*/
@SuppressWarnings("checkstyle:ParameterNumber")
private boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dbDir, AtomicLong maxTotalSstSize,
ArchiveOutputStream<TarArchiveEntry> archiveOutputStream, Path tmpDir,
Map<String, String> hardLinkFileMap, Path destDir) throws IOException {
Map<String, String> hardLinkFileMap, Path destDir, boolean onlySstFile) throws IOException {
if (!Files.exists(dbDir)) {
LOG.warn("DB directory {} does not exist. Skipping.", dbDir);
return true;
Expand All @@ -411,6 +424,9 @@ private boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dbDir, Atom
Iterable<Path> iterable = files::iterator;
for (Path dbFile : iterable) {
if (!Files.isDirectory(dbFile)) {
if (onlySstFile && !dbFile.toString().endsWith(ROCKSDB_SST_SUFFIX)) {
continue;
}
String fileId = OmSnapshotUtils.getFileInodeAndLastModifiedTimeString(dbFile);
String path = dbFile.toFile().getAbsolutePath();
if (destDir != null) {
Expand Down
Loading