-
Notifications
You must be signed in to change notification settings - Fork 593
HDDS-6137. [Ozone-Streaming] Refactor KeyDataStreamOutput. #3195
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HDDS-6137. [Ozone-Streaming] Refactor KeyDataStreamOutput. #3195
Conversation
942e87f to
b1df777
Compare
captainzmc
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @guohao-rosicky Improve this. LGTM over all. TestBlockDataStreamOutput has some error, can you confirm?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guohao-rosicky, Writing an empty buffer may not work as expected. In the streaming model, data is continuous but not block-by-block. Both sender and receiver must consider that the data is a stream of bytes.
For example, the sender may write 3 times with lengths 3, 4, 5. Since there could be buffers/encoder/decoder in between, the receiver may receive a single buffer with length 12.
Let me think about how to make it work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your comment @szetszwo . I wrote a piece of pseudo-code, Ideas about the modification of org.apache.ratis.netty.server.DataStreamManagement#writeTo.
What do you think of this idea? There is no need to send this empty buffer rpc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guohao-rosicky , this is a standard problem in streaming. The standard solution is to write the length with the data. Let's don't change the API for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @szetszwo
I have a question, about
For example, the sender may write 3 times with lengths 3, 4, 5. Since there could be buffers/encoder/decoder in between, the receiver may receive a single buffer with length 12.
I noticed that in HDFS, marking the end of stream is also represented by an empty package.
In Ratis Stream, sticky packets have already been dealt with.
org.apache.ratis.netty.NettyDataStreamUtils#encodeDataStreamRequestHeader
https://github.com/apache/ratis/blob/master/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java#L90
@captainzmc tested the code from some time ago and did not find this problem
|
@guohao-rosicky , the idea is to write the proto length at the end as shown below; see also https://issues.apache.org/jira/secure/attachment/13041230/3195_review.patch |
38b64ff to
35b89e5
Compare
Thanks @szetszwo, Patch has been applied |
szetszwo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guohao-rosicky , please fix the checkstyle warnings and test the change manually. I have not tested it. Thanks.
ba84300 to
c2e7646
Compare
|
Thanks @guohao-rosicky @szetszwo update this. I used this PR test and found that the file failed to write. In DataNode IndexOutOfBoundsException will appear. |
|
@captainzmc , thanks. Let me test the serialization/deserialization code. |
|
Found the bug: ByteBu.nioBuffers(index, length) somehow is not working. Let me fix it. |
Thanks @szetszwo for the update. Using new patch I found IndexOutOfBoundsException still appear. |
c2e7646 to
cc72e81
Compare
|
@captainzmc , thanks a lot for testing it! I added one more test for the serialization and buffers. It worked fine. See https://issues.apache.org/jira/secure/attachment/13041275/3195_testBuffers.patch
It seems there are some synchronization problems. Let me check. |
|
@captainzmc , could you post the test code? |
|
@szetszwo Here is my datanode log. https://issues.apache.org/jira/secure/attachment/13041276/ozone-root-datanode-9-29-173-57.log |
|
@captainzmc , @guohao-rosicky , just found a bug in BlockDataStreamOutput. Could you test https://issues.apache.org/jira/secure/attachment/13041279/3195_bugfix.patch to see if it can fix the bug? |
Thanks @szetszwo for update this. I had just test this patch, we still get same error. Just as the same as this. |
cc72e81 to
539eea6
Compare
szetszwo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add an abstract class instead static methods. Then, KeyDataStreamOutput can extends it. If we need to add SmallFileDataStreamOutput, it can extends it too.
package org.apache.hadoop.ozone.client.io;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
public abstract class AbstractDataStreamOutput implements ByteBufferStreamOutput {
private final Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap;
private int retryCount;
private boolean isException;
protected AbstractDataStreamOutput() {
this.retryPolicyMap = HddsClientUtils.getExceptionList()
.stream()
.collect(Collectors.toMap(Function.identity(),
e -> RetryPolicies.TRY_ONCE_THEN_FAIL));
}
@VisibleForTesting
public int getRetryCount() {
return retryCount;
}
void resetRetryCount() {
retryCount = 0;
}
boolean isException() {
return isException;
}
/**
* Checks if the provided exception signifies retry failure in ratis client.
* In case of retry failure, ratis client throws RaftRetryFailureException
* and all succeeding operations are failed with AlreadyClosedException.
*/
boolean checkForRetryFailure(Throwable t) {
return t instanceof RaftRetryFailureException
|| t instanceof AlreadyClosedException;
}
// Every container specific exception from datatnode will be seen as
// StorageContainerException
boolean checkIfContainerToExclude(Throwable t) {
return t instanceof StorageContainerException;
}
private void setExceptionAndThrow(IOException ioe) throws IOException {
isException = true;
throw ioe;
}
void handleRetry(IOException exception) throws IOException {
RetryPolicy retryPolicy = retryPolicyMap
.get(HddsClientUtils.checkForException(exception).getClass());
if (retryPolicy == null) {
retryPolicy = retryPolicyMap.get(Exception.class);
}
handleRetry(exception, retryPolicy);
}
private void handleRetry(IOException exception, RetryPolicy retryPolicy)
throws IOException {
RetryPolicy.RetryAction action = null;
try {
action = retryPolicy.shouldRetry(exception, retryCount, 0, true);
} catch (Exception e) {
setExceptionAndThrow(new IOException(e));
}
if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
String msg = "";
if (action.reason != null) {
msg = "Retry request failed. " + action.reason;
//LOG.error(msg, exception);
}
setExceptionAndThrow(new IOException(msg, exception));
}
// Throw the exception if the thread is interrupted
if (Thread.currentThread().isInterrupted()) {
//LOG.warn("Interrupted while trying for retry");
setExceptionAndThrow(exception);
}
Preconditions.checkArgument(
action.action == RetryPolicy.RetryAction.RetryDecision.RETRY);
if (action.delayMillis > 0) {
try {
Thread.sleep(action.delayMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
IOException ioe = (IOException) new InterruptedIOException(
"Interrupted: action=" + action + ", retry policy=" + retryPolicy)
.initCause(e);
setExceptionAndThrow(ioe);
}
}
retryCount++;
}
}
|
Ok, I'll change it |
7c6bf0d to
65ce44e
Compare
|
Has been modified. @szetszwo Please take a look.Thanks. |
|
@guohao-rosicky , please review it yourself -- go through the change and remove the unused code (e.g. ConsumerWithIOException). Thanks. |
c630ed9 to
d0cab2b
Compare
szetszwo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guohao-rosicky , thanks for the update. Some comments inlined.
| }); | ||
| } | ||
|
|
||
| private CompletableFuture<ContainerCommandResponseProto> runCommandAsync( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
runCommandAsync is not used. Let's add it later.
| LOG.error( | ||
| "Get stream data channel error Malformed request " + | ||
| "containerID: {} msg: {}", | ||
| container.getContainerData().getContainerID(), msg); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove LOG.error(..) since it already throws an exception.
| ContainerCommandRequestProto request, KeyValueContainer kvContainer, | ||
| DispatcherContext dispatcherContext) { | ||
| if (!request.hasWriteChunk()) { | ||
| BlockID blockID; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add final.
| * A constructor for testing purpose only. | ||
| */ | ||
| @VisibleForTesting | ||
| public KeyDataStreamOutput() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is not used anywhere. Let's remove it.
| this.retryPolicyMap = HddsClientUtils.getRetryPolicyByException( | ||
| config.getMaxRetryCount(), config.getRetryInterval()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should pass the retryPolicyMap to the super constructor.
| super(HddsClientUtils.getExceptionList() | ||
| .stream() | ||
| .collect(Collectors.toMap(Function.identity(), | ||
| e -> RetryPolicies.TRY_ONCE_THEN_FAIL))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The map is passed incorrectly. Below is the original map.
- this.retryPolicyMap = HddsClientUtils.getRetryPolicyByException(
- config.getMaxRetryCount(), config.getRetryInterval());
captainzmc
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 Thanks @guohao-rosicky. The change looks good.
szetszwo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 the change looks good.
(cherry picked from commit 06bc968)



What changes were proposed in this pull request?
Optimized the putBlock method of BlockDataStreamOutput
2.Mark the stream data boundary by sending an empty stream packet
What is the link to the Apache JIRA
https://issues.apache.org/jira/browse/HDDS-6137
How was this patch tested?
Use the existing UT