Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
import static org.apache.hadoop.hdds.DatanodeVersion.COMBINED_PUTBLOCK_WRITECHUNK_RPC;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.writeChunkAsync;
import static org.apache.hadoop.ozone.OzoneConsts.INCREMENTAL_CHUNK_LIST;

import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -85,7 +87,6 @@ public class BlockOutputStream extends OutputStream {
LoggerFactory.getLogger(BlockOutputStream.class);
public static final String EXCEPTION_MSG =
"Unexpected Storage Container Exception: ";
public static final String INCREMENTAL_CHUNK_LIST = "incremental";
public static final KeyValue INCREMENTAL_CHUNK_LIST_KV =
KeyValue.newBuilder().setKey(INCREMENTAL_CHUNK_LIST).build();
public static final String FULL_CHUNK = "full";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ public final class OzoneConsts {
public static final String CHUNK_OVERWRITE = "OverWriteRequested";

public static final int CHUNK_SIZE = 1 * 1024 * 1024; // 1 MB
// for client and DataNode to label a block contains a incremental chunk list.
public static final String INCREMENTAL_CHUNK_LIST = "incremental";
public static final long KB = 1024L;
public static final long MB = KB * 1024L;
public static final long GB = MB * 1024L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;

import jakarta.annotation.Nonnull;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
Expand All @@ -35,6 +36,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.utils.UniqueId;
Expand All @@ -50,6 +52,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.ozone.OzoneConsts.INCREMENTAL_CHUNK_LIST;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;

Expand Down Expand Up @@ -380,12 +383,24 @@ public static ContainerCommandRequestProto getUpdateContainerRequest(
public static ContainerCommandRequestProto getPutBlockRequest(
Pipeline pipeline, ContainerProtos.WriteChunkRequestProto writeRequest)
throws IOException {
return newPutBlockRequestBuilder(pipeline, writeRequest).build();
return getPutBlockRequest(pipeline, writeRequest, false);
}

public static ContainerCommandRequestProto getPutBlockRequest(
Pipeline pipeline, ContainerProtos.WriteChunkRequestProto writeRequest, boolean incremental)
throws IOException {
return newPutBlockRequestBuilder(pipeline, writeRequest, incremental).build();
}

public static Builder newPutBlockRequestBuilder(Pipeline pipeline,
ContainerProtos.WriteChunkRequestProtoOrBuilder writeRequest)
throws IOException {
return newPutBlockRequestBuilder(pipeline, writeRequest, false);
}

public static Builder newPutBlockRequestBuilder(Pipeline pipeline,
ContainerProtos.WriteChunkRequestProtoOrBuilder writeRequest, boolean incremental)
throws IOException {
LOG.trace("putBlock: {} to pipeline={}",
writeRequest.getBlockID(), pipeline);

Expand All @@ -398,6 +413,9 @@ public static Builder newPutBlockRequestBuilder(Pipeline pipeline,
newList.add(writeRequest.getChunkData());
blockData.setChunks(newList);
blockData.setBlockCommitSequenceId(0);
if (incremental) {
blockData.addMetadata(INCREMENTAL_CHUNK_LIST, "");
}
putRequest.setBlockData(blockData.getProtoBufMessage());

Builder request =
Expand Down Expand Up @@ -517,6 +535,25 @@ public static ContainerCommandRequestProto getDeleteContainer(
.build();
}

@Nonnull
public static ContainerProtos.ContainerCommandRequestProto getFinalizeBlockRequest(
long localID, ContainerInfo container, String uuidString) {
final ContainerProtos.ContainerCommandRequestProto.Builder builder =
ContainerProtos.ContainerCommandRequestProto.newBuilder()
.setCmdType(ContainerProtos.Type.FinalizeBlock)
.setContainerID(container.getContainerID())
.setDatanodeUuid(uuidString);

final ContainerProtos.DatanodeBlockID blockId =
ContainerProtos.DatanodeBlockID.newBuilder()
.setContainerID(container.getContainerID()).setLocalID(localID)
.setBlockCommitSequenceId(0).build();

builder.setFinalizeBlock(ContainerProtos.FinalizeBlockRequestProto
.newBuilder().setBlockID(blockId).build());
return builder.build();
}

public static BlockID getTestBlockID(long containerID) {
return getTestBlockID(containerID, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.BCSID_MISMATCH;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST;
import static org.apache.hadoop.ozone.OzoneConsts.INCREMENTAL_CHUNK_LIST;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -58,7 +59,6 @@ public class BlockManagerImpl implements BlockManager {
private ConfigurationSource config;

private static final String DB_NULL_ERR_MSG = "DB cannot be null here";
public static final String INCREMENTAL_CHUNK_LIST = "incremental";
public static final String FULL_CHUNK = "full";

// Default Read Buffer capacity when Checksum is not present
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import java.util.List;

import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_BLOCK;
import static org.apache.hadoop.ozone.OzoneConsts.INCREMENTAL_CHUNK_LIST;
import static org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl.FULL_CHUNK;
import static org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl.INCREMENTAL_CHUNK_LIST;

/**
* Constructs a datanode store in accordance with schema version 2, which uses
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@
import java.util.List;
import java.util.UUID;

import static org.apache.hadoop.ozone.OzoneConsts.INCREMENTAL_CHUNK_LIST;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil.isSameSchemaVersion;
import static org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl.FULL_CHUNK;
import static org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl.INCREMENTAL_CHUNK_LIST;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
Expand Down
Loading