diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java index 994115765e3a..69dcaf417986 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java @@ -33,6 +33,7 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.concurrent.CancellationException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -160,8 +161,10 @@ protected String getDocumentLink(Document doc) { protected abstract void performWorkload(BaseSubscriber baseSubscriber, long i) throws Exception; private boolean shouldContinue(long startTimeMillis, long iterationCount) { + Duration maxDurationTime = configuration.getMaxRunningTimeDuration(); int maxNumberOfOperations = configuration.getNumberOfOperations(); + if (maxDurationTime == null) { return iterationCount < maxNumberOfOperations; } @@ -181,16 +184,18 @@ void run() throws Exception { successMeter = metricsRegistry.meter("#Successful Operations"); failureMeter = metricsRegistry.meter("#Unsuccessful Operations"); + if (configuration.getOperationType() == Configuration.Operation.ReadLatency - || configuration.getOperationType() == Configuration.Operation.WriteLatency) + || configuration.getOperationType() == Configuration.Operation.WriteLatency) { latency = metricsRegistry.timer("Latency"); + } reporter.start(configuration.getPrintingInterval(), TimeUnit.SECONDS); - long startTime = System.currentTimeMillis(); AtomicLong count = new AtomicLong(0); long i; + for ( i = 0; shouldContinue(startTime, i); i++) { BaseSubscriber baseSubscriber = new BaseSubscriber() { @@ -201,7 +206,12 @@ protected void hookOnSubscribe(Subscription subscription) { @Override protected void hookOnNext(T value) { + logger.debug("hookOnNext: {}, count:{}", value, count.get()); + } + @Override + protected void hookOnCancel() { + this.hookOnError(new CancellationException()); } @Override diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncQueryBenchmark.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncQueryBenchmark.java index a28aec4a2841..15371d2ca63f 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncQueryBenchmark.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncQueryBenchmark.java @@ -83,8 +83,8 @@ protected void performWorkload(BaseSubscriber> baseSubscr } else { throw new IllegalArgumentException("Unsupported Operation: " + configuration.getOperationType()); } - concurrencyControlSemaphore.acquire(); + concurrencyControlSemaphore.acquire(); obs.subscribeOn(Schedulers.parallel()).subscribe(baseSubscriber); } } diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/Main.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/Main.java index 9b242b06b19f..81edd063bf8d 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/Main.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/Main.java @@ -65,6 +65,7 @@ public static void main(String[] args) throws Exception { throw new RuntimeException(cfg.getOperationType() + " is not supported"); } + LOGGER.info("Starting {}", cfg.getOperationType()); benchmark.run(); benchmark.shutdown(); @@ -73,6 +74,8 @@ public static void main(String[] args) throws Exception { System.err.println("INVALID Usage: " + e.getMessage()); System.err.println("Try '-help' for more information."); throw e; + } finally { + System.exit(0); } } } diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ReadMyWriteWorkflow.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ReadMyWriteWorkflow.java index b5e231689e12..dabd3e04e546 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ReadMyWriteWorkflow.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ReadMyWriteWorkflow.java @@ -54,14 +54,19 @@ protected void init() { @Override protected void performWorkload(BaseSubscriber baseSubscriber, long i) throws Exception { + Flux obs; boolean readyMyWrite = RandomUtils.nextBoolean(); + if (readyMyWrite) { + // will do a write and immediately upon success will either // do a point read // or single partition query // or cross partition query to find the write. + int j = Math.toIntExact(Math.floorMod(i, 3)); + switch (j) { case 0: // write a random document to cosmodb and update the cache. @@ -78,7 +83,7 @@ protected void performWorkload(BaseSubscriber baseSubscriber, long i) "couldn't find my write in a single partition query!")))); break; case 2: - // write a random document to cosmodb and update the cache. + // write a random document to cosmosdb and update the cache. // then try to query for the document which just was written obs = writeDocument() .flatMap(d -> xPartitionQuery(generateQuery(d)) @@ -90,12 +95,15 @@ protected void performWorkload(BaseSubscriber baseSubscriber, long i) throw new IllegalStateException(); } } else { + // will either do // a write - // a point read for a in memory cached document + // a point read for a in memory cached document4 // or single partition query for a in memory cached document // or cross partition query for a in memory cached document + int j = Math.toIntExact(Math.floorMod(i, 4)); + switch (j) { case 0: // write a random document to cosmosdb and update the cache @@ -124,8 +132,14 @@ protected void performWorkload(BaseSubscriber baseSubscriber, long i) } concurrencyControlSemaphore.acquire(); + logger.debug("concurrencyControlSemaphore: {}", concurrencyControlSemaphore); - obs.subscribeOn(Schedulers.parallel()).subscribe(baseSubscriber); + try { + obs.subscribeOn(Schedulers.parallel()).subscribe(baseSubscriber); + } catch (Throwable error) { + concurrencyControlSemaphore.release(); + logger.error("subscription failed due to ", error); + } } private void populateCache() { diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/resources/log4j.properties b/sdk/cosmos/azure-cosmos-benchmark/src/main/resources/log4j.properties index 557b05edffac..5750c0b8bf36 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/resources/log4j.properties +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/resources/log4j.properties @@ -1,14 +1,29 @@ -# this is the log4j configuration for tests +# This is the log4j configuration for benchmarks -# Set root logger level to DEBUG and its only appender to A1. -log4j.rootLogger=INFO, A1 +log4j.rootLogger=INFO, Console -log4j.category.com.azure.cosmos.internal.directconnectivity.rntbd=WARN -log4j.category.io.netty=INFO -log4j.category.io.reactivex=INFO -# A1 is set to be a ConsoleAppender. -log4j.appender.A1=org.apache.log4j.ConsoleAppender +log4j.category.com.azure.cosmos=INFO +log4j.category.com.azure.cosmos.benchmark=INFO +log4j.category.com.azure.cosmos.internal=INFO +log4j.category.com.aure.cosmos.internal.caches=INFO +log4j.category.com.aure.cosmos.internal.changefeed=INFO +log4j.category.com.azure.cosmos.internal.directconnectivity=INFO +log4j.category.com.azure.cosmos.internal.directconnectivity.rntbd=INFO +log4j.category.com.azure.cosmos.internal.http=INFO +log4j.category.com.azure.cosmos.internal.query=INFO +log4j.category.com.azure.cosmos.internal.query.aggregation=INFO +log4j.category.com.azure.cosmos.internal.query.metrics=INFO +log4j.category.com.azure.cosmos.internal.query.orderbyquery=INFO +log4j.category.com.azure.cosmos.internal.routing=INFO -# A1 uses PatternLayout. -log4j.appender.A1.layout=org.apache.log4j.PatternLayout -log4j.appender.A1.layout.ConversionPattern=%d %5X{pid} [%t] %-5p %c - %m%n +log4j.category.com.azure.cosmos.internal.directconnectivity.RntbdTransportClient=INFO +log4j.category.com.azure.cosmos.internal.directconnectivity.rntbd.RntbdRequestManager=INFO + +log4j.appender.Console=org.apache.log4j.ConsoleAppender +log4j.appender.Console.layout=org.apache.log4j.PatternLayout +log4j.appender.Console.layout.ConversionPattern=%d %5X{pid} [%t] %-5p %c - %m%n + +log4j.appender.LogFile=org.apache.log4j.FileAppender +log4j.appender.LogFile.File=${azure.cosmos.logger.directory}/azure-cosmos-benchmark.log +log4j.appender.LogFile.layout=org.apache.log4j.PatternLayout +log4j.appender.LogFile.layout.ConversionPattern=[%d][%p][${azure.cosmos.hostname}][thread:%t][logger:%c] %m%n diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/RntbdTransportClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/RntbdTransportClient.java index b5ceb775e219..e8fcb01620a8 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/RntbdTransportClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/RntbdTransportClient.java @@ -3,14 +3,14 @@ package com.azure.cosmos.internal.directconnectivity; +import com.azure.cosmos.internal.Configs; +import com.azure.cosmos.internal.RxDocumentServiceRequest; +import com.azure.cosmos.internal.UserAgentContainer; import com.azure.cosmos.internal.directconnectivity.rntbd.RntbdEndpoint; import com.azure.cosmos.internal.directconnectivity.rntbd.RntbdObjectMapper; import com.azure.cosmos.internal.directconnectivity.rntbd.RntbdRequestArgs; import com.azure.cosmos.internal.directconnectivity.rntbd.RntbdRequestRecord; import com.azure.cosmos.internal.directconnectivity.rntbd.RntbdServiceEndpoint; -import com.azure.cosmos.internal.Configs; -import com.azure.cosmos.internal.RxDocumentServiceRequest; -import com.azure.cosmos.internal.UserAgentContainer; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.annotation.JsonSerialize; @@ -26,6 +26,7 @@ import java.io.IOException; import java.net.URI; import java.time.Duration; +import java.util.Iterator; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -70,114 +71,89 @@ public final class RntbdTransportClient extends TransportClient { // endregion - // region Accessors - - public int endpointCount() { - return this.endpointProvider.count(); - } - - public int endpointEvictionCount() { - return this.endpointProvider.evictions(); - } - - public long id() { - return this.id; - } + // region Methods public boolean isClosed() { return this.closed.get(); } - public Tag tag() { - return this.tag; - } - - // endregion - - // region Methods - @Override public void close() { - logger.debug("\n [{}] CLOSE", this); - if (this.closed.compareAndSet(false, true)) { + logger.debug("close {}", this); this.endpointProvider.close(); return; } - logger.debug("\n [{}]\n already closed", this); + logger.debug("already closed {}", this); + } + + public int endpointCount() { + return this.endpointProvider.count(); + } + + public int endpointEvictionCount() { + return this.endpointProvider.evictions(); + } + + public long id() { + return this.id; } @Override - public Mono invokeStoreAsync(final URI physicalAddress, final RxDocumentServiceRequest request) { + public Mono invokeStoreAsync(final URI address, final RxDocumentServiceRequest request) { - checkNotNull(physicalAddress, "physicalAddress"); - checkNotNull(request, "request"); + logger.debug("RntbdTransportClient.invokeStoreAsync({}, {})", address, request); + + checkNotNull(address, "expected non-null address"); + checkNotNull(request, "expected non-null request"); this.throwIfClosed(); - final RntbdRequestArgs requestArgs = new RntbdRequestArgs(request, physicalAddress); + final RntbdRequestArgs requestArgs = new RntbdRequestArgs(request, address); requestArgs.traceOperation(logger, null, "invokeStoreAsync"); - final RntbdEndpoint endpoint = this.endpointProvider.get(physicalAddress); - final RntbdRequestRecord requestRecord = endpoint.request(requestArgs); + final RntbdEndpoint endpoint = this.endpointProvider.get(address); + final RntbdRequestRecord record = endpoint.request(requestArgs); + + logger.debug("RntbdTransportClient.invokeStoreAsync({}, {}): {}", address, request, record); - return Mono.fromFuture(requestRecord).doFinally(signal -> { - if (signal == SignalType.CANCEL) { - requestRecord.cancel(false); + return Mono.fromFuture(record).doFinally(signalType -> { + logger.debug("SignalType.{} received from reactor: {\n endpoint: {},\n record: {}\n}", + signalType.name(), + endpoint, + record); + if (signalType == SignalType.CANCEL) { + record.stage(RntbdRequestRecord.Stage.CANCELLED_BY_CLIENT); } }); } + public Tag tag() { + return this.tag; + } + @Override public String toString() { return RntbdObjectMapper.toString(this); } - private void throwIfClosed() { - checkState(!this.closed.get(), "%s is closed", this); + private static Tag tag(long id) { + return Tag.of(TAG_NAME, Strings.padStart(Long.toHexString(id).toUpperCase(), 4, '0')); } // endregion // region Privates - private static Tag tag(long id) { - return Tag.of(TAG_NAME, Strings.padStart(Long.toHexString(id).toUpperCase(), 4, '0')); + private void throwIfClosed() { + checkState(!this.closed.get(), "%s is closed", this); } // endregion // region Types - static final class JsonSerializer extends StdSerializer { - - public JsonSerializer() { - super(RntbdTransportClient.class); - } - - @Override - public void serialize(RntbdTransportClient value, JsonGenerator generator, SerializerProvider provider) throws IOException { - - generator.writeStartObject(); - generator.writeNumberField("id", value.id()); - generator.writeBooleanField("isClosed", value.isClosed()); - generator.writeObjectField("configuration", value.endpointProvider.config()); - generator.writeArrayFieldStart("serviceEndpoints"); - - value.endpointProvider.list().forEach(endpoint -> { - try { - generator.writeObject(endpoint); - } catch (IOException error) { - logger.error("failed to serialize instance {} due to:", value.id(), error); - } - }); - - generator.writeEndArray(); - generator.writeEndObject(); - } - } - public static final class Options { // region Fields @@ -192,6 +168,7 @@ public static final class Options { private final int maxRequestsPerChannel; private final int partitionCount; private final Duration receiveHangDetectionTime; + private final Duration requestExpiryInterval; private final Duration requestTimeout; private final Duration sendHangDetectionTime; private final Duration shutdownTimeout; @@ -212,6 +189,7 @@ private Options(Builder builder) { this.maxRequestsPerChannel = builder.maxRequestsPerChannel; this.partitionCount = builder.partitionCount; this.receiveHangDetectionTime = builder.receiveHangDetectionTime; + this.requestExpiryInterval = builder.requestExpiryInterval; this.requestTimeout = builder.requestTimeout; this.sendHangDetectionTime = builder.sendHangDetectionTime; this.shutdownTimeout = builder.shutdownTimeout; @@ -262,6 +240,10 @@ public Duration receiveHangDetectionTime() { return this.receiveHangDetectionTime; } + public Duration requestExpiryInterval() { + return this.requestExpiryInterval; + } + public Duration requestTimeout() { return this.requestTimeout; } @@ -274,17 +256,17 @@ public Duration shutdownTimeout() { return this.shutdownTimeout; } - public UserAgentContainer userAgent() { - return this.userAgent; + @Override + public String toString() { + return RntbdObjectMapper.toJson(this); } // endregion // region Methods - @Override - public String toString() { - return RntbdObjectMapper.toJson(this); + public UserAgentContainer userAgent() { + return this.userAgent; } // endregion @@ -297,6 +279,7 @@ public static class Builder { private static final UserAgentContainer DEFAULT_USER_AGENT_CONTAINER = new UserAgentContainer(); private static final Duration FIFTEEN_SECONDS = Duration.ofSeconds(15L); + private static final Duration FIVE_SECONDS =Duration.ofSeconds(5L); private static final Duration SEVENTY_SECONDS = Duration.ofSeconds(70L); private static final Duration SIXTY_FIVE_SECONDS = Duration.ofSeconds(65L); private static final Duration TEN_SECONDS = Duration.ofSeconds(10L); @@ -311,6 +294,7 @@ public static class Builder { private int maxRequestsPerChannel = 30; private int partitionCount = 1; private Duration receiveHangDetectionTime = SIXTY_FIVE_SECONDS; + private Duration requestExpiryInterval = FIVE_SECONDS; private Duration requestTimeout; private Duration sendHangDetectionTime = TEN_SECONDS; private Duration shutdownTimeout = FIFTEEN_SECONDS; @@ -332,104 +316,117 @@ public Builder(int requestTimeoutInSeconds) { // region Methods - public Options build() { - checkState(this.bufferPageSize <= this.maxBufferCapacity, "bufferPageSize (%s) > maxBufferCapacity (%s)", - this.bufferPageSize, this.maxBufferCapacity - ); - return new Options(this); - } - public Builder bufferPageSize(final int value) { - checkArgument(value >= 4096 && (value & (value - 1)) == 0, "value: %s", value); + checkArgument(value >= 4096 && (value & (value - 1)) == 0, + "expected value to be a power of 2 >= 4096, not %s", + value); this.bufferPageSize = value; return this; } + public Options build() { + checkState(this.bufferPageSize <= this.maxBufferCapacity, + "expected bufferPageSize (%s) <= maxBufferCapacity (%s)", + this.bufferPageSize, + this.maxBufferCapacity); + return new Options(this); + } + public Builder certificateHostNameOverride(final String value) { this.certificateHostNameOverride = value; return this; } public Builder connectionTimeout(final Duration value) { - checkArgument(value == null || value.compareTo(Duration.ZERO) > 0, "value: %s", value); + checkArgument(value == null || value.compareTo(Duration.ZERO) > 0, + "expected positive value, not %s", + value); this.connectionTimeout = value; return this; } public Builder idleChannelTimeout(final Duration value) { - checkNotNull(value, "value: null"); + checkNotNull(value, "expected non-null value"); this.idleChannelTimeout = value; return this; } public Builder idleEndpointTimeout(final Duration value) { - checkArgument(value != null && value.compareTo(Duration.ZERO) > 0, "value: %s", value); + checkArgument(value != null && value.compareTo(Duration.ZERO) > 0, + "expected positive value, not %s", + value); this.idleEndpointTimeout = value; return this; } public Builder maxBufferCapacity(final int value) { - checkArgument(value > 0 && (value & (value - 1)) == 0, "value: %s", value); + checkArgument(value > 0 && (value & (value - 1)) == 0, + "expected positive value, not %s", + value); this.maxBufferCapacity = value; return this; } public Builder maxChannelsPerEndpoint(final int value) { - checkArgument(value > 0, "value: %s", value); + checkArgument(value > 0, "expected positive value, not %s", value); this.maxChannelsPerEndpoint = value; return this; } public Builder maxRequestsPerChannel(final int value) { - checkArgument(value > 0, "value: %s", value); + checkArgument(value > 0, "expected positive value, not %s", value); this.maxRequestsPerChannel = value; return this; } public Builder partitionCount(final int value) { - checkArgument(value > 0, "value: %s", value); + checkArgument(value > 0, "expected positive value, not %s", value); this.partitionCount = value; return this; } public Builder receiveHangDetectionTime(final Duration value) { - - checkNotNull(value, "value: null"); - checkArgument(value.compareTo(Duration.ZERO) > 0, "value: %s", value); - + checkArgument(value != null && value.compareTo(Duration.ZERO) > 0, + "expected positive value, not %s", + value); this.receiveHangDetectionTime = value; return this; } - public Builder requestTimeout(final Duration value) { - - checkNotNull(value, "value: null"); - checkArgument(value.compareTo(Duration.ZERO) > 0, "value: %s", value); + public Builder requestExpiryInterval(final Duration value) { + checkArgument(value != null && value.compareTo(Duration.ZERO) > 0, + "expected positive value, not %s", + value); + this.requestExpiryInterval = value; + return this; + } + public Builder requestTimeout(final Duration value) { + checkArgument(value != null && value.compareTo(Duration.ZERO) > 0, + "expected positive value, not %s", + value); this.requestTimeout = value; return this; } public Builder sendHangDetectionTime(final Duration value) { - - checkNotNull(value, "value: null"); - checkArgument(value.compareTo(Duration.ZERO) > 0, "value: %s", value); - + checkArgument(value != null && value.compareTo(Duration.ZERO) > 0, + "expected positive value, not %s", + value); this.sendHangDetectionTime = value; return this; } public Builder shutdownTimeout(final Duration value) { - - checkNotNull(value, "value: null"); - checkArgument(value.compareTo(Duration.ZERO) > 0, "value: %s", value); - + checkArgument(value != null && value.compareTo(Duration.ZERO) > 0, + "expected positive value, not %s", + value); this.shutdownTimeout = value; return this; } public Builder userAgent(final UserAgentContainer value) { - checkNotNull(value, "value: null"); + checkNotNull(value, "expected non-null value"); this.userAgent = value; return this; } @@ -440,5 +437,38 @@ public Builder userAgent(final UserAgentContainer value) { // endregion } + static final class JsonSerializer extends StdSerializer { + + public JsonSerializer() { + super(RntbdTransportClient.class); + } + + @Override + public void serialize( + + final RntbdTransportClient value, + final JsonGenerator generator, + final SerializerProvider provider + + ) throws IOException { + + generator.writeStartObject(); + generator.writeNumberField("id", value.id()); + generator.writeBooleanField("isClosed", value.isClosed()); + generator.writeObjectField("configuration", value.endpointProvider.config()); + generator.writeObjectFieldStart("serviceEndpoints"); + generator.writeNumberField("count", value.endpointCount()); + generator.writeArrayFieldStart("items"); + + for (final Iterator iterator = value.endpointProvider.list().iterator(); iterator.hasNext(); ) { + generator.writeObject(iterator.next()); + } + + generator.writeEndArray(); + generator.writeEndObject(); + generator.writeEndObject(); + } + } + // endregion } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/StoreReader.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/StoreReader.java index 1d2fcba55bc2..4004d983bc58 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/StoreReader.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/StoreReader.java @@ -7,12 +7,12 @@ import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.CosmosClientException; import com.azure.cosmos.GoneException; -import com.azure.cosmos.internal.ISessionContainer; import com.azure.cosmos.InternalServerErrorException; import com.azure.cosmos.PartitionIsMigratingException; import com.azure.cosmos.PartitionKeyRangeGoneException; import com.azure.cosmos.PartitionKeyRangeIsSplittingException; import com.azure.cosmos.internal.HttpConstants; +import com.azure.cosmos.internal.ISessionContainer; import com.azure.cosmos.internal.ISessionToken; import com.azure.cosmos.internal.Integers; import com.azure.cosmos.internal.MutableVolatile; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdClientChannelHandler.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdClientChannelHandler.java index 9a2a0d1b775a..6d72ce01c43f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdClientChannelHandler.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdClientChannelHandler.java @@ -3,6 +3,7 @@ package com.azure.cosmos.internal.directconnectivity.rntbd; +import com.azure.cosmos.internal.directconnectivity.rntbd.RntbdEndpoint.Config; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; @@ -24,12 +25,12 @@ public class RntbdClientChannelHandler extends ChannelInitializer imple private static final AttributeKey REQUEST_MANAGER = AttributeKey.newInstance("requestManager"); private static final Logger logger = LoggerFactory.getLogger(RntbdClientChannelHandler.class); + private final Config config; private final ChannelHealthChecker healthChecker; - private final RntbdEndpoint.Config config; - RntbdClientChannelHandler(final RntbdEndpoint.Config config, final ChannelHealthChecker healthChecker) { - checkNotNull(healthChecker, "healthChecker"); - checkNotNull(config, "config"); + RntbdClientChannelHandler(final Config config, final ChannelHealthChecker healthChecker) { + checkNotNull(healthChecker, "expected non-null healthChecker"); + checkNotNull(config, "expected non-null config"); this.healthChecker = healthChecker; this.config = config; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java index b428611e24df..60a9b2705857 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java @@ -65,7 +65,7 @@ public final class RntbdClientChannelHealthChecker implements ChannelHealthCheck // region Constructors - public RntbdClientChannelHealthChecker(final RntbdEndpoint.Config config) { + public RntbdClientChannelHealthChecker(final Config config) { checkNotNull(config, "config: null"); @@ -96,13 +96,13 @@ public long writeDelayLimit() { public Future isHealthy(final Channel channel) { - checkNotNull(channel, "channel: null"); + checkNotNull(channel, "expected non-null channel"); final RntbdRequestManager requestManager = channel.pipeline().get(RntbdRequestManager.class); final Promise promise = channel.eventLoop().newPromise(); if (requestManager == null) { - RntbdReporter.reportIssueUnless(logger, !channel.isActive(), channel, "active with no request manager"); + reportIssueUnless(logger, !channel.isActive(), channel, "active with no request manager"); return promise.setSuccess(Boolean.FALSE); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdClientChannelPool.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdClientChannelPool.java index bc494ac2c530..e52bac38c284 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdClientChannelPool.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdClientChannelPool.java @@ -441,7 +441,7 @@ private boolean isServiceableOrInactiveChannel(final Channel channel) { final RntbdRequestManager requestManager = channel.pipeline().get(RntbdRequestManager.class); if (requestManager == null) { - RntbdReporter.reportIssueUnless(logger, !channel.isActive(), channel, "active with no request manager"); + reportIssueUnless(logger, !channel.isActive(), channel, "active with no request manager"); return true; // inactive } @@ -562,7 +562,7 @@ public void operationComplete(Future future) { if (completed.isSuccess()) { - RntbdReporter.reportIssueUnless(logger, this.acquired && requestManager.hasRntbdContext(), + reportIssueUnless(logger, this.acquired && requestManager.hasRntbdContext(), channel,"acquired: {}, rntbdContext: {}", this.acquired, requestManager.rntbdContext()); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdMetrics.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdMetrics.java index 7bff70192636..b6dd5ad9a3fc 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdMetrics.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdMetrics.java @@ -24,6 +24,7 @@ import io.micrometer.core.instrument.util.HierarchicalNameMapper; import io.micrometer.core.lang.Nullable; +import javax.annotation.Nonnull; import java.util.concurrent.TimeUnit; @SuppressWarnings("UnstableApiUsage") @@ -148,9 +149,9 @@ public String prefix() { consoleLoggingRegistry = new DropwizardMeterRegistry(dropwizardConfig, dropwizardRegistry, HierarchicalNameMapper.DEFAULT, Clock.SYSTEM) { @Override - @Nullable + @Nonnull protected Double nullGaugeValue() { - return null; + return Double.NaN; } }; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdObjectMapper.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdObjectMapper.java index dc4c461d6a49..66ccbf51dc94 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdObjectMapper.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdObjectMapper.java @@ -11,7 +11,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.ser.PropertyFilter; import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider; -import com.google.common.base.Strings; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; import io.netty.handler.codec.CorruptedFrameException; @@ -23,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Strings.lenientFormat; public final class RntbdObjectMapper { @@ -39,9 +39,9 @@ public static String toJson(final Object value) { try { return objectWriter.writeValueAsString(value); } catch (final JsonProcessingException error) { - logger.error("could not convert {} value to JSON due to:", value.getClass(), error); + logger.debug("could not convert {} value to JSON due to:", value.getClass(), error); try { - return Strings.lenientFormat("{\"error\":%s", objectWriter.writeValueAsString(error.toString())); + return lenientFormat("{\"error\":%s}", objectWriter.writeValueAsString(error.toString())); } catch (final JsonProcessingException exception) { return "null"; } @@ -50,7 +50,7 @@ public static String toJson(final Object value) { public static String toString(final Object value) { final String name = simpleClassNames.computeIfAbsent(value.getClass(), Class::getSimpleName); - return Strings.lenientFormat("%s(%s)", name, toJson(value)); + return lenientFormat("%s(%s)", name, toJson(value)); } public static ObjectWriter writer() { @@ -77,7 +77,7 @@ static ObjectNode readTree(final ByteBuf in) { return (ObjectNode)node; } - final String cause = Strings.lenientFormat("Expected %s, not %s", JsonNodeType.OBJECT, node.getNodeType()); + final String cause = lenientFormat("Expected %s, not %s", JsonNodeType.OBJECT, node.getNodeType()); throw new CorruptedFrameException(cause); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdReporter.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdReporter.java index 4057bacc4bc9..ec47719693ef 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdReporter.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdReporter.java @@ -47,16 +47,16 @@ public static void reportIssueUnless( private static void doReportIssue(Logger logger, Object subject, String format, Object[] arguments) { FormattingTuple formattingTuple = MessageFormatter.arrayFormat(format, arguments); - StackTraceElement[] stackTraceElements = new Exception().getStackTrace(); + StackTraceElement[] stackTrace = new Exception().getStackTrace(); Throwable throwable = formattingTuple.getThrowable(); if (throwable == null) { logger.error("Report this {} issue to ensure it is addressed:\n[{}]\n[{}]\n[{}]", - codeSource, subject, stackTraceElements[2], formattingTuple.getMessage() + codeSource, subject, stackTrace[2], formattingTuple.getMessage() ); } else { logger.error("Report this {} issue to ensure it is addressed:\n[{}]\n[{}]\n[{}{}]", - codeSource, subject, stackTraceElements[2], formattingTuple.getMessage(), + codeSource, subject, stackTrace[2], formattingTuple.getMessage(), ExceptionUtils.getStackTrace(throwable) ); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdRequest.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdRequest.java index 6b815796f90a..8af3e272ee5f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdRequest.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdRequest.java @@ -38,7 +38,7 @@ public UUID getActivityId() { @JsonIgnore @SuppressWarnings("unchecked") public T getHeader(final RntbdRequestHeader header) { - return (T)this.headers.get(header).getValue(); + return (T) this.headers.get(header).getValue(); } public Long getTransportRequestId() { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdRequestDecoder.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdRequestDecoder.java index 8a73129fae3a..aae971ce84d4 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdRequestDecoder.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdRequestDecoder.java @@ -22,7 +22,7 @@ public void channelRead(final ChannelHandlerContext context, final Object messag if (message instanceof ByteBuf) { - final ByteBuf in = (ByteBuf)message; + final ByteBuf in = (ByteBuf) message; final int resourceOperationType = in.getInt(in.readerIndex() + Integer.BYTES); if (resourceOperationType != 0) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdRequestEncoder.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdRequestEncoder.java index e8873dab893a..cde7b98a93d0 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdRequestEncoder.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdRequestEncoder.java @@ -39,7 +39,7 @@ public boolean acceptOutboundMessage(final Object message) { @Override protected void encode(final ChannelHandlerContext context, final Object message, final ByteBuf out) throws Exception { - final RntbdRequest request = RntbdRequest.from((RntbdRequestArgs)message); + final RntbdRequest request = RntbdRequest.from((RntbdRequestArgs) message); final int start = out.writerIndex(); try { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdRequestManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdRequestManager.java index cc6b00dd6c6b..2da46db269dd 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdRequestManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdRequestManager.java @@ -26,7 +26,6 @@ import com.azure.cosmos.ServiceUnavailableException; import com.azure.cosmos.UnauthorizedException; import com.azure.cosmos.internal.directconnectivity.StoreResponse; -import com.google.common.base.Strings; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelException; @@ -40,6 +39,7 @@ import io.netty.channel.CoalescingBufferQueue; import io.netty.channel.EventLoop; import io.netty.channel.pool.ChannelHealthChecker; +import io.netty.handler.codec.CorruptedFrameException; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.ssl.SslHandler; import io.netty.handler.timeout.IdleStateEvent; @@ -66,6 +66,7 @@ import static com.azure.cosmos.internal.directconnectivity.rntbd.RntbdConstants.RntbdResponseHeader; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Strings.lenientFormat; public final class RntbdRequestManager implements ChannelHandler, ChannelInboundHandler, ChannelOutboundHandler { @@ -87,7 +88,7 @@ public final class RntbdRequestManager implements ChannelHandler, ChannelInbound private final ChannelHealthChecker healthChecker; private final int pendingRequestLimit; private final ConcurrentHashMap pendingRequests; - private final RntbdClientChannelHealthChecker.Timestamps timestamps = new RntbdClientChannelHealthChecker.Timestamps(); + private final Timestamps timestamps = new Timestamps(); private boolean closingExceptionally = false; private CoalescingBufferQueue pendingWrites; @@ -171,7 +172,9 @@ public void channelRead(final ChannelHandlerContext context, final Object messag if (message instanceof RntbdResponse) { try { - this.messageReceived(context, (RntbdResponse)message); + this.messageReceived(context, (RntbdResponse) message); + } catch (CorruptedFrameException error) { + this.exceptionCaught(context, error); } catch (Throwable throwable) { reportIssue(context, "{} ", message, throwable); this.exceptionCaught(context, throwable); @@ -180,17 +183,17 @@ public void channelRead(final ChannelHandlerContext context, final Object messag } else { final IllegalStateException error = new IllegalStateException( - Strings.lenientFormat("expected message of %s, not %s: %s", - RntbdResponse.class, message.getClass(), message - ) - ); + lenientFormat("expected message of %s, not %s: %s", + RntbdResponse.class, + message.getClass(), + message)); reportIssue(context, "", error); this.exceptionCaught(context, error); } } finally { if (message instanceof ReferenceCounted) { - boolean released = ((ReferenceCounted)message).release(); + boolean released = ((ReferenceCounted) message).release(); reportIssueUnless(released, context, "failed to release message: {}", message); } } @@ -246,7 +249,7 @@ public void channelUnregistered(final ChannelHandlerContext context) { if (!this.closingExceptionally) { this.completeAllPendingRequestsExceptionally(context, ON_CHANNEL_UNREGISTERED); } else { - logger.warn("{} channelUnregistered exceptionally", context); + logger.debug("{} channelUnregistered exceptionally", context); } context.fireChannelUnregistered(); @@ -288,7 +291,7 @@ public void exceptionCaught(final ChannelHandlerContext context, final Throwable if (!this.closingExceptionally) { this.completeAllPendingRequestsExceptionally(context, cause); - logger.warn("{} closing due to:", context, cause); + logger.debug("{} closing due to:", context, cause); context.flush().close(); } } @@ -329,12 +332,12 @@ public void userEventTriggered(final ChannelHandlerContext context, final Object return; } if (event instanceof RntbdContext) { - this.contextFuture.complete((RntbdContext)event); + this.contextFuture.complete((RntbdContext) event); this.removeContextNegotiatorAndFlushPendingWrites(context); return; } if (event instanceof RntbdContextException) { - this.contextFuture.completeExceptionally((RntbdContextException)event); + this.contextFuture.completeExceptionally((RntbdContextException) event); context.pipeline().flush().close(); return; } @@ -377,7 +380,7 @@ public void close(final ChannelHandlerContext context, final ChannelPromise prom if (!this.closingExceptionally) { this.completeAllPendingRequestsExceptionally(context, ON_CLOSE); } else { - logger.warn("{} closed exceptionally", context); + logger.debug("{} closed exceptionally", context); } final SslHandler sslHandler = context.pipeline().get(SslHandler.class); @@ -423,7 +426,7 @@ public void deregister(final ChannelHandlerContext context, final ChannelPromise if (!this.closingExceptionally) { this.completeAllPendingRequestsExceptionally(context, ON_DEREGISTER); } else { - logger.warn("{} deregistered exceptionally", context); + logger.debug("{} deregistered exceptionally", context); } context.deregister(promise); @@ -478,18 +481,19 @@ public void read(final ChannelHandlerContext context) { @Override public void write(final ChannelHandlerContext context, final Object message, final ChannelPromise promise) { - // TODO: DANOBLE: Ensure that all write errors are reported with a root cause of type EncoderException - // Requires a full scan of the rntbd code - this.traceOperation(context, "write", message); if (message instanceof RntbdRequestRecord) { + RntbdRequestRecord record = (RntbdRequestRecord) message; this.timestamps.channelWriteAttempted(); - context.write(this.addPendingRequestRecord(context, (RntbdRequestRecord)message), promise).addListener(completed -> { + context.writeAndFlush(this.addPendingRequestRecord(context, record), promise).addListener(completed -> { if (completed.isSuccess()) { + record.stage(RntbdRequestRecord.Stage.SENT); this.timestamps.channelWriteCompleted(); + } else { + record.stage(RntbdRequestRecord.Stage.UNSENT); } }); @@ -498,7 +502,7 @@ public void write(final ChannelHandlerContext context, final Object message, fin if (message == RntbdHealthCheckRequest.MESSAGE) { - context.write(RntbdHealthCheckRequest.MESSAGE, promise).addListener(completed -> { + context.writeAndFlush(RntbdHealthCheckRequest.MESSAGE, promise).addListener(completed -> { if (completed.isSuccess()) { this.timestamps.channelPingCompleted(); } @@ -507,7 +511,7 @@ public void write(final ChannelHandlerContext context, final Object message, fin return; } - final IllegalStateException error = new IllegalStateException(Strings.lenientFormat("message of %s: %s", message.getClass(), message)); + final IllegalStateException error = new IllegalStateException(lenientFormat("message of %s: %s", message.getClass(), message)); reportIssue(context, "", error); this.exceptionCaught(context, error); } @@ -545,8 +549,8 @@ void pendWrite(final ByteBuf out, final ChannelPromise promise) { this.pendingWrites.add(out, promise); } - RntbdClientChannelHealthChecker.Timestamps snapshotTimestamps() { - return new RntbdClientChannelHealthChecker.Timestamps(this.timestamps); + Timestamps snapshotTimestamps() { + return new Timestamps(this.timestamps); } // endregion @@ -557,16 +561,13 @@ private RntbdRequestArgs addPendingRequestRecord(final ChannelHandlerContext con return this.pendingRequests.compute(record.transportRequestId(), (id, current) -> { - boolean predicate = current == null; - String format = "id: {}, current: {}, request: {}"; - - reportIssueUnless(predicate, context, format, record); + reportIssueUnless(current == null, context, "id: {}, current: {}, request: {}", record); + record.stage(RntbdRequestRecord.Stage.QUEUED); final Timeout pendingRequestTimeout = record.newTimeout(timeout -> { // We don't wish to complete on the timeout thread, but rather on a thread doled out by our executor - - EventExecutor executor = context.executor(); + final EventExecutor executor = context.executor(); if (executor.inEventLoop()) { record.expire(); @@ -575,17 +576,21 @@ private RntbdRequestArgs addPendingRequestRecord(final ChannelHandlerContext con } }); + boolean done = record.isDone(); + record.whenComplete((response, error) -> { this.pendingRequests.remove(id); pendingRequestTimeout.cancel(); }); - return record; + return record.stage(RntbdRequestRecord.Stage.QUEUED); }).args(); } - private void completeAllPendingRequestsExceptionally(final ChannelHandlerContext context, final Throwable throwable) { + private void completeAllPendingRequestsExceptionally( + final ChannelHandlerContext context, final Throwable throwable + ) { reportIssueUnless(!this.closingExceptionally, context, "", throwable); this.closingExceptionally = true; @@ -595,81 +600,82 @@ private void completeAllPendingRequestsExceptionally(final ChannelHandlerContext this.pendingWrites.releaseAndFailAll(context, throwable); } - if (!this.pendingRequests.isEmpty()) { - - if (!this.contextRequestFuture.isDone()) { - this.contextRequestFuture.completeExceptionally(throwable); - } - - if (!this.contextFuture.isDone()) { - this.contextFuture.completeExceptionally(throwable); - } - - final int count = this.pendingRequests.size(); - Exception contextRequestException = null; - String phrase = null; + if (this.pendingRequests.isEmpty()) { + return; + } - if (this.contextRequestFuture.isCompletedExceptionally()) { + if (!this.contextRequestFuture.isDone()) { + this.contextRequestFuture.completeExceptionally(throwable); + } - try { - this.contextRequestFuture.get(); - } catch (final CancellationException error) { - phrase = "RNTBD context request write cancelled"; - contextRequestException = error; - } catch (final Exception error) { - phrase = "RNTBD context request write failed"; - contextRequestException = error; - } catch (final Throwable error) { - phrase = "RNTBD context request write failed"; - contextRequestException = new ChannelException(error); - } + if (!this.contextFuture.isDone()) { + this.contextFuture.completeExceptionally(throwable); + } - } else if (this.contextFuture.isCompletedExceptionally()) { + final int count = this.pendingRequests.size(); + Exception contextRequestException = null; + String phrase = null; + + if (this.contextRequestFuture.isCompletedExceptionally()) { + + try { + this.contextRequestFuture.get(); + } catch (final CancellationException error) { + phrase = "RNTBD context request write cancelled"; + contextRequestException = error; + } catch (final Exception error) { + phrase = "RNTBD context request write failed"; + contextRequestException = error; + } catch (final Throwable error) { + phrase = "RNTBD context request write failed"; + contextRequestException = new ChannelException(error); + } - try { - this.contextFuture.get(); - } catch (final CancellationException error) { - phrase = "RNTBD context request read cancelled"; - contextRequestException = error; - } catch (final Exception error) { - phrase = "RNTBD context request read failed"; - contextRequestException = error; - } catch (final Throwable error) { - phrase = "RNTBD context request read failed"; - contextRequestException = new ChannelException(error); - } + } else if (this.contextFuture.isCompletedExceptionally()) { + + try { + this.contextFuture.get(); + } catch (final CancellationException error) { + phrase = "RNTBD context request read cancelled"; + contextRequestException = error; + } catch (final Exception error) { + phrase = "RNTBD context request read failed"; + contextRequestException = error; + } catch (final Throwable error) { + phrase = "RNTBD context request read failed"; + contextRequestException = new ChannelException(error); + } - } else { + } else { - phrase = "closed exceptionally"; - } + phrase = "closed exceptionally"; + } - final String message = Strings.lenientFormat("%s %s with %s pending requests", context, phrase, count); - final Exception cause; + final String message = lenientFormat("%s %s with %s pending requests", context, phrase, count); + final Exception cause; - if (throwable instanceof ClosedChannelException) { + if (throwable instanceof ClosedChannelException) { - cause = contextRequestException == null - ? (ClosedChannelException)throwable - : contextRequestException; + cause = contextRequestException == null + ? (ClosedChannelException) throwable + : contextRequestException; - } else { + } else { - cause = throwable instanceof Exception - ? (Exception)throwable - : new ChannelException(throwable); - } + cause = throwable instanceof Exception + ? (Exception) throwable + : new ChannelException(throwable); + } - for (RntbdRequestRecord record : this.pendingRequests.values()) { + for (RntbdRequestRecord record : this.pendingRequests.values()) { - final Map requestHeaders = record.args().serviceRequest().getHeaders(); - final String requestUri = record.args().physicalAddress().toString(); + final Map requestHeaders = record.args().serviceRequest().getHeaders(); + final String requestUri = record.args().physicalAddress().toString(); - final GoneException error = new GoneException(message, cause, (Map)null, requestUri); - BridgeInternal.setRequestHeaders(error, requestHeaders); + final GoneException error = new GoneException(message, cause, (Map) null, requestUri); + BridgeInternal.setRequestHeaders(error, requestHeaders); - record.completeExceptionally(error); - } + record.completeExceptionally(error); } } @@ -684,14 +690,14 @@ private void messageReceived(final ChannelHandlerContext context, final RntbdRes final Long transportRequestId = response.getTransportRequestId(); if (transportRequestId == null) { - reportIssue(context, "response ignored because its transport request identifier is missing: {}", response); + reportIssue(context, "response ignored because its transportRequestId is missing: {}", response); return; } - final RntbdRequestRecord pendingRequest = this.pendingRequests.get(transportRequestId); + final RntbdRequestRecord requestRecord = this.pendingRequests.get(transportRequestId); - if (pendingRequest == null) { - logger.warn("{} response ignored because there is no matching pending request: {}", context, response); + if (requestRecord == null) { + logger.debug("response {} ignored because its requestRecord is missing: {}", transportRequestId, response); return; } @@ -701,7 +707,7 @@ private void messageReceived(final ChannelHandlerContext context, final RntbdRes if (HttpResponseStatus.OK.code() <= status.code() && status.code() < HttpResponseStatus.MULTIPLE_CHOICES.code()) { final StoreResponse storeResponse = response.toStoreResponse(this.contextFuture.getNow(null)); - pendingRequest.complete(storeResponse); + requestRecord.complete(storeResponse); } else { @@ -814,7 +820,7 @@ private void messageReceived(final ChannelHandlerContext context, final RntbdRes break; } - pendingRequest.completeExceptionally(cause); + requestRecord.completeExceptionally(cause); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdRequestRecord.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdRequestRecord.java index db4645fe6916..9f1f6328cd43 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdRequestRecord.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdRequestRecord.java @@ -6,27 +6,42 @@ import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.RequestTimeoutException; import com.azure.cosmos.internal.directconnectivity.StoreResponse; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; import io.micrometer.core.instrument.Timer; import io.netty.util.Timeout; import io.netty.util.TimerTask; +import java.io.IOException; import java.time.Duration; import java.util.UUID; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import static com.google.common.base.Preconditions.checkNotNull; +@JsonSerialize(using = RntbdRequestRecord.JsonSerializer.class) public final class RntbdRequestRecord extends CompletableFuture { + private static final AtomicReferenceFieldUpdater + stateUpdater = AtomicReferenceFieldUpdater.newUpdater(RntbdRequestRecord.class, Stage.class, "stage"); + private final RntbdRequestArgs args; private final RntbdRequestTimer timer; + private volatile Stage stage; public RntbdRequestRecord(final RntbdRequestArgs args, final RntbdRequestTimer timer) { checkNotNull(args, "args"); checkNotNull(timer, "timer"); + this.stage = Stage.CREATED; this.args = args; this.timer = timer; } @@ -45,41 +60,103 @@ public long creationTime() { return this.args.creationTime(); } + public boolean expire() { + final RequestTimeoutException error = new RequestTimeoutException(this.toString(), this.args.physicalAddress()); + BridgeInternal.setRequestHeaders(error, this.args.serviceRequest().getHeaders()); + return this.completeExceptionally(error); + } + public Duration lifetime() { return this.args.lifetime(); } - public long transportRequestId() { - return this.args.transportRequestId(); + public Timeout newTimeout(final TimerTask task) { + return this.timer.newTimeout(task); } - // endregion - - // region Methods - - public boolean expire() { - - final long timeoutInterval = this.timer.getRequestTimeout(TimeUnit.MILLISECONDS); - final String message = String.format("Request timeout interval (%,d ms) elapsed", timeoutInterval); - final RequestTimeoutException error = new RequestTimeoutException(message, this.args.physicalAddress()); + @JsonProperty + public Stage stage() { + return stateUpdater.get(this); + } - BridgeInternal.setRequestHeaders(error, this.args.serviceRequest().getHeaders()); + public RntbdRequestRecord stage(Stage value) { + stateUpdater.set(this, value); + return this; + } - return this.completeExceptionally(error); + public long timeoutIntervalInMillis() { + return this.timer.getRequestTimeout(TimeUnit.MILLISECONDS); } - public Timeout newTimeout(final TimerTask task) { - return this.timer.newTimeout(task); + public long transportRequestId() { + return this.args.transportRequestId(); } + // endregion + + // region Methods + public long stop(Timer requests, Timer responses) { return this.args.stop(requests, responses); } @Override public String toString() { - return RntbdObjectMapper.toString(this.args); + return RntbdObjectMapper.toString(this); + } + + // endregion + + // region Types + + public enum Stage { + CREATED, QUEUED, SENT, UNSENT, CANCELLED_BY_CLIENT } + static final class JsonSerializer extends StdSerializer { + + JsonSerializer() { + super(RntbdRequestRecord.class); + } + + @Override + public void serialize(final RntbdRequestRecord value, final JsonGenerator generator, + final SerializerProvider provider) throws IOException { + + generator.writeStartObject(); + generator.writeObjectFieldStart("status"); + generator.writeBooleanField("done", value.isDone()); + generator.writeBooleanField("cancelled", value.isCancelled()); + generator.writeBooleanField("completedExceptionally", value.isCompletedExceptionally()); + + if (value.isCompletedExceptionally()) { + + try { + + value.get(); + + } catch (final ExecutionException executionException) { + + final Throwable error = executionException.getCause(); + + generator.writeObjectFieldStart("error"); + generator.writeStringField("type", error.getClass().getName()); + generator.writeObjectField("value", error); + generator.writeEndObject(); + + } catch (CancellationException | InterruptedException exception) { + + generator.writeObjectFieldStart("error"); + generator.writeStringField("type", exception.getClass().getName()); + generator.writeObjectField("value", exception); + generator.writeEndObject(); + } + } + + generator.writeEndObject(); + generator.writeObjectField("args", value.args); + generator.writeEndObject(); + } + } // endregion } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdRequestTimer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdRequestTimer.java index 55d03936dff5..328a0ddb0a5e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdRequestTimer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdRequestTimer.java @@ -7,32 +7,43 @@ import io.netty.util.Timeout; import io.netty.util.Timer; import io.netty.util.TimerTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Set; import java.util.concurrent.TimeUnit; public final class RntbdRequestTimer implements AutoCloseable { private static final long FIVE_MILLISECONDS = 5000000L; + + private static final Logger logger = LoggerFactory.getLogger(RntbdRequestTimer.class); private final long requestTimeout; private final Timer timer; public RntbdRequestTimer(final long requestTimeout) { // Inspection of the HashWheelTimer code indicates that our choice of a 5 millisecond timer resolution ensures - // a request will timeout within 10 milliseconds of the specified requestTimeout interval. This is because + // a request will expire within 10 milliseconds of the specified requestTimeout interval. This is because // cancellation of a timeout takes two timer resolution units to complete. this.timer = new HashedWheelTimer(FIVE_MILLISECONDS, TimeUnit.NANOSECONDS); this.requestTimeout = requestTimeout; } - public long getRequestTimeout(TimeUnit unit) { + public long getRequestTimeout(final TimeUnit unit) { return unit.convert(requestTimeout, TimeUnit.NANOSECONDS); } @Override - public void close() throws RuntimeException { - this.timer.stop(); + public void close() { + final Set timeouts = this.timer.stop(); + if (logger.isDebugEnabled()) { + final int count = timeouts.size(); + if (count > 0) { + logger.debug("request expiration tasks cancelled: {}", count); + } + } } public Timeout newTimeout(final TimerTask task) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdResponse.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdResponse.java index 805a03bd3b21..6f4617c4ac12 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdResponse.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdResponse.java @@ -71,6 +71,7 @@ private RntbdResponse( this.content = content.retain(); } + @JsonIgnore public UUID getActivityId() { return this.frame.getActivityId(); } @@ -97,10 +98,9 @@ public Long getTransportRequestId() { static RntbdResponse decode(final ByteBuf in) { - int start = in.markReaderIndex().readerIndex(); + final int start = in.markReaderIndex().readerIndex(); final RntbdResponseStatus frame = RntbdResponseStatus.decode(in); - final RntbdResponseHeaders headers = RntbdResponseHeaders.decode(in.readSlice(frame.getHeadersLength())); final boolean hasPayload = headers.isPayloadPresent(); final ByteBuf content; @@ -120,7 +120,7 @@ static RntbdResponse decode(final ByteBuf in) { content = Unpooled.EMPTY_BUFFER; } - int end = in.readerIndex(); + final int end = in.readerIndex(); in.resetReaderIndex(); return new RntbdResponse(in.readSlice(end - start), frame, headers, content); @@ -163,9 +163,9 @@ public int refCnt() { } /** - * Decreases the reference count by {@code 1} and deallocate this object if the reference count reaches {@code 0} + * Decreases the reference count by {@code 1} and deallocate this response if the count reaches {@code 0}. * - * @return {@code true} if and only if the reference count became {@code 0} and this object is de-allocated + * @return {@code true} if and only if the reference count became {@code 0} and this response is deallocated. */ @Override public boolean release() { @@ -173,10 +173,10 @@ public boolean release() { } /** - * Decreases the reference count by {@code decrement} and de-allocates this object if the reference count reaches {@code 0} + * Decreases the reference count by {@code decrement} and deallocates this response if the count reaches {@code 0}. * - * @param decrement amount of the decrease - * @return {@code true} if and only if the reference count became {@code 0} and this object has been de-allocated + * @param decrement amount of the decrease. + * @return {@code true} if and only if the reference count became {@code 0} and this response has been deallocated. */ @Override public boolean release(final int decrement) { @@ -198,8 +198,10 @@ public boolean release(final int decrement) { this.content.release(); } - checkState(this.in == Unpooled.EMPTY_BUFFER || this.in.refCnt() == 0); - checkState(this.content == Unpooled.EMPTY_BUFFER || this.content.refCnt() == 0); + // TODO: DANOBLE: figure out why PooledUnsafeDirectByteBuf violates these expectations: + // checkState(this.in == Unpooled.EMPTY_BUFFER || this.in.refCnt() == 0); + // checkState(this.content == Unpooled.EMPTY_BUFFER || this.content.refCnt() == 0); + // Specifically, why are this.in.refCnt() and this.content.refCnt() equal to 1? } return value; @@ -280,8 +282,9 @@ public void serialize(final ByteBuf value, final JsonGenerator generator, final final int length = value.readableBytes(); generator.writeStartObject(); - generator.writeObjectField("length", length); - generator.writeObjectField("content", ByteBufUtil.hexDump(value, 0, length)); + generator.writeObjectField("lengthInBytes", length); + generator.writeObjectField("hexDump", ByteBufUtil.hexDump(value, 0, length)); + generator.writeObjectField("string", value.getCharSequence(0, length, StandardCharsets.UTF_8)); generator.writeEndObject(); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdResponseHeaders.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdResponseHeaders.java index f9fb76c3dda1..eb0258244b8b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdResponseHeaders.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdResponseHeaders.java @@ -26,6 +26,7 @@ import static com.azure.cosmos.internal.directconnectivity.rntbd.RntbdConstants.RntbdIndexingDirective; import static com.azure.cosmos.internal.directconnectivity.rntbd.RntbdConstants.RntbdResponseHeader; +@SuppressWarnings("UnstableApiUsage") @JsonFilter("RntbdToken") class RntbdResponseHeaders extends RntbdTokenStream { @@ -223,7 +224,7 @@ public Map asMap(final RntbdContext context, final UUID activity static RntbdResponseHeaders decode(final ByteBuf in) { final RntbdResponseHeaders headers = new RntbdResponseHeaders(in); - decode(headers); + RntbdTokenStream.decode(headers); return headers; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdServiceEndpoint.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdServiceEndpoint.java index 365d00f077ca..e7a1847c4e99 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdServiceEndpoint.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdServiceEndpoint.java @@ -31,7 +31,6 @@ import java.net.URI; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -41,6 +40,7 @@ import static com.azure.cosmos.internal.directconnectivity.RntbdTransportClient.Options; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import static java.util.concurrent.TimeUnit.NANOSECONDS; @JsonSerialize(using = RntbdServiceEndpoint.JsonSerializer.class) public final class RntbdServiceEndpoint implements RntbdEndpoint { @@ -48,7 +48,7 @@ public final class RntbdServiceEndpoint implements RntbdEndpoint { // region Fields private static final String TAG_NAME = RntbdServiceEndpoint.class.getSimpleName(); - private static final long QUIET_PERIOD = 2L * 1_000_000_000L; + private static final long QUIET_PERIOD = 2L * 1_000_000_000L; // 2 seconds private static final AtomicLong instanceCount = new AtomicLong(); private static final Logger logger = LoggerFactory.getLogger(RntbdServiceEndpoint.class); @@ -312,12 +312,12 @@ public static final class Provider implements RntbdEndpoint.Provider { public Provider(final RntbdTransportClient transportClient, final Options options, final SslContext sslContext) { - checkNotNull(transportClient, "provider"); - checkNotNull(options, "options"); - checkNotNull(sslContext, "sslContext"); + checkNotNull(transportClient, "expected non-null provider"); + checkNotNull(options, "expected non-null options"); + checkNotNull(sslContext, "expected non-null sslContext"); - final DefaultThreadFactory threadFactory = new DefaultThreadFactory("CosmosEventLoop", true); - final int threadCount = Runtime.getRuntime().availableProcessors(); + final DefaultThreadFactory threadFactory = new DefaultThreadFactory("cosmos-rntbd-nio", true); + final int threadCount = 2 * Runtime.getRuntime().availableProcessors(); final LogLevel wireLogLevel; if (logger.isTraceEnabled()) { @@ -349,13 +349,14 @@ public void close() { endpoint.close(); } - this.eventLoopGroup.shutdownGracefully(QUIET_PERIOD, this.config.shutdownTimeout(), TimeUnit.NANOSECONDS).addListener(future -> { - if (future.isSuccess()) { - logger.debug("\n [{}]\n closed endpoints", this); - return; - } - logger.error("\n [{}]\n failed to close endpoints due to ", this, future.cause()); - }); + this.eventLoopGroup.shutdownGracefully(QUIET_PERIOD, this.config.shutdownTimeout(), NANOSECONDS) + .addListener(future -> { + if (future.isSuccess()) { + logger.debug("\n [{}]\n closed endpoints", this); + return; + } + logger.error("\n [{}]\n failed to close endpoints due to ", this, future.cause()); + }); return; } @@ -390,14 +391,6 @@ public Stream list() { } private void evict(RntbdEndpoint endpoint) { - - // TODO: DANOBLE: Utilize this method of tearing down unhealthy endpoints - // Specifically, ensure that this method is called when a Read/WriteTimeoutException occurs or a health - // check request fails. This perhaps likely requires a change to RntbdClientChannelPool. - // Links: - // https://msdata.visualstudio.com/CosmosDB/_workitems/edit/331552 - // https://msdata.visualstudio.com/CosmosDB/_workitems/edit/331593 - if (this.endpoints.remove(endpoint.remoteAddress().toString()) != null) { this.evictions.incrementAndGet(); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdToken.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdToken.java index 53d67abfe9f2..84110d7b15b6 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdToken.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdToken.java @@ -10,13 +10,14 @@ import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.ser.PropertyWriter; import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter; -import com.google.common.base.Strings; import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.CorruptedFrameException; import static com.azure.cosmos.internal.directconnectivity.rntbd.RntbdConstants.RntbdHeader; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Strings.lenientFormat; @JsonPropertyOrder({ "id", "name", "type", "present", "required", "value" }) final class RntbdToken { @@ -66,16 +67,25 @@ public RntbdTokenType getTokenType() { @JsonProperty public Object getValue() { + final RntbdTokenType.Codec codec = this.header.type().codec(); + if (this.value == null) { - return this.header.type().codec().defaultValue(); + return codec.defaultValue(); } if (this.value instanceof ByteBuf) { final ByteBuf buffer = (ByteBuf)this.value; - this.value = this.header.type().codec().read(buffer); - buffer.release(); + this.value = codec.defaultValue(); + try { + this.value = codec.read(buffer); + } catch (final CorruptedFrameException error) { + String message = lenientFormat("failed to read %s value: %s", this.getName(), error.getMessage()); + throw new CorruptedFrameException(message); + } finally { + buffer.release(); + } } else { - this.value = this.header.type().codec().convert(this.value); + this.value = codec.convert(this.value); } return this.value; @@ -137,7 +147,7 @@ public static RntbdToken create(final RntbdHeader header) { public void decode(final ByteBuf in) { - checkNotNull(in, "in"); + checkNotNull(in, "expected non-null in"); if (this.value instanceof ByteBuf) { ((ByteBuf)this.value).release(); @@ -152,7 +162,7 @@ public void encode(final ByteBuf out) { if (!this.isPresent()) { if (this.isRequired()) { - final String message = Strings.lenientFormat("Missing value for required header: %s", this); + final String message = lenientFormat("Missing value for required header: %s", this); throw new IllegalStateException(message); } return; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdTokenStream.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdTokenStream.java index 72eafe920a3a..bb06a881ddfe 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdTokenStream.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdTokenStream.java @@ -3,16 +3,17 @@ package com.azure.cosmos.internal.directconnectivity.rntbd; -import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.CorruptedFrameException; import java.util.stream.Collector; import static com.azure.cosmos.internal.directconnectivity.rntbd.RntbdConstants.RntbdHeader; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Strings.lenientFormat; @SuppressWarnings("UnstableApiUsage") abstract class RntbdTokenStream & RntbdHeader> { @@ -77,9 +78,8 @@ static > T decode(final T stream) { for (final RntbdToken token : stream.tokens.values()) { if (!token.isPresent() && token.isRequired()) { - final String reason = Strings.lenientFormat("Required token not found on token stream: type=%s, identifier=%s", - token.getTokenType(), token.getId()); - throw new IllegalStateException(reason); + final String message = lenientFormat("Required header not found on token stream: %s", token); + throw new CorruptedFrameException(message); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdTokenType.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdTokenType.java index c34f1884c305..b2416c10aa7f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdTokenType.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/directconnectivity/rntbd/RntbdTokenType.java @@ -3,17 +3,16 @@ package com.azure.cosmos.internal.directconnectivity.rntbd; -import com.google.common.base.Strings; import com.google.common.base.Utf8; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.handler.codec.CorruptedFrameException; -import io.netty.handler.codec.DecoderException; import java.nio.charset.StandardCharsets; import java.util.UUID; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Strings.lenientFormat; enum RntbdTokenType { @@ -93,10 +92,17 @@ public interface Codec { void write(Object value, ByteBuf out); static void checkReadableBytes(final ByteBuf in, final long length, final long maxLength) { - if (length != in.readableBytes() || length > maxLength) { - String message = Strings.lenientFormat("maxLength: %s, length: %s, readableBytes: %s", - maxLength, length, in.readableBytes()); - throw new CorruptedFrameException(message); + + if (length > maxLength) { + throw new CorruptedFrameException( + lenientFormat("value length (%s) is greater than maxLength (%s)", length, maxLength)); + } + + final int readableBytes = in.readableBytes(); + + if (length != readableBytes) { + throw new CorruptedFrameException( + lenientFormat("readableBytes (%s) does not match value length (%s)", readableBytes, length)); } } } @@ -681,6 +687,7 @@ private static class RntbdString implements Codec { private RntbdString() { } + @SuppressWarnings("UnstableApiUsage") final int computeLength(final Object value, final int maxLength) { assert this.isValid(value); @@ -696,16 +703,16 @@ final int computeLength(final Object value, final int maxLength) { final byte[] string = (byte[])value; if (!Utf8.isWellFormed(string)) { - final String reason = Strings.lenientFormat("UTF-8 byte string is ill-formed: %s", ByteBufUtil.hexDump(string)); - throw new DecoderException(reason); + final String reason = lenientFormat("UTF-8 byte string is ill-formed: %s", ByteBufUtil.hexDump(string)); + throw new CorruptedFrameException(reason); } length = string.length; } if (length > maxLength) { - final String reason = Strings.lenientFormat("UTF-8 byte string exceeds %s bytes: %s bytes", maxLength, length); - throw new DecoderException(reason); + final String reason = lenientFormat("UTF-8 byte string exceeds %s bytes: %s bytes", maxLength, length); + throw new CorruptedFrameException(reason); } return length; diff --git a/sdk/cosmos/tests.yml b/sdk/cosmos/tests.yml index ba682d049594..6129590ba2d4 100644 --- a/sdk/cosmos/tests.yml +++ b/sdk/cosmos/tests.yml @@ -22,7 +22,7 @@ jobs: DisplayName: ReadMyWrites Integration Tests OSVmImage: 'windows-2019' ProfileFlag: '-Pe2e' - AdditionalArgs: '-DargLine="-Dcosmos.directModeProtocol=Https"' + AdditionalArgs: '-DargLine="-Dazure.cosmos.directModeProtocol=Https"' DESIRED_CONSISTENCY: 'Session' # 09m 23s, expect passed Single_Region_Fast: @@ -190,7 +190,7 @@ jobs: DisplayName: Single Region ReadMyWrites OSVmImage: 'windows-2019' ProfileFlag: '-Pe2e' - AdditionalArgs: '-DargLine="-Dcosmos.directModeProtocol=Https"' + AdditionalArgs: '-DargLine="-Dazure.cosmos.directModeProtocol=Https"' DESIRED_CONSISTENCY: 'Strong' # 08m 50s, expect passed Strong_Https_Fast: @@ -253,7 +253,7 @@ jobs: DisplayName: Multimaster Multi Region Long OSVmImage: 'windows-2019' ProfileFlag: '-e -Plong' - AdditionalArgs: '-DargLine="-Dcosmos.directModeProtocol=Https"' + AdditionalArgs: '-DargLine="-Dazure.cosmos.directModeProtocol=Https"' # 06m 34s, previously failing MultiMaster: DisplayName: Multimaster Multi Region Multi-Master