-
Notifications
You must be signed in to change notification settings - Fork 590
HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file. #2860
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file. #2860
Conversation
|
@szetszwo @captainzmc please take a look, PutSmallFile contains writeChunk and putBlock, which is transmitted to Datanode through stream write. There is no applyTransaction that goes through containerStateMachine. Please check my PR,I don't know how to get the BCSID. |
szetszwo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guohao-rosicky , thanks for the update. The change look mostly good. Some comments:
- Let's move the common code KeyValueStreamDataChannel and SmallFileStreamDataChannel to a common base class, say StreamDataChannelBase.
- Please add some tests.
See also https://issues.apache.org/jira/secure/attachment/13036532/2860_review.patch .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move randomAccessFile, file, containerData and metrics and the shared code to a new base class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's call it StreamDataChannel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why changing data to optional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Data in PutSmallFileRequestProto is null in setupStream
ContainerProtos.PutSmallFileRequestProto putSmallFileRequest =
ContainerProtos.PutSmallFileRequestProto.newBuilder()
.setChunkInfo(chunk)
.setBlock(createBlockRequest) // data is empty
.build();
String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
ContainerProtos.ContainerCommandRequestProto.Builder builder =
ContainerProtos.ContainerCommandRequestProto.newBuilder()
.setCmdType(ContainerProtos.Type.StreamInit)
.setContainerID(blockID.get().getContainerID())
.setDatanodeUuid(id)
.setPutSmallFile(putSmallFileRequest);There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this backward compatible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, changing required to optional is backward compatible since the old code always provides the value and the new code can handle it.
However, it is not forward compatible since the old code may not be able to handle the case that the value is missing.
|
@guohao-rosicky, The change looks good but the new TestSmallFileDataStreamOutput failed. Please take a look. Thanks. |
|
Thanks @guohao-rosicky for the contribution, |
|
@captainzmc , this change is an optimization of createStreamKey(..) and createStreamFile(..) in RpcClient for the case that the given size is smaller than the chunk size. Using the Async API will have fewer RPC calls and we should implement it. However, the data have to go through the leader so that the network path won't be optimal and the leader may become a hotspot. |
Thanks @szetszwo for your explanation, agree with you. let's using Streaming instead of async write. |
szetszwo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guohao-rosicky , we need support zero buffer copying so that we should not put the data inside PutSmallFileRequestProto. We should send the header and then send the raw data in the stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method copies buffer twice so that it is not zero buffer copying.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot call array() since the ByteBuffer may not have an array.
Also, using an array means there must be buffer copying inside the code. This is the reason that we use ByteBuffer but not byte[].
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not set the data inside the proto. We should send only the header in the proto and then send the raw data to the stream.
|
@guohao-rosicky , can you please rebase? |
0501406 to
6f1e274
Compare
done |
d1e4eea to
89a12ad
Compare
6f1e274 to
a8600bb
Compare
a8600bb to
89a8a2a
Compare
75c37a0 to
f86291c
Compare
8ce34b3 to
b0a5fe5
Compare
dd4d27d to
6537d0d
Compare
captainzmc
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @guohao-rosicky for the update. The change looks good.
|
@szetszwo @captainzmc please take a look. |
|
@guohao-rosicky , the change grew from 13kB to 65kB so that it becomes hard to review. The SmallFileDataStreamOutput class is really long. How about we move the refactoring to a separated JIRA? |
How about splitting it into two parts @szetszwo
|
|
@guohao-rosicky , sure, please do it. Thanks. |
6537d0d to
b0a7307
Compare
aa8903d to
76096cf
Compare
|
@szetszwo please take a look. Thanks. |
|
The CI run was successful. @szetszwo Could you help take another look? |
|
@captainzmc , @guohao-rosicky , sure, I am reviewing this. |
szetszwo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With HDDS-6137, we may not need to add much code to implement WriteSmallFile. The data is buffered at the client side. When the data size is small, all the data can be sent in a single write call with close. What do you think?
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamRoutingTable.java
Outdated
Show resolved
Hide resolved
| public static byte[] getFixedLengthBytes(int length) { | ||
| byte[] bytes = new byte[length]; | ||
| Random random = new Random(); | ||
| random.nextBytes(bytes); | ||
| return bytes; | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use ThreadLocalRandom and support non-random data as below:
public static byte[] generateData(int length, boolean random) {
final byte[] data = new byte[length];
if (random) {
ThreadLocalRandom.current().nextBytes(data);
} else {
for (int i = 0; i < length; i++) {
data[i] = (byte) i;
}
}
return data;
}
| private CompletableFuture<ContainerCommandResponseProto> link( | ||
| LogEntryProto entry, SmallFileStreamDataChannel smallFileChannel) { | ||
| return CompletableFuture.supplyAsync(() -> { | ||
| final DispatcherContext context = new DispatcherContext.Builder() | ||
| .setTerm(entry.getTerm()) | ||
| .setLogIndex(entry.getIndex()) | ||
| .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA) | ||
| .setContainer2BCSIDMap(container2BCSIDMap) | ||
| .build(); | ||
|
|
||
| return runCommand(smallFileChannel.getPutBlockRequest(), context); | ||
| }, executor); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pass ContainerCommandRequestProto instead and rename it to runCommandAsync(..)
private CompletableFuture<ContainerCommandResponseProto> runCommandAsync(
ContainerCommandRequestProto requestProto, LogEntryProto entry) {
return CompletableFuture.supplyAsync(() -> {
final DispatcherContext context = new DispatcherContext.Builder()
.setTerm(entry.getTerm())
.setLogIndex(entry.getIndex())
.setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA)
.setContainer2BCSIDMap(container2BCSIDMap)
.build();
return runCommand(requestProto, context);
}, executor);
}
| * <p> | ||
| * TODO : currently not support multi-thread access. | ||
| */ | ||
| public class SmallFileDataStreamOutput implements ByteBufferStreamOutput { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that the code in this class is copied from BlockDataStreamOutputEntryPool and KeyDataStreamOutput. We should reuse the code but not copy them. Otherwise, it is very hard to maintain.
szetszwo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guohao-rosicky , have you seen this comment #2860 (review) ?
@szetszwo |
That's why KeyDataStreamOutput is more powerful than SmallFileStreamOutput since it works even if the data size is unknown. |
@szetszwo Ok, how can we do the following work better? Does this PR code help us to achieve this function? I can split it. |
Yes, I actually suggest you to split and move the common code to #3195 in this comment #3195 (comment) |
|
Hi @szetszwo , will you take this task? If so, I will close the PR. |
|
@captainzmc , I was thinking to support a BufferedDataStreamOutput in Ratis (similar to java.io.BufferedOutputStream). Then, Ozone could use it. It will require only a small change in Ozone. We may close this pull request and one a new pull request later. |
OK, let's close this PR. |
Hi @szetszwo , Is it this ratis Jira? https://issues.apache.org/jira/browse/RATIS-1157 |
jira: https://issues.apache.org/jira/browse/HDDS-4474