Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .idea/misc-for-inspection.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion docs/design/extensions-contrib/dropwizard.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,15 @@ Latest default metrics mapping can be found [here] (https://github.com/apache/dr
"type"
],
"type": "timer",
"timeUnit": "NANOSECONDS"
"timeUnit": "MICROSECONDS"
},
"query/brokerCpuTime": {
"dimensions": [
"dataSource",
"type"
],
"type": "timer",
"timeUnit": "MICROSECONDS"
},
"query/cache/delta/numEntries": {
"dimensions": [],
Expand Down
1 change: 1 addition & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`query/time`|Milliseconds taken to complete a query.|<p>Common: `dataSource`, `type`, `interval`, `hasFilters`, `duration`, `context`, `remoteAddress`, `id`.</p><p>Aggregation Queries: `numMetrics`, `numComplexMetrics`.</p><p>GroupBy: `numDimensions`.</p><p> TopN: `threshold`, `dimension`.</p>|< 1s|
|`query/planningTime`|Milliseconds taken to complete query planning at Broker before query is fanned out to the Data nodes.|<p>Common: `dataSource`, `type`, `interval`, `hasFilters`, `duration`, `context`, `remoteAddress`, `id`.</p><p>Aggregation Queries: `numMetrics`, `numComplexMetrics`.</p><p>GroupBy: `numDimensions`.</p><p> TopN: `threshold`, `dimension`.</p>|< 1s|
|`query/bytes`|The total number of bytes returned to the requesting client in the query response from the broker. Other services report the total bytes for their portion of the query. |<p>Common: `dataSource`, `type`, `interval`, `hasFilters`, `duration`, `context`, `remoteAddress`, `id`.</p><p> Aggregation Queries: `numMetrics`, `numComplexMetrics`.</p><p> GroupBy: `numDimensions`.</p><p> TopN: `threshold`, `dimension`.</p>| |
|`query/brokerCpuTime`| The total amount of time (microseconds) spent at broker (excluding data-nodes), for query planning and merging intermediate results.
|`query/node/time`|Milliseconds taken to query individual historical/realtime processes.|`id`, `status`, `server`|< 1s|
|`query/node/bytes`|Number of bytes returned from querying individual historical/realtime processes.|`id`, `status`, `server`| |
|`query/node/ttfb`|Time to first byte. Milliseconds elapsed until Broker starts receiving the response from individual historical/realtime processes.|`id`, `status`, `server`|< 1s|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
"dataSource",
"type"
],
"query/brokerCpuTime": [
"dataSource",
"type"
],
"query/node/time": [
"dataSource",
"type"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,15 @@
"type"
],
"type": "timer",
"timeUnit": "NANOSECONDS"
"timeUnit": "MICROSECONDS"
},
"query/brokerCpuTime": {
"dimensions": [
"dataSource",
"type"
],
"type": "timer",
"timeUnit": "MICROSECONDS"
},
"query/cache/delta/numEntries": {
"dimensions": [],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
"dataSource",
"type"
],
"query/brokerCpuTime": [
"dataSource",
"type"
],
"query/node/time": [
"dataSource",
"type"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
"dataSource",
"type"
],
"query/brokerCpuTime": [
"dataSource",
"type"
],
"jetty/numOpenConnections": [],
"jetty/threadPool/total": [],
"jetty/threadPool/idle": [],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"query/time" : { "dimensions" : ["dataSource", "type"], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds taken to complete a query."},
"query/planningTime" : { "dimensions" : ["dataSource", "type"], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds taken to complete query planning at Broker before query is fanned out to the Data nodes."},
"query/planningTime" : { "dimensions" : ["dataSource", "type"], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds taken to complete query planning at Broker before query is fanned out to the Data nodes."},
"query/bytes" : { "dimensions" : ["dataSource", "type"], "type" : "count", "help": "Number of bytes returned in query response."},
"query/node/time" : { "dimensions" : ["server"], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds taken to query individual historical/realtime processes."},
"query/node/ttfb" : { "dimensions" : ["server"], "type" : "timer", "conversionFactor": 1000.0, "help": "Time to first byte. Seconds elapsed until Broker starts receiving the response from individual historical/realtime processes."},
Expand All @@ -14,6 +14,7 @@
"segment/scan/active" : { "dimensions" : [], "type" : "gauge", "help": "Number of segments currently scanned."},
"query/segmentAndCache/time" : { "dimensions" : [], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds taken to query individual segment or hit the cache (if it is enabled on the Historical process)."},
"query/cpu/time" : { "dimensions" : ["dataSource", "type"], "type" : "timer", "conversionFactor": "1000000", "help": "Seconds of CPU time taken to complete a query"},
"query/brokerCpuTime" : { "dimensions" : ["dataSource", "type"], "type" : "timer", "conversionFactor": "1000000", "help": "Seconds of CPU time taken to complete a query"},

"query/count" : { "dimensions" : [], "type" : "count", "help": "Number of total queries" },
"query/success/count" : { "dimensions" : [], "type" : "count", "help": "Number of queries successfully processed"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"segment/scan/active" : { "dimensions" : [], "type" : "gauge"},
"query/segmentAndCache/time" : { "dimensions" : [], "type" : "timer" },
"query/cpu/time" : { "dimensions" : ["dataSource", "type"], "type" : "timer" },
"query/brokerCpuTime" : { "dimensions" : ["dataSource", "type"], "type" : "timer" },

"query/count" : { "dimensions" : [], "type" : "count" },
"query/success/count" : { "dimensions" : [], "type" : "count" },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public <RetType> RetType wrap(Supplier<RetType> sequenceProcessing)
long cpuTimeDelta = JvmUtils.getCurrentThreadCpuTime() - start;
cpuTimeAccumulator.addAndGet(cpuTimeDelta);
responseContext.addCpuNanos(cpuTimeDelta);
responseContext.addBrokerCpuNanos(cpuTimeAccumulator.get());
}
}

Expand All @@ -93,6 +94,10 @@ public void after(boolean isDone, Throwable thrown)
if (cpuTimeNs > 0) {
queryWithMetrics.getQueryMetrics().reportCpuTime(cpuTimeNs).emit(emitter);
}
final long brokerCpuTimeNs = responseContext.getBrokerCpuNanos();
if (brokerCpuTimeNs > 0) {
queryWithMetrics.getQueryMetrics().reportBrokerCpuTime(brokerCpuTimeNs).emit(emitter);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,12 @@ public QueryMetrics<QueryType> reportCpuTime(long timeNs)
return reportMetric("query/cpu/time", TimeUnit.NANOSECONDS.toMicros(timeNs));
}

@Override
public QueryMetrics<QueryType> reportBrokerCpuTime(long timeNs)
{
return reportMetric("query/brokerCpuTime", TimeUnit.NANOSECONDS.toMicros(timeNs));
}

@Override
public QueryMetrics<QueryType> reportRowsScannedCount(long rowsScannedCount)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,15 @@ default void filterBundle(FilterBundle.BundleInfo bundleInfo)
*/
QueryMetrics<QueryType> reportCpuTime(long timeNs);

/**
* Registers "broker cpu time" metric.
* Measures the total time spent in broker for answering the query. This includes the following:-
* 1. Determining segments for the given time interval.
* 2. Determining the Data nodes responsible for the segments.
* 3. Merging the results from the Data nodes.
*/
QueryMetrics<QueryType> reportBrokerCpuTime(long timeNs);

/**
* Registers "rows scanned count" metric.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,15 @@ public Object mergeValues(Object oldValue, Object newValue)
false
);

/**
* The total CPU time consumed by the broker for processing the query.
* This is equal to the time spent in planning the query, finding data-nodes and merging sub-results.
* BROKER_CPU_CONSUMED_NANOS = CPU_CONSUMED_NANOS - Time spent by Query in historical/ realtime nodes.
*/
public static final Key BROKER_CPU_CONSUMED_NANOS = new CounterKey(
"brokerCpuConsumed",
false);

/**
* Indicates if a {@link ResponseContext} was truncated during serialization.
*/
Expand Down Expand Up @@ -496,6 +505,7 @@ public Object mergeValues(Object oldValue, Object newValue)
TIMEOUT_AT,
NUM_SCANNED_ROWS,
CPU_CONSUMED_NANOS,
BROKER_CPU_CONSUMED_NANOS,
TRUNCATED,
QUERY_SEGMENT_COUNT,
});
Expand Down Expand Up @@ -706,6 +716,11 @@ public Long getCpuNanos()
return (Long) get(Keys.CPU_CONSUMED_NANOS);
}

public Long getBrokerCpuNanos()
{
return (Long) get(Keys.BROKER_CPU_CONSUMED_NANOS);
}

public Long getQuerySegmentCount()
{
return (Long) get(Keys.QUERY_SEGMENT_COUNT);
Expand Down Expand Up @@ -757,6 +772,11 @@ public void addCpuNanos(long ns)
addValue(Keys.CPU_CONSUMED_NANOS, ns);
}

public void addBrokerCpuNanos(long ns)
{
addValue(Keys.BROKER_CPU_CONSUMED_NANOS, ns);
}

private Object addValue(Key key, Object value)
{
return getDelegate().merge(key, value, key::mergeValues);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,12 @@ public QueryMetrics reportCpuTime(long timeNs)
return delegateQueryMetrics.reportCpuTime(timeNs);
}

@Override
public QueryMetrics reportBrokerCpuTime(long timeNs)
{
return delegateQueryMetrics.reportBrokerCpuTime(timeNs);
}

@Override
public QueryMetrics reportRowsScannedCount(long rowsScannedCount)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.druid.query;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
Expand All @@ -33,6 +32,7 @@
import org.junit.Test;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -69,15 +69,17 @@ public void testCpuTimeMetric()
);

Assert.assertEquals(expectedResults, results.toList());
Assert.assertEquals(1, emitter.getEvents().size());
Assert.assertEquals(2, emitter.getEvents().size());
HashSet<String> expectedMetrics = new HashSet<>();
expectedMetrics.add("query/cpu/time");
expectedMetrics.add("query/brokerCpuTime");

final Event event = Iterables.getOnlyElement(emitter.getEvents());

Assert.assertEquals("metrics", event.toMap().get("feed"));
Assert.assertEquals("query/cpu/time", event.toMap().get("metric"));

final Object value = event.toMap().get("value");
Assert.assertThat(value, CoreMatchers.instanceOf(Long.class));
Assert.assertTrue((long) value > 0);
for (Event event : emitter.getEvents()) {
Assert.assertEquals("metrics", event.toMap().get("feed"));
Assert.assertTrue(expectedMetrics.contains(event.toMap().get("metric")));
final Object value = event.toMap().get("value");
Assert.assertThat(value, CoreMatchers.instanceOf(Long.class));
Assert.assertTrue((long) value > 0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
* <li>Initialization ({@link #initialize(Query)})</li>
* <li>Authorization ({@link #authorize(HttpServletRequest)}</li>
* <li>Execution ({@link #execute()}</li>
* <li>Logging ({@link #emitLogsAndMetrics(Throwable, String, long, long, long)}</li>
* <li>Logging ({@link #emitLogsAndMetrics(Throwable, String, long, long, long, long)}</li>
* <li>Logging ({@link #emitLogsAndMetrics(Throwable, String)}</li>
* <li>Logging ({@link #emitLogsAndMetrics(Throwable)}</li>
* </ol>
Expand Down Expand Up @@ -302,7 +302,7 @@ private Access doAuthorize(final AuthenticationResult authenticationResult, fina
/**
* Execute the query. Can only be called if the query has been authorized. Note that query logs and metrics will
* not be emitted automatically when the Sequence is fully iterated. It is the caller's responsibility to call
* {@link #emitLogsAndMetrics(Throwable, String, long, long, long)} to emit logs and metrics.
* {@link #emitLogsAndMetrics(Throwable, String, long, long, long, long)} to emit logs and metrics.
*
* @return result sequence and response context
*/
Expand Down Expand Up @@ -332,7 +332,7 @@ public void emitLogsAndMetrics(
@Nullable final String remoteAddress
)
{
this.emitLogsAndMetrics(e, remoteAddress, -1, -1, -1);
this.emitLogsAndMetrics(e, remoteAddress, -1, -1, -1, -1);
}

/**
Expand All @@ -349,7 +349,8 @@ public void emitLogsAndMetrics(
@Nullable final String remoteAddress,
final long bytesWritten,
final long rowsScanned,
final long cpuConsumedMillis
final long cpuConsumedMillis,
final long brokerCpuConsumedMillis
)
{
if (baseQuery == null) {
Expand Down Expand Up @@ -395,6 +396,7 @@ public void emitLogsAndMetrics(
statsMap.put("query/bytes", bytesWritten);
statsMap.put("query/rowsScanned", rowsScanned);
statsMap.put("query/cpu/time", cpuConsumedMillis);
statsMap.put("query/brokerCpuTime", brokerCpuConsumedMillis);
statsMap.put("success", success);

if (authenticationResult != null) {
Expand Down Expand Up @@ -460,7 +462,7 @@ private void emitLogsAndMetrics(
@Nullable final Throwable e
)
{
this.emitLogsAndMetrics(e, null, -1, -1, -1);
this.emitLogsAndMetrics(e, null, -1, -1, -1, -1);
}

private boolean isSerializeDateTimeAsLong()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ public Response.ResponseBuilder start()
final String prevEtag = getPreviousEtag(req);

if (prevEtag != null && prevEtag.equals(responseContext.getEntityTag())) {
queryLifecycle.emitLogsAndMetrics(null, req.getRemoteAddr(), -1, -1, -1);
queryLifecycle.emitLogsAndMetrics(null, req.getRemoteAddr(), -1, -1, -1, -1);
counter.incrementSuccess();
return Response.status(Status.NOT_MODIFIED);
}
Expand Down Expand Up @@ -576,15 +576,15 @@ public void recordSuccess(long numBytes)
}

@Override
public void recordSuccess(long numBytes, long numRowsScanned, long cpuTimeInMillis)
public void recordSuccess(long numBytes, long numRowsScanned, long cpuTimeInMillis, long brokerCpuTimeMillis)
{
queryLifecycle.emitLogsAndMetrics(null, req.getRemoteAddr(), numBytes, numRowsScanned, cpuTimeInMillis);
queryLifecycle.emitLogsAndMetrics(null, req.getRemoteAddr(), numBytes, numRowsScanned, cpuTimeInMillis, brokerCpuTimeMillis);
}

@Override
public void recordFailure(Exception e)
{
queryLifecycle.emitLogsAndMetrics(e, req.getRemoteAddr(), -1, -1, -1);
queryLifecycle.emitLogsAndMetrics(e, req.getRemoteAddr(), -1, -1, -1, -1);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public Response push()

counter.incrementSuccess();
accumulator.close();
resultsWriter.recordSuccess(accumulator.getNumBytesSent(), accumulator.rowsScanned, accumulator.cpuConsumedMillis);
resultsWriter.recordSuccess(accumulator.getNumBytesSent(), accumulator.rowsScanned, accumulator.cpuConsumedMillis, accumulator.brokerCpuTime);
}
catch (DruidException e) {
// Less than ideal. But, if we return the result as JSON, this is
Expand Down Expand Up @@ -308,7 +308,7 @@ public interface ResultsWriter extends Closeable

void recordSuccess(long numBytes);

void recordSuccess(long numBytes, long numRowsScanned, long cpuTimeInMillis);
void recordSuccess(long numBytes, long numRowsScanned, long cpuTimeInMillis, long brokerCpuTimeMillis);

void recordFailure(Exception e);
}
Expand Down Expand Up @@ -347,6 +347,7 @@ public class StreamingHttpResponseAccumulator implements Accumulator<Response, O
private Long cpuConsumedMillis;
private Long querySegmentCount;
private Long brokerQueryTime;
private Long brokerCpuTime;

public StreamingHttpResponseAccumulator(
ResponseContext responseContext,
Expand Down Expand Up @@ -430,6 +431,7 @@ public void initialize()
cpuConsumedMillis = TimeUnit.NANOSECONDS.toMillis(responseContext.getValueOrDefaultZero(ResponseContext::getCpuNanos));
querySegmentCount = responseContext.getValueOrDefaultZero(ResponseContext::getQuerySegmentCount);
brokerQueryTime = TimeUnit.NANOSECONDS.toMillis(Objects.nonNull(startTime) ? System.nanoTime() - (Long) startTime : -1L);
brokerCpuTime = TimeUnit.NANOSECONDS.toMillis(responseContext.getValueOrDefaultZero(ResponseContext::getBrokerCpuNanos));

response.setHeader(QueryResource.NUM_SCANNED_ROWS, String.valueOf(rowsScanned));
// Emit Cpu time as a response header. Note that it doesn't include Cpu spent on serializing the response.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ public QueryLifecycle factorize()
System.nanoTime())
{
@Override
public void emitLogsAndMetrics(@Nullable Throwable e, @Nullable String remoteAddress, long bytesWritten, long rowsScanned, long cpuConsumedMillis)
public void emitLogsAndMetrics(@Nullable Throwable e, @Nullable String remoteAddress, long bytesWritten, long rowsScanned, long cpuConsumedMillis, long brokerConsumedMillis)
{
Assert.assertTrue(Throwables.getStackTraceAsString(e).contains(embeddedExceptionMessage));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void recordSuccess(long numBytes)
}

@Override
public void recordSuccess(long numBytes, long numRowsScanned, long cpuTimeInMillis)
public void recordSuccess(long numBytes, long numRowsScanned, long cpuTimeInMillis, long brokerCpuTimeInMillis)
{

}
Expand Down
Loading