Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -248,21 +248,21 @@ public enum ChecksumCombineMode {
private String fsDefaultBucketLayout = "FILE_SYSTEM_OPTIMIZED";

@Config(key = "incremental.chunk.list",
defaultValue = "false",
defaultValue = "true",
type = ConfigType.BOOLEAN,
description = "Client PutBlock request can choose incremental chunk " +
"list rather than full chunk list to optimize performance. " +
"Critical to HBase.",
"Critical to HBase. EC does not support this feature.",
tags = ConfigTag.CLIENT)
private boolean incrementalChunkList = true;

@Config(key = "stream.putblock.piggybacking",
defaultValue = "false",
defaultValue = "true",
type = ConfigType.BOOLEAN,
description = "Allow PutBlock to be piggybacked in WriteChunk " +
"requests if the chunk is small.",
tags = ConfigTag.CLIENT)
private boolean enablePutblockPiggybacking = false;
private boolean enablePutblockPiggybacking = true;

@PostConstruct
public void validate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ public class BlockOutputStream extends OutputStream {
private Pipeline pipeline;
private final ContainerClientMetrics clientMetrics;
private boolean allowPutBlockPiggybacking;
private boolean supportIncrementalChunkList;

private CompletableFuture<Void> lastFlushFuture;
private CompletableFuture<Void> allPendingFlushFutures = CompletableFuture.completedFuture(null);
Expand Down Expand Up @@ -189,8 +190,13 @@ public BlockOutputStream(
}
this.containerBlockData = BlockData.newBuilder().setBlockID(
blkIDBuilder.build()).addMetadata(keyValue);
this.pipeline = pipeline;
// tell DataNode I will send incremental chunk list
if (config.getIncrementalChunkList()) {
// EC does not support incremental chunk list.
this.supportIncrementalChunkList = config.getIncrementalChunkList() &&
this instanceof RatisBlockOutputStream && allDataNodesSupportPiggybacking();
LOG.debug("incrementalChunkList is {}", supportIncrementalChunkList);
if (supportIncrementalChunkList) {
this.containerBlockData.addMetadata(INCREMENTAL_CHUNK_LIST_KV);
this.lastChunkBuffer = DIRECT_BUFFER_POOL.getBuffer(config.getStreamBufferSize());
this.lastChunkOffset = 0;
Expand Down Expand Up @@ -223,16 +229,17 @@ public BlockOutputStream(
checksum = new Checksum(config.getChecksumType(),
config.getBytesPerChecksum());
this.clientMetrics = clientMetrics;
this.pipeline = pipeline;
this.streamBufferArgs = streamBufferArgs;
this.allowPutBlockPiggybacking = config.getEnablePutblockPiggybacking() &&
allDataNodesSupportPiggybacking();
LOG.debug("PutBlock piggybacking is {}", allowPutBlockPiggybacking);
}

private boolean allDataNodesSupportPiggybacking() {
// return true only if all DataNodes in the pipeline are on a version
// that supports PutBlock piggybacking.
for (DatanodeDetails dn : pipeline.getNodes()) {
LOG.debug("dn = {}, version = {}", dn, dn.getCurrentVersion());
if (dn.getCurrentVersion() <
COMBINED_PUTBLOCK_WRITECHUNK_RPC.toProtoValue()) {
return false;
Expand Down Expand Up @@ -532,7 +539,7 @@ CompletableFuture<PutBlockResult> executePutBlock(boolean close,
BlockData blockData = containerBlockData.build();
LOG.debug("sending PutBlock {}", blockData);

if (config.getIncrementalChunkList()) {
if (supportIncrementalChunkList) {
// remove any chunks in the containerBlockData list.
// since they are sent.
containerBlockData.clearChunks();
Expand Down Expand Up @@ -866,7 +873,7 @@ private CompletableFuture<PutBlockResult> writeChunkToContainer(
try {
BlockData blockData = null;

if (config.getIncrementalChunkList()) {
if (supportIncrementalChunkList) {
updateBlockDataForWriteChunk(chunk);
} else {
containerBlockData.addChunks(chunkInfo);
Expand All @@ -880,7 +887,7 @@ private CompletableFuture<PutBlockResult> writeChunkToContainer(
blockData = containerBlockData.build();
LOG.debug("piggyback chunk list {}", blockData);

if (config.getIncrementalChunkList()) {
if (supportIncrementalChunkList) {
// remove any chunks in the containerBlockData list.
// since they are sent.
containerBlockData.clearChunks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,9 @@ public static DatanodeDetails.Builder newBuilder(
}
if (datanodeDetailsProto.hasCurrentVersion()) {
builder.setCurrentVersion(datanodeDetailsProto.getCurrentVersion());
} else {
// fallback to version 1 if not present
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to remove this fallback branch since it's not sure about the DN's version.

Copy link
Contributor Author

@jojochuang jojochuang Aug 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing is if it is not set, it is automatically set to the client's current datanode version, which is 2. So the client wouldn't be able to tell the correct DataNode version.

builder.setCurrentVersion(DatanodeVersion.SEPARATE_RATIS_PORTS_AVAILABLE.toProtoValue());
}
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdds.protocol;

import org.apache.hadoop.hdds.DatanodeVersion;
import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -48,6 +49,24 @@ void protoIncludesNewPortsOnlyForV1() {
assertPorts(protoV1, ALL_PORTS);
}

@Test
public void testNewBuilderCurrentVersion() {
// test that if the current version is not set (Ozone 1.4.0 and earlier),
// it falls back to SEPARATE_RATIS_PORTS_AVAILABLE
DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails();
HddsProtos.DatanodeDetailsProto.Builder protoBuilder =
dn.toProtoBuilder(DEFAULT_VERSION.toProtoValue());
protoBuilder.clearCurrentVersion();
DatanodeDetails dn2 = DatanodeDetails.newBuilder(protoBuilder.build()).build();
assertEquals(DatanodeVersion.SEPARATE_RATIS_PORTS_AVAILABLE.toProtoValue(), dn2.getCurrentVersion());

// test that if the current version is set, it is used
protoBuilder =
dn.toProtoBuilder(DEFAULT_VERSION.toProtoValue());
DatanodeDetails dn3 = DatanodeDetails.newBuilder(protoBuilder.build()).build();
assertEquals(DatanodeVersion.CURRENT.toProtoValue(), dn3.getCurrentVersion());
}

public static void assertPorts(HddsProtos.DatanodeDetailsProto dn,
Set<Port.Name> expectedPorts) throws IllegalArgumentException {
assertEquals(expectedPorts.size(), dn.getPortsCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaThreeImpl;
import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaTwoImpl;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.ozone.container.metadata.DatanodeStoreWithIncrementalChunkList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -501,8 +502,6 @@ static PendingDelete countPendingDeletesSchemaV2(

Table<Long, DeletedBlocksTransaction> delTxTable =
schemaTwoStore.getDeleteTransactionTable();
final Table<String, BlockData> blockDataTable
= schemaTwoStore.getBlockDataTable();

try (TableIterator<Long, ? extends Table.KeyValue<Long,
DeletedBlocksTransaction>> iterator = delTxTable.iterator()) {
Expand All @@ -515,7 +514,7 @@ static PendingDelete countPendingDeletesSchemaV2(
// counted towards bytes used and total block count above.
pendingDeleteBlockCountTotal += localIDs.size();
pendingDeleteBytes += computePendingDeleteBytes(
localIDs, containerData, blockDataTable);
localIDs, containerData, schemaTwoStore);
}
}

Expand All @@ -525,12 +524,12 @@ static PendingDelete countPendingDeletesSchemaV2(

static long computePendingDeleteBytes(List<Long> localIDs,
KeyValueContainerData containerData,
Table<String, BlockData> blockDataTable) {
DatanodeStoreWithIncrementalChunkList store) {
long pendingDeleteBytes = 0;
for (long id : localIDs) {
try {
final String blockKey = containerData.getBlockKey(id);
final BlockData blockData = blockDataTable.get(blockKey);
final BlockData blockData = store.getBlockByID(null, blockKey);
if (blockData != null) {
pendingDeleteBytes += blockData.getSize();
}
Expand All @@ -544,23 +543,21 @@ static long computePendingDeleteBytes(List<Long> localIDs,
}

static PendingDelete countPendingDeletesSchemaV3(
DatanodeStoreSchemaThreeImpl schemaThreeStore,
DatanodeStoreSchemaThreeImpl store,
KeyValueContainerData containerData) throws IOException {
long pendingDeleteBlockCountTotal = 0;
long pendingDeleteBytes = 0;
final Table<String, BlockData> blockDataTable
= schemaThreeStore.getBlockDataTable();
try (
TableIterator<String, ? extends Table.KeyValue<String,
DeletedBlocksTransaction>>
iter = schemaThreeStore.getDeleteTransactionTable()
iter = store.getDeleteTransactionTable()
.iterator(containerData.containerPrefix())) {
while (iter.hasNext()) {
DeletedBlocksTransaction delTx = iter.next().getValue();
final List<Long> localIDs = delTx.getLocalIDList();
pendingDeleteBlockCountTotal += localIDs.size();
pendingDeleteBytes += computePendingDeleteBytes(
localIDs, containerData, blockDataTable);
localIDs, containerData, store);
}
}
return new PendingDelete(pendingDeleteBlockCountTotal,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ public List<BlockData> listBlock(Container container, long startLocalID, int
.getSequentialRangeKVs(startKey, count,
cData.containerPrefix(), cData.getUnprefixedKeyFilter());
for (Table.KeyValue<String, BlockData> entry : range) {
result.add(entry.getValue());
result.add(db.getStore().getCompleteBlockData(entry.getValue(), null, entry.getKey()));
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ private ContainerBackgroundTaskResult deleteViaTransactionStore(
try {
Table<String, BlockData> blockDataTable =
meta.getStore().getBlockDataTable();
Table<String, BlockData> lastChunkInfoTable =
meta.getStore().getLastChunkInfoTable();
DeleteTransactionStore<?> txnStore =
(DeleteTransactionStore<?>) meta.getStore();
Table<?, DeletedBlocksTransaction> deleteTxns =
Expand Down Expand Up @@ -376,8 +378,11 @@ private ContainerBackgroundTaskResult deleteViaTransactionStore(
for (DeletedBlocksTransaction delTx : deletedBlocksTxs) {
deleter.apply(deleteTxns, batch, delTx.getTxID());
for (Long blk : delTx.getLocalIDList()) {
// delete from both blockDataTable and lastChunkInfoTable.
blockDataTable.deleteWithBatch(batch,
containerData.getBlockKey(blk));
lastChunkInfoTable.deleteWithBatch(batch,
containerData.getBlockKey(blk));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ default BlockData getBlockByID(BlockID blockID,
// check block data table
BlockData blockData = getBlockDataTable().get(blockKey);

return getCompleteBlockData(blockData, blockID, blockKey);
}

default BlockData getCompleteBlockData(BlockData blockData,
BlockID blockID, String blockKey) throws IOException {
if (blockData == null) {
throw new StorageContainerException(
NO_SUCH_BLOCK_ERR_MSG + " BlockID : " + blockID, NO_SUCH_BLOCK);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.FixedLengthStringCodec;
Expand All @@ -31,6 +32,7 @@
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
import org.rocksdb.LiveFileMetaData;

import java.io.File;
Expand Down Expand Up @@ -106,6 +108,9 @@ public void removeKVContainerData(long containerID) throws IOException {
try (BatchOperation batch = getBatchHandler().initBatchOperation()) {
getMetadataTable().deleteBatchWithPrefix(batch, prefix);
getBlockDataTable().deleteBatchWithPrefix(batch, prefix);
if (VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.HBASE_SUPPORT)) {
getLastChunkInfoTable().deleteBatchWithPrefix(batch, prefix);
}
getDeleteTransactionTable().deleteBatchWithPrefix(batch, prefix);
getBatchHandler().commitBatchOperation(batch);
}
Expand All @@ -118,6 +123,10 @@ public void dumpKVContainerData(long containerID, File dumpDir)
getTableDumpFile(getMetadataTable(), dumpDir), prefix);
getBlockDataTable().dumpToFileWithPrefix(
getTableDumpFile(getBlockDataTable(), dumpDir), prefix);
if (VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.HBASE_SUPPORT)) {
getLastChunkInfoTable().dumpToFileWithPrefix(
getTableDumpFile(getLastChunkInfoTable(), dumpDir), prefix);
}
getDeleteTransactionTable().dumpToFileWithPrefix(
getTableDumpFile(getDeleteTransactionTable(), dumpDir),
prefix);
Expand All @@ -129,6 +138,10 @@ public void loadKVContainerData(File dumpDir)
getTableDumpFile(getMetadataTable(), dumpDir));
getBlockDataTable().loadFromFile(
getTableDumpFile(getBlockDataTable(), dumpDir));
if (VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.HBASE_SUPPORT)) {
getLastChunkInfoTable().loadFromFile(
getTableDumpFile(getLastChunkInfoTable(), dumpDir));
}
getDeleteTransactionTable().loadFromFile(
getTableDumpFile(getDeleteTransactionTable(), dumpDir));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,9 @@ public DatanodeStoreWithIncrementalChunkList(ConfigurationSource config,


@Override
public BlockData getBlockByID(BlockID blockID,
String blockKey) throws IOException {
public BlockData getCompleteBlockData(BlockData blockData,
BlockID blockID, String blockKey) throws IOException {
BlockData lastChunk = null;
// check block data table
BlockData blockData = getBlockDataTable().get(blockKey);
if (blockData == null || isPartialChunkList(blockData)) {
// check last chunk table
lastChunk = getLastChunkInfoTable().get(blockKey);
Expand Down Expand Up @@ -190,18 +188,29 @@ private void moveLastChunkToBlockData(BatchOperation batch, long localID,

private void putBlockWithPartialChunks(BatchOperation batch, long localID,
BlockData data, KeyValueContainerData containerData) throws IOException {
String blockKey = containerData.getBlockKey(localID);
BlockData blockData = getBlockDataTable().get(blockKey);
if (data.getChunks().size() == 1) {
// Case (3.1) replace/update the last chunk info table
getLastChunkInfoTable().putWithBatch(
batch, containerData.getBlockKey(localID), data);
batch, blockKey, data);
// If the block does not exist in the block data table because it is the first chunk
// and the chunk is not full, then add an empty block data to populate the block table.
// This is required because some of the test code and utilities expect the block to be
// present in the block data table, they don't check the last chunk info table.
if (blockData == null) {
// populate blockDataTable with empty chunk list
blockData = new BlockData(data.getBlockID());
blockData.addMetadata(INCREMENTAL_CHUNK_LIST, "");
blockData.setBlockCommitSequenceId(data.getBlockCommitSequenceId());
getBlockDataTable().putWithBatch(batch, blockKey, blockData);
}
} else {
int lastChunkIndex = data.getChunks().size() - 1;
// received more than one chunk this time
List<ContainerProtos.ChunkInfo> lastChunkInfo =
Collections.singletonList(
data.getChunks().get(lastChunkIndex));
BlockData blockData = getBlockDataTable().get(
containerData.getBlockKey(localID));
if (blockData == null) {
// Case 3.2: if the block does not exist in the block data table
List<ContainerProtos.ChunkInfo> chunkInfos =
Expand Down
Loading