Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*      http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm;

import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigTag;
import org.apache.hadoop.hdds.conf.ConfigType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.ozone.OzoneConfigKeys;

import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Configuration values for Ozone Client.
*/
@ConfigGroup(prefix = "ozone.client")
public class OzoneClientConfig {

private static final Logger LOG =
LoggerFactory.getLogger(OzoneClientConfig.class);

@Config(key = "stream.buffer.flush.size",
defaultValue = "16MB",
type = ConfigType.SIZE,
description = "Size which determines at what buffer position a partial "
+ "flush will be initiated during write. It should be a multiple of"
+ " ozone.client.stream.buffer.size",
tags = ConfigTag.CLIENT)
private long streamBufferFlushSize = 16 * 1024 * 1024;

@Config(key = "stream.buffer.size",
defaultValue = "4MB",
type = ConfigType.SIZE,
description = "The size of chunks the client will send to the server",
tags = ConfigTag.CLIENT)
private int streamBufferSize = 4 * 1024 * 1024;

@Config(key = "stream.buffer.flush.delay",
defaultValue = "true",
description = "Default true, when call flush() and determine whether "
+ "the data in the current buffer is greater than ozone.client"
+ ".stream.buffer.size, if greater than then send buffer to the "
+ "datanode. You can turn this off by setting this configuration "
+ "to false.", tags = ConfigTag.CLIENT)
private boolean streamBufferFlushDelay = true;

@Config(key = "stream.buffer.max.size",
defaultValue = "32MB",
type = ConfigType.SIZE,
description = "Size which determines at what buffer position write call"
+ " be blocked till acknowledgement of the first partial flush "
+ "happens by all servers.",
tags = ConfigTag.CLIENT)
private long streamBufferMaxSize = 32 * 1024 * 1024;

@Config(key = "max.retries",
defaultValue = "5",
description = "Maximum number of retries by Ozone Client on "
+ "encountering exception while writing a key",
tags = ConfigTag.CLIENT)
private int maxRetryCount = 5;

@Config(key = "retry.interval",
defaultValue = "0",
description =
"Indicates the time duration a client will wait before retrying a "
+ "write key request on encountering an exception. By default "
+ "there is no wait",
tags = ConfigTag.CLIENT)
private int retryInterval = 0;

@Config(key = "checksum.type",
defaultValue = "CRC32",
description = "The checksum type [NONE/ CRC32/ CRC32C/ SHA256/ MD5] "
+ "determines which algorithm would be used to compute checksum for "
+ "chunk data. Default checksum type is CRC32.",
tags = ConfigTag.CLIENT)
private String checksumType = ChecksumType.CRC32.name();

@Config(key = "bytes.per.checksum",
defaultValue = "1MB",
type = ConfigType.SIZE,
description = "Checksum will be computed for every bytes per checksum "
+ "number of bytes and stored sequentially. The minimum value for "
+ "this config is 256KB.",
tags = ConfigTag.CLIENT)
private int bytesPerChecksum = 1024 * 1024;

@Config(key = "verify.checksum",
defaultValue = "true",
description = "Ozone client to verify checksum of the checksum "
+ "blocksize data.",
tags = ConfigTag.CLIENT)
private boolean checksumVerify = true;

public OzoneClientConfig() {
}

private void validate() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently validate() is unused. I think it should have @PostConstruct annotation.

Preconditions.checkState(streamBufferSize > 0);
Preconditions.checkState(streamBufferFlushSize > 0);
Preconditions.checkState(streamBufferMaxSize > 0);

Preconditions.checkState(streamBufferMaxSize % streamBufferFlushSize == 0,
"expected max. buffer size (%s) to be a multiple of flush size (%s)",
streamBufferMaxSize, streamBufferFlushSize);
Preconditions.checkState(streamBufferFlushSize % streamBufferSize == 0,
"expected flush size (%s) to be a multiple of buffer size (%s)",
streamBufferFlushSize, streamBufferSize);

if (bytesPerChecksum <
OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE) {
LOG.warn("The checksum size ({}) is not allowed to be less than the " +
"minimum size ({}), resetting to the minimum size.",
bytesPerChecksum,
OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE);
bytesPerChecksum =
OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE;
}

}

public long getStreamBufferFlushSize() {
return streamBufferFlushSize;
}

public void setStreamBufferFlushSize(long streamBufferFlushSize) {
this.streamBufferFlushSize = streamBufferFlushSize;
}

public int getStreamBufferSize() {
return streamBufferSize;
}

public void setStreamBufferSize(int streamBufferSize) {
this.streamBufferSize = streamBufferSize;
}

public boolean isStreamBufferFlushDelay() {
return streamBufferFlushDelay;
}

public void setStreamBufferFlushDelay(boolean streamBufferFlushDelay) {
this.streamBufferFlushDelay = streamBufferFlushDelay;
}

public long getStreamBufferMaxSize() {
return streamBufferMaxSize;
}

public void setStreamBufferMaxSize(long streamBufferMaxSize) {
this.streamBufferMaxSize = streamBufferMaxSize;
}

public int getMaxRetryCount() {
return maxRetryCount;
}

public void setMaxRetryCount(int maxRetryCount) {
this.maxRetryCount = maxRetryCount;
}

public int getRetryInterval() {
return retryInterval;
}

public void setRetryInterval(int retryInterval) {
this.retryInterval = retryInterval;
}

public ChecksumType getChecksumType() {
return ChecksumType.valueOf(checksumType);
}

public void setChecksumType(ChecksumType checksumType) {
this.checksumType = checksumType.name();
}

public int getBytesPerChecksum() {
return bytesPerChecksum;
}

public void setBytesPerChecksum(int bytesPerChecksum) {
this.bytesPerChecksum = bytesPerChecksum;
}

public boolean isChecksumVerify() {
return checksumVerify;
}

public void setChecksumVerify(boolean checksumVerify) {
this.checksumVerify = checksumVerify;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
Expand Down Expand Up @@ -85,13 +85,10 @@ public class BlockOutputStream extends OutputStream {
private final BlockData.Builder containerBlockData;
private XceiverClientFactory xceiverClientFactory;
private XceiverClientSpi xceiverClient;
private final int bytesPerChecksum;
private OzoneClientConfig config;

private int chunkIndex;
private final AtomicLong chunkOffset = new AtomicLong();
private final int streamBufferSize;
private final long streamBufferFlushSize;
private final boolean streamBufferFlushDelay;
private final long streamBufferMaxSize;
private final BufferPool bufferPool;
// The IOException will be set by response handling thread in case there is an
// exception received in the response. If the exception is set, the next
Expand Down Expand Up @@ -133,46 +130,39 @@ public class BlockOutputStream extends OutputStream {
* Creates a new BlockOutputStream.
*
* @param blockID block ID
* @param xceiverClientFactory client manager that controls client
* @param xceiverClientManager client manager that controls client
* @param pipeline pipeline where block will be written
* @param bufferPool pool of buffers
* @param streamBufferFlushSize flush size
* @param streamBufferMaxSize max size of the currentBuffer
* @param checksumType checksum type
* @param bytesPerChecksum Bytes per checksum
* @param token a token for this block (may be null)
*/
@SuppressWarnings("parameternumber")
public BlockOutputStream(BlockID blockID,
XceiverClientFactory xceiverClientFactory, Pipeline pipeline,
int streamBufferSize, long streamBufferFlushSize,
boolean streamBufferFlushDelay, long streamBufferMaxSize,
BufferPool bufferPool, ChecksumType checksumType,
int bytesPerChecksum, Token<? extends TokenIdentifier> token)
throws IOException {
public BlockOutputStream(
BlockID blockID,
XceiverClientFactory xceiverClientManager,
Pipeline pipeline,
BufferPool bufferPool,
OzoneClientConfig config,
Token<? extends TokenIdentifier> token
) throws IOException {
this.xceiverClientFactory = xceiverClientManager;
this.config = config;
this.blockID = new AtomicReference<>(blockID);
KeyValue keyValue =
KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build();
this.containerBlockData =
BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf())
.addMetadata(keyValue);
this.xceiverClientFactory = xceiverClientFactory;
this.xceiverClient = xceiverClientFactory.acquireClient(pipeline);
this.streamBufferSize = streamBufferSize;
this.streamBufferFlushSize = streamBufferFlushSize;
this.streamBufferMaxSize = streamBufferMaxSize;
this.streamBufferFlushDelay = streamBufferFlushDelay;
this.xceiverClient = xceiverClientManager.acquireClient(pipeline);
this.bufferPool = bufferPool;
this.bytesPerChecksum = bytesPerChecksum;
this.token = token;

//number of buffers used before doing a flush
refreshCurrentBuffer(bufferPool);
flushPeriod = (int) (streamBufferFlushSize / streamBufferSize);
flushPeriod = (int) (config.getStreamBufferFlushSize() / config
.getStreamBufferSize());

Preconditions
.checkArgument(
(long) flushPeriod * streamBufferSize == streamBufferFlushSize);
(long) flushPeriod * config.getStreamBufferSize() == config
.getStreamBufferFlushSize());

// A single thread executor handle the responses of async requests
responseExecutor = Executors.newSingleThreadExecutor();
Expand All @@ -182,7 +172,8 @@ public BlockOutputStream(BlockID blockID,
writtenDataLength = 0;
failedServers = new ArrayList<>(0);
ioException = new AtomicReference<>(null);
checksum = new Checksum(checksumType, bytesPerChecksum);
checksum = new Checksum(config.getChecksumType(),
config.getBytesPerChecksum());
}

private void refreshCurrentBuffer(BufferPool pool) {
Expand Down Expand Up @@ -290,7 +281,7 @@ private void doFlushOrWatchIfNeeded() throws IOException {

private void allocateNewBufferIfNeeded() {
if (currentBufferRemaining == 0) {
currentBuffer = bufferPool.allocateBuffer(bytesPerChecksum);
currentBuffer = bufferPool.allocateBuffer(config.getBytesPerChecksum());
currentBufferRemaining = currentBuffer.remaining();
}
}
Expand All @@ -300,7 +291,7 @@ private void updateFlushLength() {
}

private boolean isBufferPoolFull() {
return bufferPool.computeBufferData() == streamBufferMaxSize;
return bufferPool.computeBufferData() == config.getStreamBufferMaxSize();
}

/**
Expand All @@ -318,7 +309,7 @@ public void writeOnRetry(long len) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Retrying write length {} for blockID {}", len, blockID);
}
Preconditions.checkArgument(len <= streamBufferMaxSize);
Preconditions.checkArgument(len <= config.getStreamBufferMaxSize());
int count = 0;
while (len > 0) {
ChunkBuffer buffer = bufferPool.getBuffer(count);
Expand All @@ -334,13 +325,13 @@ public void writeOnRetry(long len) throws IOException {
// the buffer. We should just validate
// if we wrote data of size streamBufferMaxSize/streamBufferFlushSize to
// call for handling full buffer/flush buffer condition.
if (writtenDataLength % streamBufferFlushSize == 0) {
if (writtenDataLength % config.getStreamBufferFlushSize() == 0) {
// reset the position to zero as now we will be reading the
// next buffer in the list
updateFlushLength();
executePutBlock(false, false);
}
if (writtenDataLength == streamBufferMaxSize) {
if (writtenDataLength == config.getStreamBufferMaxSize()) {
handleFullBuffer();
}
}
Expand Down Expand Up @@ -486,8 +477,9 @@ ContainerCommandResponseProto> executePutBlock(boolean close,
public void flush() throws IOException {
if (xceiverClientFactory != null && xceiverClient != null
&& bufferPool != null && bufferPool.getSize() > 0
&& (!streamBufferFlushDelay ||
writtenDataLength - totalDataFlushedLength >= streamBufferSize)) {
&& (!config.isStreamBufferFlushDelay() ||
writtenDataLength - totalDataFlushedLength
>= config.getStreamBufferSize())) {
try {
handleFlush(false);
} catch (ExecutionException e) {
Expand Down
Loading