Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.malformedRequest;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest;

import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.thirdparty.com.google.protobuf.ProtocolMessageEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -669,4 +670,21 @@ private boolean isAllowed(String action) {
default: return false;
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add @Override.

@Override
public StateMachine.DataChannel getStreamDataChannel(
ContainerCommandRequestProto msg)
throws StorageContainerException {
long containerID = msg.getContainerID();
Container container = getContainer(containerID);
if (container != null) {
Handler handler = getHandler(getContainerType(container));
return handler.getStreamDataChannel(container, msg);
} else {
throw new StorageContainerException(
"ContainerID " + containerID + " does not exist",
ContainerProtos.Result.CONTAINER_NOT_FOUND);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.ratis.statemachine.StateMachine;

import java.util.Map;

Expand Down Expand Up @@ -84,4 +85,13 @@ void validateContainerCommand(
* @param clusterId
*/
void setClusterId(String clusterId);

/**
* When uploading using stream, get StreamDataChannel.
*/
default StateMachine.DataChannel getStreamDataChannel(
ContainerCommandRequestProto msg) throws StorageContainerException {
throw new UnsupportedOperationException(
"getStreamDataChannel not supported.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import org.apache.ratis.statemachine.StateMachine;

/**
* Dispatcher sends ContainerCommandRequests to Handler. Each Container Type
Expand Down Expand Up @@ -81,6 +82,10 @@ public static Handler getHandlerForContainerType(
}
}

public abstract StateMachine.DataChannel getStreamDataChannel(
Container container, ContainerCommandRequestProto msg)
throws StorageContainerException;

/**
* Returns the Id of this datanode.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -78,6 +77,7 @@
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
Expand Down Expand Up @@ -501,6 +501,19 @@ private CompletableFuture<Message> handleWriteChunk(
return raftFuture;
}

private StateMachine.DataChannel getStreamDataChannel(
ContainerCommandRequestProto requestProto,
DispatcherContext context) throws StorageContainerException {
if (LOG.isDebugEnabled()) {
LOG.debug("{}: getStreamDataChannel {} containerID={} pipelineID={} " +
"traceID={}", gid, requestProto.getCmdType(),
requestProto.getContainerID(), requestProto.getPipelineID(),
requestProto.getTraceID());
}
runCommand(requestProto, context); // stream init
return dispatcher.getStreamDataChannel(requestProto);
}

@Override
public CompletableFuture<DataStream> stream(RaftClientRequest request) {
return CompletableFuture.supplyAsync(() -> {
Expand All @@ -512,11 +525,7 @@ public CompletableFuture<DataStream> stream(RaftClientRequest request) {
.setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
.setContainer2BCSIDMap(container2BCSIDMap)
.build();

ContainerCommandResponseProto response = runCommand(
requestProto, context);
final StreamDataChannel channel = new StreamDataChannel(
Paths.get(response.getMessage()));
DataChannel channel = getStreamDataChannel(requestProto, context);
final ExecutorService chunkExecutor = requestProto.hasWriteChunk() ?
getChunkExecutor(requestProto.getWriteChunk()) : null;
return new LocalStream(channel, chunkExecutor);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest;
import static org.apache.hadoop.hdds.scm.utils.ClientCommandsUtils.getReadChunkVersion;

import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -175,6 +176,17 @@ public VolumeChoosingPolicy getVolumeChoosingPolicyForTesting() {
return volumeChoosingPolicy;
}

@Override
public StateMachine.DataChannel getStreamDataChannel(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add @Override.

Container container, ContainerCommandRequestProto msg)
throws StorageContainerException {
KeyValueContainer kvContainer = (KeyValueContainer) container;
checkContainerOpen(kvContainer);
BlockID blockID = BlockID.getFromProtobuf(msg.getWriteChunk().getBlockID());
return chunkManager.getStreamDataChannel(kvContainer,
blockID, metrics);
}

@Override
public void stop() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
Expand All @@ -34,6 +35,7 @@
import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
import org.apache.hadoop.ozone.container.common.interfaces.Container;

import org.apache.ratis.statemachine.StateMachine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -80,6 +82,14 @@ public String streamInit(Container container, BlockID blockID)
.streamInit(container, blockID);
}

@Override
public StateMachine.DataChannel getStreamDataChannel(
Container container, BlockID blockID, ContainerMetrics metrics)
throws StorageContainerException {
return selectHandler(container)
.getStreamDataChannel(container, blockID, metrics);
}

@Override
public void finishWriteChunks(KeyValueContainer kvContainer,
BlockData blockData) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
Expand All @@ -42,6 +43,7 @@
import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
import org.apache.hadoop.ozone.container.common.interfaces.Container;

import org.apache.ratis.statemachine.StateMachine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -97,6 +99,16 @@ public String streamInit(Container container, BlockID blockID)
return chunkFile.getAbsolutePath();
}

@Override
public StateMachine.DataChannel getStreamDataChannel(
Container container, BlockID blockID, ContainerMetrics metrics)
throws StorageContainerException {
checkLayoutVersion(container);
File chunkFile = getChunkFile(container, blockID, null);
return new KeyValueStreamDataChannel(chunkFile,
container.getContainerData(), metrics);
}

@Override
public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
ChunkBuffer data, DispatcherContext dispatcherContext)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.container.keyvalue.impl;

import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.ratis.statemachine.StateMachine;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;

/**
* This class is used to get the DataChannel for streaming.
*/
class KeyValueStreamDataChannel implements StateMachine.DataChannel {
private final RandomAccessFile randomAccessFile;
private final File file;

private final ContainerData containerData;
private final ContainerMetrics metrics;

KeyValueStreamDataChannel(File file, ContainerData containerData,
ContainerMetrics metrics)
throws StorageContainerException {
try {
this.file = file;
this.randomAccessFile = new RandomAccessFile(file, "rw");
} catch (FileNotFoundException e) {
throw new StorageContainerException("BlockFile not exists with " +
"container Id " + containerData.getContainerID() +
" file " + file.getAbsolutePath(),
ContainerProtos.Result.IO_EXCEPTION);
}
this.containerData = containerData;
this.metrics = metrics;
}

@Override
public void force(boolean metadata) throws IOException {
randomAccessFile.getChannel().force(metadata);
}

@Override
public int write(ByteBuffer src) throws IOException {
int writeBytes = randomAccessFile.getChannel().write(src);
metrics
.incContainerBytesStats(ContainerProtos.Type.StreamWrite, writeBytes);
containerData.updateWriteStats(writeBytes, false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we had updae metrics.incContainerBytesStats and containerData.updateWriteStats. Can you also add some test case? Just make sure that future changes do not affect this logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@captainzmc ,Has been modified.

return writeBytes;
}

@Override
public boolean isOpen() {
return randomAccessFile.getChannel().isOpen();
}

@Override
public void close() throws IOException {
randomAccessFile.close();
}

@Override
public String toString() {
return "KeyValueStreamDataChannel{" +
"File=" + file.getAbsolutePath() +
", containerID=" + containerData.getContainerID() +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.ratis.statemachine.StateMachine;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -109,6 +111,12 @@ default String streamInit(Container container, BlockID blockID)
return null;
}

default StateMachine.DataChannel getStreamDataChannel(
Container container, BlockID blockID, ContainerMetrics metrics)
throws StorageContainerException {
return null;
}

static long getBufferCapacityForChunkRead(ChunkInfo chunkInfo,
long defaultReadBufferCapacity) {
long bufferCapacity = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ enum Type {
GetCommittedBlockLength = 18;

StreamInit = 19;
StreamWrite = 20;
}


Expand Down
Loading