From 5537044eed7d4a13e23a40240006a1135577bea5 Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" Date: Thu, 19 Jan 2023 17:27:47 +0100 Subject: [PATCH 1/2] HDDS-7807. TarContainerPacker closes streams multiple times --- .../keyvalue/TestTarContainerPacker.java | 42 ++++++++++------- .../org/apache/ozone/test/SpyInputStream.java | 47 +++++++++++++++++++ .../apache/ozone/test/SpyOutputStream.java | 47 +++++++++++++++++++ 3 files changed, 118 insertions(+), 18 deletions(-) create mode 100644 hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/SpyInputStream.java create mode 100644 hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/SpyOutputStream.java diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java index 9e16fb88bc7e..e5cd6381929c 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java @@ -47,6 +47,8 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.ozone.container.replication.CopyContainerCompression; import org.apache.ozone.test.LambdaTestUtils; +import org.apache.ozone.test.SpyInputStream; +import org.apache.ozone.test.SpyOutputStream; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -55,6 +57,8 @@ import org.junit.runners.Parameterized; import static java.nio.charset.StandardCharsets.UTF_8; +import static java.nio.file.Files.newInputStream; +import static java.nio.file.Files.newOutputStream; /** * Test the tar/untar for a given container. @@ -185,9 +189,9 @@ public void pack() throws IOException, CompressorException { Path targetFile = TEMP_DIR.resolve("container.tar.gz"); //WHEN: pack it - try (FileOutputStream output = new FileOutputStream(targetFile.toFile())) { - packer.pack(sourceContainer, output); - } + SpyOutputStream outputForPack = + new SpyOutputStream(newOutputStream(targetFile)); + packer.pack(sourceContainer, outputForPack); //THEN: check the result TarArchiveInputStream tarStream = null; @@ -209,13 +213,16 @@ public void pack() throws IOException, CompressorException { tarStream.close(); } } + outputForPack.assertClosedExactlyOnce(); //read the container descriptor only - try (FileInputStream input = new FileInputStream(targetFile.toFile())) { - String containerYaml = new String(packer.unpackContainerDescriptor(input), - UTF_8); - Assert.assertEquals(TEST_DESCRIPTOR_FILE_CONTENT, containerYaml); - } + SpyInputStream inputForUnpackDescriptor = + new SpyInputStream(newInputStream(targetFile)); + String containerYaml = new String( + packer.unpackContainerDescriptor(inputForUnpackDescriptor), + UTF_8); + Assert.assertEquals(TEST_DESCRIPTOR_FILE_CONTENT, containerYaml); + inputForUnpackDescriptor.assertClosedExactlyOnce(); KeyValueContainerData destinationContainerData = createContainer(DEST_CONTAINER_ROOT, false); @@ -223,17 +230,14 @@ public void pack() throws IOException, CompressorException { KeyValueContainer destinationContainer = new KeyValueContainer(destinationContainerData, conf); - String descriptor; - //unpackContainerData - try (FileInputStream input = new FileInputStream(targetFile.toFile())) { - descriptor = - new String(packer.unpackContainerData(destinationContainer, input, - TEMP_DIR, - DEST_CONTAINER_ROOT.resolve(String.valueOf( - destinationContainer.getContainerData().getContainerID()))), - UTF_8); - } + SpyInputStream inputForUnpackData = + new SpyInputStream(newInputStream(targetFile)); + String descriptor = new String( + packer.unpackContainerData(destinationContainer, inputForUnpackData, + TEMP_DIR, DEST_CONTAINER_ROOT.resolve(String.valueOf( + destinationContainer.getContainerData().getContainerID()))), + UTF_8); assertExampleMetadataDbIsGood( TarContainerPacker.getDbPath(destinationContainerData), @@ -246,6 +250,7 @@ public void pack() throws IOException, CompressorException { + "unpackContainerData Call", destinationContainer.getContainerFile().exists()); Assert.assertEquals(TEST_DESCRIPTOR_FILE_CONTENT, descriptor); + inputForUnpackData.assertClosedExactlyOnce(); } @Test @@ -408,4 +413,5 @@ private void assertExampleFileIsGood(Path parentPath, String filename, Assert.assertEquals(content, strings.get(0)); } } + } diff --git a/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/SpyInputStream.java b/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/SpyInputStream.java new file mode 100644 index 000000000000..bed5f1e982a4 --- /dev/null +++ b/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/SpyInputStream.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ozone.test; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Filter input stream that allows assertions on behavior. + */ +public class SpyInputStream extends FilterInputStream { + + private final AtomicInteger closed = new AtomicInteger(); + + public SpyInputStream(InputStream in) { + super(in); + } + + @Override + public void close() throws IOException { + closed.incrementAndGet(); + super.close(); + } + + public void assertClosedExactlyOnce() { + assertEquals(1, closed.get()); + } +} diff --git a/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/SpyOutputStream.java b/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/SpyOutputStream.java new file mode 100644 index 000000000000..28abbc60a2fe --- /dev/null +++ b/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/SpyOutputStream.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ozone.test; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Filter output stream that allows assertions on behavior. + */ +public class SpyOutputStream extends FilterOutputStream { + + private final AtomicInteger closed = new AtomicInteger(); + + public SpyOutputStream(OutputStream out) { + super(out); + } + + @Override + public void close() throws IOException { + closed.incrementAndGet(); + super.close(); + } + + public void assertClosedExactlyOnce() { + assertEquals(1, closed.get()); + } +} From d8bbcdc67f5974ae3dca4f6c3bdc78d9081b21d8 Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" Date: Thu, 19 Jan 2023 18:08:31 +0100 Subject: [PATCH 2/2] Fix --- .../ozone/container/keyvalue/TarContainerPacker.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java index 3d8c445a84fe..9ad9ad758dd6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java @@ -166,9 +166,7 @@ public void pack(Container container, KeyValueContainerData containerData = container.getContainerData(); - try (OutputStream compressed = compress(output); - ArchiveOutputStream archiveOutput = tar(compressed)) { - + try (ArchiveOutputStream archiveOutput = tar(compress(output))) { includePath(getDbPath(containerData), DB_DIR_NAME, archiveOutput); @@ -187,8 +185,7 @@ public void pack(Container container, @Override public byte[] unpackContainerDescriptor(InputStream input) throws IOException { - try (InputStream decompressed = decompress(input); - ArchiveInputStream archiveInput = untar(decompressed)) { + try (ArchiveInputStream archiveInput = untar(decompress(input))) { ArchiveEntry entry = archiveInput.getNextEntry(); while (entry != null) { @@ -313,8 +310,7 @@ OutputStream compress(OutputStream output) private byte[] innerUnpack(InputStream input, Path dbRoot, Path chunksRoot) throws IOException { byte[] descriptorFileContent = null; - try (InputStream decompressed = decompress(input); - ArchiveInputStream archiveInput = untar(decompressed)) { + try (ArchiveInputStream archiveInput = untar(decompress(input))) { ArchiveEntry entry = archiveInput.getNextEntry(); while (entry != null) { String name = entry.getName();