diff --git a/.idea/misc-for-inspection.xml b/.idea/misc-for-inspection.xml index 027c62e0f8b3..373735d4e92c 100644 --- a/.idea/misc-for-inspection.xml +++ b/.idea/misc-for-inspection.xml @@ -87,4 +87,4 @@ - + \ No newline at end of file diff --git a/docs/design/extensions-contrib/dropwizard.md b/docs/design/extensions-contrib/dropwizard.md index f14e883adf0f..2c970712222b 100644 --- a/docs/design/extensions-contrib/dropwizard.md +++ b/docs/design/extensions-contrib/dropwizard.md @@ -160,7 +160,15 @@ Latest default metrics mapping can be found [here] (https://github.com/apache/dr "type" ], "type": "timer", - "timeUnit": "NANOSECONDS" + "timeUnit": "MICROSECONDS" + }, + "query/brokerCpuTime": { + "dimensions": [ + "dataSource", + "type" + ], + "type": "timer", + "timeUnit": "MICROSECONDS" }, "query/cache/delta/numEntries": { "dimensions": [], diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 6b429b8e2e0f..9acee1b98853 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -54,6 +54,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring |`query/time`|Milliseconds taken to complete a query.|

Common: `dataSource`, `type`, `interval`, `hasFilters`, `duration`, `context`, `remoteAddress`, `id`.

Aggregation Queries: `numMetrics`, `numComplexMetrics`.

GroupBy: `numDimensions`.

TopN: `threshold`, `dimension`.

|< 1s| |`query/planningTime`|Milliseconds taken to complete query planning at Broker before query is fanned out to the Data nodes.|

Common: `dataSource`, `type`, `interval`, `hasFilters`, `duration`, `context`, `remoteAddress`, `id`.

Aggregation Queries: `numMetrics`, `numComplexMetrics`.

GroupBy: `numDimensions`.

TopN: `threshold`, `dimension`.

|< 1s| |`query/bytes`|The total number of bytes returned to the requesting client in the query response from the broker. Other services report the total bytes for their portion of the query. |

Common: `dataSource`, `type`, `interval`, `hasFilters`, `duration`, `context`, `remoteAddress`, `id`.

Aggregation Queries: `numMetrics`, `numComplexMetrics`.

GroupBy: `numDimensions`.

TopN: `threshold`, `dimension`.

| | +|`query/brokerCpuTime`| The total amount of time (microseconds) spent at broker (excluding data-nodes), for query planning and merging intermediate results. |`query/node/time`|Milliseconds taken to query individual historical/realtime processes.|`id`, `status`, `server`|< 1s| |`query/node/bytes`|Number of bytes returned from querying individual historical/realtime processes.|`id`, `status`, `server`| | |`query/node/ttfb`|Time to first byte. Milliseconds elapsed until Broker starts receiving the response from individual historical/realtime processes.|`id`, `status`, `server`|< 1s| diff --git a/extensions-contrib/ambari-metrics-emitter/src/main/resources/defaultWhiteListMap.json b/extensions-contrib/ambari-metrics-emitter/src/main/resources/defaultWhiteListMap.json index d492419bb37a..aeabfb1cf659 100644 --- a/extensions-contrib/ambari-metrics-emitter/src/main/resources/defaultWhiteListMap.json +++ b/extensions-contrib/ambari-metrics-emitter/src/main/resources/defaultWhiteListMap.json @@ -22,6 +22,10 @@ "dataSource", "type" ], + "query/brokerCpuTime": [ + "dataSource", + "type" + ], "query/node/time": [ "dataSource", "type" diff --git a/extensions-contrib/dropwizard-emitter/src/main/resources/defaultMetricDimensions.json b/extensions-contrib/dropwizard-emitter/src/main/resources/defaultMetricDimensions.json index a0abf96541f9..607c3f673364 100644 --- a/extensions-contrib/dropwizard-emitter/src/main/resources/defaultMetricDimensions.json +++ b/extensions-contrib/dropwizard-emitter/src/main/resources/defaultMetricDimensions.json @@ -65,7 +65,15 @@ "type" ], "type": "timer", - "timeUnit": "NANOSECONDS" + "timeUnit": "MICROSECONDS" + }, + "query/brokerCpuTime": { + "dimensions": [ + "dataSource", + "type" + ], + "type": "timer", + "timeUnit": "MICROSECONDS" }, "query/cache/delta/numEntries": { "dimensions": [], diff --git a/extensions-contrib/graphite-emitter/src/main/resources/defaultWhiteListMap.json b/extensions-contrib/graphite-emitter/src/main/resources/defaultWhiteListMap.json index 1a6bfbe6346a..1a9cb462c034 100644 --- a/extensions-contrib/graphite-emitter/src/main/resources/defaultWhiteListMap.json +++ b/extensions-contrib/graphite-emitter/src/main/resources/defaultWhiteListMap.json @@ -9,6 +9,10 @@ "dataSource", "type" ], + "query/brokerCpuTime": [ + "dataSource", + "type" + ], "query/node/time": [ "dataSource", "type" diff --git a/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json b/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json index cb646f056513..e32f6c06f947 100644 --- a/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json +++ b/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json @@ -27,6 +27,10 @@ "dataSource", "type" ], + "query/brokerCpuTime": [ + "dataSource", + "type" + ], "jetty/numOpenConnections": [], "jetty/threadPool/total": [], "jetty/threadPool/idle": [], diff --git a/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json b/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json index c4fe820a6dfa..e69e08ed1797 100644 --- a/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json +++ b/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json @@ -1,6 +1,6 @@ { "query/time" : { "dimensions" : ["dataSource", "type"], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds taken to complete a query."}, -"query/planningTime" : { "dimensions" : ["dataSource", "type"], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds taken to complete query planning at Broker before query is fanned out to the Data nodes."}, + "query/planningTime" : { "dimensions" : ["dataSource", "type"], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds taken to complete query planning at Broker before query is fanned out to the Data nodes."}, "query/bytes" : { "dimensions" : ["dataSource", "type"], "type" : "count", "help": "Number of bytes returned in query response."}, "query/node/time" : { "dimensions" : ["server"], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds taken to query individual historical/realtime processes."}, "query/node/ttfb" : { "dimensions" : ["server"], "type" : "timer", "conversionFactor": 1000.0, "help": "Time to first byte. Seconds elapsed until Broker starts receiving the response from individual historical/realtime processes."}, @@ -14,6 +14,7 @@ "segment/scan/active" : { "dimensions" : [], "type" : "gauge", "help": "Number of segments currently scanned."}, "query/segmentAndCache/time" : { "dimensions" : [], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds taken to query individual segment or hit the cache (if it is enabled on the Historical process)."}, "query/cpu/time" : { "dimensions" : ["dataSource", "type"], "type" : "timer", "conversionFactor": "1000000", "help": "Seconds of CPU time taken to complete a query"}, + "query/brokerCpuTime" : { "dimensions" : ["dataSource", "type"], "type" : "timer", "conversionFactor": "1000000", "help": "Seconds of CPU time taken to complete a query"}, "query/count" : { "dimensions" : [], "type" : "count", "help": "Number of total queries" }, "query/success/count" : { "dimensions" : [], "type" : "count", "help": "Number of queries successfully processed"}, diff --git a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json index 3a68145970c9..a767787320f4 100644 --- a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json +++ b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json @@ -13,6 +13,7 @@ "segment/scan/active" : { "dimensions" : [], "type" : "gauge"}, "query/segmentAndCache/time" : { "dimensions" : [], "type" : "timer" }, "query/cpu/time" : { "dimensions" : ["dataSource", "type"], "type" : "timer" }, + "query/brokerCpuTime" : { "dimensions" : ["dataSource", "type"], "type" : "timer" }, "query/count" : { "dimensions" : [], "type" : "count" }, "query/success/count" : { "dimensions" : [], "type" : "count" }, diff --git a/processing/src/main/java/org/apache/druid/query/CPUTimeMetricQueryRunner.java b/processing/src/main/java/org/apache/druid/query/CPUTimeMetricQueryRunner.java index 843c2c46671d..28bf9e574f29 100644 --- a/processing/src/main/java/org/apache/druid/query/CPUTimeMetricQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/CPUTimeMetricQueryRunner.java @@ -82,6 +82,7 @@ public RetType wrap(Supplier sequenceProcessing) long cpuTimeDelta = JvmUtils.getCurrentThreadCpuTime() - start; cpuTimeAccumulator.addAndGet(cpuTimeDelta); responseContext.addCpuNanos(cpuTimeDelta); + responseContext.addBrokerCpuNanos(cpuTimeAccumulator.get()); } } @@ -93,6 +94,10 @@ public void after(boolean isDone, Throwable thrown) if (cpuTimeNs > 0) { queryWithMetrics.getQueryMetrics().reportCpuTime(cpuTimeNs).emit(emitter); } + final long brokerCpuTimeNs = responseContext.getBrokerCpuNanos(); + if (brokerCpuTimeNs > 0) { + queryWithMetrics.getQueryMetrics().reportBrokerCpuTime(brokerCpuTimeNs).emit(emitter); + } } } } diff --git a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java index b6c37fb326b1..d1b5b6728220 100644 --- a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java @@ -256,6 +256,12 @@ public QueryMetrics reportCpuTime(long timeNs) return reportMetric("query/cpu/time", TimeUnit.NANOSECONDS.toMicros(timeNs)); } + @Override + public QueryMetrics reportBrokerCpuTime(long timeNs) + { + return reportMetric("query/brokerCpuTime", TimeUnit.NANOSECONDS.toMicros(timeNs)); + } + @Override public QueryMetrics reportRowsScannedCount(long rowsScannedCount) { diff --git a/processing/src/main/java/org/apache/druid/query/QueryMetrics.java b/processing/src/main/java/org/apache/druid/query/QueryMetrics.java index c2a7e09aae87..9d6a583526ef 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/QueryMetrics.java @@ -363,6 +363,15 @@ default void filterBundle(FilterBundle.BundleInfo bundleInfo) */ QueryMetrics reportCpuTime(long timeNs); + /** + * Registers "broker cpu time" metric. + * Measures the total time spent in broker for answering the query. This includes the following:- + * 1. Determining segments for the given time interval. + * 2. Determining the Data nodes responsible for the segments. + * 3. Merging the results from the Data nodes. + */ + QueryMetrics reportBrokerCpuTime(long timeNs); + /** * Registers "rows scanned count" metric. */ diff --git a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java index 5e037d6eb86f..8b8189de4da9 100644 --- a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java +++ b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java @@ -461,6 +461,15 @@ public Object mergeValues(Object oldValue, Object newValue) false ); + /** + * The total CPU time consumed by the broker for processing the query. + * This is equal to the time spent in planning the query, finding data-nodes and merging sub-results. + * BROKER_CPU_CONSUMED_NANOS = CPU_CONSUMED_NANOS - Time spent by Query in historical/ realtime nodes. + */ + public static final Key BROKER_CPU_CONSUMED_NANOS = new CounterKey( + "brokerCpuConsumed", + false); + /** * Indicates if a {@link ResponseContext} was truncated during serialization. */ @@ -496,6 +505,7 @@ public Object mergeValues(Object oldValue, Object newValue) TIMEOUT_AT, NUM_SCANNED_ROWS, CPU_CONSUMED_NANOS, + BROKER_CPU_CONSUMED_NANOS, TRUNCATED, QUERY_SEGMENT_COUNT, }); @@ -706,6 +716,11 @@ public Long getCpuNanos() return (Long) get(Keys.CPU_CONSUMED_NANOS); } + public Long getBrokerCpuNanos() + { + return (Long) get(Keys.BROKER_CPU_CONSUMED_NANOS); + } + public Long getQuerySegmentCount() { return (Long) get(Keys.QUERY_SEGMENT_COUNT); @@ -757,6 +772,11 @@ public void addCpuNanos(long ns) addValue(Keys.CPU_CONSUMED_NANOS, ns); } + public void addBrokerCpuNanos(long ns) + { + addValue(Keys.BROKER_CPU_CONSUMED_NANOS, ns); + } + private Object addValue(Key key, Object value) { return getDelegate().merge(key, value, key::mergeValues); diff --git a/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java b/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java index 76bdc45103bc..c7e27801c1c1 100644 --- a/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java @@ -223,6 +223,12 @@ public QueryMetrics reportCpuTime(long timeNs) return delegateQueryMetrics.reportCpuTime(timeNs); } + @Override + public QueryMetrics reportBrokerCpuTime(long timeNs) + { + return delegateQueryMetrics.reportBrokerCpuTime(timeNs); + } + @Override public QueryMetrics reportRowsScannedCount(long rowsScannedCount) { diff --git a/processing/src/test/java/org/apache/druid/query/CPUTimeMetricQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/CPUTimeMetricQueryRunnerTest.java index 4647704fe279..b08c579a9d15 100644 --- a/processing/src/test/java/org/apache/druid/query/CPUTimeMetricQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/CPUTimeMetricQueryRunnerTest.java @@ -20,7 +20,6 @@ package org.apache.druid.query; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; @@ -33,6 +32,7 @@ import org.junit.Test; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.concurrent.atomic.AtomicLong; @@ -69,15 +69,17 @@ public void testCpuTimeMetric() ); Assert.assertEquals(expectedResults, results.toList()); - Assert.assertEquals(1, emitter.getEvents().size()); + Assert.assertEquals(2, emitter.getEvents().size()); + HashSet expectedMetrics = new HashSet<>(); + expectedMetrics.add("query/cpu/time"); + expectedMetrics.add("query/brokerCpuTime"); - final Event event = Iterables.getOnlyElement(emitter.getEvents()); - - Assert.assertEquals("metrics", event.toMap().get("feed")); - Assert.assertEquals("query/cpu/time", event.toMap().get("metric")); - - final Object value = event.toMap().get("value"); - Assert.assertThat(value, CoreMatchers.instanceOf(Long.class)); - Assert.assertTrue((long) value > 0); + for (Event event : emitter.getEvents()) { + Assert.assertEquals("metrics", event.toMap().get("feed")); + Assert.assertTrue(expectedMetrics.contains(event.toMap().get("metric"))); + final Object value = event.toMap().get("value"); + Assert.assertThat(value, CoreMatchers.instanceOf(Long.class)); + Assert.assertTrue((long) value > 0); + } } } diff --git a/server/src/main/java/org/apache/druid/server/QueryLifecycle.java b/server/src/main/java/org/apache/druid/server/QueryLifecycle.java index 0ac406809573..c376fc783cd4 100644 --- a/server/src/main/java/org/apache/druid/server/QueryLifecycle.java +++ b/server/src/main/java/org/apache/druid/server/QueryLifecycle.java @@ -78,7 +78,7 @@ *
  • Initialization ({@link #initialize(Query)})
  • *
  • Authorization ({@link #authorize(HttpServletRequest)}
  • *
  • Execution ({@link #execute()}
  • - *
  • Logging ({@link #emitLogsAndMetrics(Throwable, String, long, long, long)}
  • + *
  • Logging ({@link #emitLogsAndMetrics(Throwable, String, long, long, long, long)}
  • *
  • Logging ({@link #emitLogsAndMetrics(Throwable, String)}
  • *
  • Logging ({@link #emitLogsAndMetrics(Throwable)}
  • * @@ -302,7 +302,7 @@ private Access doAuthorize(final AuthenticationResult authenticationResult, fina /** * Execute the query. Can only be called if the query has been authorized. Note that query logs and metrics will * not be emitted automatically when the Sequence is fully iterated. It is the caller's responsibility to call - * {@link #emitLogsAndMetrics(Throwable, String, long, long, long)} to emit logs and metrics. + * {@link #emitLogsAndMetrics(Throwable, String, long, long, long, long)} to emit logs and metrics. * * @return result sequence and response context */ @@ -332,7 +332,7 @@ public void emitLogsAndMetrics( @Nullable final String remoteAddress ) { - this.emitLogsAndMetrics(e, remoteAddress, -1, -1, -1); + this.emitLogsAndMetrics(e, remoteAddress, -1, -1, -1, -1); } /** @@ -349,7 +349,8 @@ public void emitLogsAndMetrics( @Nullable final String remoteAddress, final long bytesWritten, final long rowsScanned, - final long cpuConsumedMillis + final long cpuConsumedMillis, + final long brokerCpuConsumedMillis ) { if (baseQuery == null) { @@ -395,6 +396,7 @@ public void emitLogsAndMetrics( statsMap.put("query/bytes", bytesWritten); statsMap.put("query/rowsScanned", rowsScanned); statsMap.put("query/cpu/time", cpuConsumedMillis); + statsMap.put("query/brokerCpuTime", brokerCpuConsumedMillis); statsMap.put("success", success); if (authenticationResult != null) { @@ -460,7 +462,7 @@ private void emitLogsAndMetrics( @Nullable final Throwable e ) { - this.emitLogsAndMetrics(e, null, -1, -1, -1); + this.emitLogsAndMetrics(e, null, -1, -1, -1, -1); } private boolean isSerializeDateTimeAsLong() diff --git a/server/src/main/java/org/apache/druid/server/QueryResource.java b/server/src/main/java/org/apache/druid/server/QueryResource.java index e0ae918e00f9..27abd845d98f 100644 --- a/server/src/main/java/org/apache/druid/server/QueryResource.java +++ b/server/src/main/java/org/apache/druid/server/QueryResource.java @@ -521,7 +521,7 @@ public Response.ResponseBuilder start() final String prevEtag = getPreviousEtag(req); if (prevEtag != null && prevEtag.equals(responseContext.getEntityTag())) { - queryLifecycle.emitLogsAndMetrics(null, req.getRemoteAddr(), -1, -1, -1); + queryLifecycle.emitLogsAndMetrics(null, req.getRemoteAddr(), -1, -1, -1, -1); counter.incrementSuccess(); return Response.status(Status.NOT_MODIFIED); } @@ -576,15 +576,15 @@ public void recordSuccess(long numBytes) } @Override - public void recordSuccess(long numBytes, long numRowsScanned, long cpuTimeInMillis) + public void recordSuccess(long numBytes, long numRowsScanned, long cpuTimeInMillis, long brokerCpuTimeMillis) { - queryLifecycle.emitLogsAndMetrics(null, req.getRemoteAddr(), numBytes, numRowsScanned, cpuTimeInMillis); + queryLifecycle.emitLogsAndMetrics(null, req.getRemoteAddr(), numBytes, numRowsScanned, cpuTimeInMillis, brokerCpuTimeMillis); } @Override public void recordFailure(Exception e) { - queryLifecycle.emitLogsAndMetrics(e, req.getRemoteAddr(), -1, -1, -1); + queryLifecycle.emitLogsAndMetrics(e, req.getRemoteAddr(), -1, -1, -1, -1); } @Override diff --git a/server/src/main/java/org/apache/druid/server/QueryResultPusher.java b/server/src/main/java/org/apache/druid/server/QueryResultPusher.java index 5e9e680002e8..5fa0a26d968e 100644 --- a/server/src/main/java/org/apache/druid/server/QueryResultPusher.java +++ b/server/src/main/java/org/apache/druid/server/QueryResultPusher.java @@ -152,7 +152,7 @@ public Response push() counter.incrementSuccess(); accumulator.close(); - resultsWriter.recordSuccess(accumulator.getNumBytesSent(), accumulator.rowsScanned, accumulator.cpuConsumedMillis); + resultsWriter.recordSuccess(accumulator.getNumBytesSent(), accumulator.rowsScanned, accumulator.cpuConsumedMillis, accumulator.brokerCpuTime); } catch (DruidException e) { // Less than ideal. But, if we return the result as JSON, this is @@ -308,7 +308,7 @@ public interface ResultsWriter extends Closeable void recordSuccess(long numBytes); - void recordSuccess(long numBytes, long numRowsScanned, long cpuTimeInMillis); + void recordSuccess(long numBytes, long numRowsScanned, long cpuTimeInMillis, long brokerCpuTimeMillis); void recordFailure(Exception e); } @@ -347,6 +347,7 @@ public class StreamingHttpResponseAccumulator implements Accumulator