From eafcd3fe40d060d98e5fec007d919d147a6ce636 Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Thu, 26 Oct 2023 12:06:39 -0400 Subject: [PATCH 1/4] Deep copy RpcLogDetails' param field --- .../hadoop/hbase/client/SlowLogParams.java | 2 +- .../hbase/namequeues/RpcLogDetails.java | 44 ++++++++++++++++++- 2 files changed, 43 insertions(+), 3 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogParams.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogParams.java index 6af7c42c26dd..92405fbc06b7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogParams.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogParams.java @@ -82,7 +82,7 @@ public boolean equals(Object o) { } SlowLogParams that = (SlowLogParams) o; return new EqualsBuilder().append(regionName, that.regionName).append(params, that.params) - .append("scan", scan).isEquals(); + .append(scan, that.scan).isEquals(); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java index eb35d886bbb0..baed0289e4c3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java @@ -21,9 +21,14 @@ import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.hadoop.hbase.ipc.RpcCall; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; import org.apache.hbase.thirdparty.com.google.protobuf.Message; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; + /** * RpcCall details that would be passed on to ring buffer of slow log responses */ @@ -32,8 +37,10 @@ public class RpcLogDetails extends NamedQueuePayload { public static final int SLOW_LOG_EVENT = 0; + private static final Logger LOG = LoggerFactory.getLogger(RpcLogDetails.class.getName()); + private final RpcCall rpcCall; - private final Message param; + private Message param; private final String clientAddress; private final long responseSize; private final long blockBytesScanned; @@ -47,7 +54,6 @@ public RpcLogDetails(RpcCall rpcCall, Message param, String clientAddress, long long blockBytesScanned, String className, boolean isSlowLog, boolean isLargeLog) { super(SLOW_LOG_EVENT); this.rpcCall = rpcCall; - this.param = param; this.clientAddress = clientAddress; this.responseSize = responseSize; this.blockBytesScanned = blockBytesScanned; @@ -60,6 +66,40 @@ public RpcLogDetails(RpcCall rpcCall, Message param, String clientAddress, long // would result in corrupted attributes this.connectionAttributes = rpcCall.getConnectionAttributes(); this.requestAttributes = rpcCall.getRequestAttributes(); + + // We also need to deep copy the message because the CodedInputStream may be + // overwritten before this slow log is consumed. Such overwriting could + // cause the slow log payload to be corrupt + try { + if (param instanceof ClientProtos.ScanRequest) { + ClientProtos.ScanRequest scanRequest = (ClientProtos.ScanRequest) param; + this.param = ClientProtos.ScanRequest.parseFrom(scanRequest.toByteArray()); + } else if (param instanceof ClientProtos.MutationProto) { + ClientProtos.MutationProto mutationProto = (ClientProtos.MutationProto) param; + this.param = ClientProtos.MutationProto.parseFrom(mutationProto.toByteArray()); + } else if (param instanceof ClientProtos.GetRequest) { + ClientProtos.GetRequest getRequest = (ClientProtos.GetRequest) param; + this.param = ClientProtos.GetRequest.parseFrom(getRequest.toByteArray()); + } else if (param instanceof ClientProtos.MultiRequest) { + ClientProtos.MultiRequest multiRequest = (ClientProtos.MultiRequest) param; + this.param = ClientProtos.MultiRequest.parseFrom(multiRequest.toByteArray()); + } else if (param instanceof ClientProtos.MutateRequest) { + ClientProtos.MutateRequest mutateRequest = (ClientProtos.MutateRequest) param; + this.param = ClientProtos.MutateRequest.parseFrom(mutateRequest.toByteArray()); + } else if (param instanceof ClientProtos.CoprocessorServiceRequest) { + ClientProtos.CoprocessorServiceRequest coprocessorServiceRequest = + (ClientProtos.CoprocessorServiceRequest) param; + this.param = + ClientProtos.CoprocessorServiceRequest.parseFrom(coprocessorServiceRequest.toByteArray()); + } else { + this.param = param; + } + } catch (InvalidProtocolBufferException e) { + LOG.error("Failed to parse protobuf for message {}", param, e); + if (this.param == null) { + this.param = param; + } + } } public RpcCall getRpcCall() { From 814d5dfb095fc032e4f00503abb161875093f584 Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Thu, 2 Nov 2023 10:40:17 -0400 Subject: [PATCH 2/4] simplify constructor, test byte buffer corruption --- .../hbase/namequeues/RpcLogDetails.java | 26 +- .../hbase/namequeues/TestRpcLogDetails.java | 259 ++++++++++++++++++ 2 files changed, 260 insertions(+), 25 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestRpcLogDetails.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java index baed0289e4c3..c2ea1b4697a0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java @@ -27,8 +27,6 @@ import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; import org.apache.hbase.thirdparty.com.google.protobuf.Message; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; - /** * RpcCall details that would be passed on to ring buffer of slow log responses */ @@ -71,29 +69,7 @@ public RpcLogDetails(RpcCall rpcCall, Message param, String clientAddress, long // overwritten before this slow log is consumed. Such overwriting could // cause the slow log payload to be corrupt try { - if (param instanceof ClientProtos.ScanRequest) { - ClientProtos.ScanRequest scanRequest = (ClientProtos.ScanRequest) param; - this.param = ClientProtos.ScanRequest.parseFrom(scanRequest.toByteArray()); - } else if (param instanceof ClientProtos.MutationProto) { - ClientProtos.MutationProto mutationProto = (ClientProtos.MutationProto) param; - this.param = ClientProtos.MutationProto.parseFrom(mutationProto.toByteArray()); - } else if (param instanceof ClientProtos.GetRequest) { - ClientProtos.GetRequest getRequest = (ClientProtos.GetRequest) param; - this.param = ClientProtos.GetRequest.parseFrom(getRequest.toByteArray()); - } else if (param instanceof ClientProtos.MultiRequest) { - ClientProtos.MultiRequest multiRequest = (ClientProtos.MultiRequest) param; - this.param = ClientProtos.MultiRequest.parseFrom(multiRequest.toByteArray()); - } else if (param instanceof ClientProtos.MutateRequest) { - ClientProtos.MutateRequest mutateRequest = (ClientProtos.MutateRequest) param; - this.param = ClientProtos.MutateRequest.parseFrom(mutateRequest.toByteArray()); - } else if (param instanceof ClientProtos.CoprocessorServiceRequest) { - ClientProtos.CoprocessorServiceRequest coprocessorServiceRequest = - (ClientProtos.CoprocessorServiceRequest) param; - this.param = - ClientProtos.CoprocessorServiceRequest.parseFrom(coprocessorServiceRequest.toByteArray()); - } else { - this.param = param; - } + this.param = param.newBuilderForType().mergeFrom(param.toByteArray()).build(); } catch (InvalidProtocolBufferException e) { LOG.error("Failed to parse protobuf for message {}", param, e); if (this.param == null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestRpcLogDetails.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestRpcLogDetails.java new file mode 100644 index 000000000000..f0f50a7ebd3a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestRpcLogDetails.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.namequeues; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import java.io.IOException; +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.ipc.RpcCall; +import org.apache.hadoop.hbase.ipc.RpcCallback; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; +import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream; +import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; +import org.apache.hbase.thirdparty.com.google.protobuf.Message; +import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestRpcLogDetails { + + private final ClientProtos.Scan scan = + ClientProtos.Scan.newBuilder().setStartRow(ByteString.copyFrom(Bytes.toBytes("abc"))) + .setStopRow(ByteString.copyFrom(Bytes.toBytes("xyz"))).build(); + private final ClientProtos.Scan otherScan = + ClientProtos.Scan.newBuilder().setStartRow(ByteString.copyFrom(Bytes.toBytes("def"))) + .setStopRow(ByteString.copyFrom(Bytes.toBytes("uvw"))).build(); + private final ClientProtos.ScanRequest scanRequest = ClientProtos.ScanRequest + .newBuilder(ClientProtos.ScanRequest.getDefaultInstance()).setScan(scan).build(); + private final ClientProtos.ScanRequest otherScanRequest = ClientProtos.ScanRequest + .newBuilder(ClientProtos.ScanRequest.getDefaultInstance()).setScan(otherScan).build(); + + @Test + public void itDeepCopiesRpcLogDetailsParams() throws IOException { + ByteBuffer buffer = ByteBuffer.allocate(scanRequest.toByteArray().length); + CodedInputStream cis = UnsafeByteOperations.unsafeWrap(buffer).newCodedInput(); + cis.enableAliasing(true); + buffer.put(scanRequest.toByteArray()); + Message.Builder messageBuilder = ClientProtos.ScanRequest.newBuilder(); + ProtobufUtil.mergeFrom(messageBuilder, cis, buffer.capacity()); + Message message = messageBuilder.build(); + RpcLogDetails rpcLogDetails = + new RpcLogDetails(getRpcCall(message), message, null, 0L, 0L, null, true, false); + + // log's scan should be equal + ClientProtos.Scan logScan = ((ClientProtos.ScanRequest) rpcLogDetails.getParam()).getScan(); + assertEquals(logScan, scan); + + // ensure we have a different byte array for testing + assertFalse(Arrays.equals(scanRequest.toByteArray(), otherScanRequest.toByteArray())); + + // corrupt the underlying buffer + buffer.position(0); + buffer.put(otherScanRequest.toByteArray(), 0, otherScanRequest.toByteArray().length); + assertArrayEquals(otherScanRequest.toByteArray(), buffer.array()); + + // log scan should still be original scan + assertEquals(logScan, scan); + } + + @SuppressWarnings("checkstyle:methodlength") + private static RpcCall getRpcCall(Message message) { + RpcCall rpcCall = new RpcCall() { + @Override + public BlockingService getService() { + return null; + } + + @Override + public Descriptors.MethodDescriptor getMethod() { + return null; + } + + @Override + public Message getParam() { + return message; + } + + @Override + public CellScanner getCellScanner() { + return null; + } + + @Override + public long getReceiveTime() { + return 0; + } + + @Override + public long getStartTime() { + return 0; + } + + @Override + public void setStartTime(long startTime) { + } + + @Override + public int getTimeout() { + return 0; + } + + @Override + public int getPriority() { + return 0; + } + + @Override + public long getDeadline() { + return 0; + } + + @Override + public long getSize() { + return 0; + } + + @Override + public RPCProtos.RequestHeader getHeader() { + return null; + } + + @Override + public Map getConnectionAttributes() { + return Collections.emptyMap(); + } + + @Override + public Map getRequestAttributes() { + return Collections.emptyMap(); + } + + @Override + public byte[] getRequestAttribute(String key) { + return null; + } + + @Override + public int getRemotePort() { + return 0; + } + + @Override + public void setResponse(Message param, CellScanner cells, Throwable errorThrowable, + String error) { + } + + @Override + public void sendResponseIfReady() throws IOException { + } + + @Override + public void cleanup() { + } + + @Override + public String toShortString() { + return null; + } + + @Override + public long disconnectSince() { + return 0; + } + + @Override + public boolean isClientCellBlockSupported() { + return false; + } + + @Override + public Optional getRequestUser() { + return null; + } + + @Override + public InetAddress getRemoteAddress() { + return null; + } + + @Override + public HBaseProtos.VersionInfo getClientVersionInfo() { + return null; + } + + @Override + public void setCallBack(RpcCallback callback) { + } + + @Override + public boolean isRetryImmediatelySupported() { + return false; + } + + @Override + public long getResponseCellSize() { + return 0; + } + + @Override + public void incrementResponseCellSize(long cellSize) { + } + + @Override + public long getBlockBytesScanned() { + return 0; + } + + @Override + public void incrementBlockBytesScanned(long blockSize) { + } + + @Override + public long getResponseExceptionSize() { + return 0; + } + + @Override + public void incrementResponseExceptionSize(long exceptionSize) { + } + }; + return rpcCall; + } + +} From 41b36db5c671d2fe9460bd48910c52628a8fcde7 Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Thu, 2 Nov 2023 14:59:06 -0400 Subject: [PATCH 3/4] add classrule --- .../hadoop/hbase/namequeues/TestRpcLogDetails.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestRpcLogDetails.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestRpcLogDetails.java index f0f50a7ebd3a..8a93f2d0ff54 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestRpcLogDetails.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestRpcLogDetails.java @@ -29,12 +29,14 @@ import java.util.Map; import java.util.Optional; import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.ipc.RpcCall; import org.apache.hadoop.hbase.ipc.RpcCallback; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; +import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -50,9 +52,13 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; -@Category({ RegionServerTests.class, MediumTests.class }) +@Category({ RegionServerTests.class, SmallTests.class }) public class TestRpcLogDetails { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRpcLogDetails.class); + private final ClientProtos.Scan scan = ClientProtos.Scan.newBuilder().setStartRow(ByteString.copyFrom(Bytes.toBytes("abc"))) .setStopRow(ByteString.copyFrom(Bytes.toBytes("xyz"))).build(); From c60240a3afbcf05af6ad99df2b5f0717c797071e Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Mon, 6 Nov 2023 09:47:00 -0500 Subject: [PATCH 4/4] simplify RpcLogDetails constructor catch --- .../org/apache/hadoop/hbase/namequeues/RpcLogDetails.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java index c2ea1b4697a0..235d82302d64 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java @@ -72,9 +72,7 @@ public RpcLogDetails(RpcCall rpcCall, Message param, String clientAddress, long this.param = param.newBuilderForType().mergeFrom(param.toByteArray()).build(); } catch (InvalidProtocolBufferException e) { LOG.error("Failed to parse protobuf for message {}", param, e); - if (this.param == null) { - this.param = param; - } + this.param = param; } }