diff --git a/R/pkg/R/mllib_regression.R b/R/pkg/R/mllib_regression.R index 9ecd887f2c127..ebaeae970218a 100644 --- a/R/pkg/R/mllib_regression.R +++ b/R/pkg/R/mllib_regression.R @@ -76,6 +76,8 @@ setClass("IsotonicRegressionModel", representation(jobj = "jobj")) #' "frequencyDesc", "frequencyAsc", "alphabetDesc", and "alphabetAsc". #' The default value is "frequencyDesc". When the ordering is set to #' "alphabetDesc", this drops the same category as R when encoding strings. +#' @param offsetCol the offset column name. If this is not set or empty, we treat all instance offsets +#' as 0.0. The feature specified as offset has a constant coefficient of 1.0. #' @param ... additional arguments passed to the method. #' @aliases spark.glm,SparkDataFrame,formula-method #' @return \code{spark.glm} returns a fitted generalized linear model. @@ -127,7 +129,8 @@ setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"), function(data, formula, family = gaussian, tol = 1e-6, maxIter = 25, weightCol = NULL, regParam = 0.0, var.power = 0.0, link.power = 1.0 - var.power, stringIndexerOrderType = c("frequencyDesc", "frequencyAsc", - "alphabetDesc", "alphabetAsc")) { + "alphabetDesc", "alphabetAsc"), + offsetCol = NULL) { stringIndexerOrderType <- match.arg(stringIndexerOrderType) if (is.character(family)) { @@ -159,12 +162,19 @@ setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"), weightCol <- as.character(weightCol) } + if (!is.null(offsetCol)) { + offsetCol <- as.character(offsetCol) + if (nchar(offsetCol) == 0) { + offsetCol <- NULL + } + } + # For known families, Gamma is upper-cased jobj <- callJStatic("org.apache.spark.ml.r.GeneralizedLinearRegressionWrapper", "fit", formula, data@sdf, tolower(family$family), family$link, tol, as.integer(maxIter), weightCol, regParam, as.double(var.power), as.double(link.power), - stringIndexerOrderType) + stringIndexerOrderType, offsetCol) new("GeneralizedLinearRegressionModel", jobj = jobj) }) @@ -192,6 +202,8 @@ setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"), #' "frequencyDesc", "frequencyAsc", "alphabetDesc", and "alphabetAsc". #' The default value is "frequencyDesc". When the ordering is set to #' "alphabetDesc", this drops the same category as R when encoding strings. +#' @param offsetCol the offset column name. If this is not set or empty, we treat all instance offsets +#' as 0.0. The feature specified as offset has a constant coefficient of 1.0. #' @return \code{glm} returns a fitted generalized linear model. #' @rdname glm #' @export @@ -209,10 +221,12 @@ setMethod("glm", signature(formula = "formula", family = "ANY", data = "SparkDat function(formula, family = gaussian, data, epsilon = 1e-6, maxit = 25, weightCol = NULL, var.power = 0.0, link.power = 1.0 - var.power, stringIndexerOrderType = c("frequencyDesc", "frequencyAsc", - "alphabetDesc", "alphabetAsc")) { + "alphabetDesc", "alphabetAsc"), + offsetCol = NULL) { spark.glm(data, formula, family, tol = epsilon, maxIter = maxit, weightCol = weightCol, var.power = var.power, link.power = link.power, - stringIndexerOrderType = stringIndexerOrderType) + stringIndexerOrderType = stringIndexerOrderType, + offsetCol = offsetCol) }) # Returns the summary of a model produced by glm() or spark.glm(), similarly to R's summary(). diff --git a/R/pkg/tests/fulltests/test_mllib_regression.R b/R/pkg/tests/fulltests/test_mllib_regression.R index 6b72a09b200d6..23daca75fcc22 100644 --- a/R/pkg/tests/fulltests/test_mllib_regression.R +++ b/R/pkg/tests/fulltests/test_mllib_regression.R @@ -173,6 +173,14 @@ test_that("spark.glm summary", { expect_equal(stats$df.residual, rStats$df.residual) expect_equal(stats$aic, rStats$aic) + # Test spark.glm works with offset + training <- suppressWarnings(createDataFrame(iris)) + stats <- summary(spark.glm(training, Sepal_Width ~ Sepal_Length + Species, + family = poisson(), offsetCol = "Petal_Length")) + rStats <- suppressWarnings(summary(glm(Sepal.Width ~ Sepal.Length + Species, + data = iris, family = poisson(), offset = iris$Petal.Length))) + expect_true(all(abs(rStats$coefficients - stats$coefficients) < 1e-3)) + # Test summary works on base GLM models baseModel <- stats::glm(Sepal.Width ~ Sepal.Length + Species, data = iris) baseSummary <- summary(baseModel) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index ce916b43bf4d3..eeb6d10d3e741 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -22,8 +22,10 @@ import java.security.PrivilegedExceptionAction import java.text.DateFormat import java.util.{Arrays, Comparator, Date, Locale} +import scala.collection.immutable.Map import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.collection.mutable.HashMap import scala.util.control.NonFatal import com.google.common.primitives.Longs @@ -74,7 +76,6 @@ class SparkHadoopUtil extends Logging { } } - /** * Appends S3-specific, spark.hadoop.*, and spark.buffer.size configurations to a Hadoop * configuration. @@ -99,17 +100,35 @@ class SparkHadoopUtil extends Logging { hadoopConf.set("fs.s3a.session.token", sessionToken) } } - // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar" - conf.getAll.foreach { case (key, value) => - if (key.startsWith("spark.hadoop.")) { - hadoopConf.set(key.substring("spark.hadoop.".length), value) - } - } + appendSparkHadoopConfigs(conf, hadoopConf) val bufferSize = conf.get("spark.buffer.size", "65536") hadoopConf.set("io.file.buffer.size", bufferSize) } } + /** + * Appends spark.hadoop.* configurations from a [[SparkConf]] to a Hadoop + * configuration without the spark.hadoop. prefix. + */ + def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = { + // Copy any "spark.hadoop.foo=bar" spark properties into conf as "foo=bar" + for ((key, value) <- conf.getAll if key.startsWith("spark.hadoop.")) { + hadoopConf.set(key.substring("spark.hadoop.".length), value) + } + } + + /** + * Appends spark.hadoop.* configurations from a Map to another without the spark.hadoop. prefix. + */ + def appendSparkHadoopConfigs( + srcMap: Map[String, String], + destMap: HashMap[String, String]): Unit = { + // Copy any "spark.hadoop.foo=bar" system properties into destMap as "foo=bar" + for ((key, value) <- srcMap if key.startsWith("spark.hadoop.")) { + destMap.put(key.substring("spark.hadoop.".length), value) + } + } + /** * Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop * subsystems. diff --git a/dev/appveyor-install-dependencies.ps1 b/dev/appveyor-install-dependencies.ps1 index 1c34f1bbc1aa3..cf82389173048 100644 --- a/dev/appveyor-install-dependencies.ps1 +++ b/dev/appveyor-install-dependencies.ps1 @@ -114,7 +114,7 @@ $env:Path += ";$env:HADOOP_HOME\bin" Pop-Location # ========================== R -$rVer = "3.3.1" +$rVer = "3.4.1" $rToolsVer = "3.4.0" InstallR diff --git a/docs/configuration.md b/docs/configuration.md index 011d583d6ea70..e7c0306920e08 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -2357,5 +2357,37 @@ The location of these configuration files varies across Hadoop versions, but a common location is inside of `/etc/hadoop/conf`. Some tools create configurations on-the-fly, but offer a mechanisms to download copies of them. -To make these files visible to Spark, set `HADOOP_CONF_DIR` in `$SPARK_HOME/spark-env.sh` +To make these files visible to Spark, set `HADOOP_CONF_DIR` in `$SPARK_HOME/conf/spark-env.sh` to a location containing the configuration files. + +# Custom Hadoop/Hive Configuration + +If your Spark application is interacting with Hadoop, Hive, or both, there are probably Hadoop/Hive +configuration files in Spark's classpath. + +Multiple running applications might require different Hadoop/Hive client side configurations. +You can copy and modify `hdfs-site.xml`, `core-site.xml`, `yarn-site.xml`, `hive-site.xml` in +Spark's classpath for each application. In a Spark cluster running on YARN, these configuration +files are set cluster-wide, and cannot safely be changed by the application. + +The better choice is to use spark hadoop properties in the form of `spark.hadoop.*`. +They can be considered as same as normal spark properties which can be set in `$SPARK_HOME/conf/spark-defalut.conf` + +In some cases, you may want to avoid hard-coding certain configurations in a `SparkConf`. For +instance, Spark allows you to simply create an empty conf and set spark/spark hadoop properties. + +{% highlight scala %} +val conf = new SparkConf().set("spark.hadoop.abc.def","xyz") +val sc = new SparkContext(conf) +{% endhighlight %} + +Also, you can modify or add configurations at runtime: +{% highlight bash %} +./bin/spark-submit \ + --name "My app" \ + --master local[4] \ + --conf spark.eventLog.enabled=false \ + --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \ + --conf spark.hadoop.abc.def=xyz \ + myApp.jar +{% endhighlight %} \ No newline at end of file diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala index 176a6cf852914..64575b0cb0cb5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala @@ -77,7 +77,8 @@ private[r] object GeneralizedLinearRegressionWrapper regParam: Double, variancePower: Double, linkPower: Double, - stringIndexerOrderType: String): GeneralizedLinearRegressionWrapper = { + stringIndexerOrderType: String, + offsetCol: String): GeneralizedLinearRegressionWrapper = { // scalastyle:on val rFormula = new RFormula().setFormula(formula) .setStringIndexerOrderType(stringIndexerOrderType) @@ -99,6 +100,7 @@ private[r] object GeneralizedLinearRegressionWrapper glr.setLink(link) } if (weightCol != null) glr.setWeightCol(weightCol) + if (offsetCol != null) glr.setOffsetCol(offsetCol) val pipeline = new Pipeline() .setStages(Array(rFormulaModel, glr)) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 4534b7dcf6399..954955b6b1293 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -473,11 +473,11 @@ identifierComment ; relationPrimary - : tableIdentifier sample? tableAlias #tableName - | '(' queryNoWith ')' sample? tableAlias #aliasedQuery - | '(' relation ')' sample? (AS? strictIdentifier)? #aliasedRelation - | inlineTable #inlineTableDefault2 - | functionTable #tableValuedFunction + : tableIdentifier sample? tableAlias #tableName + | '(' queryNoWith ')' sample? tableAlias #aliasedQuery + | '(' relation ')' sample? tableAlias #aliasedRelation + | inlineTable #inlineTableDefault2 + | functionTable #tableValuedFunction ; inlineTable diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index a6d297cfd6538..8628a89aa949a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -592,25 +592,7 @@ class Analyzer( case u: UnresolvedRelation if !isRunningDirectlyOnFiles(u.tableIdentifier) => val defaultDatabase = AnalysisContext.get.defaultDatabase val foundRelation = lookupTableFromCatalog(u, defaultDatabase) - - // Add `Project` to rename output column names if a query has alias names: - // e.g., SELECT col1, col2 FROM testData AS t(col1, col2) - val relation = if (u.outputColumnNames.nonEmpty) { - val outputAttrs = foundRelation.output - // Checks if the number of the aliases equals to the number of columns in the table. - if (u.outputColumnNames.size != outputAttrs.size) { - u.failAnalysis(s"Number of column aliases does not match number of columns. " + - s"Table name: ${u.tableName}; number of column aliases: " + - s"${u.outputColumnNames.size}; number of columns: ${outputAttrs.size}.") - } - val aliases = outputAttrs.zip(u.outputColumnNames).map { - case (attr, name) => Alias(attr, name)() - } - Project(aliases, foundRelation) - } else { - foundRelation - } - resolveRelation(relation) + resolveRelation(foundRelation) // The view's child should be a logical plan parsed from the `desc.viewText`, the variable // `viewText` should be defined, or else we throw an error on the generation of the View // operator. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index b7a704dc8453a..d336f801d0770 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -37,19 +37,10 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str /** * Holds the name of a relation that has yet to be looked up in a catalog. - * We could add alias names for columns in a relation: - * {{{ - * // Assign alias names - * SELECT col1, col2 FROM testData AS t(col1, col2); - * }}} * * @param tableIdentifier table name - * @param outputColumnNames alias names of columns. If these names given, an analyzer adds - * [[Project]] to rename the columns. */ -case class UnresolvedRelation( - tableIdentifier: TableIdentifier, - outputColumnNames: Seq[String] = Seq.empty) +case class UnresolvedRelation(tableIdentifier: TableIdentifier) extends LeafNode { /** Returns a `.` separated name for this relation. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 5935017704eda..22c5484b76638 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -681,17 +681,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging */ override def visitTableName(ctx: TableNameContext): LogicalPlan = withOrigin(ctx) { val tableId = visitTableIdentifier(ctx.tableIdentifier) - val table = if (ctx.tableAlias.identifierList != null) { - UnresolvedRelation(tableId, visitIdentifierList(ctx.tableAlias.identifierList)) - } else { - UnresolvedRelation(tableId) - } - val tableWithAlias = if (ctx.tableAlias.strictIdentifier != null) { - SubqueryAlias(ctx.tableAlias.strictIdentifier.getText, table) - } else { - table - } - tableWithAlias.optionalMap(ctx.sample)(withSample) + val table = mayApplyAliasPlan(ctx.tableAlias, UnresolvedRelation(tableId)) + table.optionalMap(ctx.sample)(withSample) } /** @@ -739,12 +730,14 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging /** * Create an alias (SubqueryAlias) for a join relation. This is practically the same as * visitAliasedQuery and visitNamedExpression, ANTLR4 however requires us to use 3 different - * hooks. + * hooks. We could add alias names for output columns, for example: + * {{{ + * SELECT a, b, c, d FROM (src1 s1 INNER JOIN src2 s2 ON s1.id = s2.id) dst(a, b, c, d) + * }}} */ override def visitAliasedRelation(ctx: AliasedRelationContext): LogicalPlan = withOrigin(ctx) { - plan(ctx.relation) - .optionalMap(ctx.sample)(withSample) - .optionalMap(ctx.strictIdentifier)(aliasPlan) + val relation = plan(ctx.relation).optionalMap(ctx.sample)(withSample) + mayApplyAliasPlan(ctx.tableAlias, relation) } /** @@ -756,31 +749,43 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * }}} */ override def visitAliasedQuery(ctx: AliasedQueryContext): LogicalPlan = withOrigin(ctx) { - val alias = if (ctx.tableAlias.strictIdentifier == null) { + val relation = plan(ctx.queryNoWith).optionalMap(ctx.sample)(withSample) + if (ctx.tableAlias.strictIdentifier == null) { // For un-aliased subqueries, use a default alias name that is not likely to conflict with // normal subquery names, so that parent operators can only access the columns in subquery by // unqualified names. Users can still use this special qualifier to access columns if they // know it, but that's not recommended. - "__auto_generated_subquery_name" - } else { - ctx.tableAlias.strictIdentifier.getText - } - val subquery = SubqueryAlias(alias, plan(ctx.queryNoWith).optionalMap(ctx.sample)(withSample)) - if (ctx.tableAlias.identifierList != null) { - val columnAliases = visitIdentifierList(ctx.tableAlias.identifierList) - UnresolvedSubqueryColumnAliases(columnAliases, subquery) + SubqueryAlias("__auto_generated_subquery_name", relation) } else { - subquery + mayApplyAliasPlan(ctx.tableAlias, relation) } } /** - * Create an alias (SubqueryAlias) for a LogicalPlan. + * Create an alias ([[SubqueryAlias]]) for a [[LogicalPlan]]. */ private def aliasPlan(alias: ParserRuleContext, plan: LogicalPlan): LogicalPlan = { SubqueryAlias(alias.getText, plan) } + /** + * If aliases specified in a FROM clause, create a subquery alias ([[SubqueryAlias]]) and + * column aliases for a [[LogicalPlan]]. + */ + private def mayApplyAliasPlan(tableAlias: TableAliasContext, plan: LogicalPlan): LogicalPlan = { + if (tableAlias.strictIdentifier != null) { + val subquery = SubqueryAlias(tableAlias.strictIdentifier.getText, plan) + if (tableAlias.identifierList != null) { + val columnNames = visitIdentifierList(tableAlias.identifierList) + UnresolvedSubqueryColumnAliases(columnNames, subquery) + } else { + subquery + } + } else { + plan + } + } + /** * Create a Sequence of Strings for a parenthesis enclosed alias list. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala index 90d11d6d91512..c77849035a975 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala @@ -33,11 +33,11 @@ object JoinType { case _ => val supported = Seq( "inner", - "outer", "full", "fullouter", - "leftouter", "left", - "rightouter", "right", - "leftsemi", - "leftanti", + "outer", "full", "fullouter", "full_outer", + "leftouter", "left", "left_outer", + "rightouter", "right", "right_outer", + "leftsemi", "left_semi", + "leftanti", "left_anti", "cross") throw new IllegalArgumentException(s"Unsupported join type '$typ'. " + diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index a819cddcae988..ecb941c5fa9e6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1228,10 +1228,12 @@ class SQLConf extends Serializable with Logging { * not set yet, return `defaultValue`. */ def getConfString(key: String, defaultValue: String): String = { - val entry = sqlConfEntries.get(key) - if (entry != null && defaultValue != "") { - // Only verify configs in the SQLConf object - entry.valueConverter(defaultValue) + if (defaultValue != null && defaultValue != "") { + val entry = sqlConfEntries.get(key) + if (entry != null) { + // Only verify configs in the SQLConf object + entry.valueConverter(defaultValue) + } } Option(settings.get(key)).getOrElse(defaultValue) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 847713a0455b9..e5fcd60b2d3da 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.Cross +import org.apache.spark.sql.catalyst.plans.{Cross, Inner} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.types._ @@ -457,18 +457,20 @@ class AnalysisSuite extends AnalysisTest with ShouldMatchers { test("SPARK-20841 Support table column aliases in FROM clause") { def tableColumnsWithAliases(outputNames: Seq[String]): LogicalPlan = { - SubqueryAlias("t", UnresolvedRelation(TableIdentifier("TaBlE3"), outputNames)) - .select(star()) + UnresolvedSubqueryColumnAliases( + outputNames, + SubqueryAlias("t", UnresolvedRelation(TableIdentifier("TaBlE3"))) + ).select(star()) } assertAnalysisSuccess(tableColumnsWithAliases("col1" :: "col2" :: "col3" :: "col4" :: Nil)) assertAnalysisError( tableColumnsWithAliases("col1" :: Nil), - Seq("Number of column aliases does not match number of columns. Table name: TaBlE3; " + - "number of column aliases: 1; number of columns: 4.")) + Seq("Number of column aliases does not match number of columns. " + + "Number of column aliases: 1; number of columns: 4.")) assertAnalysisError( tableColumnsWithAliases("col1" :: "col2" :: "col3" :: "col4" :: "col5" :: Nil), - Seq("Number of column aliases does not match number of columns. Table name: TaBlE3; " + - "number of column aliases: 5; number of columns: 4.")) + Seq("Number of column aliases does not match number of columns. " + + "Number of column aliases: 5; number of columns: 4.")) } test("SPARK-20962 Support subquery column aliases in FROM clause") { @@ -490,4 +492,26 @@ class AnalysisSuite extends AnalysisTest with ShouldMatchers { Seq("Number of column aliases does not match number of columns. " + "Number of column aliases: 5; number of columns: 4.")) } + + test("SPARK-20963 Support aliases for join relations in FROM clause") { + def joinRelationWithAliases(outputNames: Seq[String]): LogicalPlan = { + val src1 = LocalRelation('id.int, 'v1.string).as("s1") + val src2 = LocalRelation('id.int, 'v2.string).as("s2") + UnresolvedSubqueryColumnAliases( + outputNames, + SubqueryAlias( + "dst", + src1.join(src2, Inner, Option(Symbol("s1.id") === Symbol("s2.id")))) + ).select(star()) + } + assertAnalysisSuccess(joinRelationWithAliases("col1" :: "col2" :: "col3" :: "col4" :: Nil)) + assertAnalysisError( + joinRelationWithAliases("col1" :: Nil), + Seq("Number of column aliases does not match number of columns. " + + "Number of column aliases: 1; number of columns: 4.")) + assertAnalysisError( + joinRelationWithAliases("col1" :: "col2" :: "col3" :: "col4" :: "col5" :: Nil), + Seq("Number of column aliases does not match number of columns. " + + "Number of column aliases: 5; number of columns: 4.")) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index 5fa72e1e92660..b0d2fb26a6006 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -491,8 +491,10 @@ class PlanParserSuite extends AnalysisTest { test("SPARK-20841 Support table column aliases in FROM clause") { assertEqual( "SELECT * FROM testData AS t(col1, col2)", - SubqueryAlias("t", UnresolvedRelation(TableIdentifier("testData"), Seq("col1", "col2"))) - .select(star())) + UnresolvedSubqueryColumnAliases( + Seq("col1", "col2"), + SubqueryAlias("t", UnresolvedRelation(TableIdentifier("testData"))) + ).select(star())) } test("SPARK-20962 Support subquery column aliases in FROM clause") { @@ -506,6 +508,19 @@ class PlanParserSuite extends AnalysisTest { ).select(star())) } + test("SPARK-20963 Support aliases for join relations in FROM clause") { + val src1 = UnresolvedRelation(TableIdentifier("src1")).as("s1") + val src2 = UnresolvedRelation(TableIdentifier("src2")).as("s2") + assertEqual( + "SELECT * FROM (src1 s1 INNER JOIN src2 s2 ON s1.id = s2.id) dst(a, b, c, d)", + UnresolvedSubqueryColumnAliases( + Seq("a", "b", "c", "d"), + SubqueryAlias( + "dst", + src1.join(src2, Inner, Option(Symbol("s1.id") === Symbol("s2.id")))) + ).select(star())) + } + test("inline table") { assertEqual("values 1, 2, 3, 4", UnresolvedInlineTable(Seq("col1"), Seq(1, 2, 3, 4).map(x => Seq(Literal(x))))) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/JoinTypesTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/JoinTypesTest.scala new file mode 100644 index 0000000000000..d56f97970122c --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/JoinTypesTest.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.catalyst.plans + +import org.apache.spark.SparkFunSuite + +class JoinTypesTest extends SparkFunSuite { + + test("construct an Inner type") { + assert(JoinType("inner") === Inner) + } + + test("construct a FullOuter type") { + assert(JoinType("fullouter") === FullOuter) + assert(JoinType("full_outer") === FullOuter) + assert(JoinType("outer") === FullOuter) + assert(JoinType("full") === FullOuter) + } + + test("construct a LeftOuter type") { + assert(JoinType("leftouter") === LeftOuter) + assert(JoinType("left_outer") === LeftOuter) + assert(JoinType("left") === LeftOuter) + } + + test("construct a RightOuter type") { + assert(JoinType("rightouter") === RightOuter) + assert(JoinType("right_outer") === RightOuter) + assert(JoinType("right") === RightOuter) + } + + test("construct a LeftSemi type") { + assert(JoinType("leftsemi") === LeftSemi) + assert(JoinType("left_semi") === LeftSemi) + } + + test("construct a LeftAnti type") { + assert(JoinType("leftanti") === LeftAnti) + assert(JoinType("left_anti") === LeftAnti) + } + + test("construct a Cross type") { + assert(JoinType("cross") === Cross) + } + +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala index 5f12830ee621f..7477d025dfe89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala @@ -21,6 +21,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.types.{StringType, StructField, StructType} @@ -87,6 +88,14 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm // Configures a single property. case Some((key, Some(value))) => val runFunc = (sparkSession: SparkSession) => { + if (sparkSession.conf.get(CATALOG_IMPLEMENTATION.key).equals("hive") && + key.startsWith("hive.")) { + logWarning(s"'SET $key=$value' might not work, since Spark doesn't support changing " + + "the Hive config dynamically. Please passing the Hive-specific config by adding the " + + s"prefix spark.hadoop (e.g., spark.hadoop.$key) when starting a Spark application. " + + "For details, see the link: https://spark.apache.org/docs/latest/configuration.html#" + + "dynamically-loading-spark-properties.") + } sparkSession.conf.set(key, value) Seq(Row(key, value)) } diff --git a/sql/core/src/test/resources/sql-tests/inputs/table-aliases.sql b/sql/core/src/test/resources/sql-tests/inputs/table-aliases.sql index 85481cbbf9377..4cfd5f28afdaa 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/table-aliases.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/table-aliases.sql @@ -18,3 +18,10 @@ SELECT a AS col1, b AS col2 FROM testData AS t(c, d); -- Subquery aliases in FROM clause SELECT * FROM (SELECT 1 AS a, 1 AS b) t(col1, col2); + +-- Aliases for join relations in FROM clause +CREATE OR REPLACE TEMPORARY VIEW src1 AS SELECT * FROM VALUES (1, "a"), (2, "b"), (3, "c") AS src1(id, v1); + +CREATE OR REPLACE TEMPORARY VIEW src2 AS SELECT * FROM VALUES (2, 1.0), (3, 3.2), (1, 8.5) AS src2(id, v2); + +SELECT * FROM (src1 s1 INNER JOIN src2 s2 ON s1.id = s2.id) dst(a, b, c, d); diff --git a/sql/core/src/test/resources/sql-tests/results/table-aliases.sql.out b/sql/core/src/test/resources/sql-tests/results/table-aliases.sql.out index 4459f3186c77b..1a2bd5ea91cde 100644 --- a/sql/core/src/test/resources/sql-tests/results/table-aliases.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/table-aliases.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 8 +-- Number of queries: 11 -- !query 0 @@ -42,7 +42,7 @@ SELECT * FROM testData AS t(col1, col2, col3) struct<> -- !query 4 output org.apache.spark.sql.AnalysisException -Number of column aliases does not match number of columns. Table name: testData; number of column aliases: 3; number of columns: 2.; line 1 pos 14 +Number of column aliases does not match number of columns. Number of column aliases: 3; number of columns: 2.; line 1 pos 14 -- !query 5 @@ -51,7 +51,7 @@ SELECT * FROM testData AS t(col1) struct<> -- !query 5 output org.apache.spark.sql.AnalysisException -Number of column aliases does not match number of columns. Table name: testData; number of column aliases: 1; number of columns: 2.; line 1 pos 14 +Number of column aliases does not match number of columns. Number of column aliases: 1; number of columns: 2.; line 1 pos 14 -- !query 6 @@ -60,7 +60,7 @@ SELECT a AS col1, b AS col2 FROM testData AS t(c, d) struct<> -- !query 6 output org.apache.spark.sql.AnalysisException -cannot resolve '`a`' given input columns: [t.c, t.d]; line 1 pos 7 +cannot resolve '`a`' given input columns: [c, d]; line 1 pos 7 -- !query 7 @@ -69,3 +69,29 @@ SELECT * FROM (SELECT 1 AS a, 1 AS b) t(col1, col2) struct -- !query 7 output 1 1 + + +-- !query 8 +CREATE OR REPLACE TEMPORARY VIEW src1 AS SELECT * FROM VALUES (1, "a"), (2, "b"), (3, "c") AS src1(id, v1) +-- !query 8 schema +struct<> +-- !query 8 output + + + +-- !query 9 +CREATE OR REPLACE TEMPORARY VIEW src2 AS SELECT * FROM VALUES (2, 1.0), (3, 3.2), (1, 8.5) AS src2(id, v2) +-- !query 9 schema +struct<> +-- !query 9 output + + + +-- !query 10 +SELECT * FROM (src1 s1 INNER JOIN src2 s2 ON s1.id = s2.id) dst(a, b, c, d) +-- !query 10 schema +struct +-- !query 10 output +1 a 1 8.5 +2 b 2 1 +3 c 3 3.2 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala index 6a5b74b01df80..d2d013682cd2d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala @@ -74,13 +74,13 @@ object TPCDSQueryBenchmark { // per-row processing time for those cases. val queryRelations = scala.collection.mutable.HashSet[String]() spark.sql(queryString).queryExecution.logical.map { - case UnresolvedRelation(t: TableIdentifier, _) => + case UnresolvedRelation(t: TableIdentifier) => queryRelations.add(t.table) case lp: LogicalPlan => lp.expressions.foreach { _ foreach { case subquery: SubqueryExpression => subquery.plan.foreach { - case UnresolvedRelation(t: TableIdentifier, _) => + case UnresolvedRelation(t: TableIdentifier) => queryRelations.add(t.table) case _ => } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala index a283ff971adcd..948f179f5e8f0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala @@ -270,4 +270,15 @@ class SQLConfSuite extends QueryTest with SharedSQLContext { val e2 = intercept[AnalysisException](spark.conf.unset(SCHEMA_STRING_LENGTH_THRESHOLD.key)) assert(e2.message.contains("Cannot modify the value of a static config")) } + + test("SPARK-21588 SQLContext.getConf(key, null) should return null") { + withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { + assert("1" == spark.conf.get(SQLConf.SHUFFLE_PARTITIONS.key, null)) + assert("1" == spark.conf.get(SQLConf.SHUFFLE_PARTITIONS.key, "")) + } + + assert(spark.conf.getOption("spark.sql.nonexistent").isEmpty) + assert(null == spark.conf.get("spark.sql.nonexistent", null)) + assert("" == spark.conf.get("spark.sql.nonexistent", "")) + } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 38c458948c90e..761e832ed14b8 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -50,6 +50,7 @@ private[hive] object SparkSQLCLIDriver extends Logging { private val prompt = "spark-sql" private val continuedPrompt = "".padTo(prompt.length, ' ') private var transport: TSocket = _ + private final val SPARK_HADOOP_PROP_PREFIX = "spark.hadoop." installSignalHandler() @@ -134,6 +135,16 @@ private[hive] object SparkSQLCLIDriver extends Logging { // Hive 1.2 + not supported in CLI throw new RuntimeException("Remote operations not supported") } + // Respect the configurations set by --hiveconf from the command line + // (based on Hive's CliDriver). + val hiveConfFromCmd = sessionState.getOverriddenConfigurations.entrySet().asScala + val newHiveConf = hiveConfFromCmd.map { kv => + // If the same property is configured by spark.hadoop.xxx, we ignore it and + // obey settings from spark properties + val k = kv.getKey + val v = sys.props.getOrElseUpdate(SPARK_HADOOP_PROP_PREFIX + k, kv.getValue) + (k, v) + } val cli = new SparkSQLCLIDriver cli.setHiveVariables(oproc.getHiveVariables) @@ -157,12 +168,8 @@ private[hive] object SparkSQLCLIDriver extends Logging { // Execute -i init files (always in silent mode) cli.processInitFiles(sessionState) - // Respect the configurations set by --hiveconf from the command line - // (based on Hive's CliDriver). - val it = sessionState.getOverriddenConfigurations.entrySet().iterator() - while (it.hasNext) { - val kv = it.next() - SparkSQLEnv.sqlContext.setConf(kv.getKey, kv.getValue) + newHiveConf.foreach { kv => + SparkSQLEnv.sqlContext.setConf(kv._1, kv._2) } if (sessionState.execString != null) { diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index d3cec11bd7567..933fd7369380a 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -283,4 +283,17 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { "SET conf3;" -> "conftest" ) } + + test("SPARK-21451: spark.sql.warehouse.dir should respect options in --hiveconf") { + runCliWithin(1.minute)("set spark.sql.warehouse.dir;" -> warehousePath.getAbsolutePath) + } + + test("SPARK-21451: Apply spark.hadoop.* configurations") { + val tmpDir = Utils.createTempDir(namePrefix = "SPARK-21451") + runCliWithin( + 1.minute, + Seq(s"--conf", s"spark.hadoop.${ConfVars.METASTOREWAREHOUSE}=$tmpDir"))( + "set spark.sql.warehouse.dir;" -> tmpDir.getAbsolutePath) + tmpDir.delete() + } } diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index f9462e79a69f3..5508a00259e09 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -64,6 +64,13 @@ test-jar test + + org.apache.spark + spark-catalyst_${scala.binary.version} + test-jar + ${project.version} + test + org.apache.spark spark-tags_${scala.binary.version} @@ -174,20 +181,6 @@ scalacheck_${scala.binary.version} test - - org.apache.spark - spark-sql_${scala.binary.version} - test-jar - ${project.version} - test - - - org.apache.spark - spark-catalyst_${scala.binary.version} - test-jar - ${project.version} - test - diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index be6339f7ddec3..b32b6fb82663f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable} import org.apache.hadoop.util.VersionInfo import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.CatalogTable @@ -404,6 +405,13 @@ private[spark] object HiveUtils extends Logging { propMap.put(ConfVars.METASTORE_EVENT_LISTENERS.varname, "") propMap.put(ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname, "") + // SPARK-21451: Spark will gather all `spark.hadoop.*` properties from a `SparkConf` to a + // Hadoop Configuration internally, as long as it happens after SparkContext initialized. + // Some instances such as `CliSessionState` used in `SparkSQLCliDriver` may also rely on these + // Configuration. But it happens before SparkContext initialized, we need to take them from + // system properties in the form of regular hadoop configurations. + SparkHadoopUtil.get.appendSparkHadoopConfigs(sys.props.toMap, propMap) + propMap.toMap } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 3a5c0c397b15e..9e15baa4b2b74 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -554,7 +554,7 @@ private[hive] class TestHiveQueryExecution( // Make sure any test tables referenced are loaded. val referencedTables = describedTables ++ - logical.collect { case UnresolvedRelation(tableIdent, _) => tableIdent.table } + logical.collect { case UnresolvedRelation(tableIdent) => tableIdent.table } val resolver = sparkSession.sessionState.conf.resolver val referencedTestTables = sparkSession.testTables.keys.filter { testTable => referencedTables.exists(resolver(_, testTable)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala index 667a7ddd8bb61..2ebb1de428fd1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala @@ -33,4 +33,13 @@ class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton assert(conf(ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname) === "") } } + + test("newTemporaryConfiguration respect spark.hadoop.foo=bar in SparkConf") { + sys.props.put("spark.hadoop.foo", "bar") + Seq(true, false) foreach { useInMemoryDerby => + val hiveConf = HiveUtils.newTemporaryConfiguration(useInMemoryDerby) + assert(!hiveConf.contains("spark.hadoop.foo")) + assert(hiveConf("foo") === "bar") + } + } }