Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
efa0a9e
Corrected an error in RequestRateTooLargeException, a regression that…
Oct 9, 2019
c307ef3
Corrected ServiceUnavailableException status code, optimized imports,…
Oct 10, 2019
d774841
Refactored DocumentClientExceptionTest to spread it out between commo…
Oct 11, 2019
016d344
Bumped version number to 2.6.3-SNAPSHOT
Oct 11, 2019
1ec926c
Merge branch 'master' of https://github.com/Azure/azure-cosmosdb-java…
Oct 12, 2019
e4cc408
Bumped netty version number to pickup this and other fixes: https://g…
Oct 12, 2019
acb3af1
Improved error handling for RntbdResponseHeaders
Oct 13, 2019
4078cbb
Improved an error message
Oct 14, 2019
cec103a
Resolved #275
Oct 15, 2019
77f1ddd
Minor code cleanup in the vicinity of one possible source of this issue
Oct 15, 2019
e77b52b
Tweaks
Oct 15, 2019
214eeae
Added logger.info that reports the ssl provider (OPENSSL or JVM)
Oct 15, 2019
8440317
Improved diagnostics
Oct 17, 2019
f25abcd
Added state tracking to RntbdRequestRecord so that the logs reveal th…
Oct 18, 2019
c9d02b0
Tweaked text of RequestTimeoutException produced by RntbdRequestRecor…
Oct 18, 2019
6e0e934
Added RntbdRequestRecord.State.UNSENT to denote failed writes
Oct 18, 2019
379b218
Sorted methods and improved checkArgument messages
Oct 20, 2019
d7b2b9e
Tweaks
Oct 21, 2019
7972e21
Added logger for tracking hashed wheel timer stops
Oct 22, 2019
578b6b3
Added debug logger message
Oct 23, 2019
ab1a01c
Tweaked comments and line spacing
Oct 23, 2019
6f0a479
Added final modifier
Oct 23, 2019
ced79ef
Added requestExpiryInterval to RntbdTransportClient.Options
Oct 24, 2019
0cc3cee
Version bump
Oct 28, 2019
c1a45cd
Tweaks for performance testing
Nov 7, 2019
d6f8d6c
Port from azure-cosmos-4.X
Nov 8, 2019
795ca0b
Removed extraneous file
Nov 8, 2019
30deacd
Merge branch 'master' of https://github.com/Azure/azure-cosmosdb-java…
Nov 8, 2019
e254277
Updated changelog to force a rebuild
Nov 8, 2019
3032fa4
Fixed a test break
Nov 8, 2019
84a7b94
Corrected package names in logger configuration file
Nov 8, 2019
4566585
Merge branch 'master' into issue/#286/port-from-v4
Nov 9, 2019
76648c3
Port from V4
Nov 9, 2019
26c1022
Updated changelog for properly recording unreleased log of changes
Nov 11, 2019
dd19243
Fixed a couple of omissions
Nov 11, 2019
9bd9dac
Updated changelog with info on this unreleased PR: https://github.com…
Nov 12, 2019
6e10a54
Removed unnecessary try/catch clause
Nov 27, 2019
2253ba3
Checkpoint for safe keeping
Nov 27, 2019
c48b564
Checkpoint for safe keeping
Nov 27, 2019
639f873
Checkpoint for safe keeping
Nov 27, 2019
e57cb18
Checkpoint for safe keeping
Nov 27, 2019
68ce1c2
Checkpoint for safe keeping (some testing successfully completed)
Dec 1, 2019
2c1e91e
Merge branch 'master' of github.com:Azure/azure-cosmosdb-java into is…
Dec 4, 2019
96757e4
Updated changelog
Dec 13, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,11 @@ protected void performWorkload(Subscriber<Document> subs, long i) throws Excepti
}

concurrencyControlSemaphore.acquire();
logger.debug("concurrencyControlSemaphore: {}", concurrencyControlSemaphore);

try {
obs.subscribeOn(Schedulers.computation()).subscribe(subs);
} catch (Throwable error) {
concurrencyControlSemaphore.release();
logger.error("subscription failed due to ", error);
}
logger.debug("concurrencyControlSemaphore: {}", concurrencyControlSemaphore);
obs.subscribeOn(Schedulers.computation()).subscribe(subs);

concurrencyControlSemaphore.release();
}

private void populateCache() {
Expand Down
2 changes: 2 additions & 0 deletions changelog/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Changelog

## Unreleased
- Added support for recording important events in the lifetime of a request ([#296](https://github.com/Azure/azure-cosmosdb-java/pull/296/))
- Addressed a memory leak in `RntbdRequestTimer` memory leak ([#289](https://github.com/Azure/azure-cosmosdb-java/pull/289))
- Improved diagnostics in Direct TCP transport client ([#287](https://github.com/Azure/azure-cosmosdb-java/pull/287))
- Bumped netty.version to 4.1.42.Final and netty-tcnative.version to 2.0.26.Final to address a Direct TCP SSL issue ([#274](https://github.com/Azure/azure-cosmosdb-java/pull/274))

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* The MIT License (MIT)
* Copyright (c) 2018 Microsoft Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.microsoft.azure.cosmosdb.internal.directconnectivity;

import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import com.google.common.collect.ImmutableList;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdObjectMapper;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.rntbd.RntbdRequestRecord;

import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Iterator;

import static com.google.common.base.Preconditions.checkNotNull;

/**
* Represents the time and duration of important events in the lifetime of a request.
*/
public final class RequestTimeline implements Iterable<RequestTimeline.Event> {

private static final RequestTimeline EMPTY = new RequestTimeline();
private final ImmutableList<Event> events;

private RequestTimeline() {
this.events = ImmutableList.of();
}

private RequestTimeline(final ImmutableList<Event> events) {
checkNotNull(events, "expected non-null events");
this.events = events;
}

public static RequestTimeline empty() {
return EMPTY;
}

public static RequestTimeline from(RntbdRequestRecord requestRecord) {

checkNotNull(requestRecord, "expected non-null requestRecord");

OffsetDateTime now = OffsetDateTime.now();

OffsetDateTime timeCreated = requestRecord.timeCreated();
OffsetDateTime timeQueued = requestRecord.timeQueued();
OffsetDateTime timePipelined = requestRecord.timePipelined();
OffsetDateTime timeSent = requestRecord.timeSent();
OffsetDateTime timeCompleted = requestRecord.timeCompleted();
OffsetDateTime timeCompletedOrNow = timeCompleted == null ? now : timeCompleted;

return RequestTimeline.of(
new Event("created",
timeCreated, timeQueued == null ? timeCompletedOrNow : timeQueued),
new Event("queued",
timeQueued, timePipelined == null ? timeCompletedOrNow : timePipelined),
new Event("pipelined",
timePipelined, timeSent == null ? timeCompletedOrNow : timeSent),
new Event("sent",
timeSent, timeCompletedOrNow),
new Event("completed",
timeCompleted, now));
}

@Override
public Iterator<Event> iterator() {
return this.events.iterator();
}

public static RequestTimeline of() {
return EMPTY;
}

public static RequestTimeline of(final Event event) {
return new RequestTimeline(ImmutableList.of(event));
}

public static RequestTimeline of(final Event e1, final Event e2) {
return new RequestTimeline(ImmutableList.of(e1, e2));
}

public static RequestTimeline of(final Event e1, final Event e2, final Event e3) {
return new RequestTimeline(ImmutableList.of(e1, e2, e3));
}

public static RequestTimeline of(final Event e1, final Event e2, final Event e3, final Event e4) {
return new RequestTimeline(ImmutableList.of(e1, e2, e3, e4));
}

public static RequestTimeline of(final Event e1, final Event e2, final Event e3, final Event e4, final Event e5) {
return new RequestTimeline(ImmutableList.of(e1, e2, e3, e4, e5));
}

public static RequestTimeline of(final Event... events) {
return new RequestTimeline(ImmutableList.copyOf(events));
}

@Override
public String toString() {
return RntbdObjectMapper.toString(this);
}

@JsonPropertyOrder({ "name", "time", "duration" })
public static final class Event {

private final Duration duration;
private final String name;
private final OffsetDateTime time;

public Event(final String name, final OffsetDateTime from, final OffsetDateTime to) {

checkNotNull(name, "expected non-null name");

this.name = name;
this.time = from;

this.duration = from == null ? null : to == null ? Duration.ZERO : Duration.between(from, to);
}

@JsonSerialize(using = ToStringSerializer.class)
public Duration getDuration() {
return this.duration;
}

public String getName() {
return name;
}

@JsonSerialize(using = ToStringSerializer.class)
public OffsetDateTime getTime() {
return time;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public Single<StoreResponse> invokeStoreAsync(
record.whenComplete((response, error) -> {

requestArgs.traceOperation(logger, null, "emitSingle", response, error);
record.stage(RntbdRequestRecord.Stage.COMPLETED);

if (error == null) {
emitter.onSuccess(response);
Expand Down Expand Up @@ -288,17 +289,17 @@ public Duration shutdownTimeout() {
return this.shutdownTimeout;
}

@Override
public String toString() {
return RntbdObjectMapper.toJson(this);
public UserAgentContainer userAgent() {
return this.userAgent;
}

// endregion

// region Methods

public UserAgentContainer userAgent() {
return this.userAgent;
@Override
public String toString() {
return RntbdObjectMapper.toJson(this);
}

// endregion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ protected void initChannel(final Channel channel) {
checkNotNull(channel);

final RntbdRequestManager requestManager = new RntbdRequestManager(this.healthChecker, this.config.maxRequestsPerChannel());
final long readerIdleTime = this.config.receiveHangDetectionTime();
final long writerIdleTime = this.config.sendHangDetectionTime();
final long allIdleTime = this.config.idleConnectionTimeout();
final long readerIdleTime = this.config.receiveHangDetectionTimeInNanos();
final long writerIdleTime = this.config.sendHangDetectionTimeInNanos();
final long allIdleTime = this.config.idleConnectionTimeoutInNanos();
final ChannelPipeline pipeline = channel.pipeline();

pipeline.addFirst(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,34 +53,34 @@ public final class RntbdClientChannelHealthChecker implements ChannelHealthCheck
private static final Logger logger = LoggerFactory.getLogger(RntbdClientChannelHealthChecker.class);

// A channel will be declared healthy if a read succeeded recently as defined by this value.
private static final long recentReadWindow = 1_000_000_000L;
private static final long recentReadWindowInNanos = 1_000_000_000L;

// A channel should not be declared unhealthy if a write succeeded recently. As such gaps between
// Timestamps.lastChannelWrite and Timestamps.lastChannelRead lower than this value are ignored.
// Guidance: The grace period should be large enough to accommodate the round trip time of the slowest server
// request. Assuming 1s of network RTT, a 2 MB request, a 2 MB response, a connection that can sustain 1 MB/s
// both ways, and a 5-second deadline at the server, 10 seconds should be enough.
private static final long readHangGracePeriod = 10L * 1_000_000_000L;
private static final long readHangGracePeriodInNanos = 10L * 1_000_000_000L;

// A channel will not be declared unhealthy if a write was attempted recently. As such gaps between
// Timestamps.lastChannelWriteAttempt and Timestamps.lastChannelWrite lower than this value are ignored.
// Guidance: The grace period should be large enough to accommodate slow writes. For example, a value of 2s requires
// that the client can sustain data rates of at least 1 MB/s when writing 2 MB documents.
private static final long writeHangGracePeriod = 2L * 1_000_000_000L;
private static final long writeHangGracePeriodInNanos = 2L * 1_000_000_000L;

// A channel is considered idle if:
// idleConnectionTimeout > 0L && System.nanoTime() - Timestamps.lastChannelRead() >= idleConnectionTimeout
private final long idleConnectionTimeout;
private final long idleConnectionTimeoutInNanos;

// A channel will be declared unhealthy if the gap between Timestamps.lastChannelWrite and Timestamps.lastChannelRead
// grows beyond this value.
// Constraint: readDelayLimit > readHangGracePeriod
private final long readDelayLimit;
private final long readDelayLimitInNanos;

// A channel will be declared unhealthy if the gap between Timestamps.lastChannelWriteAttempt and Timestamps.lastChannelWrite
// grows beyond this value.
// Constraint: writeDelayLimit > writeHangGracePeriod
private final long writeDelayLimit;
private final long writeDelayLimitInNanos;

// endregion

Expand All @@ -90,29 +90,29 @@ public RntbdClientChannelHealthChecker(final Config config) {

checkNotNull(config, "config: null");

this.idleConnectionTimeout = config.idleConnectionTimeout();
this.idleConnectionTimeoutInNanos = config.idleConnectionTimeoutInNanos();

this.readDelayLimit = config.receiveHangDetectionTime();
checkArgument(this.readDelayLimit > readHangGracePeriod, "config.receiveHangDetectionTime: %s", this.readDelayLimit);
this.readDelayLimitInNanos = config.receiveHangDetectionTimeInNanos();
checkArgument(this.readDelayLimitInNanos > readHangGracePeriodInNanos, "config.receiveHangDetectionTimeInNanos: %s", this.readDelayLimitInNanos);

this.writeDelayLimit = config.sendHangDetectionTime();
checkArgument(this.writeDelayLimit > writeHangGracePeriod, "config.sendHangDetectionTime: %s", this.writeDelayLimit);
this.writeDelayLimitInNanos = config.sendHangDetectionTimeInNanos();
checkArgument(this.writeDelayLimitInNanos > writeHangGracePeriodInNanos, "config.sendHangDetectionTimeInNanos: %s", this.writeDelayLimitInNanos);
}

// endregion

// region Methods

public long idleConnectionTimeout() {
return this.idleConnectionTimeout;
public long idleConnectionTimeoutInNanos() {
return this.idleConnectionTimeoutInNanos;
}

public long readDelayLimit() {
return this.readDelayLimit;
public long readDelayLimitInNanos() {
return this.readDelayLimitInNanos;
}

public long writeDelayLimit() {
return this.writeDelayLimit;
public long writeDelayLimitInNanos() {
return this.writeDelayLimitInNanos;
}

public Future<Boolean> isHealthy(final Channel channel) {
Expand All @@ -130,7 +130,7 @@ public Future<Boolean> isHealthy(final Channel channel) {
final Timestamps timestamps = requestManager.snapshotTimestamps();
final long currentTime = System.nanoTime();

if (currentTime - timestamps.lastChannelRead() < recentReadWindow) {
if (currentTime - timestamps.lastChannelRead() < recentReadWindowInNanos) {
return promise.setSuccess(Boolean.TRUE); // because we recently received data
}

Expand All @@ -140,15 +140,15 @@ public Future<Boolean> isHealthy(final Channel channel) {

final long writeDelay = timestamps.lastChannelWriteAttempt() - timestamps.lastChannelWrite();

if (writeDelay > this.writeDelayLimit && currentTime - timestamps.lastChannelWriteAttempt() > writeHangGracePeriod) {
if (writeDelay > this.writeDelayLimitInNanos && currentTime - timestamps.lastChannelWriteAttempt() > writeHangGracePeriodInNanos) {

final Optional<RntbdContext> rntbdContext = requestManager.rntbdContext();
final int pendingRequestCount = requestManager.pendingRequestCount();

logger.warn("{} health check failed due to hung write: {lastChannelWriteAttempt: {}, lastChannelWrite: {}, "
+ "writeDelay: {}, writeDelayLimit: {}, rntbdContext: {}, pendingRequestCount: {}}", channel,
timestamps.lastChannelWriteAttempt(), timestamps.lastChannelWrite(), writeDelay,
this.writeDelayLimit, rntbdContext, pendingRequestCount);
this.writeDelayLimitInNanos, rntbdContext, pendingRequestCount);

return promise.setSuccess(Boolean.FALSE);
}
Expand All @@ -159,21 +159,21 @@ public Future<Boolean> isHealthy(final Channel channel) {

final long readDelay = timestamps.lastChannelWrite() - timestamps.lastChannelRead();

if (readDelay > this.readDelayLimit && currentTime - timestamps.lastChannelWrite() > readHangGracePeriod) {
if (readDelay > this.readDelayLimitInNanos && currentTime - timestamps.lastChannelWrite() > readHangGracePeriodInNanos) {

final Optional<RntbdContext> rntbdContext = requestManager.rntbdContext();
final int pendingRequestCount = requestManager.pendingRequestCount();

logger.warn("{} health check failed due to hung read: {lastChannelWrite: {}, lastChannelRead: {}, "
+ "readDelay: {}, readDelayLimit: {}, rntbdContext: {}, pendingRequestCount: {}}", channel,
timestamps.lastChannelWrite(), timestamps.lastChannelRead(), readDelay,
this.readDelayLimit, rntbdContext, pendingRequestCount);
this.readDelayLimitInNanos, rntbdContext, pendingRequestCount);

return promise.setSuccess(Boolean.FALSE);
}

if (this.idleConnectionTimeout > 0L) {
if (currentTime - timestamps.lastChannelRead() > this.idleConnectionTimeout) {
if (this.idleConnectionTimeoutInNanos > 0L) {
if (currentTime - timestamps.lastChannelRead() > this.idleConnectionTimeoutInNanos) {
return promise.setSuccess(Boolean.FALSE);
}
}
Expand Down Expand Up @@ -208,9 +208,9 @@ static final class JsonSerializer extends StdSerializer<RntbdClientChannelHealth
@Override
public void serialize(RntbdClientChannelHealthChecker value, JsonGenerator generator, SerializerProvider provider) throws IOException {
generator.writeStartObject();
generator.writeNumberField("idleConnectionTimeout", value.idleConnectionTimeout());
generator.writeNumberField("readDelayLimit", value.readDelayLimit());
generator.writeNumberField("writeDelayLimit", value.writeDelayLimit());
generator.writeNumberField("idleConnectionTimeoutInNanos", value.idleConnectionTimeoutInNanos());
generator.writeNumberField("readDelayLimitInNanos", value.readDelayLimitInNanos());
generator.writeNumberField("writeDelayLimitInNanos", value.writeDelayLimitInNanos());
generator.writeEndObject();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void onTimeout(AcquireTask task) {
}
}

final long idleEndpointTimeout = config.idleEndpointTimeout();
final long idleEndpointTimeout = config.idleEndpointTimeoutInNanos();

this.idleStateDetectionScheduledFuture = this.executor.scheduleAtFixedRate(
() -> {
Expand Down Expand Up @@ -678,9 +678,9 @@ public void serialize(final RntbdClientChannelPool value, final JsonGenerator ge
generator.writeObjectFieldStart("configuration");
generator.writeNumberField("maxChannels", value.maxChannels());
generator.writeNumberField("maxRequestsPerChannel", value.maxRequestsPerChannel());
generator.writeNumberField("idleConnectionTimeout", healthChecker.idleConnectionTimeout());
generator.writeNumberField("readDelayLimit", healthChecker.readDelayLimit());
generator.writeNumberField("writeDelayLimit", healthChecker.writeDelayLimit());
generator.writeNumberField("idleConnectionTimeout", healthChecker.idleConnectionTimeoutInNanos());
generator.writeNumberField("readDelayLimit", healthChecker.readDelayLimitInNanos());
generator.writeNumberField("writeDelayLimit", healthChecker.writeDelayLimitInNanos());
generator.writeEndObject();
generator.writeObjectFieldStart("state");
generator.writeNumberField("channelsAcquired", value.channelsAcquired());
Expand Down
Loading