Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.security.token.Token;

import com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.util.JavaUtils;

/**
* A BlockOutputStreamEntry manages the data writes into the DataNodes.
Expand All @@ -60,33 +61,28 @@ public class BlockOutputStreamEntry extends OutputStream {
private long currentPosition;
private final Token<OzoneBlockTokenIdentifier> token;

private BufferPool bufferPool;
private ContainerClientMetrics clientMetrics;
private StreamBufferArgs streamBufferArgs;

@SuppressWarnings({"parameternumber", "squid:S00107"})
BlockOutputStreamEntry(
BlockID blockID, String key,
XceiverClientFactory xceiverClientManager,
Pipeline pipeline,
long length,
BufferPool bufferPool,
Token<OzoneBlockTokenIdentifier> token,
OzoneClientConfig config,
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
) {
this.config = config;
private final BufferPool bufferPool;
private final ContainerClientMetrics clientMetrics;
private final StreamBufferArgs streamBufferArgs;

BlockOutputStreamEntry(Builder b) {
this.config = b.config;
this.outputStream = null;
this.blockID = blockID;
this.key = key;
this.xceiverClientManager = xceiverClientManager;
this.pipeline = pipeline;
this.token = token;
this.length = length;
this.blockID = b.blockID;
this.key = b.key;
this.xceiverClientManager = b.xceiverClientManager;
this.pipeline = b.pipeline;
this.token = b.token;
this.length = b.length;
this.currentPosition = 0;
this.bufferPool = bufferPool;
this.clientMetrics = clientMetrics;
this.streamBufferArgs = streamBufferArgs;
this.bufferPool = b.bufferPool;
this.clientMetrics = b.clientMetrics;
this.streamBufferArgs = b.streamBufferArgs;
}

@Override
public String toString() {
return JavaUtils.getClassSimpleName(getClass()) + ":" + key + " " + blockID;
}

/**
Expand Down Expand Up @@ -362,6 +358,14 @@ public static class Builder {
private ContainerClientMetrics clientMetrics;
private StreamBufferArgs streamBufferArgs;

public Pipeline getPipeline() {
return pipeline;
}

public long getLength() {
return length;
}

public Builder setBlockID(BlockID bID) {
this.blockID = bID;
return this;
Expand Down Expand Up @@ -412,13 +416,7 @@ public Builder setStreamBufferArgs(StreamBufferArgs streamBufferArgs) {
}

public BlockOutputStreamEntry build() {
return new BlockOutputStreamEntry(blockID,
key,
xceiverClientManager,
pipeline,
length,
bufferPool,
token, config, clientMetrics, streamBufferArgs);
return new BlockOutputStreamEntry(this);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.ListIterator;
import java.util.Map;

import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.scm.ByteStringConversion;
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
Expand Down Expand Up @@ -62,7 +61,7 @@ public class BlockOutputStreamEntryPool implements KeyMetadataAware {
/**
* List of stream entries that are used to write a block of data.
*/
private final List<BlockOutputStreamEntry> streamEntries;
private final List<BlockOutputStreamEntry> streamEntries = new ArrayList<>();
private final OzoneClientConfig config;
/**
* The actual stream entry we are writing into. Note that a stream entry is
Expand All @@ -73,7 +72,6 @@ public class BlockOutputStreamEntryPool implements KeyMetadataAware {
private final OzoneManagerProtocol omClient;
private final OmKeyArgs keyArgs;
private final XceiverClientFactory xceiverClientFactory;
private final String requestID;
/**
* A {@link BufferPool} shared between all
* {@link org.apache.hadoop.hdds.scm.storage.BlockOutputStream}s managed by
Expand All @@ -86,65 +84,38 @@ public class BlockOutputStreamEntryPool implements KeyMetadataAware {
private final ContainerClientMetrics clientMetrics;
private final StreamBufferArgs streamBufferArgs;

@SuppressWarnings({"parameternumber", "squid:S00107"})
public BlockOutputStreamEntryPool(
OzoneClientConfig config,
OzoneManagerProtocol omClient,
String requestId, ReplicationConfig replicationConfig,
String uploadID, int partNumber,
boolean isMultipart, OmKeyInfo info,
boolean unsafeByteBufferConversion,
XceiverClientFactory xceiverClientFactory, long openID,
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
) {
this.config = config;
this.xceiverClientFactory = xceiverClientFactory;
streamEntries = new ArrayList<>();
public BlockOutputStreamEntryPool(KeyOutputStream.Builder b) {
this.config = b.getClientConfig();
this.xceiverClientFactory = b.getXceiverManager();
currentStreamIndex = 0;
this.omClient = omClient;
this.omClient = b.getOmClient();
final OmKeyInfo info = b.getOpenHandler().getKeyInfo();
this.keyArgs = new OmKeyArgs.Builder().setVolumeName(info.getVolumeName())
.setBucketName(info.getBucketName()).setKeyName(info.getKeyName())
.setReplicationConfig(replicationConfig).setDataSize(info.getDataSize())
.setIsMultipartKey(isMultipart).setMultipartUploadID(uploadID)
.setMultipartUploadPartNumber(partNumber).build();
this.requestID = requestId;
this.openID = openID;
.setReplicationConfig(b.getReplicationConfig())
.setDataSize(info.getDataSize())
.setIsMultipartKey(b.isMultipartKey())
.setMultipartUploadID(b.getMultipartUploadID())
.setMultipartUploadPartNumber(b.getMultipartNumber())
.build();
this.openID = b.getOpenHandler().getId();
this.excludeList = createExcludeList();

this.streamBufferArgs = b.getStreamBufferArgs();
this.bufferPool =
new BufferPool(streamBufferArgs.getStreamBufferSize(),
(int) (streamBufferArgs.getStreamBufferMaxSize() / streamBufferArgs
.getStreamBufferSize()),
ByteStringConversion
.createByteBufferConversion(unsafeByteBufferConversion));
this.clientMetrics = clientMetrics;
this.streamBufferArgs = streamBufferArgs;
.createByteBufferConversion(b.isUnsafeByteBufferConversionEnabled()));
this.clientMetrics = b.getClientMetrics();
}

ExcludeList createExcludeList() {
return new ExcludeList(getConfig().getExcludeNodesExpiryTime(),
Clock.system(ZoneOffset.UTC));
}

BlockOutputStreamEntryPool(ContainerClientMetrics clientMetrics,
OzoneClientConfig clientConfig, StreamBufferArgs streamBufferArgs) {
streamEntries = new ArrayList<>();
omClient = null;
keyArgs = null;
xceiverClientFactory = null;
config = clientConfig;
streamBufferArgs.setStreamBufferFlushDelay(false);
requestID = null;
int chunkSize = 0;
bufferPool = new BufferPool(chunkSize, 1);

currentStreamIndex = 0;
openID = -1;
excludeList = createExcludeList();
this.clientMetrics = clientMetrics;
this.streamBufferArgs = null;
}

/**
* When a key is opened, it is possible that there are some blocks already
* allocated to it for this open session. In this case, to make use of these
Expand All @@ -156,10 +127,8 @@ ExcludeList createExcludeList() {
*
* @param version the set of blocks that are pre-allocated.
* @param openVersion the version corresponding to the pre-allocation.
* @throws IOException
*/
public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
long openVersion) throws IOException {
public void addPreallocateBlocks(OmKeyLocationInfoGroup version, long openVersion) {
// server may return any number of blocks, (0 to any)
// only the blocks allocated in this open session (block createVersion
// equals to open session version)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,10 @@
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.StreamBufferArgs;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.hdds.scm.storage.BufferPool;
import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -75,19 +68,10 @@ public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry {
private int currentStreamIdx = 0;
private long successfulBlkGrpAckedLen;

@SuppressWarnings({"parameternumber", "squid:S00107"})
ECBlockOutputStreamEntry(BlockID blockID, String key,
XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length,
BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
OzoneClientConfig config, ContainerClientMetrics clientMetrics,
StreamBufferArgs streamBufferArgs) {
super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
token, config, clientMetrics, streamBufferArgs);
assertInstanceOf(
pipeline.getReplicationConfig(), ECReplicationConfig.class);
this.replicationConfig =
(ECReplicationConfig) pipeline.getReplicationConfig();
this.length = replicationConfig.getData() * length;
ECBlockOutputStreamEntry(Builder b) {
super(b);
this.replicationConfig = assertInstanceOf(b.getPipeline().getReplicationConfig(), ECReplicationConfig.class);
this.length = replicationConfig.getData() * b.getLength();
}

@Override
Expand Down Expand Up @@ -433,82 +417,9 @@ public ByteString calculateChecksum() throws IOException {
/**
* Builder class for ChunkGroupOutputStreamEntry.
* */
public static class Builder {
private BlockID blockID;
private String key;
private XceiverClientFactory xceiverClientManager;
private Pipeline pipeline;
private long length;
private BufferPool bufferPool;
private Token<OzoneBlockTokenIdentifier> token;
private OzoneClientConfig config;
private ContainerClientMetrics clientMetrics;
private StreamBufferArgs streamBufferArgs;

public ECBlockOutputStreamEntry.Builder setBlockID(BlockID bID) {
this.blockID = bID;
return this;
}

public ECBlockOutputStreamEntry.Builder setKey(String keys) {
this.key = keys;
return this;
}

public ECBlockOutputStreamEntry.Builder setXceiverClientManager(
XceiverClientFactory
xClientManager) {
this.xceiverClientManager = xClientManager;
return this;
}

public ECBlockOutputStreamEntry.Builder setPipeline(Pipeline ppln) {
this.pipeline = ppln;
return this;
}

public ECBlockOutputStreamEntry.Builder setLength(long len) {
this.length = len;
return this;
}

public ECBlockOutputStreamEntry.Builder setBufferPool(BufferPool pool) {
this.bufferPool = pool;
return this;
}

public ECBlockOutputStreamEntry.Builder setConfig(
OzoneClientConfig clientConfig) {
this.config = clientConfig;
return this;
}

public ECBlockOutputStreamEntry.Builder setToken(
Token<OzoneBlockTokenIdentifier> bToken) {
this.token = bToken;
return this;
}

public ECBlockOutputStreamEntry.Builder setClientMetrics(
ContainerClientMetrics containerClientMetrics) {
this.clientMetrics = containerClientMetrics;
return this;
}

public ECBlockOutputStreamEntry.Builder setStreamBufferArgs(
StreamBufferArgs args) {
this.streamBufferArgs = args;
return this;
}

public static class Builder extends BlockOutputStreamEntry.Builder {
public ECBlockOutputStreamEntry build() {
return new ECBlockOutputStreamEntry(blockID,
key,
xceiverClientManager,
pipeline,
length,
bufferPool,
token, config, clientMetrics, streamBufferArgs);
return new ECBlockOutputStreamEntry(this);
}
}
}
Loading