Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -515,8 +515,11 @@ public CompletableFuture<DataStream> stream(RaftClientRequest request) {

ContainerCommandResponseProto response = runCommand(
requestProto, context);
String path = response.getMessage();
return new LocalStream(new StreamDataChannel(Paths.get(path)));
final StreamDataChannel channel = new StreamDataChannel(
Paths.get(response.getMessage()));
final ExecutorService chunkExecutor = requestProto.hasWriteChunk() ?
getChunkExecutor(requestProto.getWriteChunk()) : null;
return new LocalStream(channel, chunkExecutor);
Copy link
Member

@captainzmc captainzmc Nov 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @szetszwo, The size of the chunkExecutors is determined by dfs.container.ratis.num.write.chunk.threads.per.volume. So I'm wondering if we need delete datastream.write.threads in DatanodeRatisServerConfig? Now that we're passing in chunkExecutor every time, this configuration should be useless?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@captainzmc , you are right. Let's remove it.

} catch (IOException e) {
throw new CompletionException("Failed to create data stream", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;

class LocalStream implements StateMachine.DataStream {
private final StateMachine.DataChannel dataChannel;
private final Executor executor;

LocalStream(StateMachine.DataChannel dataChannel) {
LocalStream(StateMachine.DataChannel dataChannel, Executor executor) {
this.dataChannel = dataChannel;
this.executor = executor;
}

@Override
Expand All @@ -47,4 +50,9 @@ public CompletableFuture<?> cleanUp() {
}
});
}

@Override
public Executor getExecutor() {
return executor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,6 @@ private void setUpRatisStream(RaftProperties properties) {
.getStreamRequestThreads();
RaftServerConfigKeys.DataStream.setAsyncRequestThreadPoolSize(properties,
dataStreamAsyncRequestThreadPoolSize);
int dataStreamWriteRequestThreadPoolSize =
conf.getObject(DatanodeRatisServerConfig.class)
.getStreamWriteThreads();
RaftServerConfigKeys.DataStream.setAsyncWriteThreadPoolSize(properties,
dataStreamWriteRequestThreadPoolSize);
int dataStreamClientPoolSize =
conf.getObject(DatanodeRatisServerConfig.class)
.getClientPoolSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,23 +141,6 @@ public void setStreamRequestThreads(int streamRequestThreads) {
this.streamRequestThreads = streamRequestThreads;
}

@Config(key = "datastream.write.threads",
defaultValue = "20",
type = ConfigType.INT,
tags = {OZONE, DATANODE, RATIS, DATASTREAM},
description = "Maximum number of threads in the thread pool for " +
"datastream write."
)
private int streamWriteThreads;

public int getStreamWriteThreads() {
return streamWriteThreads;
}

public void setStreamWriteThreads(int streamWriteThreads) {
this.streamWriteThreads = streamWriteThreads;
}

@Config(key = "datastream.client.pool.size",
defaultValue = "10",
type = ConfigType.INT,
Expand Down