Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions hadoop-hdds/client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,6 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<version>${spotbugs.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
Expand All @@ -46,6 +45,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand All @@ -59,7 +59,7 @@
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync;

/**
* An {@link ByteBufStreamOutput} used by the REST service in combination
* An {@link ByteBufferStreamOutput} used by the REST service in combination
* with the SCMClient to write the value of a key to a sequence
* of container chunks. Writes are buffered locally and periodically written to
* the container as a new chunk. In order to preserve the semantics that
Expand All @@ -74,7 +74,7 @@
* This class encapsulates all state management for buffering and writing
* through to the container.
*/
public class BlockDataStreamOutput implements ByteBufStreamOutput {
public class BlockDataStreamOutput implements ByteBufferStreamOutput {
public static final Logger LOG =
LoggerFactory.getLogger(BlockDataStreamOutput.class);
public static final String EXCEPTION_MSG =
Expand Down Expand Up @@ -209,16 +209,16 @@ public IOException getIoException() {
}

@Override
public void write(ByteBuf buf) throws IOException {
public void write(ByteBuffer b, int off, int len) throws IOException {
checkOpen();
if (buf == null) {
if (b == null) {
throw new NullPointerException();
}
final int len = buf.readableBytes();
if (len == 0) {
return;
}
writeChunkToContainer(buf);
writeChunkToContainer(
(ByteBuffer) b.asReadOnlyBuffer().position(off).limit(off + len));

writtenDataLength += len;
}
Expand Down Expand Up @@ -476,15 +476,17 @@ private boolean needSync(long position) {
* Writes buffered data as a new chunk to the container and saves chunk
* information to be used later in putKey call.
*
* @param buf chunk data to write, from position to limit
* @throws IOException if there is an I/O error while performing the call
* @throws OzoneChecksumException if there is an error while computing
* checksum
*/
private void writeChunkToContainer(ByteBuf buf)
private void writeChunkToContainer(ByteBuffer buf)
throws IOException {
ChecksumData checksumData = checksum.computeChecksum(buf.nioBuffer());
int effectiveChunkSize = buf.readableBytes();
final int effectiveChunkSize = buf.remaining();
final long offset = chunkOffset.getAndAdd(effectiveChunkSize);
ChecksumData checksumData =
checksum.computeChecksum(buf.asReadOnlyBuffer());
ChunkInfo chunkInfo = ChunkInfo.newBuilder()
.setChunkName(blockID.get().getLocalID() + "_chunk_" + ++chunkIndex)
.setOffset(offset)
Expand All @@ -499,8 +501,8 @@ private void writeChunkToContainer(ByteBuf buf)

CompletableFuture<DataStreamReply> future =
(needSync(offset + effectiveChunkSize) ?
out.writeAsync(buf.nioBuffer(), StandardWriteOption.SYNC) :
out.writeAsync(buf.nioBuffer()))
out.writeAsync(buf, StandardWriteOption.SYNC) :
out.writeAsync(buf))
.whenCompleteAsync((r, e) -> {
if (e != null || !r.isSuccess()) {
if (e == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,24 @@

package org.apache.hadoop.hdds.scm.storage;

import io.netty.buffer.ByteBuf;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;

/**
* This interface is for writing an output stream of ByteBuffers.
* An ByteBufStreamOutput accepts Netty ByteBuf and sends them to some sink.
* An ByteBufferStreamOutput accepts nio ByteBuffer and sends them to some sink.
*/
public interface ByteBufStreamOutput extends Closeable {
public interface ByteBufferStreamOutput extends Closeable {
/**
* Try to write all the bytes in ByteBuf b to DataStream.
*
* @param b the data.
* @exception IOException if an I/O error occurs.
*/
void write(ByteBuf b) throws IOException;
default void write(ByteBuffer b) throws IOException {
write(b, b.position(), b.remaining());
}

/**
* Try to write the [off:off + len) slice in ByteBuf b to DataStream.
Expand All @@ -44,9 +45,7 @@ public interface ByteBufStreamOutput extends Closeable {
* @param len the number of bytes to write.
* @exception IOException if an I/O error occurs.
*/
default void write(ByteBuf b, int off, int len) throws IOException {
write(b.slice(off, len));
}
void write(ByteBuffer b, int off, int len) throws IOException;

/**
* Flushes this DataStream output and forces any buffered output bytes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,29 @@
package org.apache.hadoop.ozone.client.io;

import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput;
import org.apache.hadoop.hdds.scm.storage.ByteBufStreamOutput;
import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;

/**
* Helper class used inside {@link BlockDataStreamOutput}.
* */
public final class BlockDataStreamOutputEntry
implements ByteBufStreamOutput {
implements ByteBufferStreamOutput {

private final OzoneClientConfig config;
private ByteBufStreamOutput byteBufStreamOutput;
private ByteBufferStreamOutput byteBufferStreamOutput;
private BlockID blockID;
private final String key;
private final XceiverClientFactory xceiverClientManager;
Expand All @@ -61,7 +61,7 @@ private BlockDataStreamOutputEntry(
OzoneClientConfig config
) {
this.config = config;
this.byteBufStreamOutput = null;
this.byteBufferStreamOutput = null;
this.blockID = blockID;
this.key = key;
this.xceiverClientManager = xceiverClientManager;
Expand Down Expand Up @@ -90,63 +90,62 @@ long getRemaining() {
* @throws IOException if xceiverClient initialization fails
*/
private void checkStream() throws IOException {
if (this.byteBufStreamOutput == null) {
this.byteBufStreamOutput =
if (this.byteBufferStreamOutput == null) {
this.byteBufferStreamOutput =
new BlockDataStreamOutput(blockID, xceiverClientManager,
pipeline, config, token);
}
}

@Override
public void write(ByteBuf b) throws IOException {
public void write(ByteBuffer b, int off, int len) throws IOException {
checkStream();
final int len = b.readableBytes();
byteBufStreamOutput.write(b);
byteBufferStreamOutput.write(b, off, len);
this.currentPosition += len;
}

@Override
public void flush() throws IOException {
if (this.byteBufStreamOutput != null) {
this.byteBufStreamOutput.flush();
if (this.byteBufferStreamOutput != null) {
this.byteBufferStreamOutput.flush();
}
}

@Override
public void close() throws IOException {
if (this.byteBufStreamOutput != null) {
this.byteBufStreamOutput.close();
if (this.byteBufferStreamOutput != null) {
this.byteBufferStreamOutput.close();
// after closing the chunkOutPutStream, blockId would have been
// reconstructed with updated bcsId
this.blockID =
((BlockDataStreamOutput) byteBufStreamOutput).getBlockID();
((BlockDataStreamOutput) byteBufferStreamOutput).getBlockID();
}
}

boolean isClosed() {
if (byteBufStreamOutput != null) {
return ((BlockDataStreamOutput) byteBufStreamOutput).isClosed();
if (byteBufferStreamOutput != null) {
return ((BlockDataStreamOutput) byteBufferStreamOutput).isClosed();
}
return false;
}

Collection<DatanodeDetails> getFailedServers() {
if (byteBufStreamOutput != null) {
if (byteBufferStreamOutput != null) {
BlockDataStreamOutput out =
(BlockDataStreamOutput) this.byteBufStreamOutput;
(BlockDataStreamOutput) this.byteBufferStreamOutput;
return out.getFailedServers();
}
return Collections.emptyList();
}

long getWrittenDataLength() {
if (byteBufStreamOutput != null) {
if (byteBufferStreamOutput != null) {
BlockDataStreamOutput out =
(BlockDataStreamOutput) this.byteBufStreamOutput;
(BlockDataStreamOutput) this.byteBufferStreamOutput;
return out.getWrittenDataLength();
} else {
// For a pre allocated block for which no write has been initiated,
// the ByteBufStreamOutput will be null here.
// the ByteBufferStreamOutput will be null here.
// In such cases, the default blockCommitSequenceId will be 0
return 0;
}
Expand All @@ -155,15 +154,15 @@ long getWrittenDataLength() {
void cleanup(boolean invalidateClient) throws IOException {
checkStream();
BlockDataStreamOutput out =
(BlockDataStreamOutput) this.byteBufStreamOutput;
(BlockDataStreamOutput) this.byteBufferStreamOutput;
out.cleanup(invalidateClient);

}

void writeOnRetry(long len) throws IOException {
checkStream();
BlockDataStreamOutput out =
(BlockDataStreamOutput) this.byteBufStreamOutput;
(BlockDataStreamOutput) this.byteBufferStreamOutput;
out.writeOnRetry(len);
this.currentPosition += len;

Expand Down Expand Up @@ -231,8 +230,8 @@ public BlockDataStreamOutputEntry build() {
}

@VisibleForTesting
public ByteBufStreamOutput getByteBufStreamOutput() {
return byteBufStreamOutput;
public ByteBufferStreamOutput getByteBufStreamOutput() {
return byteBufferStreamOutput;
}

public BlockID getBlockID() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.hdds.client.ReplicationConfig;
Expand All @@ -32,7 +31,7 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.storage.ByteBufStreamOutput;
import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
Expand All @@ -48,6 +47,7 @@

import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -63,7 +63,7 @@
*
* TODO : currently not support multi-thread access.
*/
public class KeyDataStreamOutput implements ByteBufStreamOutput {
public class KeyDataStreamOutput implements ByteBufferStreamOutput {

private OzoneClientConfig config;

Expand Down Expand Up @@ -185,17 +185,16 @@ public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
}

@Override
public void write(ByteBuf b) throws IOException {
public void write(ByteBuffer b, int off, int len) throws IOException {
checkNotClosed();
if (b == null) {
throw new NullPointerException();
}
final int len = b.readableBytes();
handleWrite(b, b.readerIndex(), len, false);
handleWrite(b, off, len, false);
writeOffset += len;
}

private void handleWrite(ByteBuf b, int off, long len, boolean retry)
private void handleWrite(ByteBuffer b, int off, long len, boolean retry)
throws IOException {
while (len > 0) {
try {
Expand Down Expand Up @@ -227,7 +226,7 @@ private void handleWrite(ByteBuf b, int off, long len, boolean retry)
}

private int writeToDataStreamOutput(BlockDataStreamOutputEntry current,
boolean retry, long len, ByteBuf b, int writeLen, int off,
boolean retry, long len, ByteBuffer b, int writeLen, int off,
long currentPos) throws IOException {
try {
if (retry) {
Expand Down
Loading