Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ public static void validatePath(Path path, Path ancestor) {
"Ancestor should not be null");
Preconditions.checkArgument(
path.normalize().startsWith(ancestor.normalize()),
"Path should be a descendant of %s", ancestor);
"Path %s should be a descendant of %s", path, ancestor);
}

public static File createDir(String dirPath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

package org.apache.hadoop.ozone.container.keyvalue;

import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_ALREADY_EXISTS;
import static org.apache.hadoop.hdds.utils.Archiver.extractEntry;
import static org.apache.hadoop.hdds.utils.Archiver.includeFile;
import static org.apache.hadoop.hdds.utils.Archiver.includePath;
import static org.apache.hadoop.hdds.utils.Archiver.readEntry;
import static org.apache.hadoop.hdds.utils.Archiver.tar;
import static org.apache.hadoop.hdds.utils.Archiver.untar;
import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V3;

import com.google.common.annotations.VisibleForTesting;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -32,16 +34,11 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.stream.Stream;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.ArchiveInputStream;
import org.apache.commons.compress.archivers.ArchiveOutputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
Expand Down Expand Up @@ -108,37 +105,6 @@ public byte[] unpackContainerData(Container<KeyValueContainerData> container,
return descriptorFileContent;
}

private void extractEntry(ArchiveEntry entry, InputStream input, long size,
Path ancestor, Path path) throws IOException {
HddsUtils.validatePath(path, ancestor);

if (entry.isDirectory()) {
Files.createDirectories(path);
} else {
Path parent = path.getParent();
if (parent != null) {
Files.createDirectories(parent);
}

try (OutputStream fileOutput = Files.newOutputStream(path);
OutputStream output = new BufferedOutputStream(fileOutput)) {
int bufferSize = 1024;
byte[] buffer = new byte[bufferSize + 1];
long remaining = size;
while (remaining > 0) {
int len = (int) Math.min(remaining, bufferSize);
int read = input.read(buffer, 0, len);
if (read >= 0) {
remaining -= read;
output.write(buffer, 0, read);
} else {
remaining = 0;
}
}
}
}
}

/**
* Given a containerData include all the required container data/metadata
* in a tar file.
Expand Down Expand Up @@ -218,65 +184,10 @@ public static Path getChunkPath(Path baseDir,
return KeyValueContainerLocationUtil.getChunksLocationPath(baseDir.toString()).toPath();
}

private byte[] readEntry(InputStream input, final long size)
throws IOException {
ByteArrayOutputStream output = new ByteArrayOutputStream();
int bufferSize = 1024;
byte[] buffer = new byte[bufferSize + 1];
long remaining = size;
while (remaining > 0) {
int len = (int) Math.min(remaining, bufferSize);
int read = input.read(buffer, 0, len);
remaining -= read;
output.write(buffer, 0, read);
}
return output.toByteArray();
}

private void includePath(Path dir, String subdir,
ArchiveOutputStream<TarArchiveEntry> archiveOutput) throws IOException {

// Add a directory entry before adding files, in case the directory is
// empty.
TarArchiveEntry entry = archiveOutput.createArchiveEntry(dir.toFile(), subdir);
archiveOutput.putArchiveEntry(entry);
archiveOutput.closeArchiveEntry();

// Add files in the directory.
try (Stream<Path> dirEntries = Files.list(dir)) {
for (Path path : dirEntries.collect(toList())) {
String entryName = subdir + "/" + path.getFileName();
includeFile(path.toFile(), entryName, archiveOutput);
}
}
}

static void includeFile(File file, String entryName,
ArchiveOutputStream<TarArchiveEntry> archiveOutput) throws IOException {
TarArchiveEntry entry = archiveOutput.createArchiveEntry(file, entryName);
archiveOutput.putArchiveEntry(entry);
try (InputStream input = Files.newInputStream(file.toPath())) {
IOUtils.copy(input, archiveOutput);
}
archiveOutput.closeArchiveEntry();
}

private static ArchiveInputStream<TarArchiveEntry> untar(InputStream input) {
return new TarArchiveInputStream(input);
}

private static ArchiveOutputStream<TarArchiveEntry> tar(OutputStream output) {
TarArchiveOutputStream os = new TarArchiveOutputStream(output);
os.setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_POSIX);
return os;
}

@VisibleForTesting
InputStream decompress(InputStream input) throws IOException {
return compression.wrap(input);
}

@VisibleForTesting
OutputStream compress(OutputStream output) throws IOException {
return compression.wrap(output);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.Archiver;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
import org.apache.hadoop.ozone.container.replication.CopyContainerCompression;
import org.apache.ozone.test.SpyInputStream;
Expand Down Expand Up @@ -391,12 +392,10 @@ private File writeSingleFile(Path parentPath, String fileName,
private File packContainerWithSingleFile(File file, String entryName)
throws Exception {
File targetFile = TEMP_DIR.resolve("container.tar").toFile();
try (OutputStream output = newOutputStream(targetFile.toPath());
OutputStream compressed = packer.compress(output);
TarArchiveOutputStream archive =
new TarArchiveOutputStream(compressed)) {
Path path = targetFile.toPath();
try (TarArchiveOutputStream archive = new TarArchiveOutputStream(packer.compress(newOutputStream(path)))) {
archive.setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_POSIX);
TarContainerPacker.includeFile(file, entryName, archive);
Archiver.includeFile(file, entryName, archive);
}
return targetFile;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.utils;

import static java.util.stream.Collectors.toList;

import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.stream.Stream;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.ArchiveInputStream;
import org.apache.commons.compress.archivers.ArchiveOutputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hdds.HddsUtils;

/** Create and extract archives. */
public final class Archiver {

private Archiver() {
// no instances (for now)
}

/** Create tarball including contents of {@code from}. */
public static void create(File tarFile, Path from) throws IOException {
try (ArchiveOutputStream<TarArchiveEntry> out = tar(Files.newOutputStream(tarFile.toPath()))) {
includePath(from, "", out);
}
}

/** Extract {@code tarFile} to {@code dir}. */
public static void extract(File tarFile, Path dir) throws IOException {
Files.createDirectories(dir);
String parent = dir.toString();
try (ArchiveInputStream<TarArchiveEntry> in = untar(Files.newInputStream(tarFile.toPath()))) {
TarArchiveEntry entry;
while ((entry = in.getNextEntry()) != null) {
Path path = Paths.get(parent, entry.getName());
extractEntry(entry, in, entry.getSize(), dir, path);
}
}
}

public static byte[] readEntry(InputStream input, final long size)
throws IOException {
ByteArrayOutputStream output = new ByteArrayOutputStream();
int bufferSize = 1024;
byte[] buffer = new byte[bufferSize + 1];
long remaining = size;
while (remaining > 0) {
int len = (int) Math.min(remaining, bufferSize);
int read = input.read(buffer, 0, len);
remaining -= read;
output.write(buffer, 0, read);
}
return output.toByteArray();
}

public static void includePath(Path dir, String subdir,
ArchiveOutputStream<TarArchiveEntry> archiveOutput) throws IOException {

// Add a directory entry before adding files, in case the directory is
// empty.
TarArchiveEntry entry = archiveOutput.createArchiveEntry(dir.toFile(), subdir);
archiveOutput.putArchiveEntry(entry);
archiveOutput.closeArchiveEntry();

// Add files in the directory.
try (Stream<Path> dirEntries = Files.list(dir)) {
for (Path path : dirEntries.collect(toList())) {
File file = path.toFile();
String entryName = subdir + "/" + path.getFileName();
if (file.isDirectory()) {
includePath(path, entryName, archiveOutput);
} else {
includeFile(file, entryName, archiveOutput);
}
}
}
}

public static long includeFile(File file, String entryName,
ArchiveOutputStream<TarArchiveEntry> archiveOutput) throws IOException {
final long bytes;
TarArchiveEntry entry = archiveOutput.createArchiveEntry(file, entryName);
archiveOutput.putArchiveEntry(entry);
try (InputStream input = Files.newInputStream(file.toPath())) {
bytes = IOUtils.copyLarge(input, archiveOutput);
}
archiveOutput.closeArchiveEntry();
return bytes;
}

public static void extractEntry(ArchiveEntry entry, InputStream input, long size,
Path ancestor, Path path) throws IOException {
HddsUtils.validatePath(path, ancestor);

if (entry.isDirectory()) {
Files.createDirectories(path);
} else {
Path parent = path.getParent();
if (parent != null) {
Files.createDirectories(parent);
}

try (OutputStream fileOutput = Files.newOutputStream(path);
OutputStream output = new BufferedOutputStream(fileOutput)) {
int bufferSize = 1024;
byte[] buffer = new byte[bufferSize + 1];
long remaining = size;
while (remaining > 0) {
int len = (int) Math.min(remaining, bufferSize);
int read = input.read(buffer, 0, len);
if (read >= 0) {
remaining -= read;
output.write(buffer, 0, read);
} else {
remaining = 0;
}
}
}
}
}

public static ArchiveInputStream<TarArchiveEntry> untar(InputStream input) {
return new TarArchiveInputStream(input);
}

public static ArchiveOutputStream<TarArchiveEntry> tar(OutputStream output) {
TarArchiveOutputStream os = new TarArchiveOutputStream(output);
os.setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_POSIX);
os.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX);
return os;
}

}
Loading