Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1195,17 +1195,7 @@ private Collection<List<Function>> findMetricFunctions(Multimap<String, List<Fun
}

for (String metricNameEntry : metricFunctions.keySet()) {

String metricRegEx;
//Special case handling for metric name with * and __%.
//For example, dfs.NNTopUserOpCounts.windowMs=300000.op=*.user=%.count
// or dfs.NNTopUserOpCounts.windowMs=300000.op=__%.user=%.count
if (metricNameEntry.contains("*") || metricNameEntry.contains("__%")) {
String metricNameWithEscSeq = metricNameEntry.replace("*", "\\*").replace("__%", "..%");
metricRegEx = metricNameWithEscSeq.replace("%", ".*");
} else {
metricRegEx = metricNameEntry.replace("%", ".*");
}
String metricRegEx = getJavaRegexFromSqlRegex(metricNameEntry);
if (metricName.matches(metricRegEx)) {
return metricFunctions.get(metricNameEntry);
}
Expand All @@ -1214,6 +1204,20 @@ private Collection<List<Function>> findMetricFunctions(Multimap<String, List<Fun
return null;
}

public String getJavaRegexFromSqlRegex(String sqlRegex) {
String javaRegEx;
if (sqlRegex.contains("*") || sqlRegex.contains("__%")) {
//Special case handling for metric name with * and __%.
//For example, dfs.NNTopUserOpCounts.windowMs=300000.op=*.user=%.count
// or dfs.NNTopUserOpCounts.windowMs=300000.op=__%.user=%.count
String metricNameWithEscSeq = sqlRegex.replace("*", "\\*").replace("__%", "..%");
javaRegEx = metricNameWithEscSeq.replace("%", ".*");
} else {
javaRegEx = sqlRegex.replace("%", ".*");
}
return javaRegEx;
}

public void saveHostAggregateRecords(Map<TimelineMetric, MetricHostAggregate> hostAggregateMap,
String phoenixTableName) throws SQLException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,9 @@ public class TimelineMetricConfiguration {
public static final String TIMELINE_METRICS_SUPPORT_MULTIPLE_CLUSTERS =
"timeline.metrics.support.multiple.clusters";

public static final String TIMELINE_METRICS_EVENT_METRIC_PATTERNS =
"timeline.metrics.downsampler.event.metric.patterns";

public static final String HOST_APP_ID = "HOST";

public static final String DEFAULT_INSTANCE_PORT = "12001";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,11 +429,11 @@ private void runDownSamplerQuery(Connection conn, Condition condition) {
try {
stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);

LOG.debug("Downsampler Query issued...");
LOG.debug("Downsampler Query issued : " + condition.getStatement());
if (condition.doUpdate()) {
int rows = stmt.executeUpdate();
conn.commit();
LOG.info(rows + " row(s) updated in downsampling.");
LOG.debug(rows + " row(s) updated in downsampling.");
} else {
rs = stmt.executeQuery();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.hadoop.conf.Configuration;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -38,7 +37,8 @@ public class DownSamplerUtils {

public static final String downSamplerConfigPrefix = "timeline.metrics.downsampler.";
public static final String downSamplerMetricPatternsConfig = "metric.patterns";
public static final String topNDownSampler = "topn";
public static final String topNDownSamplerKey = "topn";
public static final String eventDownSamplerKey = "event";
private static final Log LOG = LogFactory.getLog(DownSamplerUtils.class);


Expand Down Expand Up @@ -108,10 +108,14 @@ public static CustomDownSampler getDownSamplerByType(String type, Map<String, St
return null;
}

if (StringUtils.isNotEmpty(type) && type.equalsIgnoreCase(topNDownSampler)) {
if (StringUtils.isNotEmpty(type) && type.equalsIgnoreCase(topNDownSamplerKey)) {
return TopNDownSampler.fromConfig(conf);
}

if (StringUtils.isNotEmpty(type) && type.equalsIgnoreCase(eventDownSamplerKey)) {
return EventMetricDownSampler.fromConfig(conf);
}

LOG.warn("Unknown downsampler requested : " + type);
return null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.EVENT_DOWNSAMPLER_CLUSTER_METRIC_SELECT_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.EVENT_DOWNSAMPLER_HOST_METRIC_SELECT_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;

public class EventMetricDownSampler implements CustomDownSampler{

private String metricPatterns = "";
private static final Log LOG = LogFactory.getLog(EventMetricDownSampler.class);

public static EventMetricDownSampler fromConfig(Map<String, String> conf) {
String metricPatterns = conf.get(DownSamplerUtils.downSamplerConfigPrefix + DownSamplerUtils.eventDownSamplerKey + "." +
DownSamplerUtils.downSamplerMetricPatternsConfig);

return new EventMetricDownSampler(metricPatterns);
}

public EventMetricDownSampler(String metricPatterns) {
this.metricPatterns = metricPatterns;
}

@Override
public boolean validateConfigs() {
return true;
}

@Override
public List<String> prepareDownSamplingStatement(Long startTime, Long endTime, String tableName) {
List<String> stmts = new ArrayList<>();
List<String> metricPatternList = Arrays.asList(metricPatterns.split(","));

String aggregateColumnName = "METRIC_COUNT";

if (tableName.equals(METRICS_CLUSTER_AGGREGATE_TABLE_NAME)) {
aggregateColumnName = "HOSTS_COUNT";
}

for (String metricPattern : metricPatternList) {
String metricPatternClause = "'" + metricPattern + "'";
if (tableName.contains("RECORD")) {
stmts.add(String.format(EVENT_DOWNSAMPLER_HOST_METRIC_SELECT_SQL,
endTime, tableName, metricPatternClause,
startTime, endTime));
} else {
stmts.add(String.format(EVENT_DOWNSAMPLER_CLUSTER_METRIC_SELECT_SQL,
endTime, aggregateColumnName, tableName, metricPatternClause,
startTime, endTime));
}
}

if (LOG.isDebugEnabled()) {
LOG.debug("Downsampling Stmt: " + stmts.toString());
}
return stmts;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.SERVER_SIDE_TIMESIFT_ADJUSTMENT;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_EVENT_METRIC_PATTERNS;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
Expand All @@ -31,8 +32,12 @@
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
Expand Down Expand Up @@ -63,7 +68,8 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
private final boolean interpolationEnabled;
private TimelineMetricMetadataManager metadataManagerInstance;
private String skipAggrPatternStrings;

private String skipInterpolationMetricPatternStrings;
private Set<Pattern> skipInterpolationMetricPatterns = new HashSet<>();

public TimelineMetricClusterAggregatorSecond(AggregationTaskRunner.AGGREGATOR_NAME aggregatorName,
TimelineMetricMetadataManager metadataManager,
Expand All @@ -88,6 +94,15 @@ public TimelineMetricClusterAggregatorSecond(AggregationTaskRunner.AGGREGATOR_NA
this.serverTimeShiftAdjustment = Long.parseLong(metricsConf.get(SERVER_SIDE_TIMESIFT_ADJUSTMENT, "90000"));
this.interpolationEnabled = Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED, "true"));
this.skipAggrPatternStrings = metricsConf.get(TIMELINE_METRIC_AGGREGATION_SQL_FILTERS);
this.skipInterpolationMetricPatternStrings = metricsConf.get(TIMELINE_METRICS_EVENT_METRIC_PATTERNS, "");

if (StringUtils.isNotEmpty(skipInterpolationMetricPatternStrings)) {
for (String patternString : skipInterpolationMetricPatternStrings.split(",")) {
String javaPatternString = hBaseAccessor.getJavaRegexFromSqlRegex(patternString);
LOG.info("SQL pattern " + patternString + " converted to Java pattern : " + javaPatternString);
skipInterpolationMetricPatterns.add(Pattern.compile(javaPatternString));
}
}
}

@Override
Expand Down Expand Up @@ -326,6 +341,14 @@ private void interpolateMissingPeriods(Map<TimelineClusterMetric, Double> timeli
Map<Long, Double> timeSliceValueMap) {


for (Pattern pattern : skipInterpolationMetricPatterns) {
Matcher m = pattern.matcher(timelineMetric.getMetricName());
if (m.matches()) {
LOG.debug("Skipping interpolation for " + timelineMetric.getMetricName());
return;
}
}

if (StringUtils.isNotEmpty(timelineMetric.getType()) && "COUNTER".equalsIgnoreCase(timelineMetric.getType())) {
//For Counter Based metrics, ok to do interpolation and extrapolation

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public SingleValuedTimelineMetric getAggregatedTimelineMetricFromResultSet(Resul
value = rs.getDouble("METRIC_MAX");
break;
case SUM:
value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT");
value = rs.getDouble("METRIC_SUM");
break;
default:
value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class TopNDownSampler implements CustomDownSampler {
protected String metricPatterns;

public static TopNDownSampler fromConfig(Map<String, String> conf) {
String metricPatterns = conf.get(DownSamplerUtils.downSamplerConfigPrefix + "topn." +
String metricPatterns = conf.get(DownSamplerUtils.downSamplerConfigPrefix + DownSamplerUtils.topNDownSamplerKey + "." +
DownSamplerUtils.downSamplerMetricPatternsConfig);

String topNString = conf.get(DownSamplerUtils.downSamplerConfigPrefix + "topn.value");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ public class PhoenixTransactSQL {
"INTO %s (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, UNITS, " +
"METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) " +
"SELECT METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, %s AS SERVER_TIME, UNITS, " +
"SUM(METRIC_SUM), SUM(METRIC_COUNT), MAX(METRIC_MAX), MIN(METRIC_MIN) " +
"ROUND(SUM(METRIC_SUM)/SUM(METRIC_COUNT),2), SUM(METRIC_COUNT), MAX(METRIC_MAX), MIN(METRIC_MIN) " +
"FROM %s WHERE%s SERVER_TIME > %s AND SERVER_TIME <= %s " +
"GROUP BY METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, UNITS";

Expand Down Expand Up @@ -364,6 +364,18 @@ public class PhoenixTransactSQL {
" %s AS SERVER_TIME, UNITS, %s, 1, %s, %s FROM %s WHERE METRIC_NAME LIKE %s AND SERVER_TIME > %s AND SERVER_TIME <= %s " +
"GROUP BY METRIC_NAME, APP_ID, INSTANCE_ID, UNITS ORDER BY %s DESC LIMIT %s";

/**
* Event based downsampler SELECT query.
*/
public static final String EVENT_DOWNSAMPLER_HOST_METRIC_SELECT_SQL = "SELECT METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
"%s AS SERVER_TIME, UNITS, SUM(METRIC_SUM), SUM(METRIC_COUNT), MAX(METRIC_MAX), MIN(METRIC_MIN) " +
"FROM %s WHERE METRIC_NAME LIKE %s AND SERVER_TIME > %s AND SERVER_TIME <= %s GROUP BY METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, UNITS";

public static final String EVENT_DOWNSAMPLER_CLUSTER_METRIC_SELECT_SQL = "SELECT METRIC_NAME, APP_ID, " +
"INSTANCE_ID, %s AS SERVER_TIME, UNITS, SUM(METRIC_SUM), SUM(%s), " +
"MAX(METRIC_MAX), MIN(METRIC_MIN) FROM %s WHERE METRIC_NAME LIKE %s AND SERVER_TIME > %s AND " +
"SERVER_TIME <= %s GROUP BY METRIC_NAME, APP_ID, INSTANCE_ID, UNITS";

public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD";

public static final String CONTAINER_METRICS_TABLE_NAME = "CONTAINER_METRICS";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,24 @@ public void testPrepareTopNDownSamplingStatement() throws Exception {
"METRIC_NAME LIKE 'pattern1' AND SERVER_TIME > 14000000 AND SERVER_TIME <= 14100000 " +
"GROUP BY METRIC_NAME, APP_ID, INSTANCE_ID, UNITS ORDER BY SUM(METRIC_SUM) DESC LIMIT 4"));
}

@Test
public void testPrepareEventDownSamplingStatement() throws Exception {
Configuration configuration = new Configuration();
configuration.setIfUnset("timeline.metrics.downsampler.event.metric.patterns", "pattern1,pattern2");

Map<String, String> conf = configuration.getValByRegex(DownSamplerUtils.downSamplerConfigPrefix);

EventMetricDownSampler eventMetricDownSampler = EventMetricDownSampler.fromConfig(conf);
List<String> stmts = eventMetricDownSampler.prepareDownSamplingStatement(14000000l, 14100000l, "METRIC_RECORD");
Assert.assertEquals(stmts.size(),2);

Assert.assertTrue(stmts.get(0).equals("SELECT METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, 14100000 AS SERVER_TIME, " +
"UNITS, SUM(METRIC_SUM), SUM(METRIC_COUNT), MAX(METRIC_MAX), MIN(METRIC_MIN) FROM METRIC_RECORD WHERE METRIC_NAME " +
"LIKE 'pattern1' AND SERVER_TIME > 14000000 AND SERVER_TIME <= 14100000 GROUP BY METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, UNITS"));

Assert.assertTrue(stmts.get(1).equals("SELECT METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, 14100000 AS SERVER_TIME, " +
"UNITS, SUM(METRIC_SUM), SUM(METRIC_COUNT), MAX(METRIC_MAX), MIN(METRIC_MIN) FROM METRIC_RECORD WHERE METRIC_NAME " +
"LIKE 'pattern2' AND SERVER_TIME > 14000000 AND SERVER_TIME <= 14100000 GROUP BY METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, UNITS"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -720,4 +720,16 @@
<value>{{cluster_zookeeper_clientPort}}</value>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>timeline.metrics.downsampler.event.metric.patterns</name>
<value></value>
<description>
Commas separated list of metric name regular expressions that are like events. No interpolation will be done for such
metrics, and the downsampling SUM aggregators will sum the values across time instead of averaging them out.
</description>
<value-attributes>
<empty-value-valid>true</empty-value-valid>
</value-attributes>
<on-ambari-upgrade add="true"/>
</property>
</configuration>