-
Notifications
You must be signed in to change notification settings - Fork 592
HDDS-5869. [Ozone-Streaming] Added support for stream on S3Gateway wr… #2858
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HDDS-5869. [Ozone-Streaming] Added support for stream on S3Gateway wr… #2858
Conversation
|
hi @szetszwo @captainzmc , can you help review this patch? |
szetszwo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guohao-rosicky , thanks for working on this.
This is tricky since we want to have zero buffer copying. Let's study how to have zero buffer copying with JAX-RS.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo? Why hadoop.conf but not hadoop.hdds.conf?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make streaming optional. Add a new QueryParam with DefaultValue not using streaming.
+ @QueryParam("streaming") @DefaultValue("false") boolean streaming,
Use streaming only if it is true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@szetszwo , when @QueryParam("streaming") is true, use streaming? aws s3 should not have this option.
see: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when @QueryParam("streaming") is true, use streaming?
Yes.
... aws s3 should not have this option.
Similar to partNumber and uploadId, this parameter is Ozone specific. We may change the default to true later on when the streaming feature is ready.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@szetszwo I can add a configuration to s3 Gateway
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@szetszwo and @guohao-rosicky I donot have much context on Streaming. But my comment is from the perspective of S3.
Similar to partNumber and uploadId, this parameter is Ozone specific. We may change the default to true later on >when the streaming feature is ready.
This is also part of AWS S3 protocol for multipart upload.
Adding query param streaming to this API and then passing query param from AWS S3 SDK will not be possible (As Ozone S3 talks S3 protocol only). I think based on the config selecting streaming or non-streaming would be the way to go here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also part of AWS S3 protocol for multipart upload.
@bharatviswa504 , thanks for the info. You are right that partNumber and uploadId are also in AWS; see https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPart.html
Adding query param streaming to this API and then passing query param from AWS S3 SDK will not be possible ...
It is still better to add a query param and set the default to false (non-streaming) for now. After streaming is stable, we will change the default to true. We need this param for testing and branch mark without restarting the machines. We should set the param name to something like "internalOzoneParam-streaming" to emphasize that it is not in the AWS S3 protocol.
AWS S3 SDK will always use the default.
We may have a conf to override the default value if such conf is useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should avoid buffer copying. I wonder if we should other body type (instead of InputStream) in order to support zero buffer copying; see https://docs.oracle.com/cd/E19798-01/821-1841/6nmq2cp2b/index.html#gkccg
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@szetszwo I looked at this document, Is it available java.io.File All media types (*/*)?
Reading files is similar to the implementation here: https://github.com/apache/ozone/blob/HDDS-4454/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java#L134
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@szetszwo I submitted the new code, is it ok to change it like this, please take a look.
Such a change is a little big, and the previous UT needs to be changed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... Is it available java.io.File All media types (/)?
We should try it.
Such a change is a little big, and the previous UT needs to be changed
I suggest we should change the case when uploadId == null (non-createMultipartKey) first. Then, change the createMultipartKey case in a separate JIRA.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@szetszwo
like this
public Response put(
@PathParam("bucket") String bucketName,
@PathParam("path") String keyPath,
@HeaderParam("Content-Length") long length,
@QueryParam("partNumber") int partNumber,
@QueryParam("uploadId") @DefaultValue("") String uploadID,
File bodyFile) throws IOException, OS3Exception {
If change InputStream to File, all UT related to put cannot be used. Is there any way to solve this problem
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example here
org.apache.hadoop.ozone.s3.endpoint.TestObjectPut
@Test
public void testPutObject() throws IOException, OS3Exception {
//GIVEN
HttpHeaders headers = Mockito.mock(HttpHeaders.class);
ByteArrayInputStream body =
new ByteArrayInputStream(CONTENT.getBytes(UTF_8));
objectEndpoint.setHeaders(headers);
//WHEN
Response response = objectEndpoint.put(bucketName, keyName, CONTENT
.length(), 1, null, body);
//THEN
OzoneInputStream ozoneInputStream =
clientStub.getObjectStore().getS3Bucket(bucketName)
.readKey(keyName);
String keyContent =
IOUtils.toString(ozoneInputStream, UTF_8);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(CONTENT, keyContent);
}
put method Parameters for the InputStream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If streaming is enabled through configuration, do you need to retain the InputStream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may add another put method as below:
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@ -155,6 +153,24 @@ public Response put(
@HeaderParam("Content-Length") long length,
@QueryParam("partNumber") int partNumber,
@QueryParam("uploadId") @DefaultValue("") String uploadID,
+ @QueryParam("ozoneInternalParam_streaming") @DefaultValue("false")
+ boolean ozoneInternalParam_streaming,
+ File body) throws IOException, OS3Exception {
+ if (!ozoneInternalParam_streaming) {
+ final BufferedInputStream in = new BufferedInputStream(new FileInputStream(body));
+ return put(bucketName, keyPath, length, partNumber, uploadID, in);
+ }
+
+ // new code for Ratis streaming
+ ...
+ }
+
+ public Response put(
+ String bucketName,
+ String keyPath,
+ long length,
+ int partNumber,
+ String uploadID,
InputStream body) throws IOException, OS3Exception {
OzoneOutputStream output = null;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then, the existing tests will remain working. Of course, we need to create new tests for the new code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I'm going to implement a version, and you can take a look
|
@guohao-rosicky , the compilation failed. Please take a looks. Thanks. |
d1e4eea to
89a12ad
Compare
|
@szetszwo This PR is implemented using buffer copy, and then I'll turn on another PR to optimize for zero copy |
3523cd6 to
9d007cd
Compare
|
Added the enable Stream configuration on the Gateway side. Please take a look. @szetszwo |
szetszwo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guohao-rosicky , thanks for the update. Some comments inlined.
Please add some new tests. Otherwise, we don't know if the new code is working.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please reuse OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_ENABLE instead of adding a new conf.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's call it "datastreamEnabled".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move the new code to a new class, say ObjectEndpointStreaming. Otherwise, there are many changes here and it becomes hard to maintain.
@PUT
public Response put(
@PathParam("bucket") String bucketName,
@PathParam("path") String keyPath,
@HeaderParam("Content-Length") long length,
@QueryParam("partNumber") int partNumber,
@QueryParam("uploadId") @DefaultValue("") String uploadID,
InputStream body) throws IOException, OS3Exception {
if (datastreamEnabled) {
return objectEndpointStreaming.put(bucketName, keyPath, length, partNumber, uploadID, body);
}
...
}
fecd0f4 to
9f781ef
Compare
1b3af21 to
49bd7ba
Compare
f637b1e to
ce1d880
Compare
3bb41d4 to
f6dd95f
Compare
f6dd95f to
6f307e3
Compare
d61543a to
7ef985f
Compare
7ef985f to
bff6e56
Compare
d8ddda7 to
2f9cbb3
Compare
…treamCommitWatcher. (apache#3492)
cd2d180 to
9f86764
Compare
|
Thanks @guohao-rosicky update this. @szetszwo Can you help take another look? |
szetszwo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guohao-rosicky , thanks a lot for working on this. Are the crypto changes related to S3Gateway? Could we separated it out since this is very big?
Also, please revert all the whitespace changes, especially for the files existing in the master branch. Otherwise, the merge will be harder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't have whitespace change like this. Since this is an existing file in the master branch, it will make the merge harder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make the new fields final.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, we should create an ObjectEndpointStreaming streaming object and use streaming != null for checking if streaming is enabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a local variable. Otherwise, it will affect other calls.
9f86764 to
473c302
Compare
473c302 to
a6f5c40
Compare
Hi, @szetszwo I can work this in https://issues.apache.org/jira/browse/HDDS-5892 |
|
@guohao-rosicky , @captainzmc , this pull request is around for quite a long time. We should plan about how to get this committed. One problem is that this involves too much code change. Let's separate the individual methods put(..) and createMultipartKey(..) into different pull requests. Also, we should refactor the ObjectEndpoint code in master first. Otherwise, merging becomes a pain. Let me work on the refactoring. |
szetszwo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guohao-rosicky , really appreciated that you work on this over a long time! Some comments inlined.
| boolean enableEC = false; | ||
| if ((replicationConfig != null && | ||
| replicationConfig.getReplicationType() == EC) || | ||
| bucket.getReplicationConfig() instanceof ECReplicationConfig) { | ||
| enableEC = true; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is incorrect when replicationConfig != null && replicationConfig.getReplicationType() != EC. As an isEc() method as below.
static boolean isEc(ReplicationConfig replication, OzoneBucket bucket) {
if (replication != null) {
return replication.getReplicationType() == EC;
} else {
return bucket.getReplicationConfig() instanceof ECReplicationConfig;
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @szetszwo , I think we'd better add a separate switch for S3G like we did for OFS/O3FS. Instead of using DFS_CONTAINER_RATIS_DATASTREAM_ENABLE to control both S3G and HDDS. This allows both client and HDDS to have a switch that controls whether or not streaming is used.
Otherwise, it cannot meet the requirements of ofs/o3fs using Streaming and S3 using Async.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, a separated conf sounds good.
|
|
||
| private List<String> customizableGetHeaders = new ArrayList<>(); | ||
| private int bufferSize; | ||
| private int chunkSize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move chunkSize to ObjectEndpointStreaming
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, we should create an ObjectEndpointStreaming streaming object and use streaming != null for checking if streaming is enabled.
| String copyHeader = headers.getHeaderString(COPY_SOURCE_HEADER); | ||
| String storageType = headers.getHeaderString(STORAGE_CLASS_HEADER); | ||
| ReplicationConfig replicationConfig = | ||
| getReplicationConfig(ozoneBucket, storageType); | ||
|
|
||
| boolean enableEC = false; | ||
| if ((replicationConfig != null && | ||
| replicationConfig.getReplicationType() == EC) || | ||
| ozoneBucket.getReplicationConfig() instanceof ECReplicationConfig) { | ||
| enableEC = true; | ||
| } | ||
|
|
||
| try { | ||
| if (datastreamEnabled && !enableEC && copyHeader != null) { | ||
| Pair<String, String> result = parseSourceHeader(copyHeader); | ||
| String sourceBucket = result.getLeft(); | ||
| String sourceKey = result.getRight(); | ||
| OzoneBucket sourceOzoneBucket = getBucket(sourceBucket); | ||
| return ObjectEndpointStreaming | ||
| .copyMultipartKey(Pair.of(sourceOzoneBucket, sourceKey), | ||
| Pair.of(ozoneBucket, key), length, partNumber, uploadID, | ||
| chunkSize, headers); | ||
| } else if (datastreamEnabled && !enableEC) { | ||
| return ObjectEndpointStreaming | ||
| .createMultipartKey(ozoneBucket, key, length, partNumber, | ||
| uploadID, chunkSize, body); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move the logic to ObjectEndpointStreaming.
|
Filed https://issues.apache.org/jira/browse/HDDS-6973 for the refactoring. |
Hi @guohao-rosicky, I just had update HDDS-4454, you can continue this. |
|
@szetszwo should this PR be revisited? |
|
@guohao-rosicky Can you rebase this branch with master? Close this PR & open a new PR with master if you want the changes to go in. HDDS-4454 has been merged with master. Correct me if I am wrong @szetszwo @kerneltime |
sure. |
What changes were proposed in this pull request?
Added support for stream on S3Gateway write path
What is the link to the Apache JIRA
https://issues.apache.org/jira/browse/HDDS-5869