diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 402e1be4cd0f..1bcb64200b22 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -122,6 +122,7 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerDataProto.State.RECOVERING; import static org.apache.hadoop.ozone.ClientVersion.EC_REPLICA_INDEX_REQUIRED_IN_BLOCK_REQUEST; +import static org.apache.hadoop.ozone.OzoneConsts.INCREMENTAL_CHUNK_LIST; import static org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult; import org.apache.hadoop.util.Time; @@ -547,9 +548,13 @@ ContainerCommandResponseProto handlePutBlock( boolean endOfBlock = false; if (!request.getPutBlock().hasEof() || request.getPutBlock().getEof()) { - // in EC, we will be doing empty put block. - // So, let's flush only when there are any chunks - if (!request.getPutBlock().getBlockData().getChunksList().isEmpty()) { + // There are two cases where client sends empty put block with eof. + // (1) An EC empty file. In this case, the block/chunk file does not exist, + // so no need to flush/close the file. + // (2) Ratis output stream in incremental chunk list mode may send empty put block + // to close the block, in which case we need to flush/close the file. + if (!request.getPutBlock().getBlockData().getChunksList().isEmpty() || + blockData.getMetadata().containsKey(INCREMENTAL_CHUNK_LIST)) { chunkManager.finishWriteChunks(kvContainer, blockData); } endOfBlock = true; @@ -903,6 +908,9 @@ ContainerCommandResponseProto handleWriteChunk( // of order. blockData.setBlockCommitSequenceId(dispatcherContext.getLogIndex()); boolean eob = writeChunk.getBlock().getEof(); + if (eob) { + chunkManager.finishWriteChunks(kvContainer, blockData); + } blockManager.putBlock(kvContainer, blockData, eob); blockDataProto = blockData.getProtoBufMessage(); final long numBytes = blockDataProto.getSerializedSize(); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/AbstractTestChunkManager.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/AbstractTestChunkManager.java index 0c373cb0dbf0..d9b95f199ddd 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/AbstractTestChunkManager.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/AbstractTestChunkManager.java @@ -34,8 +34,13 @@ import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.BufferedReader; import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; import java.nio.Buffer; import java.nio.ByteBuffer; import java.util.UUID; @@ -53,6 +58,8 @@ * Helpers for ChunkManager implementation tests. */ public abstract class AbstractTestChunkManager { + private static final Logger LOG = + LoggerFactory.getLogger(AbstractTestChunkManager.class); private HddsVolume hddsVolume; private KeyValueContainerData keyValueContainerData; @@ -128,6 +135,55 @@ protected void checkChunkFileCount(int expected) { assertEquals(expected, files.length); } + /** + * Helper method to check if a file is in use. + */ + public static boolean isFileNotInUse(String filePath) { + try { + Process process = new ProcessBuilder("fuser", filePath).start(); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream(), UTF_8))) { + String output = reader.readLine(); // If fuser returns no output, the file is not in use + if (output == null) { + return true; + } + LOG.debug("File is in use: {}", filePath); + return false; + } finally { + process.destroy(); + } + } catch (IOException e) { + LOG.warn("Failed to check if file is in use: {}", filePath, e); + return false; // On failure, assume the file is in use + } + } + + protected boolean checkChunkFilesClosed() { + return checkChunkFilesClosed(keyValueContainerData.getChunksPath()); + } + + /** + * check that all files under chunk path are closed. + */ + public static boolean checkChunkFilesClosed(String path) { + //As in Setup, we try to create container, these paths should exist. + assertNotNull(path); + + File dir = new File(path); + assertTrue(dir.exists()); + + File[] files = dir.listFiles(); + assertNotNull(files); + for (File file : files) { + assertTrue(file.exists()); + assertTrue(file.isFile()); + // check that the file is closed. + if (!isFileNotInUse(file.getAbsolutePath())) { + return false; + } + } + return true; + } + protected void checkWriteIOStats(long length, long opCount) { VolumeIOStats volumeIOStats = hddsVolume.getVolumeIOStats(); assertEquals(length, volumeIOStats.getWriteBytes()); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/CommonChunkManagerTestCases.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/CommonChunkManagerTestCases.java index 47d24874749e..d4a12f577e98 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/CommonChunkManagerTestCases.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/CommonChunkManagerTestCases.java @@ -27,6 +27,7 @@ import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; import java.io.File; import java.io.IOException; @@ -39,7 +40,9 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.Mockito.when; /** * Common test cases for ChunkManager implementation tests. @@ -222,4 +225,26 @@ public void testWriteAndReadChunkMultipleTimes() throws Exception { checkReadIOStats(len * count, count); } + @Test + public void testFinishWrite() throws Exception { + // GIVEN + ChunkManager chunkManager = createTestSubject(); + checkChunkFileCount(0); + checkWriteIOStats(0, 0); + + chunkManager.writeChunk(getKeyValueContainer(), getBlockID(), + getChunkInfo(), getData(), + WRITE_STAGE); + + BlockData blockData = Mockito.mock(BlockData.class); + when(blockData.getBlockID()).thenReturn(getBlockID()); + + chunkManager.finishWriteChunks(getKeyValueContainer(), blockData); + assertTrue(checkChunkFilesClosed()); + + // THEN + checkChunkFileCount(1); + checkWriteIOStats(getChunkInfo().getLen(), 1); + } + } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java index 49b515d53c57..72978f818109 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java @@ -70,6 +70,7 @@ import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.ozone.ClientConfigForTesting; +import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; @@ -83,7 +84,9 @@ import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.container.TestHelper; import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; +import org.apache.hadoop.ozone.container.keyvalue.impl.AbstractTestChunkManager; import org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl; import org.apache.hadoop.ozone.container.metadata.AbstractDatanodeStore; import org.apache.hadoop.ozone.om.OMMetadataManager; @@ -93,6 +96,7 @@ import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; import org.apache.hadoop.ozone.om.service.OpenKeyCleanupService; import org.apache.hadoop.security.UserGroupInformation; @@ -320,6 +324,8 @@ private void waitForEmptyDeletedTable() } @Test + // Making this the second test to be run to avoid lingering block files from previous tests + @Order(2) public void testEmptyHsync() throws Exception { // Check that deletedTable should not have keys with the same block as in // keyTable's when a key is hsync()'ed then close()'d. @@ -358,10 +364,16 @@ public void testKeyHSyncThenClose() throws Exception { String data = "random data"; final Path file = new Path(dir, "file-hsync-then-close"); try (FileSystem fs = FileSystem.get(CONF)) { + String chunkPath; try (FSDataOutputStream outputStream = fs.create(file, true)) { outputStream.write(data.getBytes(UTF_8), 0, data.length()); outputStream.hsync(); + // locate the container chunk path on the first DataNode. + chunkPath = getChunkPathOnDataNode(outputStream); + assertFalse(AbstractTestChunkManager.checkChunkFilesClosed(chunkPath)); } + // After close, the chunk file should be closed. + assertTrue(AbstractTestChunkManager.checkChunkFilesClosed(chunkPath)); } OzoneManager ozoneManager = cluster.getOzoneManager(); @@ -387,6 +399,22 @@ public void testKeyHSyncThenClose() throws Exception { } } + private static String getChunkPathOnDataNode(FSDataOutputStream outputStream) + throws IOException { + String chunkPath; + KeyOutputStream groupOutputStream = + ((OzoneFSOutputStream) outputStream.getWrappedStream()).getWrappedOutputStream().getKeyOutputStream(); + List locationInfoList = + groupOutputStream.getLocationInfoList(); + OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0); + HddsDatanodeService dn = TestHelper.getDatanodeService(omKeyLocationInfo, cluster); + chunkPath = dn.getDatanodeStateMachine() + .getContainer().getContainerSet() + .getContainer(omKeyLocationInfo.getContainerID()). + getContainerData().getChunksPath(); + return chunkPath; + } + @ParameterizedTest @ValueSource(booleans = {false, true}) public void testO3fsHSync(boolean incrementalChunkList) throws Exception {