Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
d4bf89a
HDDS-5366. [Ozone-Streaming] Implement stream method to ContainerSta…
captainzmc Jun 23, 2021
92a8081
HDDS-5452. Add link method to ContainerStateMachine for Ratis streami…
kaijchen Jul 18, 2021
28d3035
HDDS-5481. Fix stream() and link() method in ContainerStateMachine. (…
kaijchen Jul 22, 2021
c69955e
HDDS-5480. [Ozone-Streaming] Client and server should support stream …
captainzmc Jul 28, 2021
e6be97e
HDDS-5488. [Ozone-Streaming] Add a new BlockOutputStream/KeyOutputStr…
kaijchen Aug 12, 2021
db89077
HDDS-5599. [Ozone-Streaming]drop BufferPool and ChunkBuffer to avoid…
captainzmc Aug 25, 2021
8853c8a
HDDS-5705. [Ozone-Streaming] Change ByteBufStreamOutput to ByteBuffer…
kaijchen Sep 8, 2021
72bc750
HDDS-5742. Avoid unnecessary Bytebuffer conversions (#2673)
captainzmc Sep 23, 2021
8124a1c
HDDS-5486. [Ozone-Streaming] Streaming supports writing in Pipline mo…
captainzmc Sep 30, 2021
a6968d5
HDDS-5849. [Ozone-Streaming]Write exceptions occur after checksum is …
captainzmc Oct 12, 2021
3958708
HDDS-5674.[Ozone-Streaming] Handle client retries on exception (#2701)
sadanand48 Oct 21, 2021
483eae8
HDDS-5895. [Ozone-Streaming] Make raft.server.data-stream.client.pool…
captainzmc Oct 26, 2021
e4a0f00
HDDS-5763. Provide an Executor for each LocalStream in ContainerState…
szetszwo Nov 1, 2021
029055b
HDDS-5987. [Ozone-Streaming] Add XceiverClientRatis stream config (#2…
guohao-rosicky Nov 15, 2021
7290ff9
HDDS-5961. [Ozone-Streaming] update the usage space of Containers in …
guohao-rosicky Nov 17, 2021
2054fe4
HDDS-5879. [Ozone-Streaming] OzoneBucket add the createMultipartStrea…
guohao-rosicky Nov 19, 2021
d81ca2a
HDDS-5743. [Ozone-Streaming] Add option to write files via streaming …
sadanand48 Nov 19, 2021
9d5ccb3
HDDS-5851. [Ozone-Streaming] Define a PutBlock/maxBuffer fixed bounda…
sadanand48 Dec 1, 2021
28fdecd
HDDS-6039. Define a minimum packet size during streaming writes. (#2883)
sadanand48 Dec 21, 2021
58da69f
HDDS-6130. [Ozone-Streaming] When releaseBuffers will get “Couldn 't…
captainzmc Dec 23, 2021
156cd46
HDDS-6139. [Ozone-Streaming] Fix incorrect computation of totalAckDat…
sadanand48 Jan 13, 2022
ddf9f84
HDDS-6178. [Ozone-Streaming] Fix NPE in HDDS-6139. (#2984)
sadanand48 Jan 14, 2022
dd62684
HDDS-6281. Update ratis version to 2.3.0-94db58b-SNAPSHOT version (#3…
guohao-rosicky Feb 10, 2022
340f959
HDDS-6138.[Ozone-Streaming] Define a limit on the size of the retry b…
sadanand48 Feb 10, 2022
20f1682
HDDS-6298. Add XceiverServerRatis stream config (#3070)
guohao-rosicky Feb 11, 2022
9c6904d
HDDS-5487. [Ozone-Streaming] BlockDataStreamOutput support FlushDelay…
captainzmc Feb 14, 2022
fce0dfa
HDDS-6282. Fix BlockDataStreamOutput#doFlushIfNeeded NPE (#3060)
guohao-rosicky Feb 15, 2022
4ce474c
HDDS-6229. [Ozone-Streaming] Data Channel abstraction on datanode (#3…
guohao-rosicky Feb 15, 2022
328c389
HDDS-6355. [Ozone-Streaming] Fix CheckStyle problem (#3119)
guohao-rosicky Feb 21, 2022
1f097e1
HDDS-6388. [Ozone-Streaming] Streaming write support both pipeline mo…
captainzmc Mar 2, 2022
b42829c
HDDS-5798. [Ozone-Streaming] Setup TlsConf parameters. (#3207)
szetszwo Mar 27, 2022
06bc968
HDDS-6137. [Ozone-Streaming] Refactor KeyDataStreamOutput. (#3195)
guohao-rosicky Mar 28, 2022
d0407c1
HDDS-6500. [Ozone-Streaming] Buffer the PutBlockRequest at the end of…
szetszwo Mar 28, 2022
d1b7a92
HDDS-5666. Add option to createKey via streaming api in Freon (#2574)
sadanand48 Apr 8, 2022
a7eb40e
HDDS-6592. [Ozone-Streaming] Fix ContainerStateMachine#applyTransacti…
guohao-rosicky Apr 18, 2022
317053d
HDDS-6842. [Ozone-Streaming] Reduce the number of watch requests in S…
szetszwo Jun 10, 2022
b3a6cfc
HDDS-6867. [Ozone-Streaming] PutKeyHandler should not use streaming …
captainzmc Jun 15, 2022
6319ce8
HDDS-6955. [Ozone-streaming] Add explicit stream flag in ozone shell …
kaijchen Jul 12, 2022
53af908
HDDS-7431. [Ozone-Streaming] Disable data steam by default. (#3900)
szetszwo Oct 28, 2022
cfdf9cd
HDDS-7438. [Ozone-Streaming] Add a createStreamKey method to OzoneBuc…
szetszwo Oct 31, 2022
7fc85ee
HDDS-7425. Add documentation for the new Streaming Pipeline feature. …
szetszwo Nov 9, 2022
c17be70
HDDS-7478. [Ozone-Streaming] NPE in when creating a file with o3fs. (…
szetszwo Nov 11, 2022
0400358
HDDS-7426. Add a new acceptance test for Streaming Pipeline.
szetszwo Nov 22, 2022
eb775c2
Enable Streaming in ozonesecure.
szetszwo Nov 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,37 @@ public enum ChecksumCombineMode {
tags = ConfigTag.CLIENT)
private int streamBufferSize = 4 * 1024 * 1024;

@Config(key = "datastream.buffer.flush.size",
defaultValue = "16MB",
type = ConfigType.SIZE,
description = "The boundary at which putBlock is executed",
tags = ConfigTag.CLIENT)
private long dataStreamBufferFlushSize = 16 * 1024 * 1024;

@Config(key = "datastream.min.packet.size",
defaultValue = "1MB",
type = ConfigType.SIZE,
description = "The maximum size of the ByteBuffer "
+ "(used via ratis streaming)",
tags = ConfigTag.CLIENT)
private int dataStreamMinPacketSize = 1024 * 1024;

@Config(key = "datastream.window.size",
defaultValue = "64MB",
type = ConfigType.SIZE,
description = "Maximum size of BufferList(used for retry) size per " +
"BlockDataStreamOutput instance",
tags = ConfigTag.CLIENT)
private long streamWindowSize = 64 * 1024 * 1024;

@Config(key = "datastream.pipeline.mode",
defaultValue = "true",
description = "Streaming write support both pipeline mode(datanode1->" +
"datanode2->datanode3) and star mode(datanode1->datanode2, " +
"datanode1->datanode3). By default we use pipeline mode.",
tags = ConfigTag.CLIENT)
private boolean datastreamPipelineMode = true;

@Config(key = "stream.buffer.increment",
defaultValue = "0B",
type = ConfigType.SIZE,
Expand Down Expand Up @@ -244,6 +275,22 @@ public void setStreamBufferMaxSize(long streamBufferMaxSize) {
this.streamBufferMaxSize = streamBufferMaxSize;
}

public int getDataStreamMinPacketSize() {
return dataStreamMinPacketSize;
}

public void setDataStreamMinPacketSize(int dataStreamMinPacketSize) {
this.dataStreamMinPacketSize = dataStreamMinPacketSize;
}

public long getStreamWindowSize() {
return streamWindowSize;
}

public void setStreamWindowSize(long streamWindowSize) {
this.streamWindowSize = streamWindowSize;
}

public int getMaxRetryCount() {
return maxRetryCount;
}
Expand Down Expand Up @@ -296,6 +343,14 @@ public int getBufferIncrement() {
return bufferIncrement;
}

public long getDataStreamBufferFlushSize() {
return dataStreamBufferFlushSize;
}

public void setDataStreamBufferFlushSize(long dataStreamBufferFlushSize) {
this.dataStreamBufferFlushSize = dataStreamBufferFlushSize;
}

public ChecksumCombineMode getChecksumCombineMode() {
try {
return ChecksumCombineMode.valueOf(checksumCombineMode);
Expand Down Expand Up @@ -325,4 +380,12 @@ public void setFsDefaultBucketLayout(String bucketLayout) {
public String getFsDefaultBucketLayout() {
return fsDefaultBucketLayout;
}

public boolean isDatastreamPipelineMode() {
return datastreamPipelineMode;
}

public void setDatastreamPipelineMode(boolean datastreamPipelineMode) {
this.datastreamPipelineMode = datastreamPipelineMode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.api.DataStreamApi;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
Expand Down Expand Up @@ -135,7 +136,7 @@ private long updateCommitInfosMap(RaftClientReply reply) {
.orElse(0L);
}

private long updateCommitInfosMap(
public long updateCommitInfosMap(
Collection<RaftProtos.CommitInfoProto> commitInfoProtos) {
// if the commitInfo map is empty, just update the commit indexes for each
// of the servers
Expand Down Expand Up @@ -382,4 +383,8 @@ public XceiverClientReply sendCommandAsync(
throw new UnsupportedOperationException(
"Operation Not supported for ratis client");
}

public DataStreamApi getDataStreamApi() {
return this.getClient().getDataStreamApi();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.scm.storage;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Map;
import java.util.Objects;

/**
* This class is used for error handling methods.
*/
public abstract class AbstractDataStreamOutput
implements ByteBufferStreamOutput {

private final Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap;
private int retryCount;
private boolean isException;

protected AbstractDataStreamOutput(
Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap) {
this.retryPolicyMap = retryPolicyMap;
this.isException = false;
this.retryCount = 0;
}

@VisibleForTesting
public int getRetryCount() {
return retryCount;
}

protected void resetRetryCount() {
retryCount = 0;
}

protected boolean isException() {
return isException;
}

/**
* Checks if the provided exception signifies retry failure in ratis client.
* In case of retry failure, ratis client throws RaftRetryFailureException
* and all succeeding operations are failed with AlreadyClosedException.
*/
protected boolean checkForRetryFailure(Throwable t) {
return t instanceof RaftRetryFailureException
|| t instanceof AlreadyClosedException;
}

// Every container specific exception from datatnode will be seen as
// StorageContainerException
protected boolean checkIfContainerToExclude(Throwable t) {
return t instanceof StorageContainerException;
}

protected void setExceptionAndThrow(IOException ioe) throws IOException {
isException = true;
throw ioe;
}

protected void handleRetry(IOException exception) throws IOException {
RetryPolicy retryPolicy = retryPolicyMap
.get(HddsClientUtils.checkForException(exception).getClass());
if (retryPolicy == null) {
retryPolicy = retryPolicyMap.get(Exception.class);
}
handleRetry(exception, retryPolicy);
}

protected void handleRetry(IOException exception, RetryPolicy retryPolicy)
throws IOException {
RetryPolicy.RetryAction action = null;
try {
action = retryPolicy.shouldRetry(exception, retryCount, 0, true);
} catch (Exception e) {
setExceptionAndThrow(new IOException(e));
}
if (action != null &&
action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
String msg = "";
if (action.reason != null) {
msg = "Retry request failed. " + action.reason;
}
setExceptionAndThrow(new IOException(msg, exception));
}

// Throw the exception if the thread is interrupted
if (Thread.currentThread().isInterrupted()) {
setExceptionAndThrow(exception);
}
Objects.requireNonNull(action);
Preconditions.checkArgument(
action.action == RetryPolicy.RetryAction.RetryDecision.RETRY);
if (action.delayMillis > 0) {
try {
Thread.sleep(action.delayMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
IOException ioe = (IOException) new InterruptedIOException(
"Interrupted: action=" + action + ", retry policy=" + retryPolicy)
.initCause(e);
setExceptionAndThrow(ioe);
}
}
retryCount++;
}
}
Loading