Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public RocksDBCheckpoint createCheckpoint(String parentDir) {
Instant end = Instant.now();

long duration = Duration.between(start, end).toMillis();
LOG.debug("Created checkpoint at " + checkpointPath.toString() + " in "
LOG.info("Created checkpoint at " + checkpointPath.toString() + " in "
+ duration + " milliseconds");

return new RocksDBCheckpoint(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public long checkpointCreationTimeTaken() {

@Override
public void cleanupCheckpoint() throws IOException {
LOG.debug("Cleaning up checkpoint at " + checkpointLocation.toString());
LOG.info("Cleaning up RocksDB checkpoint at " +
checkpointLocation.toString());
FileUtils.deleteDirectory(checkpointLocation.toFile());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@
package org.apache.hadoop.ozone;

import com.google.common.base.Joiner;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.MessageDigest;
Expand All @@ -34,16 +33,22 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.zip.GZIPOutputStream;
import java.util.stream.Collectors;

import com.google.common.base.Strings;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;

import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.ArchiveOutputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.compressors.CompressorException;
import org.apache.commons.compress.compressors.CompressorOutputStream;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.HddsServerUtil;
import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
Expand Down Expand Up @@ -346,61 +351,51 @@ public static Collection<String> emptyAsSingletonNull(Collection<String>
}

/**
* Given a source directory, create a tar.gz file from it.
*
* @param sourcePath the path to the directory to be archived.
* @return tar.gz file
* Write OM DB Checkpoint to an output stream as a compressed file (tgz).
* @param checkpoint checkpoint file
* @param destination desination output stream.
* @throws IOException
*/
public static File createTarFile(Path sourcePath) throws IOException {
TarArchiveOutputStream tarOs = null;
try {
String sourceDir = sourcePath.toString();
String fileName = sourceDir.concat(".tar.gz");
FileOutputStream fileOutputStream = new FileOutputStream(fileName);
GZIPOutputStream gzipOutputStream =
new GZIPOutputStream(new BufferedOutputStream(fileOutputStream));
tarOs = new TarArchiveOutputStream(gzipOutputStream);
File folder = new File(sourceDir);
File[] filesInDir = folder.listFiles();
if (filesInDir != null) {
for (File file : filesInDir) {
addFilesToArchive(file.getName(), file, tarOs);
public static void writeOmDBCheckpointToStream(DBCheckpoint checkpoint,
OutputStream destination)
throws IOException {

try (CompressorOutputStream gzippedOut = new CompressorStreamFactory()
.createCompressorOutputStream(CompressorStreamFactory.GZIP,
destination)) {

try (ArchiveOutputStream archiveOutputStream =
new TarArchiveOutputStream(gzippedOut)) {

Path checkpointPath = checkpoint.getCheckpointLocation();
for (Path path : Files.list(checkpointPath)
.collect(Collectors.toList())) {
if (path != null) {
Path fileName = path.getFileName();
if (fileName != null) {
includeFile(path.toFile(), fileName.toString(),
archiveOutputStream);
}
}
}
}
return new File(fileName);
} finally {
try {
org.apache.hadoop.io.IOUtils.closeStream(tarOs);
} catch (Exception e) {
LOG.error("Exception encountered when closing " +
"TAR file output stream: " + e);
}
} catch (CompressorException e) {
throw new IOException(
"Can't compress the checkpoint: " +
checkpoint.getCheckpointLocation(), e);
}
}

private static void addFilesToArchive(String source, File file,
TarArchiveOutputStream
tarFileOutputStream)
private static void includeFile(File file, String entryName,
ArchiveOutputStream archiveOutputStream)
throws IOException {
tarFileOutputStream.putArchiveEntry(new TarArchiveEntry(file, source));
if (file.isFile()) {
FileInputStream fileInputStream = new FileInputStream(file);
BufferedInputStream bufferedInputStream =
new BufferedInputStream(fileInputStream);
IOUtils.copy(bufferedInputStream, tarFileOutputStream);
tarFileOutputStream.closeArchiveEntry();
fileInputStream.close();
} else if (file.isDirectory()) {
tarFileOutputStream.closeArchiveEntry();
File[] filesInDir = file.listFiles();
if (filesInDir != null) {
for (File cFile : filesInDir) {
addFilesToArchive(cFile.getAbsolutePath(), cFile,
tarFileOutputStream);
}
}
ArchiveEntry archiveEntry =
archiveOutputStream.createArchiveEntry(file, entryName);
archiveOutputStream.putArchiveEntry(archiveEntry);
try (FileInputStream fis = new FileInputStream(file)) {
IOUtils.copy(fis, archiveOutputStream);
}
archiveOutputStream.closeArchiveEntry();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,39 @@
package org.apache.hadoop.ozone;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

/**
* Unit tests for {@link OmUtils}.
*/
public class TestOmUtils {

@Rule
public TemporaryFolder folder = new TemporaryFolder();

@Rule
public Timeout timeout = new Timeout(60_000);

Expand Down Expand Up @@ -96,22 +104,13 @@ public void testNoOmDbDirConfigured() {
}

@Test
public void testCreateTarFile() throws Exception {
public void testWriteCheckpointToOutputStream() throws Exception {

File tempSnapshotDir = null;
FileInputStream fis = null;
FileOutputStream fos = null;
File tarFile = null;

try {
String testDirName = System.getProperty("java.io.tmpdir");
if (!testDirName.endsWith("/")) {
testDirName += "/";
}
testDirName += "TestCreateTarFile_Dir" + System.currentTimeMillis();
tempSnapshotDir = new File(testDirName);
tempSnapshotDir.mkdirs();

String testDirName = folder.newFolder().getAbsolutePath();
File file = new File(testDirName + "/temp1.txt");
FileWriter writer = new FileWriter(file);
writer.write("Test data 1");
Expand All @@ -122,14 +121,60 @@ public void testCreateTarFile() throws Exception {
writer.write("Test data 2");
writer.close();

tarFile = OmUtils.createTarFile(Paths.get(testDirName));
Assert.assertNotNull(tarFile);

File outputFile =
new File(Paths.get(testDirName, "output_file.tgz").toString());
TestDBCheckpoint dbCheckpoint = new TestDBCheckpoint(
Paths.get(testDirName));
OmUtils.writeOmDBCheckpointToStream(dbCheckpoint,
new FileOutputStream(outputFile));
assertNotNull(outputFile);
} finally {
IOUtils.closeStream(fis);
IOUtils.closeStream(fos);
FileUtils.deleteDirectory(tempSnapshotDir);
FileUtils.deleteQuietly(tarFile);
}
}

}

class TestDBCheckpoint implements DBCheckpoint {

private Path checkpointFile;

TestDBCheckpoint(Path checkpointFile) {
this.checkpointFile = checkpointFile;
}

@Override
public Path getCheckpointLocation() {
return checkpointFile;
}

@Override
public long getCheckpointTimestamp() {
return 0;
}

@Override
public long getLatestSequenceNumber() {
return 0;
}

@Override
public long checkpointCreationTimeTaken() {
return 0;
}

@Override
public void cleanupCheckpoint() throws IOException {
FileUtils.deleteDirectory(checkpointFile.toFile());
}

@Override
public void setRatisSnapshotIndex(long omRatisSnapshotIndex) {
}

@Override
public long getRatisSnapshotIndex() {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,6 @@ public void write(int b) throws IOException {

Assert.assertTrue(
omMetrics.getLastCheckpointCreationTimeTaken() == 0);
Assert.assertTrue(
omMetrics.getLastCheckpointTarOperationTimeTaken() == 0);
Assert.assertTrue(
omMetrics.getLastCheckpointStreamingTimeTaken() == 0);

Expand All @@ -164,8 +162,6 @@ public void write(int b) throws IOException {
Assert.assertTrue(tempFile.length() > 0);
Assert.assertTrue(
omMetrics.getLastCheckpointCreationTimeTaken() > 0);
Assert.assertTrue(
omMetrics.getLastCheckpointTarOperationTimeTaken() > 0);
Assert.assertTrue(
omMetrics.getLastCheckpointStreamingTimeTaken() > 0);
} finally {
Expand Down
Loading