-
Notifications
You must be signed in to change notification settings - Fork 592
HDDS-5480. [Ozone-Streaming] Client and server should support stream setup. #2452
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
Outdated
Show resolved
Hide resolved
| return responseProtoHashMap; | ||
| } | ||
|
|
||
| // TODO: We need separate XceiverClientRatis and XceiverClientGrpc instances |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets create a jira for this
| </property> | ||
| <property> | ||
| <name>dfs.container.ratis.datastream.ipc</name> | ||
| <value>9890</value> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These values are different from the values in the java conf file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix the problem and we should not use port 9890. This will conflict with the port in docker-compose
| CloseContainer = 17; | ||
| GetCommittedBlockLength = 18; | ||
|
|
||
| StreamInit = 19; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets start this at 30.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to change this to 30, but I got an NPE exception when start datanode. Change this to 19 in order and it start normal.
| && dispatcherContext.getStage() | ||
| == DispatcherContext.WriteChunkStage.WRITE_DATA); | ||
| == DispatcherContext.WriteChunkStage.WRITE_DATA) | ||
| || (cmdType == Type.StreamInit); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are there any callers sending stream init at this time ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not yet now. But next step, when our client initializes the DataStreamOutput, we will set CmdType to type.streamInit. As we implement in our POC code.
|
One more question, whats the use of stream init, and how is it different from the first data packet to be sent from client to the datanode ? |
| // set the datastream config | ||
| NettyConfigKeys.DataStream.setPort(properties, dataStreamPort); | ||
| RaftConfigKeys.DataStream.setType(properties, | ||
| SupportedDataStreamType.NETTY); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we do these stream specfic settings in a different function?
| } | ||
|
|
||
| path = chunkManager | ||
| .streamInit(kvContainer, blockID, dispatcherContext); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel, we should set the dispacher contect as null here. Dispacher context being null during write will signify its a stream opeartion.
Also, stream classes should be coupled with ChunkManager. Let's maintain a clear separation. May be we can use StreamHandler. Common functionalities can be moved to chunkUtils.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would try to change the current implementation in this way.
| message WriteChunkRequestProto { | ||
| required DatanodeBlockID blockID = 1; | ||
| required ChunkInfo chunkData = 2; | ||
| optional ChunkInfo chunkData = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't this backword incompatible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed this from required to optional is compatible. The previous interface WriteChunkRequestProto must have chunkData. Init stream will use WriteChunkRequestProto, but do not require chunkData. Just like we implemented in the POC code.
| <property> | ||
| <name>dfs.container.ratis.datastream.request.threads</name> | ||
| <value>20</value> | ||
| <tag>OZONE, CONTAINER, RATIS, DATASTREAM</tag> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cab we move these settings to DatanodeRatisServerConfig and clinet config keys separately?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had moved datastream.request.threads and datastream.write.threads to DatanodeRatisServerConfig. The rest of the configurations are similar to those related to Container, and I keep them in this place for better management.
| @Override | ||
| public DataStreamApi getDataStreamApi() { | ||
| return null; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its better to throw an exception here.
|
Thanks @captainzmc for working on this. I think, we need define a config option to enable/disable the feature both on client and server. |
The strream init will be used to instantiate the streaming client instance and the will try to write the 1st packet. |
Stream init do 2 things: build the stream pipeline, and get a file path from the container for subsequent writes. Ratis: Build the stream pipeline according to the routing table. If no routing table is given, it will fallback to HDDS Container: prepare a file for write, but does not write any data. In our POC code, we let the ContainerStateMachine calculate the path of that file as if there is a WriteChunk request. The first data packet, of course, write data into that file. |
Thanks @bshashikant for the review and suggestions. On the server side, I think it's easy to distinguish. We can added a config and check in XceiverServerRatis , to confirm whether eanble stream in RaftServer. |
|
I will solve the remaining review and CI problems. |
c2e0c79 to
b9b1fec
Compare
|
@captainzmc , Just have went through the entire change once. The change looks great! Will review this again tomorrow. In the mean time, please take a look the build failures. Thanks a lot. |
Yes, now I am solving the problem of CI failure, and I will update this PR again after solving it. |
szetszwo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update. Some comments inlined.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getDataStreamApi is yet not used. Let's add it later when it is being called.
BTW, we may only add the method to XceiverClientRatis and cast the object for calling it as below.
XceiverClientSpi client = ...
DataStreamApi streamApi = ((XceiverClientRatis)client).getDataStreamApi();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can add this when we implement the client
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's call it RATIS_DATASTREAM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use the first node:
toRaftPeer(pipeline.getFirstNode()),
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove "ipc", i.e. dfs.container.ratis.datastream.random.port". It is not really an ipc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's rename this to "dfs.container.ratis.datastream.port".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's revert this whitespace change.
3cf6b7e to
6616cd5
Compare
szetszwo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change looks good. Just a minor comment inlined.
build-branch / acceptance (misc) (pull_request) Failing after 86m — acceptance (misc)
Please also take a look the build failure. Thanks.
| // TODO: For now, always set first node as primary. In the future we should | ||
| // use the local or closest node in the pipline. | ||
| final DatanodeDetails leader = pipeline.getLeaderNode(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TODO can be removed since the first datanode is supposed to be the closest datanode. SCM is supposed to sort the datanode in the pipeline according the distance to the client. Not sure if SCM is currently sorting the datanodes. We should check the code.
BTW, the leader local variable is no longer needed. We should revert the change.
|
Hi @szetszwo @bshashikant @mukul1987 CI had been fixed. Could you help take another look? |
szetszwo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 the change looks good.
…setup. (apache#2452) (cherry picked from commit c69955e)

What changes were proposed in this pull request?
Before we add new BlockDataStreamOutput, we need to support stream setup. In this way, we can get the DataStreamOutput instance through xceiverClient.
We can DataStreamOutput instance as we used in our POC code.
What is the link to the Apache JIRA
https://issues.apache.org/jira/browse/HDDS-5480
How was this patch tested?
CI test will be added after DataStreamOutput Client is finished