From 86dbee8807cf5f049ef11278ddd8f63cb0801819 Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" Date: Mon, 28 Sep 2020 17:45:50 +0200 Subject: [PATCH 1/5] HDDS-4285. Read is slow due to the frequent usage of UGI.getCurrentUserCall() --- .../hdds/scm/storage/BlockInputStream.java | 9 ++++-- .../hdds/scm/storage/BlockOutputStream.java | 13 ++++++-- .../hdds/scm/storage/ChunkInputStream.java | 11 +++++-- .../scm/storage/DummyChunkInputStream.java | 4 ++- .../scm/storage/ContainerProtocolCalls.java | 30 +++++++++++++------ .../ozone/scm/TestXceiverClientGrpc.java | 6 ++-- 6 files changed, 54 insertions(+), 19 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index bcc3cead9715..219451a2a979 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto; +import org.apache.hadoop.security.token.TokenIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +42,7 @@ import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.function.Function; @@ -96,6 +98,7 @@ public class BlockInputStream extends InputStream implements Seekable { private int chunkIndexOfPrevPosition; private Function refreshPipelineFunction; + private Collection> tokens; public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline, Token token, boolean verifyChecksum, @@ -128,6 +131,8 @@ public synchronized void initialize() throws IOException { return; } + tokens = UserGroupInformation.getCurrentUser().getTokens(); + List chunks = null; try { chunks = getChunkInfos(); @@ -196,7 +201,7 @@ protected List getChunkInfos() throws IOException { DatanodeBlockID datanodeBlockID = blockID .getDatanodeBlockIDProtobuf(); GetBlockResponseProto response = ContainerProtocolCalls - .getBlock(xceiverClient, datanodeBlockID); + .getBlock(xceiverClient, datanodeBlockID, tokens); chunks = response.getBlockData().getChunksList(); success = true; @@ -216,7 +221,7 @@ protected List getChunkInfos() throws IOException { */ protected synchronized void addStream(ChunkInfo chunkInfo) { chunkStreams.add(new ChunkInputStream(chunkInfo, blockID, - xceiverClient, verifyChecksum)); + xceiverClient, verifyChecksum, tokens)); } public synchronized long getRemaining() throws IOException { diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 1a16caf23537..b0ac5b894946 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -51,6 +52,10 @@ import com.google.common.base.Preconditions; import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync; import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.writeChunkAsync; + +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -124,6 +129,7 @@ public class BlockOutputStream extends OutputStream { private int currentBufferRemaining; //current buffer allocated to write private ChunkBuffer currentBuffer; + private final Collection> tokens; /** * Creates a new BlockOutputStream. @@ -176,6 +182,7 @@ public BlockOutputStream(BlockID blockID, failedServers = new ArrayList<>(0); ioException = new AtomicReference<>(null); checksum = new Checksum(checksumType, bytesPerChecksum); + tokens = UserGroupInformation.getCurrentUser().getTokens(); } private void refreshCurrentBuffer(BufferPool pool) { @@ -425,7 +432,7 @@ ContainerCommandResponseProto> executePutBlock(boolean close, try { BlockData blockData = containerBlockData.build(); XceiverClientReply asyncReply = - putBlockAsync(xceiverClient, blockData, close); + putBlockAsync(xceiverClient, blockData, close, tokens); CompletableFuture future = asyncReply.getResponse(); flushFuture = future.thenApplyAsync(e -> { @@ -663,8 +670,8 @@ private void writeChunkToContainer(ChunkBuffer chunk) throws IOException { } try { - XceiverClientReply asyncReply = - writeChunkAsync(xceiverClient, chunkInfo, blockID.get(), data); + XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo, + blockID.get(), data, tokens); CompletableFuture future = asyncReply.getResponse(); future.thenApplyAsync(e -> { diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java index b1f7126ff83e..a77701a595a8 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java @@ -31,12 +31,15 @@ import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.hadoop.ozone.common.OzoneChecksumException; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.Collection; import java.util.List; /** @@ -72,15 +75,19 @@ public class ChunkInputStream extends InputStream implements Seekable { // position. Once the chunk is read, this variable is reset. private long chunkPosition = -1; + private final Collection> tokens; + private static final int EOF = -1; ChunkInputStream(ChunkInfo chunkInfo, BlockID blockId, - XceiverClientSpi xceiverClient, boolean verifyChecksum) { + XceiverClientSpi xceiverClient, boolean verifyChecksum, + Collection> tokens) { this.chunkInfo = chunkInfo; this.length = chunkInfo.getLen(); this.blockID = blockId; this.xceiverClient = xceiverClient; this.verifyChecksum = verifyChecksum; + this.tokens = tokens; } public synchronized long getRemaining() throws IOException { @@ -332,7 +339,7 @@ protected ByteString readChunk(ChunkInfo readChunkInfo) throws IOException { validators.add(validator); readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient, - readChunkInfo, blockID, validators); + readChunkInfo, blockID, validators, tokens); } catch (IOException e) { if (e instanceof StorageContainerException) { diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java index 8405f4349be4..3035056bd97b 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java @@ -26,6 +26,8 @@ import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import static java.util.Collections.emptyList; + /** * A dummy ChunkInputStream to mock read chunk calls to DN. */ @@ -42,7 +44,7 @@ public DummyChunkInputStream(TestChunkInputStream testChunkInputStream, XceiverClientSpi xceiverClient, boolean verifyChecksum, byte[] data) { - super(chunkInfo, blockId, xceiverClient, verifyChecksum); + super(chunkInfo, blockId, xceiverClient, verifyChecksum, emptyList()); this.chunkData = data; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index ea5fc825ca81..84d067a9485d 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -61,6 +62,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; /** @@ -84,7 +86,8 @@ private ContainerProtocolCalls() { * @throws IOException if there is an I/O error while performing the call */ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, - DatanodeBlockID datanodeBlockID) throws IOException { + DatanodeBlockID datanodeBlockID, + Collection> tokens) throws IOException { GetBlockRequestProto.Builder readBlockRequest = GetBlockRequestProto .newBuilder() .setBlockID(datanodeBlockID); @@ -96,7 +99,8 @@ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, .setContainerID(datanodeBlockID.getContainerID()) .setDatanodeUuid(id) .setGetBlock(readBlockRequest); - String encodedToken = getEncodedBlockToken(getService(datanodeBlockID)); + String encodedToken = getEncodedBlockToken(getService(datanodeBlockID), + tokens); if (encodedToken != null) { builder.setEncodedToken(encodedToken); } @@ -187,7 +191,8 @@ public static ContainerProtos.PutBlockResponseProto putBlock( * @throws ExecutionException */ public static XceiverClientReply putBlockAsync( - XceiverClientSpi xceiverClient, BlockData containerBlockData, boolean eof) + XceiverClientSpi xceiverClient, BlockData containerBlockData, boolean eof, + Collection> tokens) throws IOException, InterruptedException, ExecutionException { PutBlockRequestProto.Builder createBlockRequest = PutBlockRequestProto.newBuilder() @@ -200,7 +205,8 @@ public static XceiverClientReply putBlockAsync( .setDatanodeUuid(id) .setPutBlock(createBlockRequest); String encodedToken = - getEncodedBlockToken(getService(containerBlockData.getBlockID())); + getEncodedBlockToken(getService(containerBlockData.getBlockID()), + tokens); if (encodedToken != null) { builder.setEncodedToken(encodedToken); } @@ -220,7 +226,8 @@ public static XceiverClientReply putBlockAsync( */ public static ContainerProtos.ReadChunkResponseProto readChunk( XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID, - List validators) throws IOException { + List validators, + Collection> tokens) throws IOException { ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto.newBuilder() .setBlockID(blockID.getDatanodeBlockIDProtobuf()) @@ -231,7 +238,7 @@ public static ContainerProtos.ReadChunkResponseProto readChunk( .setContainerID(blockID.getContainerID()) .setDatanodeUuid(id).setReadChunk(readChunkRequest); String encodedToken = getEncodedBlockToken(new Text(blockID. - getContainerBlockID().toString())); + getContainerBlockID().toString()), tokens); if (encodedToken != null) { builder.setEncodedToken(encodedToken); } @@ -285,7 +292,7 @@ public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk, */ public static XceiverClientReply writeChunkAsync( XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID, - ByteString data) + ByteString data, Collection> tokens) throws IOException, ExecutionException, InterruptedException { WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto.newBuilder() @@ -297,7 +304,7 @@ public static XceiverClientReply writeChunkAsync( .setContainerID(blockID.getContainerID()) .setDatanodeUuid(id).setWriteChunk(writeChunkRequest); String encodedToken = getEncodedBlockToken(new Text(blockID. - getContainerBlockID().toString())); + getContainerBlockID().toString()), tokens); if (encodedToken != null) { builder.setEncodedToken(encodedToken); } @@ -540,8 +547,13 @@ public static void validateContainerResponse( private static String getEncodedBlockToken(Text service) throws IOException { UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + return getEncodedBlockToken(service, ugi.getTokens()); + } + + private static String getEncodedBlockToken(Text service, + Collection> tokens) throws IOException { Token token = - OzoneBlockTokenSelector.selectBlockToken(service, ugi.getTokens()); + OzoneBlockTokenSelector.selectBlockToken(service, tokens); if (token != null) { return token.encodeToUrlString(); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientGrpc.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientGrpc.java index 32a0b1d00272..5b9bc4122655 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientGrpc.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientGrpc.java @@ -41,6 +41,8 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; +import static java.util.Collections.emptyList; + /** * Tests for TestXceiverClientGrpc, to ensure topology aware reads work * select the closest node, and connections are re-used after a getBlock call. @@ -156,7 +158,7 @@ private void invokeXceiverClientGetBlock(XceiverClientSpi client) .setContainerID(1) .setLocalID(1) .setBlockCommitSequenceId(1) - .build()); + .build(), emptyList()); } private void invokeXceiverClientReadChunk(XceiverClientSpi client) @@ -174,7 +176,7 @@ private void invokeXceiverClientReadChunk(XceiverClientSpi client) .setOffset(100) .build(), bid, - null); + null, emptyList()); } private void invokeXceiverClientReadSmallFile(XceiverClientSpi client) From 32387fcaf07af13a71fa4fca000868fc3eaac920 Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" Date: Mon, 28 Sep 2020 20:16:34 +0200 Subject: [PATCH 2/5] Fix read path --- .../org/apache/hadoop/hdds/scm/storage/BlockInputStream.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index 219451a2a979..74202405e980 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -131,8 +131,6 @@ public synchronized void initialize() throws IOException { return; } - tokens = UserGroupInformation.getCurrentUser().getTokens(); - List chunks = null; try { chunks = getChunkInfos(); @@ -198,6 +196,7 @@ protected List getChunkInfos() throws IOException { if (token != null) { UserGroupInformation.getCurrentUser().addToken(token); } + tokens = UserGroupInformation.getCurrentUser().getTokens(); DatanodeBlockID datanodeBlockID = blockID .getDatanodeBlockIDProtobuf(); GetBlockResponseProto response = ContainerProtocolCalls From 980842eb0f1377d87decf3fe638fb4874a658830 Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" Date: Tue, 29 Sep 2020 10:53:01 +0200 Subject: [PATCH 3/5] trigger new CI check From 3fd7d07b7d35d2060db5ea5ce1bdfe3fac8e63c1 Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" Date: Wed, 30 Sep 2020 11:15:23 +0200 Subject: [PATCH 4/5] Add javadoc for new param --- .../hadoop/hdds/scm/storage/ContainerProtocolCalls.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 84d067a9485d..d52bb1bc81ce 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -82,6 +82,8 @@ private ContainerProtocolCalls() { * * @param xceiverClient client to perform call * @param datanodeBlockID blockID to identify container + * @param tokens list of tokens the current user has, possibly including a + * token for this block * @return container protocol get block response * @throws IOException if there is an I/O error while performing the call */ @@ -185,6 +187,8 @@ public static ContainerProtos.PutBlockResponseProto putBlock( * @param xceiverClient client to perform call * @param containerBlockData block data to identify container * @param eof whether this is the last putBlock for the same block + * @param tokens list of tokens the current user has, possibly including a + * token for this block * @return putBlockResponse * @throws IOException if there is an error while performing the call * @throws InterruptedException @@ -221,6 +225,8 @@ public static XceiverClientReply putBlockAsync( * @param chunk information about chunk to read * @param blockID ID of the block * @param validators functions to validate the response + * @param tokens list of tokens the current user has, possibly including a + * token for this block * @return container protocol read chunk response * @throws IOException if there is an I/O error while performing the call */ @@ -288,6 +294,8 @@ public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk, * @param chunk information about chunk to write * @param blockID ID of the block * @param data the data of the chunk to write + * @param tokens list of tokens the current user has, possibly including a + * token for this block * @throws IOException if there is an I/O error while performing the call */ public static XceiverClientReply writeChunkAsync( From e05eb00f091c77920d1e6e5f0e06762bca398e3f Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" Date: Thu, 8 Oct 2020 10:58:23 +0200 Subject: [PATCH 5/5] HDDS-4285. Directly pass block token --- .../hdds/scm/storage/BlockInputStream.java | 12 +- .../hdds/scm/storage/BlockOutputStream.java | 14 +- .../hdds/scm/storage/ChunkInputStream.java | 9 +- .../scm/storage/DummyChunkInputStream.java | 4 +- .../TestBlockOutputStreamCorrectness.java | 2 +- .../scm/storage/ContainerProtocolCalls.java | 151 ++++++------------ .../client/io/BlockOutputStreamEntry.java | 8 +- .../client/io/BlockOutputStreamEntryPool.java | 5 +- .../ozone/om/helpers/OmKeyLocationInfo.java | 6 +- .../ozone/scm/TestContainerSmallFile.java | 20 +-- .../TestGetCommittedBlockLengthAndPutKey.java | 4 +- .../ozone/scm/TestXceiverClientGrpc.java | 8 +- .../hadoop/ozone/debug/ChunkKeyHandler.java | 11 +- 13 files changed, 87 insertions(+), 167 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index 74202405e980..17b513f9beb0 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -24,7 +24,6 @@ import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.hdds.scm.XceiverClientManager; @@ -33,7 +32,6 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto; -import org.apache.hadoop.security.token.TokenIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +40,6 @@ import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.List; import java.util.function.Function; @@ -98,7 +95,6 @@ public class BlockInputStream extends InputStream implements Seekable { private int chunkIndexOfPrevPosition; private Function refreshPipelineFunction; - private Collection> tokens; public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline, Token token, boolean verifyChecksum, @@ -193,14 +189,10 @@ protected List getChunkInfos() throws IOException { blockID.getContainerID()); } - if (token != null) { - UserGroupInformation.getCurrentUser().addToken(token); - } - tokens = UserGroupInformation.getCurrentUser().getTokens(); DatanodeBlockID datanodeBlockID = blockID .getDatanodeBlockIDProtobuf(); GetBlockResponseProto response = ContainerProtocolCalls - .getBlock(xceiverClient, datanodeBlockID, tokens); + .getBlock(xceiverClient, datanodeBlockID, token); chunks = response.getBlockData().getChunksList(); success = true; @@ -220,7 +212,7 @@ protected List getChunkInfos() throws IOException { */ protected synchronized void addStream(ChunkInfo chunkInfo) { chunkStreams.add(new ChunkInputStream(chunkInfo, blockID, - xceiverClient, verifyChecksum, tokens)); + xceiverClient, verifyChecksum, token)); } public synchronized long getRemaining() throws IOException { diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index b0ac5b894946..75225d8b1669 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -53,7 +52,6 @@ import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync; import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.writeChunkAsync; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; @@ -129,7 +127,7 @@ public class BlockOutputStream extends OutputStream { private int currentBufferRemaining; //current buffer allocated to write private ChunkBuffer currentBuffer; - private final Collection> tokens; + private final Token token; /** * Creates a new BlockOutputStream. @@ -142,6 +140,7 @@ public class BlockOutputStream extends OutputStream { * @param streamBufferMaxSize max size of the currentBuffer * @param checksumType checksum type * @param bytesPerChecksum Bytes per checksum + * @param token a token for this block (may be null) */ @SuppressWarnings("parameternumber") public BlockOutputStream(BlockID blockID, @@ -149,7 +148,8 @@ public BlockOutputStream(BlockID blockID, int streamBufferSize, long streamBufferFlushSize, boolean streamBufferFlushDelay, long streamBufferMaxSize, BufferPool bufferPool, ChecksumType checksumType, - int bytesPerChecksum) throws IOException { + int bytesPerChecksum, Token token) + throws IOException { this.blockID = new AtomicReference<>(blockID); KeyValue keyValue = KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build(); @@ -164,6 +164,7 @@ public BlockOutputStream(BlockID blockID, this.streamBufferFlushDelay = streamBufferFlushDelay; this.bufferPool = bufferPool; this.bytesPerChecksum = bytesPerChecksum; + this.token = token; //number of buffers used before doing a flush refreshCurrentBuffer(bufferPool); @@ -182,7 +183,6 @@ public BlockOutputStream(BlockID blockID, failedServers = new ArrayList<>(0); ioException = new AtomicReference<>(null); checksum = new Checksum(checksumType, bytesPerChecksum); - tokens = UserGroupInformation.getCurrentUser().getTokens(); } private void refreshCurrentBuffer(BufferPool pool) { @@ -432,7 +432,7 @@ ContainerCommandResponseProto> executePutBlock(boolean close, try { BlockData blockData = containerBlockData.build(); XceiverClientReply asyncReply = - putBlockAsync(xceiverClient, blockData, close, tokens); + putBlockAsync(xceiverClient, blockData, close, token); CompletableFuture future = asyncReply.getResponse(); flushFuture = future.thenApplyAsync(e -> { @@ -671,7 +671,7 @@ private void writeChunkToContainer(ChunkBuffer chunk) throws IOException { try { XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo, - blockID.get(), data, tokens); + blockID.get(), data, token); CompletableFuture future = asyncReply.getResponse(); future.thenApplyAsync(e -> { diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java index a77701a595a8..cfb3a21f62be 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java @@ -39,7 +39,6 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; -import java.util.Collection; import java.util.List; /** @@ -75,19 +74,19 @@ public class ChunkInputStream extends InputStream implements Seekable { // position. Once the chunk is read, this variable is reset. private long chunkPosition = -1; - private final Collection> tokens; + private final Token token; private static final int EOF = -1; ChunkInputStream(ChunkInfo chunkInfo, BlockID blockId, XceiverClientSpi xceiverClient, boolean verifyChecksum, - Collection> tokens) { + Token token) { this.chunkInfo = chunkInfo; this.length = chunkInfo.getLen(); this.blockID = blockId; this.xceiverClient = xceiverClient; this.verifyChecksum = verifyChecksum; - this.tokens = tokens; + this.token = token; } public synchronized long getRemaining() throws IOException { @@ -339,7 +338,7 @@ protected ByteString readChunk(ChunkInfo readChunkInfo) throws IOException { validators.add(validator); readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient, - readChunkInfo, blockID, validators, tokens); + readChunkInfo, blockID, validators, token); } catch (IOException e) { if (e instanceof StorageContainerException) { diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java index 3035056bd97b..e654d119f75d 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java @@ -26,8 +26,6 @@ import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; -import static java.util.Collections.emptyList; - /** * A dummy ChunkInputStream to mock read chunk calls to DN. */ @@ -44,7 +42,7 @@ public DummyChunkInputStream(TestChunkInputStream testChunkInputStream, XceiverClientSpi xceiverClient, boolean verifyChecksum, byte[] data) { - super(chunkInfo, blockId, xceiverClient, verifyChecksum, emptyList()); + super(chunkInfo, blockId, xceiverClient, verifyChecksum, null); this.chunkData = data; } diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java index 141a1d81e832..545c5889bceb 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java @@ -107,7 +107,7 @@ private BlockOutputStream createBlockOutputStream(BufferPool bufferPool) 32 * 1024 * 1024, bufferPool, ChecksumType.NONE, - 256 * 1024); + 256 * 1024, null); return outputStream; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index d52bb1bc81ce..eecc97595090 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -55,11 +54,8 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; -import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSelector; -import org.apache.hadoop.io.Text; import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChecksumData; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @@ -82,14 +78,13 @@ private ContainerProtocolCalls() { * * @param xceiverClient client to perform call * @param datanodeBlockID blockID to identify container - * @param tokens list of tokens the current user has, possibly including a - * token for this block + * @param token a token for this block (may be null) * @return container protocol get block response * @throws IOException if there is an I/O error while performing the call */ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, DatanodeBlockID datanodeBlockID, - Collection> tokens) throws IOException { + Token token) throws IOException { GetBlockRequestProto.Builder readBlockRequest = GetBlockRequestProto .newBuilder() .setBlockID(datanodeBlockID); @@ -101,10 +96,8 @@ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, .setContainerID(datanodeBlockID.getContainerID()) .setDatanodeUuid(id) .setGetBlock(readBlockRequest); - String encodedToken = getEncodedBlockToken(getService(datanodeBlockID), - tokens); - if (encodedToken != null) { - builder.setEncodedToken(encodedToken); + if (token != null) { + builder.setEncodedToken(token.encodeToUrlString()); } ContainerCommandRequestProto request = builder.build(); @@ -122,12 +115,14 @@ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, * * @param xceiverClient client to perform call * @param blockID blockId for the Block + * @param token a token for this block (may be null) * @return container protocol getLastCommittedBlockLength response * @throws IOException if there is an I/O error while performing the call */ public static ContainerProtos.GetCommittedBlockLengthResponseProto getCommittedBlockLength( - XceiverClientSpi xceiverClient, BlockID blockID) + XceiverClientSpi xceiverClient, BlockID blockID, + Token token) throws IOException { ContainerProtos.GetCommittedBlockLengthRequestProto.Builder getBlockLengthRequestBuilder = @@ -140,10 +135,8 @@ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, .setContainerID(blockID.getContainerID()) .setDatanodeUuid(id) .setGetCommittedBlockLength(getBlockLengthRequestBuilder); - String encodedToken = getEncodedBlockToken(new Text(blockID. - getContainerBlockID().toString())); - if (encodedToken != null) { - builder.setEncodedToken(encodedToken); + if (token != null) { + builder.setEncodedToken(token.encodeToUrlString()); } ContainerCommandRequestProto request = builder.build(); ContainerCommandResponseProto response = @@ -156,11 +149,13 @@ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, * * @param xceiverClient client to perform call * @param containerBlockData block data to identify container + * @param token a token for this block (may be null) * @return putBlockResponse * @throws IOException if there is an I/O error while performing the call */ public static ContainerProtos.PutBlockResponseProto putBlock( - XceiverClientSpi xceiverClient, BlockData containerBlockData) + XceiverClientSpi xceiverClient, BlockData containerBlockData, + Token token) throws IOException { PutBlockRequestProto.Builder createBlockRequest = PutBlockRequestProto.newBuilder().setBlockData(containerBlockData); @@ -170,10 +165,8 @@ public static ContainerProtos.PutBlockResponseProto putBlock( .setContainerID(containerBlockData.getBlockID().getContainerID()) .setDatanodeUuid(id) .setPutBlock(createBlockRequest); - String encodedToken = - getEncodedBlockToken(getService(containerBlockData.getBlockID())); - if (encodedToken != null) { - builder.setEncodedToken(encodedToken); + if (token != null) { + builder.setEncodedToken(token.encodeToUrlString()); } ContainerCommandRequestProto request = builder.build(); ContainerCommandResponseProto response = @@ -187,8 +180,7 @@ public static ContainerProtos.PutBlockResponseProto putBlock( * @param xceiverClient client to perform call * @param containerBlockData block data to identify container * @param eof whether this is the last putBlock for the same block - * @param tokens list of tokens the current user has, possibly including a - * token for this block + * @param token a token for this block (may be null) * @return putBlockResponse * @throws IOException if there is an error while performing the call * @throws InterruptedException @@ -196,7 +188,7 @@ public static ContainerProtos.PutBlockResponseProto putBlock( */ public static XceiverClientReply putBlockAsync( XceiverClientSpi xceiverClient, BlockData containerBlockData, boolean eof, - Collection> tokens) + Token token) throws IOException, InterruptedException, ExecutionException { PutBlockRequestProto.Builder createBlockRequest = PutBlockRequestProto.newBuilder() @@ -208,11 +200,8 @@ public static XceiverClientReply putBlockAsync( .setContainerID(containerBlockData.getBlockID().getContainerID()) .setDatanodeUuid(id) .setPutBlock(createBlockRequest); - String encodedToken = - getEncodedBlockToken(getService(containerBlockData.getBlockID()), - tokens); - if (encodedToken != null) { - builder.setEncodedToken(encodedToken); + if (token != null) { + builder.setEncodedToken(token.encodeToUrlString()); } ContainerCommandRequestProto request = builder.build(); return xceiverClient.sendCommandAsync(request); @@ -225,15 +214,14 @@ public static XceiverClientReply putBlockAsync( * @param chunk information about chunk to read * @param blockID ID of the block * @param validators functions to validate the response - * @param tokens list of tokens the current user has, possibly including a - * token for this block + * @param token a token for this block (may be null) * @return container protocol read chunk response * @throws IOException if there is an I/O error while performing the call */ public static ContainerProtos.ReadChunkResponseProto readChunk( XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID, List validators, - Collection> tokens) throws IOException { + Token token) throws IOException { ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto.newBuilder() .setBlockID(blockID.getDatanodeBlockIDProtobuf()) @@ -243,10 +231,8 @@ public static ContainerProtos.ReadChunkResponseProto readChunk( ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadChunk) .setContainerID(blockID.getContainerID()) .setDatanodeUuid(id).setReadChunk(readChunkRequest); - String encodedToken = getEncodedBlockToken(new Text(blockID. - getContainerBlockID().toString()), tokens); - if (encodedToken != null) { - builder.setEncodedToken(encodedToken); + if (token != null) { + builder.setEncodedToken(token.encodeToUrlString()); } ContainerCommandRequestProto request = builder.build(); ContainerCommandResponseProto reply = @@ -261,10 +247,12 @@ public static ContainerProtos.ReadChunkResponseProto readChunk( * @param chunk information about chunk to write * @param blockID ID of the block * @param data the data of the chunk to write + * @param token a token for this block (may be null) * @throws IOException if there is an error while performing the call */ public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk, - BlockID blockID, ByteString data) + BlockID blockID, ByteString data, + Token token) throws IOException { WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto .newBuilder() @@ -278,10 +266,8 @@ public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk, .setContainerID(blockID.getContainerID()) .setDatanodeUuid(id) .setWriteChunk(writeChunkRequest); - String encodedToken = getEncodedBlockToken(new Text(blockID. - getContainerBlockID().toString())); - if (encodedToken != null) { - builder.setEncodedToken(encodedToken); + if (token != null) { + builder.setEncodedToken(token.encodeToUrlString()); } ContainerCommandRequestProto request = builder.build(); xceiverClient.sendCommand(request, getValidatorList()); @@ -294,13 +280,12 @@ public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk, * @param chunk information about chunk to write * @param blockID ID of the block * @param data the data of the chunk to write - * @param tokens list of tokens the current user has, possibly including a - * token for this block + * @param token a token for this block (may be null) * @throws IOException if there is an I/O error while performing the call */ public static XceiverClientReply writeChunkAsync( XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID, - ByteString data, Collection> tokens) + ByteString data, Token token) throws IOException, ExecutionException, InterruptedException { WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto.newBuilder() @@ -311,10 +296,8 @@ public static XceiverClientReply writeChunkAsync( ContainerCommandRequestProto.newBuilder().setCmdType(Type.WriteChunk) .setContainerID(blockID.getContainerID()) .setDatanodeUuid(id).setWriteChunk(writeChunkRequest); - String encodedToken = getEncodedBlockToken(new Text(blockID. - getContainerBlockID().toString()), tokens); - if (encodedToken != null) { - builder.setEncodedToken(encodedToken); + if (token != null) { + builder.setEncodedToken(token.encodeToUrlString()); } ContainerCommandRequestProto request = builder.build(); return xceiverClient.sendCommandAsync(request); @@ -329,12 +312,13 @@ public static XceiverClientReply writeChunkAsync( * @param client - client that communicates with the container. * @param blockID - ID of the block * @param data - Data to be written into the container. + * @param token a token for this block (may be null) * @return container protocol writeSmallFile response * @throws IOException */ public static PutSmallFileResponseProto writeSmallFile( - XceiverClientSpi client, BlockID blockID, byte[] data) - throws IOException { + XceiverClientSpi client, BlockID blockID, byte[] data, + Token token) throws IOException { BlockData containerBlockData = BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf()) @@ -369,10 +353,8 @@ public static PutSmallFileResponseProto writeSmallFile( .setContainerID(blockID.getContainerID()) .setDatanodeUuid(id) .setPutSmallFile(putSmallFileRequest); - String encodedToken = getEncodedBlockToken(new Text(blockID. - getContainerBlockID().toString())); - if (encodedToken != null) { - builder.setEncodedToken(encodedToken); + if (token != null) { + builder.setEncodedToken(token.encodeToUrlString()); } ContainerCommandRequestProto request = builder.build(); ContainerCommandResponseProto response = @@ -492,11 +474,13 @@ public static ReadContainerResponseProto readContainer( * * @param client * @param blockID - ID of the block + * @param token a token for this block (may be null) * @return GetSmallFileResponseProto * @throws IOException */ public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client, - BlockID blockID) throws IOException { + BlockID blockID, + Token token) throws IOException { GetBlockRequestProto.Builder getBlock = GetBlockRequestProto .newBuilder() .setBlockID(blockID.getDatanodeBlockIDProtobuf()); @@ -512,10 +496,8 @@ public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client, .setContainerID(blockID.getContainerID()) .setDatanodeUuid(id) .setGetSmallFile(getSmallFileRequest); - String encodedToken = getEncodedBlockToken(new Text(blockID. - getContainerBlockID().toString())); - if (encodedToken != null) { - builder.setEncodedToken(encodedToken); + if (token != null) { + builder.setEncodedToken(token.encodeToUrlString()); } ContainerCommandRequestProto request = builder.build(); ContainerCommandResponseProto response = @@ -546,37 +528,6 @@ public static void validateContainerResponse( response.getMessage(), response.getResult()); } - /** - * Returns a url encoded block token. Service param should match the service - * field of token. - * @param service - * - * */ - private static String getEncodedBlockToken(Text service) - throws IOException { - UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - return getEncodedBlockToken(service, ugi.getTokens()); - } - - private static String getEncodedBlockToken(Text service, - Collection> tokens) throws IOException { - Token token = - OzoneBlockTokenSelector.selectBlockToken(service, tokens); - if (token != null) { - return token.encodeToUrlString(); - } - return null; - } - - private static Text getService(DatanodeBlockID blockId) { - return new Text(new StringBuffer() - .append("conID: ") - .append(blockId.getContainerID()) - .append(" locID: ") - .append(blockId.getLocalID()) - .toString()); - } - public static List getValidatorList() { List validators = new ArrayList<>(1); CheckedBiFunction getValidatorList() { public static HashMap getBlockFromAllNodes( XceiverClientSpi xceiverClient, - DatanodeBlockID datanodeBlockID) throws IOException, - InterruptedException { + DatanodeBlockID datanodeBlockID, + Token token) + throws IOException, InterruptedException { GetBlockRequestProto.Builder readBlockRequest = GetBlockRequestProto .newBuilder() .setBlockID(datanodeBlockID); @@ -598,14 +550,13 @@ public static List getValidatorList() { = new HashMap<>(); String id = xceiverClient.getPipeline().getFirstNode().getUuidString(); ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto - .newBuilder() - .setCmdType(Type.GetBlock) - .setContainerID(datanodeBlockID.getContainerID()) - .setDatanodeUuid(id) - .setGetBlock(readBlockRequest); - String encodedToken = getEncodedBlockToken(getService(datanodeBlockID)); - if (encodedToken != null) { - builder.setEncodedToken(encodedToken); + .newBuilder() + .setCmdType(Type.GetBlock) + .setContainerID(datanodeBlockID.getContainerID()) + .setDatanodeUuid(id) + .setGetBlock(readBlockRequest); + if (token != null) { + builder.setEncodedToken(token.encodeToUrlString()); } ContainerCommandRequestProto request = builder.build(); Map responses = diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java index 8e1e6405e770..49c608c45ef2 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; import org.apache.hadoop.hdds.scm.storage.BufferPool; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import java.util.Collection; @@ -53,7 +52,7 @@ public final class BlockOutputStreamEntry extends OutputStream { private final long length; // the current position of this stream 0 <= currentPosition < length private long currentPosition; - private Token token; + private final Token token; private final int streamBufferSize; private final long streamBufferFlushSize; @@ -110,14 +109,11 @@ long getRemaining() { */ private void checkStream() throws IOException { if (this.outputStream == null) { - if (getToken() != null) { - UserGroupInformation.getCurrentUser().addToken(getToken()); - } this.outputStream = new BlockOutputStream(blockID, xceiverClientManager, pipeline, streamBufferSize, streamBufferFlushSize, streamBufferFlushDelay, streamBufferMaxSize, bufferPool, - checksumType, bytesPerChecksum); + checksumType, bytesPerChecksum, token); } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java index 712d1199a335..7d33c45b1fdc 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java @@ -36,7 +36,6 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; -import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -177,10 +176,8 @@ public void addPreallocateBlocks(OmKeyLocationInfoGroup version, } } - private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) - throws IOException { + private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) { Preconditions.checkNotNull(subKeyInfo.getPipeline()); - UserGroupInformation.getCurrentUser().addToken(subKeyInfo.getToken()); BlockOutputStreamEntry.Builder builder = new BlockOutputStreamEntry.Builder() .setBlockID(subKeyInfo.getBlockID()) diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java index 70c71d6d7f32..6986b4a90d7a 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java @@ -148,11 +148,7 @@ public Builder setToken(Token bToken) { } public OmKeyLocationInfo build() { - if (token == null) { - return new OmKeyLocationInfo(blockID, pipeline, length, offset); - } else { - return new OmKeyLocationInfo(blockID, pipeline, length, offset, token); - } + return new OmKeyLocationInfo(blockID, pipeline, length, offset, token); } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java index 7ebe18939555..316951343014 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java @@ -98,9 +98,9 @@ public void testAllocateWrite() throws Exception { BlockID blockID = ContainerTestHelper.getTestBlockID( container.getContainerInfo().getContainerID()); ContainerProtocolCalls.writeSmallFile(client, blockID, - "data123".getBytes()); + "data123".getBytes(), null); ContainerProtos.GetSmallFileResponseProto response = - ContainerProtocolCalls.readSmallFile(client, blockID); + ContainerProtocolCalls.readSmallFile(client, blockID, null); String readData = response.getData().getData().toStringUtf8(); Assert.assertEquals("data123", readData); xceiverClientManager.releaseClient(client, false); @@ -124,7 +124,7 @@ public void testInvalidBlockRead() throws Exception { container.getContainerInfo().getContainerID()); // Try to read a Key Container Name ContainerProtos.GetSmallFileResponseProto response = - ContainerProtocolCalls.readSmallFile(client, blockID); + ContainerProtocolCalls.readSmallFile(client, blockID, null); xceiverClientManager.releaseClient(client, false); } @@ -142,7 +142,7 @@ public void testInvalidContainerRead() throws Exception { BlockID blockID = ContainerTestHelper.getTestBlockID( container.getContainerInfo().getContainerID()); ContainerProtocolCalls.writeSmallFile(client, blockID, - "data123".getBytes()); + "data123".getBytes(), null); thrown.expect(StorageContainerException.class); thrown.expectMessage("ContainerID 8888 does not exist"); @@ -151,7 +151,7 @@ public void testInvalidContainerRead() throws Exception { ContainerProtos.GetSmallFileResponseProto response = ContainerProtocolCalls.readSmallFile(client, ContainerTestHelper.getTestBlockID( - nonExistContainerID)); + nonExistContainerID), null); xceiverClientManager.releaseClient(client, false); } @@ -170,14 +170,14 @@ public void testReadWriteWithBCSId() throws Exception { container.getContainerInfo().getContainerID()); ContainerProtos.PutSmallFileResponseProto responseProto = ContainerProtocolCalls - .writeSmallFile(client, blockID1, "data123".getBytes()); + .writeSmallFile(client, blockID1, "data123".getBytes(), null); long bcsId = responseProto.getCommittedBlockLength().getBlockID() .getBlockCommitSequenceId(); try { blockID1.setBlockCommitSequenceId(bcsId + 1); //read a file with higher bcsId than the container bcsId ContainerProtocolCalls - .readSmallFile(client, blockID1); + .readSmallFile(client, blockID1, null); Assert.fail("Expected exception not thrown"); } catch (StorageContainerException sce) { Assert @@ -188,12 +188,12 @@ public void testReadWriteWithBCSId() throws Exception { BlockID blockID2 = ContainerTestHelper .getTestBlockID(container.getContainerInfo().getContainerID()); ContainerProtocolCalls - .writeSmallFile(client, blockID2, "data123".getBytes()); + .writeSmallFile(client, blockID2, "data123".getBytes(), null); try { blockID1.setBlockCommitSequenceId(bcsId + 1); //read a file with higher bcsId than the committed bcsId for the block - ContainerProtocolCalls.readSmallFile(client, blockID1); + ContainerProtocolCalls.readSmallFile(client, blockID1, null); Assert.fail("Expected exception not thrown"); } catch (StorageContainerException sce) { Assert @@ -201,7 +201,7 @@ public void testReadWriteWithBCSId() throws Exception { } blockID1.setBlockCommitSequenceId(bcsId); ContainerProtos.GetSmallFileResponseProto response = - ContainerProtocolCalls.readSmallFile(client, blockID1); + ContainerProtocolCalls.readSmallFile(client, blockID1, null); String readData = response.getData().getData().toStringUtf8(); Assert.assertEquals("data123", readData); xceiverClientManager.releaseClient(client, false); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java index 7b4fc5367bb7..5efb57e10402 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java @@ -113,7 +113,7 @@ public void tesGetCommittedBlockLength() throws Exception { .getPutBlockRequest(pipeline, writeChunkRequest.getWriteChunk()); client.sendCommand(putKeyRequest); response = ContainerProtocolCalls - .getCommittedBlockLength(client, blockID); + .getCommittedBlockLength(client, blockID, null); // make sure the block ids in the request and response are same. Assert.assertTrue( BlockID.getFromProtobuf(response.getBlockID()).equals(blockID)); @@ -137,7 +137,7 @@ public void testGetCommittedBlockLengthForInvalidBlock() throws Exception { try { // There is no block written inside the container. The request should // fail. - ContainerProtocolCalls.getCommittedBlockLength(client, blockID); + ContainerProtocolCalls.getCommittedBlockLength(client, blockID, null); Assert.fail("Expected exception not thrown"); } catch (StorageContainerException sce) { Assert.assertTrue(sce.getMessage().contains("Unable to find the block")); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientGrpc.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientGrpc.java index 5b9bc4122655..74a7537ab171 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientGrpc.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientGrpc.java @@ -41,8 +41,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; -import static java.util.Collections.emptyList; - /** * Tests for TestXceiverClientGrpc, to ensure topology aware reads work * select the closest node, and connections are re-used after a getBlock call. @@ -158,7 +156,7 @@ private void invokeXceiverClientGetBlock(XceiverClientSpi client) .setContainerID(1) .setLocalID(1) .setBlockCommitSequenceId(1) - .build(), emptyList()); + .build(), null); } private void invokeXceiverClientReadChunk(XceiverClientSpi client) @@ -176,14 +174,14 @@ private void invokeXceiverClientReadChunk(XceiverClientSpi client) .setOffset(100) .build(), bid, - null, emptyList()); + null, null); } private void invokeXceiverClientReadSmallFile(XceiverClientSpi client) throws IOException { BlockID bid = new BlockID(1, 1); bid.setBlockCommitSequenceId(1); - ContainerProtocolCalls.readSmallFile(client, bid); + ContainerProtocolCalls.readSmallFile(client, bid, null); } private XceiverClientReply buildValidResponse() { diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/ChunkKeyHandler.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/ChunkKeyHandler.java index 4f69da78b905..898aca46f6bb 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/ChunkKeyHandler.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/ChunkKeyHandler.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; -import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientException; @@ -51,8 +50,6 @@ import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; import org.apache.hadoop.ozone.shell.OzoneAddress; import org.apache.hadoop.ozone.shell.keys.KeyHandler; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; import org.kohsuke.MetaInfServices; import picocli.CommandLine.Command; import picocli.CommandLine.Parameters; @@ -121,7 +118,6 @@ protected void execute(OzoneClient client, OzoneAddress address) ContainerChunkInfo containerChunkInfo = new ContainerChunkInfo(); long containerId = keyLocation.getContainerID(); chunkPaths.clear(); - Token token = keyLocation.getToken(); Pipeline pipeline = keyLocation.getPipeline(); if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) { pipeline = Pipeline.newBuilder(pipeline) @@ -131,17 +127,14 @@ protected void execute(OzoneClient client, OzoneAddress address) .acquireClientForReadData(pipeline); // Datanode is queried to get chunk information.Thus querying the // OM,SCM and datanode helps us get chunk location information - if (token != null) { - UserGroupInformation.getCurrentUser().addToken(token); - } ContainerProtos.DatanodeBlockID datanodeBlockID = keyLocation.getBlockID() .getDatanodeBlockIDProtobuf(); // doing a getBlock on all nodes HashMap responses = null; try { - responses = ContainerProtocolCalls - .getBlockFromAllNodes(xceiverClient, datanodeBlockID); + responses = ContainerProtocolCalls.getBlockFromAllNodes( + xceiverClient, datanodeBlockID, keyLocation.getToken()); } catch (InterruptedException e) { LOG.error("Execution interrupted due to " + e); }