Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,7 @@ void serializeINodeDirectorySection(OutputStream out) throws IOException {
outputInodes = 0;
parent.commitSubSection(summary,
FSImageFormatProtobuf.SectionName.INODE_DIR_SUB);
out = parent.getSectionOutputStream();
}
}
parent.commitSectionAndSubSection(summary,
Expand Down Expand Up @@ -817,6 +818,7 @@ void serializeINodeSection(OutputStream out) throws IOException {
if (i % parent.getInodesPerSubSection() == 0) {
parent.commitSubSection(summary,
FSImageFormatProtobuf.SectionName.INODE_SUB);
out = parent.getSectionOutputStream();
}
}
parent.commitSectionAndSubSection(summary,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -584,18 +584,6 @@ private void loadErasureCodingSection(InputStream in)

private static boolean enableParallelSaveAndLoad(Configuration conf) {
boolean loadInParallel = enableParallelLoad;
boolean compressionEnabled = conf.getBoolean(
DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY,
DFSConfigKeys.DFS_IMAGE_COMPRESS_DEFAULT);

if (loadInParallel) {
if (compressionEnabled) {
LOG.warn("Parallel Image loading and saving is not supported when {}" +
" is set to true. Parallel will be disabled.",
DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY);
loadInParallel = false;
}
}
return loadInParallel;
}

Expand Down Expand Up @@ -653,7 +641,11 @@ public int getInodesPerSubSection() {
return inodesPerSubSection;
}

/**
public OutputStream getSectionOutputStream() {
return sectionOutputStream;
}

/**
* Commit the length and offset of a fsimage section to the summary index,
* including the sub section, which will be committed before the section is
* committed.
Expand All @@ -664,14 +656,22 @@ public int getInodesPerSubSection() {
*/
public void commitSectionAndSubSection(FileSummary.Builder summary,
SectionName name, SectionName subSectionName) throws IOException {
commitSubSection(summary, subSectionName);
commitSection(summary, name);
commitSubSection(summary, subSectionName, true);
commitSection(summary, name, true);
}

public void commitSection(FileSummary.Builder summary, SectionName name)
throws IOException {
throws IOException {
commitSection(summary, name, false);
}

public void commitSection(FileSummary.Builder summary, SectionName name,
boolean afterSubSectionCommit) throws IOException {
long oldOffset = currentOffset;
flushSectionOutputStream();
boolean subSectionCommitted = afterSubSectionCommit && writeSubSections;
if (!subSectionCommitted) {
flushSectionOutputStream();
}

if (codec != null) {
sectionOutputStream = codec.createOutputStream(underlyingOutputStream);
Expand All @@ -685,14 +685,20 @@ public void commitSection(FileSummary.Builder summary, SectionName name)
subSectionOffset = currentOffset;
}

public void commitSubSection(FileSummary.Builder summary, SectionName name)
throws IOException {
this.commitSubSection(summary, name, false);
}

/**
* Commit the length and offset of a fsimage sub-section to the summary
* index.
* @param summary The image summary object
* @param name The name of the sub-section to commit
* @param isLast True if sub-section is the last sub-section of each section
* @throws IOException
*/
public void commitSubSection(FileSummary.Builder summary, SectionName name)
public void commitSubSection(FileSummary.Builder summary, SectionName name, boolean isLast)
throws IOException {
if (!writeSubSections) {
return;
Expand All @@ -701,7 +707,15 @@ public void commitSubSection(FileSummary.Builder summary, SectionName name)
LOG.debug("Saving a subsection for {}", name.toString());
// The output stream must be flushed before the length is obtained
// as the flush can move the length forward.
sectionOutputStream.flush();
flushSectionOutputStream();

if (codec == null || isLast) {
// To avoid empty sub-section, Do not create CompressionOutputStream
// if sub-section is last sub-section of each section
sectionOutputStream = underlyingOutputStream;
} else {
sectionOutputStream = codec.createOutputStream(underlyingOutputStream);
}
long length = fileChannel.position() - subSectionOffset;
if (length == 0) {
LOG.warn("The requested section for {} is empty. It will not be " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,8 +530,8 @@ public void serializeSnapshotDiffSection(OutputStream out)
context.checkCancelled();
}
if (i % parent.getInodesPerSubSection() == 0) {
parent.commitSubSection(headers,
FSImageFormatProtobuf.SectionName.SNAPSHOT_DIFF_SUB);
parent.commitSubSection(headers, FSImageFormatProtobuf.SectionName.SNAPSHOT_DIFF_SUB);
out = parent.getSectionOutputStream();
}
}
parent.commitSectionAndSubSection(headers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1120,7 +1120,7 @@ public void testParallelSaveAndLoad() throws IOException {
}

@Test
public void testNoParallelSectionsWithCompressionEnabled()
public void testParallelSaveAndLoadWithCompression()
throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true);
Expand All @@ -1137,16 +1137,21 @@ public void testNoParallelSectionsWithCompressionEnabled()
getLatestImageSummary(cluster);
ArrayList<Section> sections = Lists.newArrayList(
summary.getSectionsList());
Section inodeSection =
getSubSectionsOfName(sections, SectionName.INODE).get(0);
Section dirSection = getSubSectionsOfName(sections,
SectionName.INODE_DIR).get(0);

ArrayList<Section> inodeSubSections =
getSubSectionsOfName(sections, SectionName.INODE_SUB);
ArrayList<Section> dirSubSections =
getSubSectionsOfName(sections, SectionName.INODE_DIR_SUB);
// Compression and parallel can be enabled at the same time.
assertEquals(4, inodeSubSections.size());
assertEquals(4, dirSubSections.size());

// As compression is enabled, there should be no sub-sections in the
// image header
assertEquals(0, inodeSubSections.size());
assertEquals(0, dirSubSections.size());
ensureSubSectionsAlignWithParent(inodeSubSections, inodeSection);
ensureSubSectionsAlignWithParent(dirSubSections, dirSection);
} finally {
if (cluster != null) {
cluster.shutdown();
Expand Down Expand Up @@ -1229,4 +1234,4 @@ public void testUpdateBlocksMapAndNameCacheAsync() throws IOException {
SnapshotTestHelper.compareDumpedTreeInFile(
preRestartTree, postRestartTree, true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.SafeModeAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
Expand Down Expand Up @@ -77,15 +78,18 @@ public class TestFSImageWithSnapshot {
MiniDFSCluster cluster;
FSNamesystem fsn;
DistributedFileSystem hdfs;

public void createCluster() throws IOException {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
cluster.waitActive();
fsn = cluster.getNamesystem();
hdfs = cluster.getFileSystem();
}

@Before
public void setUp() throws Exception {
conf = new Configuration();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES)
.build();
cluster.waitActive();
fsn = cluster.getNamesystem();
hdfs = cluster.getFileSystem();
createCluster();
}

@After
Expand Down Expand Up @@ -512,6 +516,32 @@ public void testSaveLoadImageAfterSnapshotDeletion()
hdfs = cluster.getFileSystem();
}

/**
* Test parallel compressed fsimage can be loaded serially.
*/
@Test
public void testLoadParallelCompressedImageSerial() throws Exception {
int s = 0;
cluster.shutdown();

cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
cluster.waitActive();
fsn = cluster.getNamesystem();
hdfs = cluster.getFileSystem();
hdfs.mkdirs(dir);
SnapshotTestHelper.createSnapshot(hdfs, dir, "s");

Path sub1 = new Path(dir, "sub1");
Path sub1file1 = new Path(sub1, "sub1file1");
Path sub1file2 = new Path(sub1, "sub1file2");
DFSTestUtil.createFile(hdfs, sub1file1, BLOCKSIZE, (short) 1, seed);
DFSTestUtil.createFile(hdfs, sub1file2, BLOCKSIZE, (short) 1, seed);

conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, false);
conf.setBoolean(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY, false);
checkImage(s);
}

void rename(Path src, Path dst) throws Exception {
printTree("Before rename " + src + " -> " + dst);
hdfs.rename(src, dst);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;

import java.io.IOException;

import org.slf4j.event.Level;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.test.GenericTestUtils;

/**
* This test extends TestFSImageWithSnapshot to test
* enable both fsimage load parallel and fsimage compress.
*/
public class TestFSImageWithSnapshotParallelAndCompress extends TestFSImageWithSnapshot {
{
SnapshotTestHelper.disableLogs();
GenericTestUtils.setLogLevel(INode.LOG, Level.TRACE);
}

@Override
public void createCluster() throws IOException {

// turn on both parallelization and compression
conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true);
conf.set(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY, GzipCodec.class.getCanonicalName());
conf.setBoolean(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY, true);
conf.setInt(DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY, 2);
conf.setInt(DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY, 2);
conf.setInt(DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_KEY, 2);

conf = new Configuration();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
cluster.waitActive();
fsn = cluster.getNamesystem();
hdfs = cluster.getFileSystem();
}
}