Skip to content

Commit b65cbf5

Browse files
authored
inprocess: Support tracing message sizes guarded by flag (#11629)
1 parent 62f4098 commit b65cbf5

File tree

4 files changed

+92
-55
lines changed

4 files changed

+92
-55
lines changed

Diff for: core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java

+33-12
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,9 @@ public abstract class AbstractTransportTest {
9696

9797
private static final int TIMEOUT_MS = 5000;
9898

99+
protected static final String GRPC_EXPERIMENTAL_SUPPORT_TRACING_MESSAGE_SIZES =
100+
"GRPC_EXPERIMENTAL_SUPPORT_TRACING_MESSAGE_SIZES";
101+
99102
private static final Attributes.Key<String> ADDITIONAL_TRANSPORT_ATTR_KEY =
100103
Attributes.Key.create("additional-attr");
101104

@@ -238,6 +241,13 @@ protected void advanceClock(long offset, TimeUnit unit) {
238241
throw new UnsupportedOperationException();
239242
}
240243

244+
/**
245+
* Returns true if env var is set.
246+
*/
247+
protected static boolean isEnabledSupportTracingMessageSizes() {
248+
return GrpcUtil.getFlag(GRPC_EXPERIMENTAL_SUPPORT_TRACING_MESSAGE_SIZES, false);
249+
}
250+
241251
/**
242252
* Returns the current time, for tests that rely on the clock.
243253
*/
@@ -850,16 +860,21 @@ public void basicStream() throws Exception {
850860
message.close();
851861
assertThat(clientStreamTracer1.nextOutboundEvent())
852862
.matches("outboundMessageSent\\(0, -?[0-9]+, -?[0-9]+\\)");
853-
assertThat(clientStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
854-
assertThat(clientStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
863+
if (isEnabledSupportTracingMessageSizes()) {
864+
assertThat(clientStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
865+
assertThat(clientStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
866+
}
867+
855868
assertThat(serverStreamTracer1.nextInboundEvent()).isEqualTo("inboundMessage(0)");
856869
assertNull("no additional message expected", serverStreamListener.messageQueue.poll());
857870

858871
clientStream.halfClose();
859872
assertTrue(serverStreamListener.awaitHalfClosed(TIMEOUT_MS, TimeUnit.MILLISECONDS));
860873

861-
assertThat(serverStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
862-
assertThat(serverStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
874+
if (isEnabledSupportTracingMessageSizes()) {
875+
assertThat(serverStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
876+
assertThat(serverStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
877+
}
863878
assertThat(serverStreamTracer1.nextInboundEvent())
864879
.matches("inboundMessageRead\\(0, -?[0-9]+, -?[0-9]+\\)");
865880

@@ -890,15 +905,19 @@ public void basicStream() throws Exception {
890905
assertNotNull("message expected", message);
891906
assertThat(serverStreamTracer1.nextOutboundEvent())
892907
.matches("outboundMessageSent\\(0, -?[0-9]+, -?[0-9]+\\)");
893-
assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
894-
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
908+
if (isEnabledSupportTracingMessageSizes()) {
909+
assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
910+
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
911+
}
895912
assertTrue(clientStreamTracer1.getInboundHeaders());
896913
assertThat(clientStreamTracer1.nextInboundEvent()).isEqualTo("inboundMessage(0)");
897914
assertEquals("Hi. Who are you?", methodDescriptor.parseResponse(message));
898915
assertThat(clientStreamTracer1.nextInboundEvent())
899916
.matches("inboundMessageRead\\(0, -?[0-9]+, -?[0-9]+\\)");
900-
assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
901-
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
917+
if (isEnabledSupportTracingMessageSizes()) {
918+
assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
919+
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
920+
}
902921

903922
message.close();
904923
assertNull("no additional message expected", clientStreamListener.messageQueue.poll());
@@ -1258,10 +1277,12 @@ public void onReady() {
12581277
serverStream.close(Status.OK, new Metadata());
12591278
assertTrue(clientStreamTracer1.getOutboundHeaders());
12601279
assertTrue(clientStreamTracer1.getInboundHeaders());
1261-
assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
1262-
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
1263-
assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
1264-
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
1280+
if (isEnabledSupportTracingMessageSizes()) {
1281+
assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
1282+
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
1283+
assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
1284+
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
1285+
}
12651286
assertNull(clientStreamTracer1.getInboundTrailers());
12661287
assertSame(status, clientStreamTracer1.getStatus());
12671288
// There is a race between client cancelling and server closing. The final status seen by the

Diff for: inprocess/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java

+4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static com.google.common.base.Preconditions.checkArgument;
2020
import static com.google.common.base.Preconditions.checkNotNull;
21+
import static io.grpc.inprocess.InProcessTransport.isEnabledSupportTracingMessageSizes;
2122

2223
import com.google.errorprone.annotations.DoNotCall;
2324
import io.grpc.ChannelCredentials;
@@ -118,6 +119,9 @@ public ClientTransportFactory buildClientTransportFactory() {
118119
managedChannelImplBuilder.setStatsRecordStartedRpcs(false);
119120
managedChannelImplBuilder.setStatsRecordFinishedRpcs(false);
120121
managedChannelImplBuilder.setStatsRecordRetryMetrics(false);
122+
if (!isEnabledSupportTracingMessageSizes) {
123+
managedChannelImplBuilder.disableRetry();
124+
}
121125
}
122126

123127
@Internal

Diff for: inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java

+49-39
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@
8282
@ThreadSafe
8383
final class InProcessTransport implements ServerTransport, ConnectionClientTransport {
8484
private static final Logger log = Logger.getLogger(InProcessTransport.class.getName());
85+
static boolean isEnabledSupportTracingMessageSizes =
86+
GrpcUtil.getFlag("GRPC_EXPERIMENTAL_SUPPORT_TRACING_MESSAGE_SIZES", false);
8587

8688
private final InternalLogId logId;
8789
private final SocketAddress address;
@@ -485,22 +487,25 @@ private void clientCancelled(Status status) {
485487

486488
@Override
487489
public void writeMessage(InputStream message) {
488-
long messageLength;
489-
try {
490-
if (assumedMessageSize != -1) {
491-
messageLength = assumedMessageSize;
492-
} else if (message instanceof KnownLength || message instanceof ByteArrayInputStream) {
493-
messageLength = message.available();
494-
} else {
495-
InputStream oldMessage = message;
496-
byte[] payload = ByteStreams.toByteArray(message);
497-
messageLength = payload.length;
498-
message = new ByteArrayInputStream(payload);
499-
oldMessage.close();
490+
long messageLength = 0;
491+
if (isEnabledSupportTracingMessageSizes) {
492+
try {
493+
if (assumedMessageSize != -1) {
494+
messageLength = assumedMessageSize;
495+
} else if (message instanceof KnownLength || message instanceof ByteArrayInputStream) {
496+
messageLength = message.available();
497+
} else {
498+
InputStream oldMessage = message;
499+
byte[] payload = ByteStreams.toByteArray(message);
500+
messageLength = payload.length;
501+
message = new ByteArrayInputStream(payload);
502+
oldMessage.close();
503+
}
504+
} catch (Exception e) {
505+
throw new RuntimeException("Error processing the message length", e);
500506
}
501-
} catch (Exception e) {
502-
throw new RuntimeException("Error processing the message length", e);
503507
}
508+
504509
synchronized (this) {
505510
if (closed) {
506511
return;
@@ -509,11 +514,13 @@ public void writeMessage(InputStream message) {
509514
statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1);
510515
clientStream.statsTraceCtx.inboundMessage(outboundSeqNo);
511516
clientStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);
512-
statsTraceCtx.outboundUncompressedSize(messageLength);
513-
statsTraceCtx.outboundWireSize(messageLength);
514-
// messageLength should be same at receiver's end as no actual wire is involved.
515-
clientStream.statsTraceCtx.inboundUncompressedSize(messageLength);
516-
clientStream.statsTraceCtx.inboundWireSize(messageLength);
517+
if (isEnabledSupportTracingMessageSizes) {
518+
statsTraceCtx.outboundUncompressedSize(messageLength);
519+
statsTraceCtx.outboundWireSize(messageLength);
520+
// messageLength should be same at receiver's end as no actual wire is involved.
521+
clientStream.statsTraceCtx.inboundUncompressedSize(messageLength);
522+
clientStream.statsTraceCtx.inboundWireSize(messageLength);
523+
}
517524
outboundSeqNo++;
518525
StreamListener.MessageProducer producer = new SingleMessageProducer(message);
519526
if (clientRequested > 0) {
@@ -523,7 +530,6 @@ public void writeMessage(InputStream message) {
523530
clientReceiveQueue.add(producer);
524531
}
525532
}
526-
527533
syncContext.drain();
528534
}
529535

@@ -777,21 +783,23 @@ private void serverClosed(Status serverListenerStatus, Status serverTracerStatus
777783

778784
@Override
779785
public void writeMessage(InputStream message) {
780-
long messageLength;
781-
try {
782-
if (assumedMessageSize != -1) {
783-
messageLength = assumedMessageSize;
784-
} else if (message instanceof KnownLength || message instanceof ByteArrayInputStream) {
785-
messageLength = message.available();
786-
} else {
787-
InputStream oldMessage = message;
788-
byte[] payload = ByteStreams.toByteArray(message);
789-
messageLength = payload.length;
790-
message = new ByteArrayInputStream(payload);
791-
oldMessage.close();
786+
long messageLength = 0;
787+
if (isEnabledSupportTracingMessageSizes) {
788+
try {
789+
if (assumedMessageSize != -1) {
790+
messageLength = assumedMessageSize;
791+
} else if (message instanceof KnownLength || message instanceof ByteArrayInputStream) {
792+
messageLength = message.available();
793+
} else {
794+
InputStream oldMessage = message;
795+
byte[] payload = ByteStreams.toByteArray(message);
796+
messageLength = payload.length;
797+
message = new ByteArrayInputStream(payload);
798+
oldMessage.close();
799+
}
800+
} catch (Exception e) {
801+
throw new RuntimeException("Error processing the message length", e);
792802
}
793-
} catch (Exception e) {
794-
throw new RuntimeException("Error processing the message length", e);
795803
}
796804
synchronized (this) {
797805
if (closed) {
@@ -801,11 +809,13 @@ public void writeMessage(InputStream message) {
801809
statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1);
802810
serverStream.statsTraceCtx.inboundMessage(outboundSeqNo);
803811
serverStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);
804-
statsTraceCtx.outboundUncompressedSize(messageLength);
805-
statsTraceCtx.outboundWireSize(messageLength);
806-
// messageLength should be same at receiver's end as no actual wire is involved.
807-
serverStream.statsTraceCtx.inboundUncompressedSize(messageLength);
808-
serverStream.statsTraceCtx.inboundWireSize(messageLength);
812+
if (isEnabledSupportTracingMessageSizes) {
813+
statsTraceCtx.outboundUncompressedSize(messageLength);
814+
statsTraceCtx.outboundWireSize(messageLength);
815+
// messageLength should be same at receiver's end as no actual wire is involved.
816+
serverStream.statsTraceCtx.inboundUncompressedSize(messageLength);
817+
serverStream.statsTraceCtx.inboundWireSize(messageLength);
818+
}
809819
outboundSeqNo++;
810820
StreamListener.MessageProducer producer = new SingleMessageProducer(message);
811821
if (serverRequested > 0) {

Diff for: inprocess/src/test/java/io/grpc/inprocess/InProcessTransportTest.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -234,9 +234,11 @@ public void basicStreamInProcess() throws Exception {
234234

235235
private void assertAssumedMessageSize(
236236
TestStreamTracer streamTracerSender, TestStreamTracer streamTracerReceiver) {
237-
Assert.assertEquals(TEST_MESSAGE_LENGTH, streamTracerSender.getOutboundWireSize());
238-
Assert.assertEquals(TEST_MESSAGE_LENGTH, streamTracerSender.getOutboundUncompressedSize());
239-
Assert.assertEquals(TEST_MESSAGE_LENGTH, streamTracerReceiver.getInboundWireSize());
240-
Assert.assertEquals(TEST_MESSAGE_LENGTH, streamTracerReceiver.getInboundUncompressedSize());
237+
if (isEnabledSupportTracingMessageSizes()) {
238+
Assert.assertEquals(TEST_MESSAGE_LENGTH, streamTracerSender.getOutboundWireSize());
239+
Assert.assertEquals(TEST_MESSAGE_LENGTH, streamTracerSender.getOutboundUncompressedSize());
240+
Assert.assertEquals(TEST_MESSAGE_LENGTH, streamTracerReceiver.getInboundWireSize());
241+
Assert.assertEquals(TEST_MESSAGE_LENGTH, streamTracerReceiver.getInboundUncompressedSize());
242+
}
241243
}
242244
}

0 commit comments

Comments
 (0)