Skip to content

Commit

Permalink
Merge aebdd91 into 1.1.13
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Nov 13, 2023
2 parents fcbdbb7 + aebdd91 commit 162b96e
Show file tree
Hide file tree
Showing 11 changed files with 102 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ public void channelActive(ChannelHandlerContext ctx) {
recorder().recordServerConnectionOpened(ctx.channel().localAddress());
}
catch (RuntimeException e) {
// Allow request-response exchange to continue, unaffected by metrics problem
if (log.isWarnEnabled()) {
log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e);
}
// Allow request-response exchange to continue, unaffected by metrics problem
}
}
ctx.fireChannelActive();
Expand All @@ -78,10 +78,10 @@ public void channelInactive(ChannelHandlerContext ctx) {
}
}
catch (RuntimeException e) {
// Allow request-response exchange to continue, unaffected by metrics problem
if (log.isWarnEnabled()) {
log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e);
}
// Allow request-response exchange to continue, unaffected by metrics problem
}
}
ctx.fireChannelInactive();
Expand All @@ -97,9 +97,9 @@ public void channelRegistered(ChannelHandlerContext ctx) {
}
if (ctx.pipeline().get(NettyPipeline.SslHandler) != null) {
ctx.pipeline()
.addBefore(NettyPipeline.SslHandler,
NettyPipeline.TlsMetricsHandler,
tlsMetricsHandler());
.addBefore(NettyPipeline.SslHandler,
NettyPipeline.TlsMetricsHandler,
tlsMetricsHandler());
}

ctx.fireChannelRegistered();
Expand All @@ -123,10 +123,10 @@ else if (msg instanceof DatagramPacket) {
}
}
catch (RuntimeException e) {
// Allow request-response exchange to continue, unaffected by metrics problem
if (log.isWarnEnabled()) {
log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e);
}
// Allow request-response exchange to continue, unaffected by metrics problem
}

ctx.fireChannelRead(msg);
Expand All @@ -151,10 +151,10 @@ else if (msg instanceof DatagramPacket) {
}
}
catch (RuntimeException e) {
// Allow request-response exchange to continue, unaffected by metrics problem
if (log.isWarnEnabled()) {
log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e);
}
// Allow request-response exchange to continue, unaffected by metrics problem
}

//"FutureReturnValueIgnored" this is deliberate
Expand All @@ -167,10 +167,10 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
recordException(ctx, remoteAddress != null ? remoteAddress : ctx.channel().remoteAddress());
}
catch (RuntimeException e) {
// Allow request-response exchange to continue, unaffected by metrics problem
if (log.isWarnEnabled()) {
log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e);
}
// Allow request-response exchange to continue, unaffected by metrics problem
}

ctx.fireExceptionCaught(cause);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2023 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -91,12 +91,13 @@ static class TlsMetricsHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
long tlsHandshakeTimeStart = System.nanoTime();
ctx.pipeline().get(SslHandler.class)
.handshakeFuture()
.addListener(f -> {
ctx.pipeline().remove(this);
recordTlsHandshakeTime(ctx, tlsHandshakeTimeStart, f.isSuccess() ? SUCCESS : ERROR);
});
ctx.pipeline()
.get(SslHandler.class)
.handshakeFuture()
.addListener(f -> {
ctx.pipeline().remove(this);
recordTlsHandshakeTime(ctx, tlsHandshakeTimeStart, f.isSuccess() ? SUCCESS : ERROR);
});
ctx.fireChannelActive();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2022-2023 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -259,16 +259,17 @@ public void channelActive(ChannelHandlerContext ctx) {
observation = Observation.createNotStarted(recorder.name() + TLS_HANDSHAKE_TIME, this, OBSERVATION_REGISTRY);
parentContextView = updateChannelContext(ctx.channel(), observation);
observation.start();
ctx.pipeline().get(SslHandler.class)
.handshakeFuture()
.addListener(f -> {
ctx.pipeline().remove(this);
status = f.isSuccess() ? SUCCESS : ERROR;
observation.stop();

ReactorNetty.setChannelContext(ctx.channel(), parentContextView);
parentContextView = null;
});
ctx.pipeline()
.get(SslHandler.class)
.handshakeFuture()
.addListener(f -> {
ctx.pipeline().remove(this);
status = f.isSuccess() ? SUCCESS : ERROR;
observation.stop();

ReactorNetty.setChannelContext(ctx.channel(), parentContextView);
parentContextView = null;
});

ctx.fireChannelActive();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static reactor.netty.Metrics.REMOTE_ADDRESS;
import static reactor.netty.Metrics.STATUS;
import static reactor.netty.Metrics.TLS_HANDSHAKE_TIME;
import static reactor.netty.Metrics.formatSocketAddress;

/**
* A {@link ChannelMetricsRecorder} implementation for integration with Micrometer.
Expand Down Expand Up @@ -72,7 +73,7 @@ public MicrometerChannelMetricsRecorder(String name, String protocol) {

@Override
public void recordDataReceived(SocketAddress remoteAddress, long bytes) {
String address = reactor.netty.Metrics.formatSocketAddress(remoteAddress);
String address = formatSocketAddress(remoteAddress);
DistributionSummary ds = MapUtils.computeIfAbsent(dataReceivedCache, address,
key -> filter(DistributionSummary.builder(name + DATA_RECEIVED)
.baseUnit(ChannelMeters.DATA_RECEIVED.getBaseUnit())
Expand All @@ -86,7 +87,7 @@ public void recordDataReceived(SocketAddress remoteAddress, long bytes) {

@Override
public void recordDataSent(SocketAddress remoteAddress, long bytes) {
String address = reactor.netty.Metrics.formatSocketAddress(remoteAddress);
String address = formatSocketAddress(remoteAddress);
DistributionSummary ds = MapUtils.computeIfAbsent(dataSentCache, address,
key -> filter(DistributionSummary.builder(name + DATA_SENT)
.baseUnit(ChannelMeters.DATA_SENT.getBaseUnit())
Expand All @@ -100,7 +101,7 @@ public void recordDataSent(SocketAddress remoteAddress, long bytes) {

@Override
public void incrementErrorsCount(SocketAddress remoteAddress) {
String address = reactor.netty.Metrics.formatSocketAddress(remoteAddress);
String address = formatSocketAddress(remoteAddress);
Counter c = MapUtils.computeIfAbsent(errorsCache, address,
key -> filter(Counter.builder(name + ERRORS)
.tags(ChannelMeters.ChannelMetersTags.URI.asString(), protocol,
Expand All @@ -113,7 +114,7 @@ public void incrementErrorsCount(SocketAddress remoteAddress) {

@Override
public void recordTlsHandshakeTime(SocketAddress remoteAddress, Duration time, String status) {
String address = reactor.netty.Metrics.formatSocketAddress(remoteAddress);
String address = formatSocketAddress(remoteAddress);
Timer timer = getTlsHandshakeTimer(name + TLS_HANDSHAKE_TIME, address, status);
if (timer != null) {
timer.record(time);
Expand All @@ -125,13 +126,13 @@ public final Timer getTlsHandshakeTimer(String name, String address, String stat
MeterKey meterKey = new MeterKey(null, address, null, status);
return MapUtils.computeIfAbsent(tlsHandshakeTimeCache, meterKey,
key -> filter(Timer.builder(name)
.tags(REMOTE_ADDRESS, address, STATUS, status)
.register(REGISTRY)));
.tags(REMOTE_ADDRESS, address, STATUS, status)
.register(REGISTRY)));
}

@Override
public void recordConnectTime(SocketAddress remoteAddress, Duration time, String status) {
String address = reactor.netty.Metrics.formatSocketAddress(remoteAddress);
String address = formatSocketAddress(remoteAddress);
Timer timer = getConnectTimer(name + CONNECT_TIME, address, status);
if (timer != null) {
timer.record(time);
Expand All @@ -143,13 +144,13 @@ final Timer getConnectTimer(String name, String address, String status) {
MeterKey meterKey = new MeterKey(null, address, null, status);
return MapUtils.computeIfAbsent(connectTimeCache, meterKey,
key -> filter(Timer.builder(name)
.tags(REMOTE_ADDRESS, address, STATUS, status)
.register(REGISTRY)));
.tags(REMOTE_ADDRESS, address, STATUS, status)
.register(REGISTRY)));
}

@Override
public void recordResolveAddressTime(SocketAddress remoteAddress, Duration time, String status) {
String address = reactor.netty.Metrics.formatSocketAddress(remoteAddress);
String address = formatSocketAddress(remoteAddress);
Timer timer = getResolveAddressTimer(name + ADDRESS_RESOLVER, address, status);
if (timer != null) {
timer.record(time);
Expand All @@ -161,8 +162,8 @@ public final Timer getResolveAddressTimer(String name, String address, String st
MeterKey meterKey = new MeterKey(null, address, null, status);
return MapUtils.computeIfAbsent(addressResolverTimeCache, meterKey,
key -> filter(Timer.builder(name)
.tags(REMOTE_ADDRESS, address, STATUS, status)
.register(REGISTRY)));
.tags(REMOTE_ADDRESS, address, STATUS, status)
.register(REGISTRY)));
}

@Override
Expand Down Expand Up @@ -201,7 +202,7 @@ public String protocol() {

@Nullable
LongAdder getTotalConnectionsAdder(SocketAddress serverAddress) {
String address = reactor.netty.Metrics.formatSocketAddress(serverAddress);
String address = formatSocketAddress(serverAddress);
return MapUtils.computeIfAbsent(totalConnectionsCache, address,
key -> {
LongAdder totalConnectionsAdder = new LongAdder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,19 +108,19 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
}
}
catch (RuntimeException e) {
// Allow request-response exchange to continue, unaffected by metrics problem
if (log.isWarnEnabled()) {
log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e);
}
// Allow request-response exchange to continue, unaffected by metrics problem
}
});
}
}
catch (RuntimeException e) {
// Allow request-response exchange to continue, unaffected by metrics problem
if (log.isWarnEnabled()) {
log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e);
}
// Allow request-response exchange to continue, unaffected by metrics problem
}

//"FutureReturnValueIgnored" this is deliberate
Expand All @@ -140,7 +140,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {

if (msg instanceof LastHttpContent) {
// Detect if we have received an early response before the request has been fully flushed.
// In this case, invoke recordwrite now (because next we will reset all class fields).
// In this case, invoke #recordWrite now (because next we will reset all class fields).
lastReadSeq = (lastReadSeq + 1) & 0x7F_FF_FF_FF;
if ((lastReadSeq > lastWriteSeq) || (lastReadSeq == 0 && lastWriteSeq == Integer.MAX_VALUE)) {
lastWriteSeq = (lastWriteSeq + 1) & 0x7F_FF_FF_FF;
Expand All @@ -151,10 +151,10 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
}
}
catch (RuntimeException e) {
// Allow request-response exchange to continue, unaffected by metrics problem
if (log.isWarnEnabled()) {
log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e);
}
// Allow request-response exchange to continue, unaffected by metrics problem
}

ctx.fireChannelRead(msg);
Expand All @@ -166,10 +166,10 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
recordException(ctx);
}
catch (RuntimeException e) {
// Allow request-response exchange to continue, unaffected by metrics problem
if (log.isWarnEnabled()) {
log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e);
}
// Allow request-response exchange to continue, unaffected by metrics problem
}

ctx.fireExceptionCaught(cause);
Expand Down Expand Up @@ -207,20 +207,17 @@ protected void recordException(ChannelHandlerContext ctx) {

protected void recordRead(Channel channel) {
SocketAddress address = channel.remoteAddress();
recorder().recordDataReceivedTime(address,
path, method, status,
recorder().recordDataReceivedTime(address, path, method, status,
Duration.ofNanos(System.nanoTime() - dataReceivedTime));

recorder().recordResponseTime(address,
path, method, status,
recorder().recordResponseTime(address, path, method, status,
Duration.ofNanos(System.nanoTime() - dataSentTime));

recorder().recordDataReceived(address, path, dataReceived);
}

protected void recordWrite(SocketAddress address) {
recorder().recordDataSentTime(address,
path, method,
recorder().recordDataSentTime(address, path, method,
Duration.ofNanos(System.nanoTime() - dataSentTime));

recorder().recordDataSent(address, path, dataSent);
Expand All @@ -235,7 +232,7 @@ protected void reset() {
dataSent = 0;
dataReceivedTime = 0;
dataSentTime = 0;
// don't reset lastWriteSeq and lastReadSeq, which must be incremented for ever
// don't reset lastWriteSeq and lastReadSeq, which must be incremented forever
}

protected void startRead(HttpResponse msg) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2022 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2021-2023 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -60,8 +60,7 @@ protected void recordException(ChannelHandlerContext ctx) {
@Override
protected void recordWrite(SocketAddress address) {
if (contextView != null) {
recorder.recordDataSentTime(contextView, address,
path, method,
recorder.recordDataSentTime(contextView, address, path, method,
Duration.ofNanos(System.nanoTime() - dataSentTime));

recorder.recordDataSent(contextView, address, path, dataSent);
Expand All @@ -75,12 +74,10 @@ protected void recordWrite(SocketAddress address) {
protected void recordRead(Channel channel) {
if (contextView != null) {
SocketAddress address = channel.remoteAddress();
recorder.recordDataReceivedTime(contextView, address,
path, method, status,
recorder.recordDataReceivedTime(contextView, address, path, method, status,
Duration.ofNanos(System.nanoTime() - dataReceivedTime));

recorder.recordResponseTime(contextView, address,
path, method, status,
recorder.recordResponseTime(contextView, address, path, method, status,
Duration.ofNanos(System.nanoTime() - dataSentTime));

recorder.recordDataReceived(contextView, address, path, dataReceived);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2022 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2021-2023 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,6 @@
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.Tags;
import reactor.netty.Metrics;
import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool;

import java.net.SocketAddress;
Expand All @@ -31,6 +30,7 @@
import static reactor.netty.http.client.Http2ConnectionProviderMeters.Http2ConnectionProviderMetersTags.REMOTE_ADDRESS;
import static reactor.netty.http.client.Http2ConnectionProviderMeters.IDLE_CONNECTIONS;
import static reactor.netty.http.client.Http2ConnectionProviderMeters.PENDING_STREAMS;
import static reactor.netty.Metrics.formatSocketAddress;

final class MicrometerHttp2ConnectionProviderMeterRegistrar {

Expand All @@ -41,7 +41,7 @@ private MicrometerHttp2ConnectionProviderMeterRegistrar() {
}

void registerMetrics(String poolName, String id, SocketAddress remoteAddress, InstrumentedPool.PoolMetrics metrics) {
String addressAsString = Metrics.formatSocketAddress(remoteAddress);
String addressAsString = formatSocketAddress(remoteAddress);
Tags tags = Tags.of(ID.asString(), id, REMOTE_ADDRESS.asString(), addressAsString, NAME.asString(), poolName);

Gauge.builder(ACTIVE_CONNECTIONS.getName(), metrics, InstrumentedPool.PoolMetrics::acquiredSize)
Expand All @@ -62,7 +62,7 @@ void registerMetrics(String poolName, String id, SocketAddress remoteAddress, In
}

void deRegisterMetrics(String poolName, String id, SocketAddress remoteAddress) {
String addressAsString = Metrics.formatSocketAddress(remoteAddress);
String addressAsString = formatSocketAddress(remoteAddress);
Tags tags = Tags.of(ID.asString(), id, REMOTE_ADDRESS.asString(), addressAsString, NAME.asString(), poolName);

REGISTRY.remove(new Meter.Id(ACTIVE_CONNECTIONS.getName(), tags, null, null, Meter.Type.GAUGE));
Expand Down
Loading

0 comments on commit 162b96e

Please sign in to comment.