Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,17 @@
hsync performance.
</description>
</property>
<property>
<name>ozone.chunk.list.incremental</name>
<value>false</value>
<tag>OZONE, CLIENT, DATANODE, PERFORMANCE</tag>
<description>
By default, a writer client sends full chunk list of a block when it
sends PutBlock requests. Changing this configuration to true will send
only incremental chunk list which reduces metadata overhead and improves
hsync performance.
</description>
</property>
<property>
<name>ozone.scm.container.layout</name>
<value>FILE_PER_BLOCK</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ public void shutdown() {

private BlockData getBlockByID(DBHandle db, BlockID blockID,
KeyValueContainerData containerData) throws IOException {
return db.getStore().getBlockByID(blockID, containerData);
String blockKey = containerData.getBlockKey(blockID.getLocalID());
return db.getStore().getBlockByID(blockID, blockKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public ConfigurationSource getConfig() {
public DBColumnFamilyDefinition<String, Long> getFinalizeBlocksColumnFamily() {
return null;
}

public abstract DBColumnFamilyDefinition<String, BlockData>
getLastChunkInfoColumnFamily();
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ default void compactionIfNeeded() throws Exception {
}

default BlockData getBlockByID(BlockID blockID,
KeyValueContainerData containerData) throws IOException {
String blockKey = containerData.getBlockKey(blockID.getLocalID());
String blockKey) throws IOException {

// check block data table
BlockData blockData = getBlockDataTable().get(blockKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
*/
package org.apache.hadoop.ozone.container.metadata;

import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.FixedLengthStringCodec;
Expand All @@ -30,16 +33,24 @@
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;

import com.google.common.base.Preconditions;
import org.bouncycastle.util.Strings;
import org.rocksdb.LiveFileMetaData;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_BLOCK;
import static org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl.FULL_CHUNK;
import static org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl.INCREMENTAL_CHUNK_LIST;
import static org.apache.hadoop.ozone.container.metadata.DatanodeSchemaThreeDBDefinition.getContainerKeyPrefix;

/**
Expand Down Expand Up @@ -224,4 +235,174 @@ public void compactionIfNeeded() throws Exception {
}
}
}

@Override
public BlockData getBlockByID(BlockID blockID,
String blockKey) throws IOException {

BlockData lastChunk = null;
// check block data table
BlockData blockData = getBlockDataTable().get(blockKey);
if (blockData == null || isPartialChunkList(blockData)) {
// check last chunk table
lastChunk = getLastChunkInfoTable().get(blockKey);
}

if (blockData == null) {
if (lastChunk == null) {
throw new StorageContainerException(
NO_SUCH_BLOCK_ERR_MSG + " BlockID : " + blockID, NO_SUCH_BLOCK);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("blockData=(null), lastChunk={}", lastChunk.getChunks());
}
return lastChunk;
}
} else {
if (lastChunk != null) {
reconcilePartialChunks(lastChunk, blockData);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("blockData={}, lastChunk=(null)", blockData.getChunks());
}
}
}

return blockData;
}

private void reconcilePartialChunks(
BlockData lastChunk, BlockData blockData) {
LOG.debug("blockData={}, lastChunk={}",
blockData.getChunks(), lastChunk.getChunks());
Preconditions.checkState(lastChunk.getChunks().size() == 1);
ContainerProtos.ChunkInfo lastChunkInBlockData =
blockData.getChunks().get(blockData.getChunks().size() - 1);
Preconditions.checkState(
lastChunkInBlockData.getOffset() + lastChunkInBlockData.getLen()
== lastChunk.getChunks().get(0).getOffset(),
"chunk offset does not match");

// append last partial chunk to the block data
List<ContainerProtos.ChunkInfo> chunkInfos =
new ArrayList<>(blockData.getChunks());
chunkInfos.add(lastChunk.getChunks().get(0));
blockData.setChunks(chunkInfos);

blockData.setBlockCommitSequenceId(
lastChunk.getBlockCommitSequenceId());
}

private static boolean isPartialChunkList(BlockData data) {
return data.getMetadata().containsKey(INCREMENTAL_CHUNK_LIST);
}

private static boolean isFullChunk(ContainerProtos.ChunkInfo chunkInfo) {
for (ContainerProtos.KeyValue kv: chunkInfo.getMetadataList()) {
if (kv.getKey().equals(FULL_CHUNK)) {
return true;
}
}
return false;
}

// if eob or if the last chunk is full,
private static boolean shouldAppendLastChunk(boolean endOfBlock,
BlockData data) {
if (endOfBlock || data.getChunks().isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what case that the data.getChunks will be empty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hsync right after creating a file might cause client to send empty chunk

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

return true;
}
return isFullChunk(data.getChunks().get(data.getChunks().size() - 1));
}

public void putBlockByID(BatchOperation batch, boolean incremental,
long localID, BlockData data, KeyValueContainerData containerData,
boolean endOfBlock) throws IOException {
if (!incremental && !isPartialChunkList(data)) {
// Case (1) old client: override chunk list.
getBlockDataTable().putWithBatch(
batch, containerData.getBlockKey(localID), data);
} else if (shouldAppendLastChunk(endOfBlock, data)) {
moveLastChunkToBlockData(batch, localID, data, containerData);
} else {
// incremental chunk list,
// not end of block, has partial chunks
putBlockWithPartialChunks(batch, localID, data, containerData);
}
}

private void moveLastChunkToBlockData(BatchOperation batch, long localID,
BlockData data, KeyValueContainerData containerData) throws IOException {
// if eob or if the last chunk is full,
// the 'data' is full so append it to the block table's chunk info
// and then remove from lastChunkInfo
BlockData blockData = getBlockDataTable().get(
containerData.getBlockKey(localID));
if (blockData == null) {
// Case 2.1 if the block did not have full chunks before,
// the block's chunk is what received from client this time.
blockData = data;
} else {
// case 2.2 the block already has some full chunks
List<ContainerProtos.ChunkInfo> chunkInfoList = blockData.getChunks();
blockData.setChunks(new ArrayList<>(chunkInfoList));
for (ContainerProtos.ChunkInfo chunk : data.getChunks()) {
blockData.addChunk(chunk);
}
blockData.setBlockCommitSequenceId(data.getBlockCommitSequenceId());
}
// delete the entry from last chunk info table
getLastChunkInfoTable().deleteWithBatch(
batch, containerData.getBlockKey(localID));
// update block data table
getBlockDataTable().putWithBatch(batch,
containerData.getBlockKey(localID), blockData);
}

private void putBlockWithPartialChunks(BatchOperation batch, long localID,
BlockData data, KeyValueContainerData containerData) throws IOException {
if (data.getChunks().size() == 1) {
// Case (3.1) replace/update the last chunk info table
getLastChunkInfoTable().putWithBatch(
batch, containerData.getBlockKey(localID), data);
} else {
int lastChunkIndex = data.getChunks().size() - 1;
// received more than one chunk this time
List<ContainerProtos.ChunkInfo> lastChunkInfo =
Collections.singletonList(
data.getChunks().get(lastChunkIndex));
BlockData blockData = getBlockDataTable().get(
containerData.getBlockKey(localID));
if (blockData == null) {
// Case 3.2: if the block does not exist in the block data table
List<ContainerProtos.ChunkInfo> chunkInfos =
new ArrayList<>(data.getChunks());
chunkInfos.remove(lastChunkIndex);
data.setChunks(chunkInfos);
blockData = data;
LOG.debug("block {} does not have full chunks yet. Adding the " +
"chunks to it {}", localID, blockData);
} else {
// Case 3.3: if the block exists in the block data table,
// append chunks till except the last one (supposedly partial)
List<ContainerProtos.ChunkInfo> chunkInfos =
new ArrayList<>(blockData.getChunks());

LOG.debug("blockData.getChunks()={}", chunkInfos);
LOG.debug("data.getChunks()={}", data.getChunks());

for (int i = 0; i < lastChunkIndex; i++) {
chunkInfos.add(data.getChunks().get(i));
}
blockData.setChunks(chunkInfos);
blockData.setBlockCommitSequenceId(data.getBlockCommitSequenceId());
}
getBlockDataTable().putWithBatch(batch,
containerData.getBlockKey(localID), blockData);
// update the last partial chunk
data.setChunks(lastChunkInfo);
getLastChunkInfoTable().putWithBatch(
batch, containerData.getBlockKey(localID), data);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just define and store the last chunk instead of blockData containing last chunk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think there could still be cases where the client's buffer has more than one chunk's full of data that gets flushed out at a time.

}
}
}
Loading