From 626b4cafce7d2dca186144336939d4d993b6f878 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 6 Apr 2017 19:24:03 -0700 Subject: [PATCH 1/8] [SPARK-19495][SQL] Make SQLConf slightly more extensible - addendum ## What changes were proposed in this pull request? This is a tiny addendum to SPARK-19495 to remove the private visibility for copy, which is the only package private method in the entire file. ## How was this patch tested? N/A - no semantic change. Author: Reynold Xin Closes #17555 from rxin/SPARK-19495-2. --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index e685c2bed50ae..640c0f189c237 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1153,7 +1153,7 @@ class SQLConf extends Serializable with Logging { } // For test only - private[spark] def copy(entries: (ConfigEntry[_], Any)*): SQLConf = { + def copy(entries: (ConfigEntry[_], Any)*): SQLConf = { val cloned = clone() entries.foreach { case (entry, value) => cloned.setConfString(entry.key, value.toString) From ad3cc1312db3b5667cea134940a09896a4609b74 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 7 Apr 2017 15:58:50 +0800 Subject: [PATCH 2/8] [SPARK-20245][SQL][MINOR] pass output to LogicalRelation directly ## What changes were proposed in this pull request? Currently `LogicalRelation` has a `expectedOutputAttributes` parameter, which makes it hard to reason about what the actual output is. Like other leaf nodes, `LogicalRelation` should also take `output` as a parameter, to simplify the logic ## How was this patch tested? existing tests Author: Wenchen Fan Closes #17552 from cloud-fan/minor. --- .../sql/catalyst/catalog/interface.scala | 8 ++-- .../datasources/DataSourceStrategy.scala | 15 +++---- .../datasources/LogicalRelation.scala | 39 +++++++------------ .../PruneFileSourcePartitions.scala | 4 +- .../spark/sql/sources/PathOptionSuite.scala | 19 ++++----- .../spark/sql/hive/HiveMetastoreCatalog.scala | 13 +++++-- .../spark/sql/hive/CachedTableSuite.scala | 4 +- .../PruneFileSourcePartitionsSuite.scala | 2 +- 8 files changed, 49 insertions(+), 55 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index dc2e40424fd5f..360e55d922821 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -27,7 +27,7 @@ import com.google.common.base.Objects import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Cast, Literal} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, Literal} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.catalyst.util.quoteIdentifier @@ -403,14 +403,14 @@ object CatalogTypes { */ case class CatalogRelation( tableMeta: CatalogTable, - dataCols: Seq[Attribute], - partitionCols: Seq[Attribute]) extends LeafNode with MultiInstanceRelation { + dataCols: Seq[AttributeReference], + partitionCols: Seq[AttributeReference]) extends LeafNode with MultiInstanceRelation { assert(tableMeta.identifier.database.isDefined) assert(tableMeta.partitionSchema.sameType(partitionCols.toStructType)) assert(tableMeta.dataSchema.sameType(dataCols.toStructType)) // The partition column should always appear after data columns. - override def output: Seq[Attribute] = dataCols ++ partitionCols + override def output: Seq[AttributeReference] = dataCols ++ partitionCols def isPartitioned: Boolean = partitionCols.nonEmpty diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index e5c7c383d708c..2d83d512e702d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -231,16 +231,17 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] options = table.storage.properties ++ pathOption, catalogTable = Some(table)) - LogicalRelation( - dataSource.resolveRelation(checkFilesExist = false), - catalogTable = Some(table)) + LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table) } }).asInstanceOf[LogicalRelation] - // It's possible that the table schema is empty and need to be inferred at runtime. We should - // not specify expected outputs for this case. - val expectedOutputs = if (r.output.isEmpty) None else Some(r.output) - plan.copy(expectedOutputAttributes = expectedOutputs) + if (r.output.isEmpty) { + // It's possible that the table schema is empty and need to be inferred at runtime. For this + // case, we don't need to change the output of the cached plan. + plan + } else { + plan.copy(output = r.output) + } } override def apply(plan: LogicalPlan): LogicalPlan = plan transform { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index 3b14b794fd08c..4215203960075 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference} +import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.BaseRelation @@ -26,31 +26,13 @@ import org.apache.spark.util.Utils /** * Used to link a [[BaseRelation]] in to a logical query plan. - * - * Note that sometimes we need to use `LogicalRelation` to replace an existing leaf node without - * changing the output attributes' IDs. The `expectedOutputAttributes` parameter is used for - * this purpose. See https://issues.apache.org/jira/browse/SPARK-10741 for more details. */ case class LogicalRelation( relation: BaseRelation, - expectedOutputAttributes: Option[Seq[Attribute]] = None, - catalogTable: Option[CatalogTable] = None) + output: Seq[AttributeReference], + catalogTable: Option[CatalogTable]) extends LeafNode with MultiInstanceRelation { - override val output: Seq[AttributeReference] = { - val attrs = relation.schema.toAttributes - expectedOutputAttributes.map { expectedAttrs => - assert(expectedAttrs.length == attrs.length) - attrs.zip(expectedAttrs).map { - // We should respect the attribute names provided by base relation and only use the - // exprId in `expectedOutputAttributes`. - // The reason is that, some relations(like parquet) will reconcile attribute names to - // workaround case insensitivity issue. - case (attr, expected) => attr.withExprId(expected.exprId) - } - }.getOrElse(attrs) - } - // Logical Relations are distinct if they have different output for the sake of transformations. override def equals(other: Any): Boolean = other match { case l @ LogicalRelation(otherRelation, _, _) => relation == otherRelation && output == l.output @@ -87,11 +69,8 @@ case class LogicalRelation( * unique expression ids. We respect the `expectedOutputAttributes` and create * new instances of attributes in it. */ - override def newInstance(): this.type = { - LogicalRelation( - relation, - expectedOutputAttributes.map(_.map(_.newInstance())), - catalogTable).asInstanceOf[this.type] + override def newInstance(): LogicalRelation = { + this.copy(output = output.map(_.newInstance())) } override def refresh(): Unit = relation match { @@ -101,3 +80,11 @@ case class LogicalRelation( override def simpleString: String = s"Relation[${Utils.truncatedString(output, ",")}] $relation" } + +object LogicalRelation { + def apply(relation: BaseRelation): LogicalRelation = + LogicalRelation(relation, relation.schema.toAttributes, None) + + def apply(relation: BaseRelation, table: CatalogTable): LogicalRelation = + LogicalRelation(relation, relation.schema.toAttributes, Some(table)) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 8566a8061034b..905b8683e10bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -59,9 +59,7 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq) val prunedFsRelation = fsRelation.copy(location = prunedFileIndex)(sparkSession) - val prunedLogicalRelation = logicalRelation.copy( - relation = prunedFsRelation, - expectedOutputAttributes = Some(logicalRelation.output)) + val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation) // Keep partition-pruning predicates so that they are visible in physical planning val filterExpression = filters.reduceLeft(And) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala index 60adee4599b0b..6dd4847ead738 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala @@ -75,13 +75,13 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext { |USING ${classOf[TestOptionsSource].getCanonicalName} |OPTIONS (PATH '/tmp/path') """.stripMargin) - assert(getPathOption("src") == Some("file:/tmp/path")) + assert(getPathOption("src").map(makeQualifiedPath) == Some(makeQualifiedPath("/tmp/path"))) } // should exist even path option is not specified when creating table withTable("src") { sql(s"CREATE TABLE src(i int) USING ${classOf[TestOptionsSource].getCanonicalName}") - assert(getPathOption("src") == Some(CatalogUtils.URIToString(defaultTablePath("src")))) + assert(getPathOption("src").map(makeQualifiedPath) == Some(defaultTablePath("src"))) } } @@ -95,9 +95,9 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext { |OPTIONS (PATH '$p') |AS SELECT 1 """.stripMargin) - assert(CatalogUtils.stringToURI( - spark.table("src").schema.head.metadata.getString("path")) == - makeQualifiedPath(p.getAbsolutePath)) + assert( + spark.table("src").schema.head.metadata.getString("path") == + p.getAbsolutePath) } } @@ -109,8 +109,9 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext { |USING ${classOf[TestOptionsSource].getCanonicalName} |AS SELECT 1 """.stripMargin) - assert(spark.table("src").schema.head.metadata.getString("path") == - CatalogUtils.URIToString(defaultTablePath("src"))) + assert( + makeQualifiedPath(spark.table("src").schema.head.metadata.getString("path")) == + defaultTablePath("src")) } } @@ -122,13 +123,13 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext { |USING ${classOf[TestOptionsSource].getCanonicalName} |OPTIONS (PATH '/tmp/path')""".stripMargin) sql("ALTER TABLE src SET LOCATION '/tmp/path2'") - assert(getPathOption("src") == Some("/tmp/path2")) + assert(getPathOption("src").map(makeQualifiedPath) == Some(makeQualifiedPath("/tmp/path2"))) } withTable("src", "src2") { sql(s"CREATE TABLE src(i int) USING ${classOf[TestOptionsSource].getCanonicalName}") sql("ALTER TABLE src RENAME TO src2") - assert(getPathOption("src2") == Some(CatalogUtils.URIToString(defaultTablePath("src2")))) + assert(getPathOption("src2").map(makeQualifiedPath) == Some(defaultTablePath("src2"))) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 10f432570e94b..6b98066cb76c8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -175,7 +175,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log bucketSpec = None, fileFormat = fileFormat, options = options)(sparkSession = sparkSession) - val created = LogicalRelation(fsRelation, catalogTable = Some(updatedTable)) + val created = LogicalRelation(fsRelation, updatedTable) tableRelationCache.put(tableIdentifier, created) created } @@ -203,7 +203,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log bucketSpec = None, options = options, className = fileType).resolveRelation(), - catalogTable = Some(updatedTable)) + table = updatedTable) tableRelationCache.put(tableIdentifier, created) created @@ -212,7 +212,14 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log logicalRelation }) } - result.copy(expectedOutputAttributes = Some(relation.output)) + // The inferred schema may have different filed names as the table schema, we should respect + // it, but also respect the exprId in table relation output. + assert(result.output.length == relation.output.length && + result.output.zip(relation.output).forall { case (a1, a2) => a1.dataType == a2.dataType }) + val newOutput = result.output.zip(relation.output).map { + case (a1, a2) => a1.withExprId(a2.exprId) + } + result.copy(output = newOutput) } private def inferIfNeeded( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index 2b3f36064c1f8..d3cbf898e2439 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -329,7 +329,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto fileFormat = new ParquetFileFormat(), options = Map.empty)(sparkSession = spark) - val plan = LogicalRelation(relation, catalogTable = Some(tableMeta)) + val plan = LogicalRelation(relation, tableMeta) spark.sharedState.cacheManager.cacheQuery(Dataset.ofRows(spark, plan)) assert(spark.sharedState.cacheManager.lookupCachedData(plan).isDefined) @@ -342,7 +342,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto bucketSpec = None, fileFormat = new ParquetFileFormat(), options = Map.empty)(sparkSession = spark) - val samePlan = LogicalRelation(sameRelation, catalogTable = Some(tableMeta)) + val samePlan = LogicalRelation(sameRelation, tableMeta) assert(spark.sharedState.cacheManager.lookupCachedData(samePlan).isDefined) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala index cd8f94b1cc4f0..f818e29555468 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala @@ -58,7 +58,7 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te fileFormat = new ParquetFileFormat(), options = Map.empty)(sparkSession = spark) - val logicalRelation = LogicalRelation(relation, catalogTable = Some(tableMeta)) + val logicalRelation = LogicalRelation(relation, tableMeta) val query = Project(Seq('i, 'p), Filter('p === 1, logicalRelation)).analyze val optimized = Optimize.execute(query) From 1a52a62377a87cec493c8c6711bfd44e779c7973 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 7 Apr 2017 11:00:10 +0200 Subject: [PATCH 3/8] [SPARK-20076][ML][PYSPARK] Add Python interface for ml.stats.Correlation ## What changes were proposed in this pull request? The Dataframes-based support for the correlation statistics is added in #17108. This patch adds the Python interface for it. ## How was this patch tested? Python unit test. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh Closes #17494 from viirya/correlation-python-api. --- .../apache/spark/ml/stat/Correlation.scala | 8 +-- python/pyspark/ml/stat.py | 61 +++++++++++++++++++ 2 files changed, 65 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/Correlation.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/Correlation.scala index d3c84b77d26ac..e185bc8a6faaa 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/stat/Correlation.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/stat/Correlation.scala @@ -38,7 +38,7 @@ object Correlation { /** * :: Experimental :: - * Compute the correlation matrix for the input RDD of Vectors using the specified method. + * Compute the correlation matrix for the input Dataset of Vectors using the specified method. * Methods currently supported: `pearson` (default), `spearman`. * * @param dataset A dataset or a dataframe @@ -56,14 +56,14 @@ object Correlation { * Here is how to access the correlation coefficient: * {{{ * val data: Dataset[Vector] = ... - * val Row(coeff: Matrix) = Statistics.corr(data, "value").head + * val Row(coeff: Matrix) = Correlation.corr(data, "value").head * // coeff now contains the Pearson correlation matrix. * }}} * * @note For Spearman, a rank correlation, we need to create an RDD[Double] for each column * and sort it in order to retrieve the ranks and then join the columns back into an RDD[Vector], - * which is fairly costly. Cache the input RDD before calling corr with `method = "spearman"` to - * avoid recomputing the common lineage. + * which is fairly costly. Cache the input Dataset before calling corr with `method = "spearman"` + * to avoid recomputing the common lineage. */ @Since("2.2.0") def corr(dataset: Dataset[_], column: String, method: String): DataFrame = { diff --git a/python/pyspark/ml/stat.py b/python/pyspark/ml/stat.py index db043ff68feca..079b0833e1c6d 100644 --- a/python/pyspark/ml/stat.py +++ b/python/pyspark/ml/stat.py @@ -71,6 +71,67 @@ def test(dataset, featuresCol, labelCol): return _java2py(sc, javaTestObj.test(*args)) +class Correlation(object): + """ + .. note:: Experimental + + Compute the correlation matrix for the input dataset of Vectors using the specified method. + Methods currently supported: `pearson` (default), `spearman`. + + .. note:: For Spearman, a rank correlation, we need to create an RDD[Double] for each column + and sort it in order to retrieve the ranks and then join the columns back into an RDD[Vector], + which is fairly costly. Cache the input Dataset before calling corr with `method = 'spearman'` + to avoid recomputing the common lineage. + + :param dataset: + A dataset or a dataframe. + :param column: + The name of the column of vectors for which the correlation coefficient needs + to be computed. This must be a column of the dataset, and it must contain + Vector objects. + :param method: + String specifying the method to use for computing correlation. + Supported: `pearson` (default), `spearman`. + :return: + A dataframe that contains the correlation matrix of the column of vectors. This + dataframe contains a single row and a single column of name + '$METHODNAME($COLUMN)'. + + >>> from pyspark.ml.linalg import Vectors + >>> from pyspark.ml.stat import Correlation + >>> dataset = [[Vectors.dense([1, 0, 0, -2])], + ... [Vectors.dense([4, 5, 0, 3])], + ... [Vectors.dense([6, 7, 0, 8])], + ... [Vectors.dense([9, 0, 0, 1])]] + >>> dataset = spark.createDataFrame(dataset, ['features']) + >>> pearsonCorr = Correlation.corr(dataset, 'features', 'pearson').collect()[0][0] + >>> print(str(pearsonCorr).replace('nan', 'NaN')) + DenseMatrix([[ 1. , 0.0556..., NaN, 0.4004...], + [ 0.0556..., 1. , NaN, 0.9135...], + [ NaN, NaN, 1. , NaN], + [ 0.4004..., 0.9135..., NaN, 1. ]]) + >>> spearmanCorr = Correlation.corr(dataset, 'features', method='spearman').collect()[0][0] + >>> print(str(spearmanCorr).replace('nan', 'NaN')) + DenseMatrix([[ 1. , 0.1054..., NaN, 0.4 ], + [ 0.1054..., 1. , NaN, 0.9486... ], + [ NaN, NaN, 1. , NaN], + [ 0.4 , 0.9486... , NaN, 1. ]]) + + .. versionadded:: 2.2.0 + + """ + @staticmethod + @since("2.2.0") + def corr(dataset, column, method="pearson"): + """ + Compute the correlation matrix with specified method using dataset. + """ + sc = SparkContext._active_spark_context + javaCorrObj = _jvm().org.apache.spark.ml.stat.Correlation + args = [_py2java(sc, arg) for arg in (dataset, column, method)] + return _java2py(sc, javaCorrObj.corr(*args)) + + if __name__ == "__main__": import doctest import pyspark.ml.stat From 9e0893b53d68f777c1f3fb0a67820424a9c253ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=83=AD=E5=B0=8F=E9=BE=99=2010207633?= Date: Fri, 7 Apr 2017 13:03:07 +0100 Subject: [PATCH 4/8] [SPARK-20218][DOC][APP-ID] applications//stages' in REST API,add description. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What changes were proposed in this pull request? 1. '/applications/[app-id]/stages' in rest api.status should add description '?status=[active|complete|pending|failed] list only stages in the state.' Now the lack of this description, resulting in the use of this api do not know the use of the status through the brush stage list. 2.'/applications/[app-id]/stages/[stage-id]' in REST API,remove redundant description ‘?status=[active|complete|pending|failed] list only stages in the state.’. Because only one stage is determined based on stage-id. code: GET def stageList(QueryParam("status") statuses: JList[StageStatus]): Seq[StageData] = { val listener = ui.jobProgressListener val stageAndStatus = AllStagesResource.stagesAndStatus(ui) val adjStatuses = { if (statuses.isEmpty()) { Arrays.asList(StageStatus.values(): _*) } else { statuses } }; ## How was this patch tested? manual tests Please review http://spark.apache.org/contributing.html before opening a pull request. Author: 郭小龙 10207633 Closes #17534 from guoxiaolongzte/SPARK-20218. --- docs/monitoring.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/monitoring.md b/docs/monitoring.md index 4d0617d253b80..da954385dc452 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -299,12 +299,12 @@ can be identified by their `[attempt-id]`. In the API listed below, when running /applications/[app-id]/stages A list of all stages for a given application. +
?status=[active|complete|pending|failed] list only stages in the state. /applications/[app-id]/stages/[stage-id] A list of all attempts for the given stage. -
?status=[active|complete|pending|failed] list only stages in the state. From 870b9d9aa00c260b532c78088e4a0384f7f1fa8a Mon Sep 17 00:00:00 2001 From: actuaryzhang Date: Fri, 7 Apr 2017 10:57:12 -0700 Subject: [PATCH 5/8] [SPARK-20026][DOC][SPARKR] Add Tweedie example for SparkR in programming guide ## What changes were proposed in this pull request? Add Tweedie example for SparkR in programming guide. The doc was already updated in #17103. Author: actuaryzhang Closes #17553 from actuaryzhang/programGuide. --- examples/src/main/r/ml/glm.R | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/examples/src/main/r/ml/glm.R b/examples/src/main/r/ml/glm.R index ee13910382c58..23141b57df143 100644 --- a/examples/src/main/r/ml/glm.R +++ b/examples/src/main/r/ml/glm.R @@ -56,6 +56,15 @@ summary(binomialGLM) # Prediction binomialPredictions <- predict(binomialGLM, binomialTestDF) head(binomialPredictions) + +# Fit a generalized linear model of family "tweedie" with spark.glm +training3 <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm") +tweedieDF <- transform(training3, label = training3$label * exp(randn(10))) +tweedieGLM <- spark.glm(tweedieDF, label ~ features, family = "tweedie", + var.power = 1.2, link.power = 0) + +# Model summary +summary(tweedieGLM) # $example off$ sparkR.session.stop() From 8feb799af0bb67618310947342e3e4d2a77aae13 Mon Sep 17 00:00:00 2001 From: Felix Cheung Date: Fri, 7 Apr 2017 11:17:49 -0700 Subject: [PATCH 6/8] [SPARK-20197][SPARKR] CRAN check fail with package installation ## What changes were proposed in this pull request? Test failed because SPARK_HOME is not set before Spark is installed. Author: Felix Cheung Closes #17516 from felixcheung/rdircheckincran. --- R/pkg/tests/run-all.R | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/R/pkg/tests/run-all.R b/R/pkg/tests/run-all.R index cefaadda6e215..29812f872c784 100644 --- a/R/pkg/tests/run-all.R +++ b/R/pkg/tests/run-all.R @@ -22,12 +22,13 @@ library(SparkR) options("warn" = 2) # Setup global test environment +# Install Spark first to set SPARK_HOME +install.spark() + sparkRDir <- file.path(Sys.getenv("SPARK_HOME"), "R") sparkRFilesBefore <- list.files(path = sparkRDir, all.files = TRUE) sparkRWhitelistSQLDirs <- c("spark-warehouse", "metastore_db") invisible(lapply(sparkRWhitelistSQLDirs, function(x) { unlink(file.path(sparkRDir, x), recursive = TRUE, force = TRUE)})) -install.spark() - test_package("SparkR") From 1ad73f0a21d8007d8466ef8756f751c0ab6a9d1f Mon Sep 17 00:00:00 2001 From: actuaryzhang Date: Fri, 7 Apr 2017 12:29:45 -0700 Subject: [PATCH 7/8] [SPARK-20258][DOC][SPARKR] Fix SparkR logistic regression example in programming guide (did not converge) ## What changes were proposed in this pull request? SparkR logistic regression example did not converge in programming guide (for IRWLS). All estimates are essentially zero: ``` training2 <- read.df("data/mllib/sample_binary_classification_data.txt", source = "libsvm") df_list2 <- randomSplit(training2, c(7,3), 2) binomialDF <- df_list2[[1]] binomialTestDF <- df_list2[[2]] binomialGLM <- spark.glm(binomialDF, label ~ features, family = "binomial") 17/04/07 11:42:03 WARN WeightedLeastSquares: Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver. > summary(binomialGLM) Coefficients: Estimate (Intercept) 9.0255e+00 features_0 0.0000e+00 features_1 0.0000e+00 features_2 0.0000e+00 features_3 0.0000e+00 features_4 0.0000e+00 features_5 0.0000e+00 features_6 0.0000e+00 features_7 0.0000e+00 ``` Author: actuaryzhang Closes #17571 from actuaryzhang/programGuide2. --- examples/src/main/r/ml/glm.R | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/examples/src/main/r/ml/glm.R b/examples/src/main/r/ml/glm.R index 23141b57df143..68787f9aa9dca 100644 --- a/examples/src/main/r/ml/glm.R +++ b/examples/src/main/r/ml/glm.R @@ -27,7 +27,7 @@ sparkR.session(appName = "SparkR-ML-glm-example") # $example on$ training <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm") # Fit a generalized linear model of family "gaussian" with spark.glm -df_list <- randomSplit(training, c(7,3), 2) +df_list <- randomSplit(training, c(7, 3), 2) gaussianDF <- df_list[[1]] gaussianTestDF <- df_list[[2]] gaussianGLM <- spark.glm(gaussianDF, label ~ features, family = "gaussian") @@ -44,8 +44,9 @@ gaussianGLM2 <- glm(label ~ features, gaussianDF, family = "gaussian") summary(gaussianGLM2) # Fit a generalized linear model of family "binomial" with spark.glm -training2 <- read.df("data/mllib/sample_binary_classification_data.txt", source = "libsvm") -df_list2 <- randomSplit(training2, c(7,3), 2) +training2 <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm") +training2 <- transform(training2, label = cast(training2$label > 1, "integer")) +df_list2 <- randomSplit(training2, c(7, 3), 2) binomialDF <- df_list2[[1]] binomialTestDF <- df_list2[[2]] binomialGLM <- spark.glm(binomialDF, label ~ features, family = "binomial") From 589f3edb82e970b6df9121861ed0c6b4a6d02cb6 Mon Sep 17 00:00:00 2001 From: Adrian Ionescu Date: Fri, 7 Apr 2017 14:00:23 -0700 Subject: [PATCH 8/8] [SPARK-20255] Move listLeafFiles() to InMemoryFileIndex ## What changes were proposed in this pull request Trying to get a grip on the `FileIndex` hierarchy, I was confused by the following inconsistency: On the one hand, `PartitioningAwareFileIndex` defines `leafFiles` and `leafDirToChildrenFiles` as abstract, but on the other it fully implements `listLeafFiles` which does all the listing of files. However, the latter is only used by `InMemoryFileIndex`. I'm hereby proposing to move this method (and all its dependencies) to the implementation class that actually uses it, and thus unclutter the `PartitioningAwareFileIndex` interface. ## How was this patch tested? `./build/sbt sql/test` Author: Adrian Ionescu Closes #17570 from adrian-ionescu/list-leaf-files. --- .../datasources/InMemoryFileIndex.scala | 226 ++++++++++++++++++ .../PartitioningAwareFileIndex.scala | 223 +---------------- .../datasources/FileIndexSuite.scala | 18 +- 3 files changed, 236 insertions(+), 231 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index ee4d0863d9771..11605dd280569 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -17,12 +17,19 @@ package org.apache.spark.sql.execution.datasources +import java.io.FileNotFoundException + import scala.collection.mutable +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ +import org.apache.hadoop.mapred.{FileInputFormat, JobConf} +import org.apache.spark.internal.Logging +import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StructType +import org.apache.spark.util.SerializableConfiguration /** @@ -84,4 +91,223 @@ class InMemoryFileIndex( } override def hashCode(): Int = rootPaths.toSet.hashCode() + + /** + * List leaf files of given paths. This method will submit a Spark job to do parallel + * listing whenever there is a path having more files than the parallel partition discovery + * discovery threshold. + * + * This is publicly visible for testing. + */ + def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { + val output = mutable.LinkedHashSet[FileStatus]() + val pathsToFetch = mutable.ArrayBuffer[Path]() + for (path <- paths) { + fileStatusCache.getLeafFiles(path) match { + case Some(files) => + HiveCatalogMetrics.incrementFileCacheHits(files.length) + output ++= files + case None => + pathsToFetch += path + } + } + val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass)) + val discovered = InMemoryFileIndex.bulkListLeafFiles( + pathsToFetch, hadoopConf, filter, sparkSession) + discovered.foreach { case (path, leafFiles) => + HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size) + fileStatusCache.putLeafFiles(path, leafFiles.toArray) + output ++= leafFiles + } + output + } +} + +object InMemoryFileIndex extends Logging { + + /** A serializable variant of HDFS's BlockLocation. */ + private case class SerializableBlockLocation( + names: Array[String], + hosts: Array[String], + offset: Long, + length: Long) + + /** A serializable variant of HDFS's FileStatus. */ + private case class SerializableFileStatus( + path: String, + length: Long, + isDir: Boolean, + blockReplication: Short, + blockSize: Long, + modificationTime: Long, + accessTime: Long, + blockLocations: Array[SerializableBlockLocation]) + + /** + * Lists a collection of paths recursively. Picks the listing strategy adaptively depending + * on the number of paths to list. + * + * This may only be called on the driver. + * + * @return for each input path, the set of discovered files for the path + */ + private def bulkListLeafFiles( + paths: Seq[Path], + hadoopConf: Configuration, + filter: PathFilter, + sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = { + + // Short-circuits parallel listing when serial listing is likely to be faster. + if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { + return paths.map { path => + (path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession))) + } + } + + logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}") + HiveCatalogMetrics.incrementParallelListingJobCount(1) + + val sparkContext = sparkSession.sparkContext + val serializableConfiguration = new SerializableConfiguration(hadoopConf) + val serializedPaths = paths.map(_.toString) + val parallelPartitionDiscoveryParallelism = + sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism + + // Set the number of parallelism to prevent following file listing from generating many tasks + // in case of large #defaultParallelism. + val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism) + + val statusMap = sparkContext + .parallelize(serializedPaths, numParallelism) + .mapPartitions { pathStrings => + val hadoopConf = serializableConfiguration.value + pathStrings.map(new Path(_)).toSeq.map { path => + (path, listLeafFiles(path, hadoopConf, filter, None)) + }.iterator + }.map { case (path, statuses) => + val serializableStatuses = statuses.map { status => + // Turn FileStatus into SerializableFileStatus so we can send it back to the driver + val blockLocations = status match { + case f: LocatedFileStatus => + f.getBlockLocations.map { loc => + SerializableBlockLocation( + loc.getNames, + loc.getHosts, + loc.getOffset, + loc.getLength) + } + + case _ => + Array.empty[SerializableBlockLocation] + } + + SerializableFileStatus( + status.getPath.toString, + status.getLen, + status.isDirectory, + status.getReplication, + status.getBlockSize, + status.getModificationTime, + status.getAccessTime, + blockLocations) + } + (path.toString, serializableStatuses) + }.collect() + + // turn SerializableFileStatus back to Status + statusMap.map { case (path, serializableStatuses) => + val statuses = serializableStatuses.map { f => + val blockLocations = f.blockLocations.map { loc => + new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length) + } + new LocatedFileStatus( + new FileStatus( + f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, + new Path(f.path)), + blockLocations) + } + (new Path(path), statuses) + } + } + + /** + * Lists a single filesystem path recursively. If a SparkSession object is specified, this + * function may launch Spark jobs to parallelize listing. + * + * If sessionOpt is None, this may be called on executors. + * + * @return all children of path that match the specified filter. + */ + private def listLeafFiles( + path: Path, + hadoopConf: Configuration, + filter: PathFilter, + sessionOpt: Option[SparkSession]): Seq[FileStatus] = { + logTrace(s"Listing $path") + val fs = path.getFileSystem(hadoopConf) + val name = path.getName.toLowerCase + + // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist + // Note that statuses only include FileStatus for the files and dirs directly under path, + // and does not include anything else recursively. + val statuses = try fs.listStatus(path) catch { + case _: FileNotFoundException => + logWarning(s"The directory $path was not found. Was it deleted very recently?") + Array.empty[FileStatus] + } + + val filteredStatuses = statuses.filterNot(status => shouldFilterOut(status.getPath.getName)) + + val allLeafStatuses = { + val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory) + val nestedFiles: Seq[FileStatus] = sessionOpt match { + case Some(session) => + bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2) + case _ => + dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt)) + } + val allFiles = topLevelFiles ++ nestedFiles + if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles + } + + allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map { + case f: LocatedFileStatus => + f + + // NOTE: + // + // - Although S3/S3A/S3N file system can be quite slow for remote file metadata + // operations, calling `getFileBlockLocations` does no harm here since these file system + // implementations don't actually issue RPC for this method. + // + // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not + // be a big deal since we always use to `listLeafFilesInParallel` when the number of + // paths exceeds threshold. + case f => + // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), + // which is very slow on some file system (RawLocalFileSystem, which is launch a + // subprocess and parse the stdout). + val locations = fs.getFileBlockLocations(f, 0, f.getLen) + val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, + f.getModificationTime, 0, null, null, null, null, f.getPath, locations) + if (f.isSymlink) { + lfs.setSymlink(f.getSymlink) + } + lfs + } + } + + /** Checks if we should filter out this path name. */ + def shouldFilterOut(pathName: String): Boolean = { + // We filter follow paths: + // 1. everything that starts with _ and ., except _common_metadata and _metadata + // because Parquet needs to find those metadata files from leaf files returned by this method. + // We should refactor this logic to not mix metadata files with data files. + // 2. everything that ends with `._COPYING_`, because this is a intermediate state of file. we + // should skip this file in case of double reading. + val exclude = (pathName.startsWith("_") && !pathName.contains("=")) || + pathName.startsWith(".") || pathName.endsWith("._COPYING_") + val include = pathName.startsWith("_common_metadata") || pathName.startsWith("_metadata") + exclude && !include + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 71500a010581e..ffd7f6c750f85 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -17,22 +17,17 @@ package org.apache.spark.sql.execution.datasources -import java.io.FileNotFoundException - import scala.collection.mutable import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ -import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.internal.Logging -import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{expressions, InternalRow} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.types.{StringType, StructType} -import org.apache.spark.util.SerializableConfiguration /** * An abstract class that represents [[FileIndex]]s that are aware of partitioned tables. @@ -241,224 +236,8 @@ abstract class PartitioningAwareFileIndex( val name = path.getName !((name.startsWith("_") && !name.contains("=")) || name.startsWith(".")) } - - /** - * List leaf files of given paths. This method will submit a Spark job to do parallel - * listing whenever there is a path having more files than the parallel partition discovery - * discovery threshold. - * - * This is publicly visible for testing. - */ - def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { - val output = mutable.LinkedHashSet[FileStatus]() - val pathsToFetch = mutable.ArrayBuffer[Path]() - for (path <- paths) { - fileStatusCache.getLeafFiles(path) match { - case Some(files) => - HiveCatalogMetrics.incrementFileCacheHits(files.length) - output ++= files - case None => - pathsToFetch += path - } - } - val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass)) - val discovered = PartitioningAwareFileIndex.bulkListLeafFiles( - pathsToFetch, hadoopConf, filter, sparkSession) - discovered.foreach { case (path, leafFiles) => - HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size) - fileStatusCache.putLeafFiles(path, leafFiles.toArray) - output ++= leafFiles - } - output - } } -object PartitioningAwareFileIndex extends Logging { +object PartitioningAwareFileIndex { val BASE_PATH_PARAM = "basePath" - - /** A serializable variant of HDFS's BlockLocation. */ - private case class SerializableBlockLocation( - names: Array[String], - hosts: Array[String], - offset: Long, - length: Long) - - /** A serializable variant of HDFS's FileStatus. */ - private case class SerializableFileStatus( - path: String, - length: Long, - isDir: Boolean, - blockReplication: Short, - blockSize: Long, - modificationTime: Long, - accessTime: Long, - blockLocations: Array[SerializableBlockLocation]) - - /** - * Lists a collection of paths recursively. Picks the listing strategy adaptively depending - * on the number of paths to list. - * - * This may only be called on the driver. - * - * @return for each input path, the set of discovered files for the path - */ - private def bulkListLeafFiles( - paths: Seq[Path], - hadoopConf: Configuration, - filter: PathFilter, - sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = { - - // Short-circuits parallel listing when serial listing is likely to be faster. - if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { - return paths.map { path => - (path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession))) - } - } - - logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}") - HiveCatalogMetrics.incrementParallelListingJobCount(1) - - val sparkContext = sparkSession.sparkContext - val serializableConfiguration = new SerializableConfiguration(hadoopConf) - val serializedPaths = paths.map(_.toString) - val parallelPartitionDiscoveryParallelism = - sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism - - // Set the number of parallelism to prevent following file listing from generating many tasks - // in case of large #defaultParallelism. - val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism) - - val statusMap = sparkContext - .parallelize(serializedPaths, numParallelism) - .mapPartitions { pathStrings => - val hadoopConf = serializableConfiguration.value - pathStrings.map(new Path(_)).toSeq.map { path => - (path, listLeafFiles(path, hadoopConf, filter, None)) - }.iterator - }.map { case (path, statuses) => - val serializableStatuses = statuses.map { status => - // Turn FileStatus into SerializableFileStatus so we can send it back to the driver - val blockLocations = status match { - case f: LocatedFileStatus => - f.getBlockLocations.map { loc => - SerializableBlockLocation( - loc.getNames, - loc.getHosts, - loc.getOffset, - loc.getLength) - } - - case _ => - Array.empty[SerializableBlockLocation] - } - - SerializableFileStatus( - status.getPath.toString, - status.getLen, - status.isDirectory, - status.getReplication, - status.getBlockSize, - status.getModificationTime, - status.getAccessTime, - blockLocations) - } - (path.toString, serializableStatuses) - }.collect() - - // turn SerializableFileStatus back to Status - statusMap.map { case (path, serializableStatuses) => - val statuses = serializableStatuses.map { f => - val blockLocations = f.blockLocations.map { loc => - new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length) - } - new LocatedFileStatus( - new FileStatus( - f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, - new Path(f.path)), - blockLocations) - } - (new Path(path), statuses) - } - } - - /** - * Lists a single filesystem path recursively. If a SparkSession object is specified, this - * function may launch Spark jobs to parallelize listing. - * - * If sessionOpt is None, this may be called on executors. - * - * @return all children of path that match the specified filter. - */ - private def listLeafFiles( - path: Path, - hadoopConf: Configuration, - filter: PathFilter, - sessionOpt: Option[SparkSession]): Seq[FileStatus] = { - logTrace(s"Listing $path") - val fs = path.getFileSystem(hadoopConf) - val name = path.getName.toLowerCase - - // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist - // Note that statuses only include FileStatus for the files and dirs directly under path, - // and does not include anything else recursively. - val statuses = try fs.listStatus(path) catch { - case _: FileNotFoundException => - logWarning(s"The directory $path was not found. Was it deleted very recently?") - Array.empty[FileStatus] - } - - val filteredStatuses = statuses.filterNot(status => shouldFilterOut(status.getPath.getName)) - - val allLeafStatuses = { - val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory) - val nestedFiles: Seq[FileStatus] = sessionOpt match { - case Some(session) => - bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2) - case _ => - dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt)) - } - val allFiles = topLevelFiles ++ nestedFiles - if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles - } - - allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map { - case f: LocatedFileStatus => - f - - // NOTE: - // - // - Although S3/S3A/S3N file system can be quite slow for remote file metadata - // operations, calling `getFileBlockLocations` does no harm here since these file system - // implementations don't actually issue RPC for this method. - // - // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not - // be a big deal since we always use to `listLeafFilesInParallel` when the number of - // paths exceeds threshold. - case f => - // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), - // which is very slow on some file system (RawLocalFileSystem, which is launch a - // subprocess and parse the stdout). - val locations = fs.getFileBlockLocations(f, 0, f.getLen) - val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, - f.getModificationTime, 0, null, null, null, null, f.getPath, locations) - if (f.isSymlink) { - lfs.setSymlink(f.getSymlink) - } - lfs - } - } - - /** Checks if we should filter out this path name. */ - def shouldFilterOut(pathName: String): Boolean = { - // We filter follow paths: - // 1. everything that starts with _ and ., except _common_metadata and _metadata - // because Parquet needs to find those metadata files from leaf files returned by this method. - // We should refactor this logic to not mix metadata files with data files. - // 2. everything that ends with `._COPYING_`, because this is a intermediate state of file. we - // should skip this file in case of double reading. - val exclude = (pathName.startsWith("_") && !pathName.contains("=")) || - pathName.startsWith(".") || pathName.endsWith("._COPYING_") - val include = pathName.startsWith("_common_metadata") || pathName.startsWith("_metadata") - exclude && !include - } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 7ea4064927576..00f5d5db8f5f4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -135,15 +135,15 @@ class FileIndexSuite extends SharedSQLContext { } } - test("PartitioningAwareFileIndex - file filtering") { - assert(!PartitioningAwareFileIndex.shouldFilterOut("abcd")) - assert(PartitioningAwareFileIndex.shouldFilterOut(".ab")) - assert(PartitioningAwareFileIndex.shouldFilterOut("_cd")) - assert(!PartitioningAwareFileIndex.shouldFilterOut("_metadata")) - assert(!PartitioningAwareFileIndex.shouldFilterOut("_common_metadata")) - assert(PartitioningAwareFileIndex.shouldFilterOut("_ab_metadata")) - assert(PartitioningAwareFileIndex.shouldFilterOut("_cd_common_metadata")) - assert(PartitioningAwareFileIndex.shouldFilterOut("a._COPYING_")) + test("InMemoryFileIndex - file filtering") { + assert(!InMemoryFileIndex.shouldFilterOut("abcd")) + assert(InMemoryFileIndex.shouldFilterOut(".ab")) + assert(InMemoryFileIndex.shouldFilterOut("_cd")) + assert(!InMemoryFileIndex.shouldFilterOut("_metadata")) + assert(!InMemoryFileIndex.shouldFilterOut("_common_metadata")) + assert(InMemoryFileIndex.shouldFilterOut("_ab_metadata")) + assert(InMemoryFileIndex.shouldFilterOut("_cd_common_metadata")) + assert(InMemoryFileIndex.shouldFilterOut("a._COPYING_")) } test("SPARK-17613 - PartitioningAwareFileIndex: base path w/o '/' at end") {