diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java index 12c78c347e43..fb595b02713f 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -1880,14 +1880,9 @@ private List columnStatisticsObjForPartitions( return Batchable.runBatched(batchSize, colNames, new Batchable() { @Override public List run(final List inputColNames) throws MetaException { - return Batchable.runBatched(batchSize, partNames, new Batchable() { - @Override - public List run(List inputPartNames) throws MetaException { - return columnStatisticsObjForPartitionsBatch(catName, dbName, tableName, inputPartNames, - inputColNames, engine, areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner, - enableBitVector, enableKll); - } - }); + /** Should be called with the list short enough to not trip up Oracle/etc. */ + return aggrStatsUseJava(catName, dbName, tableName, partNames, inputColNames, engine, areAllPartsFound, + useDensityFunctionForNDVEstimation, ndvTuner, enableBitVector, enableKll); } }); } @@ -1926,21 +1921,6 @@ public List getColStatsForAllTablePartitions(String c return colStatsForDB; } - /** Should be called with the list short enough to not trip up Oracle/etc. */ - private List columnStatisticsObjForPartitionsBatch(String catName, String dbName, - String tableName, List partNames, List colNames, String engine, - boolean areAllPartsFound, boolean useDensityFunctionForNDVEstimation, double ndvTuner, - boolean enableBitVector, boolean enableKll) - throws MetaException { - if (enableBitVector || enableKll) { - return aggrStatsUseJava(catName, dbName, tableName, partNames, colNames, engine, areAllPartsFound, - useDensityFunctionForNDVEstimation, ndvTuner, enableBitVector, enableKll); - } else { - return aggrStatsUseDB(catName, dbName, tableName, partNames, colNames, engine, areAllPartsFound, - useDensityFunctionForNDVEstimation, ndvTuner); - } - } - private List aggrStatsUseJava(String catName, String dbName, String tableName, List partNames, List colNames, String engine, boolean areAllPartsFound, boolean useDensityFunctionForNDVEstimation, double ndvTuner, boolean enableBitVector, @@ -1953,306 +1933,6 @@ private List aggrStatsUseJava(String catName, String dbName areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner); } - private List aggrStatsUseDB(String catName, String dbName, - String tableName, List partNames, List colNames, String engine, - boolean areAllPartsFound, boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException { - // TODO: all the extrapolation logic should be moved out of this class, - // only mechanical data retrieval should remain here. - String commonPrefix = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", " - + "min(\"LONG_LOW_VALUE\"), max(\"LONG_HIGH_VALUE\"), min(\"DOUBLE_LOW_VALUE\"), max(\"DOUBLE_HIGH_VALUE\"), " - + "min(cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal)), max(cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)), " - + "sum(\"NUM_NULLS\"), max(\"NUM_DISTINCTS\"), " - + "max(\"AVG_COL_LEN\"), max(\"MAX_COL_LEN\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), " - // The following data is used to compute a partitioned table's NDV based - // on partitions' NDV when useDensityFunctionForNDVEstimation = true. Global NDVs cannot be - // accurately derived from partition NDVs, because the domain of column value two partitions - // can overlap. If there is no overlap then global NDV is just the sum - // of partition NDVs (UpperBound). But if there is some overlay then - // global NDV can be anywhere between sum of partition NDVs (no overlap) - // and same as one of the partition NDV (domain of column value in all other - // partitions is subset of the domain value in one of the partition) - // (LowerBound).But under uniform distribution, we can roughly estimate the global - // NDV by leveraging the min/max values. - // And, we also guarantee that the estimation makes sense by comparing it to the - // UpperBound (calculated by "sum(\"NUM_DISTINCTS\")") - // and LowerBound (calculated by "max(\"NUM_DISTINCTS\")") - + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal))," - + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\")," - + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")," - + "sum(\"NUM_DISTINCTS\")" + " from " + PART_COL_STATS + "" - + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" - + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" - + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"" - + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "; - String queryText = null; - long start = 0; - long end = 0; - - boolean doTrace = LOG.isDebugEnabled(); - ForwardQueryResult fqr = null; - // Check if the status of all the columns of all the partitions exists - // Extrapolation is not needed. - if (areAllPartsFound) { - queryText = commonPrefix + " and \"COLUMN_NAME\" in (" + makeParams(colNames.size()) + ")" - + " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")" - + " and \"ENGINE\" = ? " - + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\""; - start = doTrace ? System.nanoTime() : 0; - try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) { - Object qResult = executeWithArray(query.getInnerQuery(), - prepareParams(catName, dbName, tableName, partNames, colNames, - engine), queryText); - if (qResult == null) { - return Collections.emptyList(); - } - end = doTrace ? System.nanoTime() : 0; - MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end); - List list = MetastoreDirectSqlUtils.ensureList(qResult); - List colStats = - new ArrayList(list.size()); - for (Object[] row : list) { - colStats.add(prepareCSObjWithAdjustedNDV(row, 0, - useDensityFunctionForNDVEstimation, ndvTuner)); - Deadline.checkTimeout(); - } - return colStats; - } - } else { - // Extrapolation is needed for some columns. - // In this case, at least a column status for a partition is missing. - // We need to extrapolate this partition based on the other partitions - List colStats = new ArrayList(colNames.size()); - queryText = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", count(\"PART_COL_STATS\".\"PART_ID\") " - + " from " + PART_COL_STATS - + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" - + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" - + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"" - + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? " - + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (" + makeParams(colNames.size()) + ")" - + " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")" - + " and " + PART_COL_STATS + ".\"ENGINE\" = ? " - + " group by " + PART_COL_STATS + ".\"COLUMN_NAME\", " + PART_COL_STATS + ".\"COLUMN_TYPE\""; - start = doTrace ? System.nanoTime() : 0; - List noExtraColumnNames = new ArrayList(); - Map extraColumnNameTypeParts = new HashMap(); - try(QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) { - Object qResult = executeWithArray(query.getInnerQuery(), - prepareParams(catName, dbName, tableName, partNames, colNames, - engine), queryText); - end = doTrace ? System.nanoTime() : 0; - MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end); - if (qResult == null) { - return Collections.emptyList(); - } - - List list = MetastoreDirectSqlUtils.ensureList(qResult); - for (Object[] row : list) { - String colName = (String) row[0]; - String colType = (String) row[1]; - // Extrapolation is not needed for this column if - // count(\"PARTITION_NAME\")==partNames.size() - // Or, extrapolation is not possible for this column if - // count(\"PARTITION_NAME\")<2 - Long count = MetastoreDirectSqlUtils.extractSqlLong(row[2]); - if (count == partNames.size() || count < 2) { - noExtraColumnNames.add(colName); - } else { - extraColumnNameTypeParts.put(colName, new String[] {colType, String.valueOf(count)}); - } - Deadline.checkTimeout(); - } - } - // Extrapolation is not needed for columns noExtraColumnNames - List list; - if (noExtraColumnNames.size() != 0) { - queryText = commonPrefix + " and \"COLUMN_NAME\" in (" - + makeParams(noExtraColumnNames.size()) + ")" + " and \"PARTITION_NAME\" in (" - + makeParams(partNames.size()) + ")" - + " and \"ENGINE\" = ? " - + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\""; - start = doTrace ? System.nanoTime() : 0; - - try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) { - Object qResult = executeWithArray(query.getInnerQuery(), - prepareParams(catName, dbName, tableName, partNames, noExtraColumnNames, engine), queryText); - if (qResult == null) { - return Collections.emptyList(); - } - list = MetastoreDirectSqlUtils.ensureList(qResult); - for (Object[] row : list) { - colStats.add(prepareCSObjWithAdjustedNDV(row, 0, - useDensityFunctionForNDVEstimation, ndvTuner)); - Deadline.checkTimeout(); - } - end = doTrace ? System.nanoTime() : 0; - MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end); - } - } - // Extrapolation is needed for extraColumnNames. - // give a sequence number for all the partitions - if (extraColumnNameTypeParts.size() != 0) { - Map indexMap = new HashMap(); - for (int index = 0; index < partNames.size(); index++) { - indexMap.put(partNames.get(index), index); - } - // get sum for all columns to reduce the number of queries - Map> sumMap = new HashMap>(); - queryText = "select \"COLUMN_NAME\", sum(\"NUM_NULLS\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), sum(\"NUM_DISTINCTS\")" - + " from " + PART_COL_STATS - + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" - + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" - + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"" - + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? " - + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (" + makeParams(extraColumnNameTypeParts.size()) + ")" - + " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")" - + " and " + PART_COL_STATS + ".\"ENGINE\" = ? " - + " group by " + PART_COL_STATS + ".\"COLUMN_NAME\""; - start = doTrace ? System.nanoTime() : 0; - try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) { - List extraColumnNames = new ArrayList(); - extraColumnNames.addAll(extraColumnNameTypeParts.keySet()); - Object qResult = executeWithArray(query.getInnerQuery(), - prepareParams(catName, dbName, tableName, partNames, - extraColumnNames, engine), queryText); - if (qResult == null) { - return Collections.emptyList(); - } - list = MetastoreDirectSqlUtils.ensureList(qResult); - // see the indexes for colstats in IExtrapolatePartStatus - Integer[] sumIndex = new Integer[] {6, 10, 11, 15}; - for (Object[] row : list) { - Map indexToObject = new HashMap(); - for (int ind = 1; ind < row.length; ind++) { - indexToObject.put(sumIndex[ind - 1], row[ind]); - } - // row[0] is the column name - sumMap.put((String) row[0], indexToObject); - Deadline.checkTimeout(); - } - end = doTrace ? System.nanoTime() : 0; - MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end); - } - for (Map.Entry entry : extraColumnNameTypeParts.entrySet()) { - Object[] row = new Object[IExtrapolatePartStatus.colStatNames.length + 2]; - String colName = entry.getKey(); - String colType = entry.getValue()[0]; - Long sumVal = Long.parseLong(entry.getValue()[1]); - // fill in colname - row[0] = colName; - // fill in coltype - row[1] = colType; - // use linear extrapolation. more complicated one can be added in the - // future. - IExtrapolatePartStatus extrapolateMethod = new LinearExtrapolatePartStatus(); - // fill in colstatus - Integer[] index = null; - boolean decimal = false; - if (colType.toLowerCase().startsWith("decimal")) { - index = IExtrapolatePartStatus.indexMaps.get("decimal"); - decimal = true; - } else { - index = IExtrapolatePartStatus.indexMaps.get(colType.toLowerCase()); - } - // if the colType is not the known type, long, double, etc, then get - // all index. - if (index == null) { - index = IExtrapolatePartStatus.indexMaps.get("default"); - } - for (int colStatIndex : index) { - String colStatName = IExtrapolatePartStatus.colStatNames[colStatIndex]; - // if the aggregation type is sum, we do a scale-up - if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Sum) { - Object o = sumMap.get(colName).get(colStatIndex); - if (o == null) { - row[2 + colStatIndex] = null; - } else { - Long val = MetastoreDirectSqlUtils.extractSqlLong(o); - row[2 + colStatIndex] = val / sumVal * (partNames.size()); - } - } else if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Min - || IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Max) { - // if the aggregation type is min/max, we extrapolate from the - // left/right borders - if (!decimal) { - queryText = "select \"" + colStatName + "\",\"PART_NAME\" from " + PART_COL_STATS - + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" - + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" - + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"" - + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? " - + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? " - + " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")" - + " and " + PART_COL_STATS + ".\"ENGINE\" = ? " - + " order by \"" + colStatName + "\""; - } else { - queryText = "select \"" + colStatName + "\",\"PART_NAME\" from " + PART_COL_STATS - + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" - + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" - + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"" - + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? " - + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? " - + " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")" - + " and " + PART_COL_STATS + ".\"ENGINE\" = ? " - + " order by cast(\"" + colStatName + "\" as decimal)"; - } - start = doTrace ? System.nanoTime() : 0; - try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) { - Object qResult = executeWithArray(query.getInnerQuery(), - prepareParams(catName, dbName, tableName, partNames, Arrays.asList(colName), engine), queryText); - if (qResult == null) { - return Collections.emptyList(); - } - fqr = (ForwardQueryResult) qResult; - Object[] min = (Object[]) (fqr.get(0)); - Object[] max = (Object[]) (fqr.get(fqr.size() - 1)); - end = doTrace ? System.nanoTime() : 0; - MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end); - if (min[0] == null || max[0] == null) { - row[2 + colStatIndex] = null; - } else { - row[2 + colStatIndex] = extrapolateMethod - .extrapolate(min, max, colStatIndex, indexMap); - } - } - } else { - // if the aggregation type is avg, we use the average on the existing ones. - queryText = "select " - + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal))," - + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\")," - + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")" - + " from " + PART_COL_STATS + "" - + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" - + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" - + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"" - + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? " - + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? " - + " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")" - + " and " + PART_COL_STATS + ".\"ENGINE\" = ? " - + " group by \"COLUMN_NAME\""; - start = doTrace ? System.nanoTime() : 0; - try(QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) { - Object qResult = executeWithArray(query.getInnerQuery(), - prepareParams(catName, dbName, tableName, partNames, Arrays.asList(colName), engine), queryText); - if (qResult == null) { - return Collections.emptyList(); - } - fqr = (ForwardQueryResult) qResult; - Object[] avg = (Object[]) (fqr.get(0)); - // colStatIndex=12,13,14 respond to "AVG_LONG", "AVG_DOUBLE", - // "AVG_DECIMAL" - row[2 + colStatIndex] = avg[colStatIndex - 12]; - end = doTrace ? System.nanoTime() : 0; - MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end); - } - } - } - colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation, ndvTuner)); - Deadline.checkTimeout(); - } - } - return colStats; - } - } - private ColumnStatisticsObj prepareCSObj (Object[] row, int i) throws MetaException { ColumnStatisticsData data = new ColumnStatisticsData(); ColumnStatisticsObj cso = new ColumnStatisticsObj((String)row[i++], (String)row[i++], data); @@ -2264,20 +1944,6 @@ private ColumnStatisticsObj prepareCSObj (Object[] row, int i) throws MetaExcept return cso; } - private ColumnStatisticsObj prepareCSObjWithAdjustedNDV(Object[] row, int i, - boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException { - ColumnStatisticsData data = new ColumnStatisticsData(); - ColumnStatisticsObj cso = new ColumnStatisticsObj((String) row[i++], (String) row[i++], data); - Object llow = row[i++], lhigh = row[i++], dlow = row[i++], dhigh = row[i++], declow = row[i++], - dechigh = row[i++], nulls = row[i++], dist = row[i++], avglen = row[i++], maxlen = row[i++], - trues = row[i++], falses = row[i++], avgLong = row[i++], avgDouble = row[i++], - avgDecimal = row[i++], sumDist = row[i++]; - StatObjectConverter.fillColumnStatisticsData(cso.getColType(), data, llow, lhigh, dlow, dhigh, - declow, dechigh, nulls, dist, avglen, maxlen, trues, falses, avgLong, avgDouble, - avgDecimal, sumDist, useDensityFunctionForNDVEstimation, ndvTuner); - return cso; - } - private Object[] prepareParams(String catName, String dbName, String tableName, List partNames, List colNames, String engine) throws MetaException { Object[] params = new Object[colNames.size() + partNames.size() + 4];