Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
22f977a
HDDS-5366. [Ozone-Streaming] Implement stream method to ContainerSta…
captainzmc Jun 23, 2021
0aabbc0
HDDS-5452. Add link method to ContainerStateMachine for Ratis streami…
kaijchen Jul 18, 2021
82af9d7
HDDS-5481. Fix stream() and link() method in ContainerStateMachine. (…
kaijchen Jul 22, 2021
decaa57
HDDS-5480. [Ozone-Streaming] Client and server should support stream …
captainzmc Jul 28, 2021
0634daf
HDDS-5488. [Ozone-Streaming] Add a new BlockOutputStream/KeyOutputStr…
kaijchen Aug 12, 2021
c49edfe
HDDS-5599. [Ozone-Streaming]drop BufferPool and ChunkBuffer to avoid…
captainzmc Aug 25, 2021
50e96dd
HDDS-5705. [Ozone-Streaming] Change ByteBufStreamOutput to ByteBuffer…
kaijchen Sep 8, 2021
7f503b9
HDDS-5742. Avoid unnecessary Bytebuffer conversions (#2673)
captainzmc Sep 23, 2021
aba2b3b
HDDS-5486. [Ozone-Streaming] Streaming supports writing in Pipline mo…
captainzmc Sep 30, 2021
0ee0c63
HDDS-5849. [Ozone-Streaming]Write exceptions occur after checksum is …
captainzmc Oct 12, 2021
eede797
HDDS-5674.[Ozone-Streaming] Handle client retries on exception (#2701)
sadanand48 Oct 21, 2021
786d09a
HDDS-5895. [Ozone-Streaming] Make raft.server.data-stream.client.pool…
captainzmc Oct 26, 2021
253ee38
HDDS-5763. Provide an Executor for each LocalStream in ContainerState…
szetszwo Nov 1, 2021
d86e5fe
HDDS-5987. [Ozone-Streaming] Add XceiverClientRatis stream config (#2…
guohao-rosicky Nov 15, 2021
06dbec5
HDDS-5961. [Ozone-Streaming] update the usage space of Containers in …
guohao-rosicky Nov 17, 2021
b6f5244
HDDS-5879. [Ozone-Streaming] OzoneBucket add the createMultipartStrea…
guohao-rosicky Nov 19, 2021
9491324
HDDS-5743. [Ozone-Streaming] Add option to write files via streaming …
sadanand48 Nov 19, 2021
d75ea44
HDDS-5851. [Ozone-Streaming] Define a PutBlock/maxBuffer fixed bounda…
sadanand48 Dec 1, 2021
4e21bdf
HDDS-6039. Define a minimum packet size during streaming writes. (#2883)
sadanand48 Dec 21, 2021
6ab6d0d
HDDS-6130. [Ozone-Streaming] When releaseBuffers will get “Couldn 't…
captainzmc Dec 23, 2021
fbeda1c
HDDS-6139. [Ozone-Streaming] Fix incorrect computation of totalAckDat…
sadanand48 Jan 13, 2022
1792363
HDDS-6178. [Ozone-Streaming] Fix NPE in HDDS-6139. (#2984)
sadanand48 Jan 14, 2022
e8257f9
HDDS-6281. Update ratis version to 2.3.0-94db58b-SNAPSHOT version (#3…
guohao-rosicky Feb 10, 2022
4f9265e
HDDS-6138.[Ozone-Streaming] Define a limit on the size of the retry b…
sadanand48 Feb 10, 2022
5dfdbcb
HDDS-6298. Add XceiverServerRatis stream config (#3070)
guohao-rosicky Feb 11, 2022
9134b73
HDDS-5487. [Ozone-Streaming] BlockDataStreamOutput support FlushDelay…
captainzmc Feb 14, 2022
45d8f51
HDDS-6282. Fix BlockDataStreamOutput#doFlushIfNeeded NPE (#3060)
guohao-rosicky Feb 15, 2022
4dcc108
HDDS-6229. [Ozone-Streaming] Data Channel abstraction on datanode (#3…
guohao-rosicky Feb 15, 2022
bb39d10
HDDS-6355. [Ozone-Streaming] Fix CheckStyle problem (#3119)
guohao-rosicky Feb 21, 2022
b6f1921
HDDS-6388. [Ozone-Streaming] Streaming write support both pipeline mo…
captainzmc Mar 2, 2022
eaef7ea
HDDS-6461. Update Ratis version to 2.3.0-da5d868-SNAPSHOT. (#3205)
szetszwo Mar 17, 2022
76096cf
HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.
guohao-rosicky Mar 17, 2022
69edff5
fix bug
guohao-rosicky Mar 18, 2022
005ec01
trigger new CI
guohao-rosicky Mar 18, 2022
1bba3a8
trigger new CI
guohao-rosicky Mar 21, 2022
97ddf53
trigger new CI
guohao-rosicky Mar 21, 2022
37accc0
code review
guohao-rosicky Mar 24, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,37 @@ public enum ChecksumCombineMode {
tags = ConfigTag.CLIENT)
private int streamBufferSize = 4 * 1024 * 1024;

@Config(key = "datastream.buffer.flush.size",
defaultValue = "16MB",
type = ConfigType.SIZE,
description = "The boundary at which putBlock is executed",
tags = ConfigTag.CLIENT)
private long dataStreamBufferFlushSize = 16 * 1024 * 1024;

@Config(key = "datastream.min.packet.size",
defaultValue = "1MB",
type = ConfigType.SIZE,
description = "The maximum size of the ByteBuffer "
+ "(used via ratis streaming)",
tags = ConfigTag.CLIENT)
private int dataStreamMinPacketSize = 1024 * 1024;

@Config(key = "datastream.window.size",
defaultValue = "64MB",
type = ConfigType.SIZE,
description = "Maximum size of BufferList(used for retry) size per " +
"BlockDataStreamOutput instance",
tags = ConfigTag.CLIENT)
private long streamWindowSize = 64 * 1024 * 1024;

@Config(key = "datastream.pipeline.mode",
defaultValue = "true",
description = "Streaming write support both pipeline mode(datanode1->" +
"datanode2->datanode3) and star mode(datanode1->datanode2, " +
"datanode1->datanode3). By default we use pipeline mode.",
tags = ConfigTag.CLIENT)
private boolean datastreamPipelineMode = true;

@Config(key = "stream.buffer.increment",
defaultValue = "0B",
type = ConfigType.SIZE,
Expand Down Expand Up @@ -210,6 +241,22 @@ public void setStreamBufferMaxSize(long streamBufferMaxSize) {
this.streamBufferMaxSize = streamBufferMaxSize;
}

public int getDataStreamMinPacketSize() {
return dataStreamMinPacketSize;
}

public void setDataStreamMinPacketSize(int dataStreamMinPacketSize) {
this.dataStreamMinPacketSize = dataStreamMinPacketSize;
}

public long getStreamWindowSize() {
return streamWindowSize;
}

public void setStreamWindowSize(long streamWindowSize) {
this.streamWindowSize = streamWindowSize;
}

public int getMaxRetryCount() {
return maxRetryCount;
}
Expand Down Expand Up @@ -254,6 +301,14 @@ public int getBufferIncrement() {
return bufferIncrement;
}

public long getDataStreamBufferFlushSize() {
return dataStreamBufferFlushSize;
}

public void setDataStreamBufferFlushSize(long dataStreamBufferFlushSize) {
this.dataStreamBufferFlushSize = dataStreamBufferFlushSize;
}

public ChecksumCombineMode getChecksumCombineMode() {
try {
return ChecksumCombineMode.valueOf(checksumCombineMode);
Expand All @@ -265,4 +320,12 @@ public ChecksumCombineMode getChecksumCombineMode() {
ChecksumCombineMode.COMPOSITE_CRC.name());
}
}

public boolean isDatastreamPipelineMode() {
return datastreamPipelineMode;
}

public void setDatastreamPipelineMode(boolean datastreamPipelineMode) {
this.datastreamPipelineMode = datastreamPipelineMode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.api.DataStreamApi;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.exceptions.GroupMismatchException;
Expand Down Expand Up @@ -121,7 +122,7 @@ private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
this.ozoneConfiguration = configuration;
}

private void updateCommitInfosMap(
public void updateCommitInfosMap(
Collection<RaftProtos.CommitInfoProto> commitInfoProtos) {
// if the commitInfo map is empty, just update the commit indexes for each
// of the servers
Expand Down Expand Up @@ -359,4 +360,8 @@ public XceiverClientReply sendCommandAsync(
throw new UnsupportedOperationException(
"Operation Not supported for ratis client");
}

public DataStreamApi getDataStreamApi() {
return this.getClient().getDataStreamApi();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hadoop.hdds.scm.client;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.text.ParseException;
import java.time.Instant;
import java.time.ZoneId;
Expand All @@ -28,10 +30,12 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.annotation.InterfaceStability;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.function.ConsumerWithIOException;
import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.io.retry.RetryPolicies;
Expand Down Expand Up @@ -271,6 +275,22 @@ public static Throwable checkForException(Exception e) {
return t;
}

/**
* Checks if the provided exception signifies retry failure in ratis client.
* In case of retry failure, ratis client throws RaftRetryFailureException
* and all succeeding operations are failed with AlreadyClosedException.
*/
public static boolean checkForRetryFailure(Throwable t) {
return t instanceof RaftRetryFailureException
|| t instanceof AlreadyClosedException;
}

// Every container specific exception from datatnode will be seen as
// StorageContainerException
public static boolean checkIfContainerToExclude(Throwable t) {
return t instanceof StorageContainerException;
}

public static RetryPolicy createRetryPolicy(int maxRetryCount,
long retryInterval) {
// retry with fixed sleep between retries
Expand Down Expand Up @@ -298,6 +318,54 @@ RetryPolicy> getRetryPolicyByException(int maxRetryCount,
return policyMap;
}

public static void streamRetryHandle(
IOException exception,
Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap,
AtomicInteger retryCount,
ConsumerWithIOException<IOException> setExceptionAndThrow)
throws IOException {
RetryPolicy retryPolicy = retryPolicyMap
.get(HddsClientUtils.checkForException(exception).getClass());
if (retryPolicy == null) {
retryPolicy = retryPolicyMap.get(Exception.class);
}
RetryPolicy.RetryAction action = null;
try {
action = retryPolicy.shouldRetry(exception, retryCount.get(), 0, true);
} catch (Exception e) {
setExceptionAndThrow.accept(new IOException(e));
}
if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
String msg = "";
if (action.reason != null) {
msg = "Retry request failed. " + action.reason;
//LOG.error(msg, exception);
}
setExceptionAndThrow.accept(new IOException(msg, exception));
}

// Throw the exception if the thread is interrupted
if (Thread.currentThread().isInterrupted()) {
//LOG.warn("Interrupted while trying for retry");
setExceptionAndThrow.accept(exception);
}
Preconditions.checkArgument(
action.action == RetryPolicy.RetryAction.RetryDecision.RETRY);
if (action.delayMillis > 0) {
try {
Thread.sleep(action.delayMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
IOException ioe = (IOException) new InterruptedIOException(
"Interrupted: action=" + action + ", retry policy=" + retryPolicy)
.initCause(e);
setExceptionAndThrow.accept(ioe);
}
}
retryCount.incrementAndGet();
}


public static List<Class<? extends Exception>> getExceptionList() {
return EXCEPTION_LIST;
}
Expand Down
Loading