Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
f73ee65
HDDS-5366. [Ozone-Streaming] Implement stream method to ContainerSta…
captainzmc Jun 23, 2021
117771b
HDDS-5452. Add link method to ContainerStateMachine for Ratis streami…
kaijchen Jul 18, 2021
0eee890
HDDS-5481. Fix stream() and link() method in ContainerStateMachine. (…
kaijchen Jul 22, 2021
4c93359
HDDS-5480. [Ozone-Streaming] Client and server should support stream …
captainzmc Jul 28, 2021
84407f0
HDDS-5488. [Ozone-Streaming] Add a new BlockOutputStream/KeyOutputStr…
kaijchen Aug 12, 2021
948e345
HDDS-5599. [Ozone-Streaming]drop BufferPool and ChunkBuffer to avoid…
captainzmc Aug 25, 2021
f7e5990
HDDS-5705. [Ozone-Streaming] Change ByteBufStreamOutput to ByteBuffer…
kaijchen Sep 8, 2021
796ef0e
HDDS-5742. Avoid unnecessary Bytebuffer conversions (#2673)
captainzmc Sep 23, 2021
55dab06
HDDS-5486. [Ozone-Streaming] Streaming supports writing in Pipline mo…
captainzmc Sep 30, 2021
6ef1b39
HDDS-5849. [Ozone-Streaming]Write exceptions occur after checksum is …
captainzmc Oct 12, 2021
801b882
HDDS-5674.[Ozone-Streaming] Handle client retries on exception (#2701)
sadanand48 Oct 21, 2021
1cb2e21
HDDS-5895. [Ozone-Streaming] Make raft.server.data-stream.client.pool…
captainzmc Oct 26, 2021
12cd5d1
HDDS-5763. Provide an Executor for each LocalStream in ContainerState…
szetszwo Nov 1, 2021
e5f3bb2
HDDS-5987. [Ozone-Streaming] Add XceiverClientRatis stream config (#2…
guohao-rosicky Nov 15, 2021
e350e01
HDDS-5961. [Ozone-Streaming] update the usage space of Containers in …
guohao-rosicky Nov 17, 2021
085bbf1
HDDS-5879. [Ozone-Streaming] OzoneBucket add the createMultipartStrea…
guohao-rosicky Nov 19, 2021
657915b
HDDS-5743. [Ozone-Streaming] Add option to write files via streaming …
sadanand48 Nov 19, 2021
5804f22
HDDS-5851. [Ozone-Streaming] Define a PutBlock/maxBuffer fixed bounda…
sadanand48 Dec 1, 2021
d2a51a0
HDDS-6039. Define a minimum packet size during streaming writes. (#2883)
sadanand48 Dec 21, 2021
a794067
HDDS-6130. [Ozone-Streaming] When releaseBuffers will get “Couldn 't…
captainzmc Dec 23, 2021
7da72fd
HDDS-6139. [Ozone-Streaming] Fix incorrect computation of totalAckDat…
sadanand48 Jan 13, 2022
fec03b7
HDDS-6178. [Ozone-Streaming] Fix NPE in HDDS-6139. (#2984)
sadanand48 Jan 14, 2022
918cec5
HDDS-6281. Update ratis version to 2.3.0-94db58b-SNAPSHOT version (#3…
guohao-rosicky Feb 10, 2022
32598ca
HDDS-6138.[Ozone-Streaming] Define a limit on the size of the retry b…
sadanand48 Feb 10, 2022
630ea3a
HDDS-6298. Add XceiverServerRatis stream config (#3070)
guohao-rosicky Feb 11, 2022
7caa82c
HDDS-5487. [Ozone-Streaming] BlockDataStreamOutput support FlushDelay…
captainzmc Feb 14, 2022
1ae8d86
HDDS-6282. Fix BlockDataStreamOutput#doFlushIfNeeded NPE (#3060)
guohao-rosicky Feb 15, 2022
d82f075
HDDS-6229. [Ozone-Streaming] Data Channel abstraction on datanode (#3…
guohao-rosicky Feb 15, 2022
1fab6bb
HDDS-6355. [Ozone-Streaming] Fix CheckStyle problem (#3119)
guohao-rosicky Feb 21, 2022
fde3681
HDDS-6388. [Ozone-Streaming] Streaming write support both pipeline mo…
captainzmc Mar 2, 2022
d35335d
HDDS-6461. Update Ratis version to 2.3.0-da5d868-SNAPSHOT. (#3205)
szetszwo Mar 17, 2022
ba69a4a
HDDS-5798. [Ozone-Streaming] Setup TlsConf parameters. (#3207)
szetszwo Mar 27, 2022
6c0f95f
HDDS-6137. [Ozone-Streaming] Refactor KeyDataStreamOutput. (#3195)
guohao-rosicky Mar 28, 2022
96d5eb0
HDDS-6500. [Ozone-Streaming] Buffer the PutBlockRequest at the end of…
szetszwo Mar 28, 2022
1063516
HDDS-5666. Add option to createKey via streaming api in Freon (#2574)
sadanand48 Apr 8, 2022
19b8f37
HDDS-6592. [Ozone-Streaming] Fix ContainerStateMachine#applyTransacti…
guohao-rosicky Apr 18, 2022
cabc517
HDDS-6842. [Ozone-Streaming] Reduce the number of watch requests in S…
szetszwo Jun 10, 2022
a4199f5
HDDS-6867. [Ozone-Streaming] PutKeyHandler should not use streaming …
captainzmc Jun 15, 2022
a6f5c40
HDDS-5869. [Ozone-Streaming] Added support for stream on S3Gateway wr…
guohao-rosicky Jun 27, 2022
98d6c07
trigger new CI
guohao-rosicky Jun 27, 2022
0ff6a76
trigger new CI
guohao-rosicky Jun 27, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,37 @@ public enum ChecksumCombineMode {
tags = ConfigTag.CLIENT)
private int streamBufferSize = 4 * 1024 * 1024;

@Config(key = "datastream.buffer.flush.size",
defaultValue = "16MB",
type = ConfigType.SIZE,
description = "The boundary at which putBlock is executed",
tags = ConfigTag.CLIENT)
private long dataStreamBufferFlushSize = 16 * 1024 * 1024;

@Config(key = "datastream.min.packet.size",
defaultValue = "1MB",
type = ConfigType.SIZE,
description = "The maximum size of the ByteBuffer "
+ "(used via ratis streaming)",
tags = ConfigTag.CLIENT)
private int dataStreamMinPacketSize = 1024 * 1024;

@Config(key = "datastream.window.size",
defaultValue = "64MB",
type = ConfigType.SIZE,
description = "Maximum size of BufferList(used for retry) size per " +
"BlockDataStreamOutput instance",
tags = ConfigTag.CLIENT)
private long streamWindowSize = 64 * 1024 * 1024;

@Config(key = "datastream.pipeline.mode",
defaultValue = "true",
description = "Streaming write support both pipeline mode(datanode1->" +
"datanode2->datanode3) and star mode(datanode1->datanode2, " +
"datanode1->datanode3). By default we use pipeline mode.",
tags = ConfigTag.CLIENT)
private boolean datastreamPipelineMode = true;

@Config(key = "stream.buffer.increment",
defaultValue = "0B",
type = ConfigType.SIZE,
Expand Down Expand Up @@ -236,6 +267,22 @@ public void setStreamBufferMaxSize(long streamBufferMaxSize) {
this.streamBufferMaxSize = streamBufferMaxSize;
}

public int getDataStreamMinPacketSize() {
return dataStreamMinPacketSize;
}

public void setDataStreamMinPacketSize(int dataStreamMinPacketSize) {
this.dataStreamMinPacketSize = dataStreamMinPacketSize;
}

public long getStreamWindowSize() {
return streamWindowSize;
}

public void setStreamWindowSize(long streamWindowSize) {
this.streamWindowSize = streamWindowSize;
}

public int getMaxRetryCount() {
return maxRetryCount;
}
Expand Down Expand Up @@ -288,6 +335,14 @@ public int getBufferIncrement() {
return bufferIncrement;
}

public long getDataStreamBufferFlushSize() {
return dataStreamBufferFlushSize;
}

public void setDataStreamBufferFlushSize(long dataStreamBufferFlushSize) {
this.dataStreamBufferFlushSize = dataStreamBufferFlushSize;
}

public ChecksumCombineMode getChecksumCombineMode() {
try {
return ChecksumCombineMode.valueOf(checksumCombineMode);
Expand All @@ -307,4 +362,12 @@ public void setEcReconstructStripeReadPoolLimit(int poolLimit) {
public int getEcReconstructStripeReadPoolLimit() {
return ecReconstructStripeReadPoolLimit;
}

public boolean isDatastreamPipelineMode() {
return datastreamPipelineMode;
}

public void setDatastreamPipelineMode(boolean datastreamPipelineMode) {
this.datastreamPipelineMode = datastreamPipelineMode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.api.DataStreamApi;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
Expand Down Expand Up @@ -135,7 +136,7 @@ private long updateCommitInfosMap(RaftClientReply reply) {
.orElse(0L);
}

private long updateCommitInfosMap(
public long updateCommitInfosMap(
Collection<RaftProtos.CommitInfoProto> commitInfoProtos) {
// if the commitInfo map is empty, just update the commit indexes for each
// of the servers
Expand Down Expand Up @@ -382,4 +383,8 @@ public XceiverClientReply sendCommandAsync(
throw new UnsupportedOperationException(
"Operation Not supported for ratis client");
}

public DataStreamApi getDataStreamApi() {
return this.getClient().getDataStreamApi();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.scm.storage;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Map;
import java.util.Objects;

/**
* This class is used for error handling methods.
*/
public abstract class AbstractDataStreamOutput
implements ByteBufferStreamOutput {

private final Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap;
private int retryCount;
private boolean isException;

protected AbstractDataStreamOutput(
Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap) {
this.retryPolicyMap = retryPolicyMap;
this.isException = false;
this.retryCount = 0;
}

@VisibleForTesting
public int getRetryCount() {
return retryCount;
}

protected void resetRetryCount() {
retryCount = 0;
}

protected boolean isException() {
return isException;
}

/**
* Checks if the provided exception signifies retry failure in ratis client.
* In case of retry failure, ratis client throws RaftRetryFailureException
* and all succeeding operations are failed with AlreadyClosedException.
*/
protected boolean checkForRetryFailure(Throwable t) {
return t instanceof RaftRetryFailureException
|| t instanceof AlreadyClosedException;
}

// Every container specific exception from datatnode will be seen as
// StorageContainerException
protected boolean checkIfContainerToExclude(Throwable t) {
return t instanceof StorageContainerException;
}

protected void setExceptionAndThrow(IOException ioe) throws IOException {
isException = true;
throw ioe;
}

protected void handleRetry(IOException exception) throws IOException {
RetryPolicy retryPolicy = retryPolicyMap
.get(HddsClientUtils.checkForException(exception).getClass());
if (retryPolicy == null) {
retryPolicy = retryPolicyMap.get(Exception.class);
}
handleRetry(exception, retryPolicy);
}

protected void handleRetry(IOException exception, RetryPolicy retryPolicy)
throws IOException {
RetryPolicy.RetryAction action = null;
try {
action = retryPolicy.shouldRetry(exception, retryCount, 0, true);
} catch (Exception e) {
setExceptionAndThrow(new IOException(e));
}
if (action != null &&
action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
String msg = "";
if (action.reason != null) {
msg = "Retry request failed. " + action.reason;
}
setExceptionAndThrow(new IOException(msg, exception));
}

// Throw the exception if the thread is interrupted
if (Thread.currentThread().isInterrupted()) {
setExceptionAndThrow(exception);
}
Objects.requireNonNull(action);
Preconditions.checkArgument(
action.action == RetryPolicy.RetryAction.RetryDecision.RETRY);
if (action.delayMillis > 0) {
try {
Thread.sleep(action.delayMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
IOException ioe = (IOException) new InterruptedIOException(
"Interrupted: action=" + action + ", retry policy=" + retryPolicy)
.initCause(e);
setExceptionAndThrow(ioe);
}
}
retryCount++;
}
}
Loading