Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,23 @@ public MetricDataMapper(BiConsumer<AbstractTelemetryBuilder, Resource> telemetry
this.captureHttpServer4xxAsError = captureHttpServer4xxAsError;
}

public void mapMetrics(MetricData metricData, Consumer<TelemetryItem> consumer) {
MetricDataType type = metricData.getType();
if (type == DOUBLE_SUM || type == DOUBLE_GAUGE || type == LONG_SUM || type == LONG_GAUGE || type == HISTOGRAM) {

// DO NOT emit unstable metrics from the OpenTelemetry auto instrumentation libraries
// custom metrics are always emitted
if (OTEL_UNSTABLE_METRICS_TO_EXCLUDE.contains(metricData.getName())
&& metricData.getInstrumentationScopeInfo().getName().startsWith(OTEL_INSTRUMENTATION_NAME_PREFIX)) {
return;
}
List<TelemetryItem> stableOtelMetrics = convertOtelMetricToAzureMonitorMetric(metricData, false);
stableOtelMetrics.forEach(consumer::accept);
} else {
logger.warning("metric data type {} is not supported yet.", metricData.getType());
}
}

public void map(MetricData metricData, Consumer<TelemetryItem> consumer) {
MetricDataType type = metricData.getType();
if (type == DOUBLE_SUM || type == DOUBLE_GAUGE || type == LONG_SUM || type == LONG_GAUGE || type == HISTOGRAM) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.azure.core.http.HttpPipeline;
import com.azure.core.http.HttpRequest;
import com.azure.monitor.opentelemetry.exporter.implementation.MetricDataMapper;
import com.azure.monitor.opentelemetry.exporter.implementation.models.TelemetryItem;
import com.azure.monitor.opentelemetry.exporter.implementation.utils.HostName;
import com.azure.monitor.opentelemetry.exporter.implementation.utils.Strings;
Expand All @@ -20,13 +21,15 @@

public class QuickPulse {

static final int QP_INVARIANT_VERSION = 1;
// 6 represents filtering support for Otel metrics only is enabled
static final int QP_INVARIANT_VERSION = 6;

private volatile QuickPulseDataCollector collector;

public static QuickPulse create(HttpPipeline httpPipeline, Supplier<URL> endpointUrl,
Supplier<String> instrumentationKey, @Nullable String roleName, @Nullable String roleInstance,
String sdkVersion) {
boolean useNormalizedValueForNonNormalizedCpuPercentage, QuickPulseMetricReader quickPulseMetricReader,
MetricDataMapper metricDataMapper, String sdkVersion) {

QuickPulse quickPulse = new QuickPulse();

Expand All @@ -40,7 +43,8 @@ public static QuickPulse create(HttpPipeline httpPipeline, Supplier<URL> endpoin
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
quickPulse.initialize(httpPipeline, endpointUrl, instrumentationKey, roleName, roleInstance, sdkVersion);
quickPulse.initialize(httpPipeline, endpointUrl, instrumentationKey, roleName, roleInstance,
useNormalizedValueForNonNormalizedCpuPercentage, quickPulseMetricReader, metricDataMapper, sdkVersion);
});
// the condition below will always be false, but by referencing the executor it ensures the
// executor can't become unreachable in the middle of the execute() method execution above
Expand All @@ -64,12 +68,16 @@ public void add(TelemetryItem telemetryItem) {
}

private void initialize(HttpPipeline httpPipeline, Supplier<URL> endpointUrl, Supplier<String> instrumentationKey,
@Nullable String roleName, @Nullable String roleInstance, String sdkVersion) {
@Nullable String roleName, @Nullable String roleInstance,
boolean useNormalizedValueForNonNormalizedCpuPercentage, QuickPulseMetricReader quickPulseMetricReader,
MetricDataMapper metricDataMapper, String sdkVersion) {

String quickPulseId = UUID.randomUUID().toString().replace("-", "");
ArrayBlockingQueue<HttpRequest> sendQueue = new ArrayBlockingQueue<>(256, true);
QuickPulseConfiguration quickPulseConfiguration = new QuickPulseConfiguration();

QuickPulseDataSender quickPulseDataSender = new QuickPulseDataSender(httpPipeline, sendQueue);
QuickPulseDataSender quickPulseDataSender
= new QuickPulseDataSender(httpPipeline, sendQueue, quickPulseConfiguration);

String instanceName = roleInstance;
String machineName = HostName.get();
Expand All @@ -81,12 +89,13 @@ private void initialize(HttpPipeline httpPipeline, Supplier<URL> endpointUrl, Su
instanceName = "Unknown host";
}

QuickPulseDataCollector collector = new QuickPulseDataCollector();
QuickPulseDataCollector collector
= new QuickPulseDataCollector(useNormalizedValueForNonNormalizedCpuPercentage, quickPulseConfiguration);

QuickPulsePingSender quickPulsePingSender = new QuickPulsePingSender(httpPipeline, endpointUrl,
instrumentationKey, roleName, instanceName, machineName, quickPulseId, sdkVersion);
instrumentationKey, roleName, instanceName, machineName, quickPulseId, sdkVersion, quickPulseConfiguration);
QuickPulseDataFetcher quickPulseDataFetcher = new QuickPulseDataFetcher(collector, sendQueue, endpointUrl,
instrumentationKey, roleName, instanceName, machineName, quickPulseId);
instrumentationKey, roleName, instanceName, machineName, quickPulseId, quickPulseConfiguration);

QuickPulseCoordinatorInitData coordinatorInitData
= new QuickPulseCoordinatorInitDataBuilder().withPingSender(quickPulsePingSender)
Expand All @@ -97,6 +106,14 @@ private void initialize(HttpPipeline httpPipeline, Supplier<URL> endpointUrl, Su

QuickPulseCoordinator coordinator = new QuickPulseCoordinator(coordinatorInitData);

QuickPulseMetricReceiver quickPulseMetricReceiver
= new QuickPulseMetricReceiver(quickPulseMetricReader, metricDataMapper, collector);

Thread metricReceiverThread
= new Thread(quickPulseMetricReceiver, QuickPulseMetricReceiver.class.getSimpleName());
metricReceiverThread.setDaemon(true);
metricReceiverThread.start();

Thread senderThread = new Thread(quickPulseDataSender, QuickPulseDataSender.class.getSimpleName());
senderThread.setDaemon(true);
senderThread.start();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
package com.azure.monitor.opentelemetry.exporter.implementation.quickpulse;

import com.azure.core.http.HttpResponse;
import com.azure.core.util.logging.ClientLogger;
import com.azure.json.*;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;

public class QuickPulseConfiguration {
private static final ClientLogger logger = new ClientLogger(QuickPulseDataFetcher.class);
private AtomicReference<String> etag = new AtomicReference<>();
private ConcurrentHashMap<String, ArrayList<DerivedMetricInfo>> derivedMetrics = new ConcurrentHashMap<>();

public synchronized String getEtag() {
return this.etag.get();
}

public synchronized void setEtag(String etag) {
this.etag.set(etag);
}

public synchronized ConcurrentHashMap<String, ArrayList<DerivedMetricInfo>> getDerivedMetrics() {
return this.derivedMetrics;
}

public synchronized void setDerivedMetrics(ConcurrentHashMap<String, ArrayList<DerivedMetricInfo>> metrics) {
this.derivedMetrics = metrics;
}

public synchronized void updateConfig(String etagValue,
ConcurrentHashMap<String, ArrayList<DerivedMetricInfo>> otelMetrics) {
if (!Objects.equals(this.getEtag(), etagValue)) {
this.setEtag(etagValue);
this.setDerivedMetrics(otelMetrics);
}

}

public ConcurrentHashMap<String, ArrayList<DerivedMetricInfo>> parseDerivedMetrics(HttpResponse response)
throws IOException {

ConcurrentHashMap<String, ArrayList<DerivedMetricInfo>> requestedMetrics = new ConcurrentHashMap<>();
try {

String responseBody = response.getBodyAsString().block();
if (responseBody == null || responseBody.isEmpty()) {
return new ConcurrentHashMap<String, ArrayList<DerivedMetricInfo>>();
}

try (JsonReader jsonReader = JsonProviders.createReader(responseBody)) {
jsonReader.nextToken();
while (jsonReader.nextToken() != JsonToken.END_OBJECT) {
if ("Metrics".equals(jsonReader.getFieldName())) {
jsonReader.nextToken();

while (jsonReader.nextToken() != JsonToken.END_ARRAY) {
DerivedMetricInfo metric = new DerivedMetricInfo();

while (jsonReader.nextToken() != JsonToken.END_OBJECT) {

String fieldName = jsonReader.getFieldName();
jsonReader.nextToken();

switch (fieldName) {
case "Id":
metric.setId(jsonReader.getString());
break;

case "Aggregation":
metric.setAggregation(jsonReader.getString());
break;

case "TelemetryType":
metric.setTelemetryType(jsonReader.getString());
break;

case "Projection":
metric.setProjection(jsonReader.getString());
break;

case "FilterGroups":
// Handle "FilterGroups" field
if (jsonReader.currentToken() == JsonToken.START_ARRAY) {
while (jsonReader.nextToken() != JsonToken.END_ARRAY) {
if (jsonReader.currentToken() == JsonToken.START_OBJECT) {
while (jsonReader.nextToken() != JsonToken.END_OBJECT) {
if (jsonReader.currentToken() == JsonToken.FIELD_NAME
&& jsonReader.getFieldName().equals("Filters")) {
jsonReader.nextToken();
if (jsonReader.currentToken() == JsonToken.START_ARRAY) {
while (jsonReader.nextToken() != JsonToken.END_ARRAY) {
if (jsonReader.currentToken()
== JsonToken.START_OBJECT) {
String innerFieldName = "";
String predicate = "";
String comparand = "";

while (jsonReader.nextToken()
!= JsonToken.END_OBJECT) {
String filterFieldName
= jsonReader.getFieldName();
jsonReader.nextToken();

switch (filterFieldName) {
case "FieldName":
innerFieldName
= jsonReader.getString();
if (innerFieldName.contains(".")) {
innerFieldName = innerFieldName
.split("\\.")[1];
}
break;

case "Predicate":
predicate = jsonReader.getString();
break;

case "Comparand":
comparand = jsonReader.getString();
break;
}
}

if (!innerFieldName.isEmpty()
&& !innerFieldName.equals("undefined")
&& !predicate.isEmpty()
&& !comparand.isEmpty()) {
metric.addFilterGroup(innerFieldName,
predicate, comparand);
}
}
}
}
}
}
}
}
}
break;

default:
jsonReader.skipChildren();
break;
}
}
requestedMetrics.computeIfAbsent(metric.getTelemetryType(), k -> new ArrayList<>())
.add(metric);
}
} else {
jsonReader.skipChildren();

}
}
}
return requestedMetrics;
} catch (Exception e) {
logger.verbose("Failed to parse metrics from response: %s", e.getMessage());
}
return new ConcurrentHashMap<String, ArrayList<DerivedMetricInfo>>();
}

public class DerivedMetricInfo {
private String id;
private String projection;
private String telemetryType;
private String aggregation;
private ArrayList<FilterGroup> filterGroups = new ArrayList<FilterGroup>();

public String getId() {
return this.id;
}

public void setId(String id) {
this.id = id;
}

public String getProjection() {
return projection;
}

public void setTelemetryType(String telemetryType) {
this.telemetryType = telemetryType;
}

public String getTelemetryType() {
return this.telemetryType;
}

public void setProjection(String projection) {
this.projection = projection;
}

public String getAggregation() {
return this.aggregation;
}

public void setAggregation(String aggregation) {
this.aggregation = aggregation;
}

public ArrayList<FilterGroup> getFilterGroups() {
return this.filterGroups;
}

public void addFilterGroup(String fieldName, String predicate, String comparand) {
this.filterGroups.add(new FilterGroup(fieldName, predicate, comparand));
}
}

class FilterGroup {
private String fieldName;
private String operator;
private String comparand;

public FilterGroup(String fieldName, String predicate, String comparand) {
this.setFieldName(fieldName);
this.setOperator(predicate);
this.setComparand(comparand);
}

public String getFieldName() {
return this.fieldName;
}

private void setFieldName(String fieldName) {
this.fieldName = fieldName;
}

public String getOperator() {
return this.operator;
}

private void setOperator(String operator) {
this.operator = operator;
}

public String getComparand() {
return this.comparand;
}

public void setComparand(String comparand) {
this.comparand = comparand;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,14 @@ private long sendData() {

case QP_IS_OFF:
pingMode = true;
collector.flushOtelMetrics();
QuickPulseMetricReceiver.setQuickPulseHeaderInfo(currentQuickPulseHeaderInfo);
return qpsServicePollingIntervalHintMillis > 0
? qpsServicePollingIntervalHintMillis
: waitBetweenPingsInMillis;

case QP_IS_ON:
QuickPulseMetricReceiver.setQuickPulseHeaderInfo(currentQuickPulseHeaderInfo);
return waitBetweenPostsInMillis;
}

Expand Down
Loading
Loading