Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions R/pkg/R/mllib_regression.R
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ setClass("IsotonicRegressionModel", representation(jobj = "jobj"))
#' "frequencyDesc", "frequencyAsc", "alphabetDesc", and "alphabetAsc".
#' The default value is "frequencyDesc". When the ordering is set to
#' "alphabetDesc", this drops the same category as R when encoding strings.
#' @param offsetCol the offset column name. If this is not set or empty, we treat all instance offsets
#' as 0.0. The feature specified as offset has a constant coefficient of 1.0.
#' @param ... additional arguments passed to the method.
#' @aliases spark.glm,SparkDataFrame,formula-method
#' @return \code{spark.glm} returns a fitted generalized linear model.
Expand Down Expand Up @@ -127,7 +129,8 @@ setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"),
function(data, formula, family = gaussian, tol = 1e-6, maxIter = 25, weightCol = NULL,
regParam = 0.0, var.power = 0.0, link.power = 1.0 - var.power,
stringIndexerOrderType = c("frequencyDesc", "frequencyAsc",
"alphabetDesc", "alphabetAsc")) {
"alphabetDesc", "alphabetAsc"),
offsetCol = NULL) {

stringIndexerOrderType <- match.arg(stringIndexerOrderType)
if (is.character(family)) {
Expand Down Expand Up @@ -159,12 +162,19 @@ setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"),
weightCol <- as.character(weightCol)
}

if (!is.null(offsetCol)) {
offsetCol <- as.character(offsetCol)
if (nchar(offsetCol) == 0) {
offsetCol <- NULL
}
}

# For known families, Gamma is upper-cased
jobj <- callJStatic("org.apache.spark.ml.r.GeneralizedLinearRegressionWrapper",
"fit", formula, data@sdf, tolower(family$family), family$link,
tol, as.integer(maxIter), weightCol, regParam,
as.double(var.power), as.double(link.power),
stringIndexerOrderType)
stringIndexerOrderType, offsetCol)
new("GeneralizedLinearRegressionModel", jobj = jobj)
})

Expand Down Expand Up @@ -192,6 +202,8 @@ setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"),
#' "frequencyDesc", "frequencyAsc", "alphabetDesc", and "alphabetAsc".
#' The default value is "frequencyDesc". When the ordering is set to
#' "alphabetDesc", this drops the same category as R when encoding strings.
#' @param offsetCol the offset column name. If this is not set or empty, we treat all instance offsets
#' as 0.0. The feature specified as offset has a constant coefficient of 1.0.
#' @return \code{glm} returns a fitted generalized linear model.
#' @rdname glm
#' @export
Expand All @@ -209,10 +221,12 @@ setMethod("glm", signature(formula = "formula", family = "ANY", data = "SparkDat
function(formula, family = gaussian, data, epsilon = 1e-6, maxit = 25, weightCol = NULL,
var.power = 0.0, link.power = 1.0 - var.power,
stringIndexerOrderType = c("frequencyDesc", "frequencyAsc",
"alphabetDesc", "alphabetAsc")) {
"alphabetDesc", "alphabetAsc"),
offsetCol = NULL) {
spark.glm(data, formula, family, tol = epsilon, maxIter = maxit, weightCol = weightCol,
var.power = var.power, link.power = link.power,
stringIndexerOrderType = stringIndexerOrderType)
stringIndexerOrderType = stringIndexerOrderType,
offsetCol = offsetCol)
})

# Returns the summary of a model produced by glm() or spark.glm(), similarly to R's summary().
Expand Down
8 changes: 8 additions & 0 deletions R/pkg/tests/fulltests/test_mllib_regression.R
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,14 @@ test_that("spark.glm summary", {
expect_equal(stats$df.residual, rStats$df.residual)
expect_equal(stats$aic, rStats$aic)

# Test spark.glm works with offset
training <- suppressWarnings(createDataFrame(iris))
stats <- summary(spark.glm(training, Sepal_Width ~ Sepal_Length + Species,
family = poisson(), offsetCol = "Petal_Length"))
rStats <- suppressWarnings(summary(glm(Sepal.Width ~ Sepal.Length + Species,
data = iris, family = poisson(), offset = iris$Petal.Length)))
expect_true(all(abs(rStats$coefficients - stats$coefficients) < 1e-3))

# Test summary works on base GLM models
baseModel <- stats::glm(Sepal.Width ~ Sepal.Length + Species, data = iris)
baseSummary <- summary(baseModel)
Expand Down
33 changes: 26 additions & 7 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import java.security.PrivilegedExceptionAction
import java.text.DateFormat
import java.util.{Arrays, Comparator, Date, Locale}

import scala.collection.immutable.Map
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.HashMap
import scala.util.control.NonFatal

import com.google.common.primitives.Longs
Expand Down Expand Up @@ -74,7 +76,6 @@ class SparkHadoopUtil extends Logging {
}
}


/**
* Appends S3-specific, spark.hadoop.*, and spark.buffer.size configurations to a Hadoop
* configuration.
Expand All @@ -99,17 +100,35 @@ class SparkHadoopUtil extends Logging {
hadoopConf.set("fs.s3a.session.token", sessionToken)
}
}
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
conf.getAll.foreach { case (key, value) =>
if (key.startsWith("spark.hadoop.")) {
hadoopConf.set(key.substring("spark.hadoop.".length), value)
}
}
appendSparkHadoopConfigs(conf, hadoopConf)
val bufferSize = conf.get("spark.buffer.size", "65536")
hadoopConf.set("io.file.buffer.size", bufferSize)
}
}

/**
* Appends spark.hadoop.* configurations from a [[SparkConf]] to a Hadoop
* configuration without the spark.hadoop. prefix.
*/
def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = {
// Copy any "spark.hadoop.foo=bar" spark properties into conf as "foo=bar"
for ((key, value) <- conf.getAll if key.startsWith("spark.hadoop.")) {
hadoopConf.set(key.substring("spark.hadoop.".length), value)
}
}

/**
* Appends spark.hadoop.* configurations from a Map to another without the spark.hadoop. prefix.
*/
def appendSparkHadoopConfigs(
srcMap: Map[String, String],
destMap: HashMap[String, String]): Unit = {
// Copy any "spark.hadoop.foo=bar" system properties into destMap as "foo=bar"
for ((key, value) <- srcMap if key.startsWith("spark.hadoop.")) {
destMap.put(key.substring("spark.hadoop.".length), value)
}
}

/**
* Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
* subsystems.
Expand Down
2 changes: 1 addition & 1 deletion dev/appveyor-install-dependencies.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ $env:Path += ";$env:HADOOP_HOME\bin"
Pop-Location

# ========================== R
$rVer = "3.3.1"
$rVer = "3.4.1"
$rToolsVer = "3.4.0"

InstallR
Expand Down
34 changes: 33 additions & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -2357,5 +2357,37 @@ The location of these configuration files varies across Hadoop versions, but
a common location is inside of `/etc/hadoop/conf`. Some tools create
configurations on-the-fly, but offer a mechanisms to download copies of them.

To make these files visible to Spark, set `HADOOP_CONF_DIR` in `$SPARK_HOME/spark-env.sh`
To make these files visible to Spark, set `HADOOP_CONF_DIR` in `$SPARK_HOME/conf/spark-env.sh`
to a location containing the configuration files.

# Custom Hadoop/Hive Configuration

If your Spark application is interacting with Hadoop, Hive, or both, there are probably Hadoop/Hive
configuration files in Spark's classpath.

Multiple running applications might require different Hadoop/Hive client side configurations.
You can copy and modify `hdfs-site.xml`, `core-site.xml`, `yarn-site.xml`, `hive-site.xml` in
Spark's classpath for each application. In a Spark cluster running on YARN, these configuration
files are set cluster-wide, and cannot safely be changed by the application.

The better choice is to use spark hadoop properties in the form of `spark.hadoop.*`.
They can be considered as same as normal spark properties which can be set in `$SPARK_HOME/conf/spark-defalut.conf`

In some cases, you may want to avoid hard-coding certain configurations in a `SparkConf`. For
instance, Spark allows you to simply create an empty conf and set spark/spark hadoop properties.

{% highlight scala %}
val conf = new SparkConf().set("spark.hadoop.abc.def","xyz")
val sc = new SparkContext(conf)
{% endhighlight %}

Also, you can modify or add configurations at runtime:
{% highlight bash %}
./bin/spark-submit \
--name "My app" \
--master local[4] \
--conf spark.eventLog.enabled=false \
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \
--conf spark.hadoop.abc.def=xyz \
myApp.jar
{% endhighlight %}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ private[r] object GeneralizedLinearRegressionWrapper
regParam: Double,
variancePower: Double,
linkPower: Double,
stringIndexerOrderType: String): GeneralizedLinearRegressionWrapper = {
stringIndexerOrderType: String,
offsetCol: String): GeneralizedLinearRegressionWrapper = {
// scalastyle:on
val rFormula = new RFormula().setFormula(formula)
.setStringIndexerOrderType(stringIndexerOrderType)
Expand All @@ -99,6 +100,7 @@ private[r] object GeneralizedLinearRegressionWrapper
glr.setLink(link)
}
if (weightCol != null) glr.setWeightCol(weightCol)
if (offsetCol != null) glr.setOffsetCol(offsetCol)

val pipeline = new Pipeline()
.setStages(Array(rFormulaModel, glr))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,11 +473,11 @@ identifierComment
;

relationPrimary
: tableIdentifier sample? tableAlias #tableName
| '(' queryNoWith ')' sample? tableAlias #aliasedQuery
| '(' relation ')' sample? (AS? strictIdentifier)? #aliasedRelation
| inlineTable #inlineTableDefault2
| functionTable #tableValuedFunction
: tableIdentifier sample? tableAlias #tableName
| '(' queryNoWith ')' sample? tableAlias #aliasedQuery
| '(' relation ')' sample? tableAlias #aliasedRelation
| inlineTable #inlineTableDefault2
| functionTable #tableValuedFunction
;

inlineTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -592,25 +592,7 @@ class Analyzer(
case u: UnresolvedRelation if !isRunningDirectlyOnFiles(u.tableIdentifier) =>
val defaultDatabase = AnalysisContext.get.defaultDatabase
val foundRelation = lookupTableFromCatalog(u, defaultDatabase)

// Add `Project` to rename output column names if a query has alias names:
// e.g., SELECT col1, col2 FROM testData AS t(col1, col2)
val relation = if (u.outputColumnNames.nonEmpty) {
val outputAttrs = foundRelation.output
// Checks if the number of the aliases equals to the number of columns in the table.
if (u.outputColumnNames.size != outputAttrs.size) {
u.failAnalysis(s"Number of column aliases does not match number of columns. " +
s"Table name: ${u.tableName}; number of column aliases: " +
s"${u.outputColumnNames.size}; number of columns: ${outputAttrs.size}.")
}
val aliases = outputAttrs.zip(u.outputColumnNames).map {
case (attr, name) => Alias(attr, name)()
}
Project(aliases, foundRelation)
} else {
foundRelation
}
resolveRelation(relation)
resolveRelation(foundRelation)
// The view's child should be a logical plan parsed from the `desc.viewText`, the variable
// `viewText` should be defined, or else we throw an error on the generation of the View
// operator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,10 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str

/**
* Holds the name of a relation that has yet to be looked up in a catalog.
* We could add alias names for columns in a relation:
* {{{
* // Assign alias names
* SELECT col1, col2 FROM testData AS t(col1, col2);
* }}}
*
* @param tableIdentifier table name
* @param outputColumnNames alias names of columns. If these names given, an analyzer adds
* [[Project]] to rename the columns.
*/
case class UnresolvedRelation(
tableIdentifier: TableIdentifier,
outputColumnNames: Seq[String] = Seq.empty)
case class UnresolvedRelation(tableIdentifier: TableIdentifier)
extends LeafNode {

/** Returns a `.` separated name for this relation. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -681,17 +681,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
*/
override def visitTableName(ctx: TableNameContext): LogicalPlan = withOrigin(ctx) {
val tableId = visitTableIdentifier(ctx.tableIdentifier)
val table = if (ctx.tableAlias.identifierList != null) {
UnresolvedRelation(tableId, visitIdentifierList(ctx.tableAlias.identifierList))
} else {
UnresolvedRelation(tableId)
}
val tableWithAlias = if (ctx.tableAlias.strictIdentifier != null) {
SubqueryAlias(ctx.tableAlias.strictIdentifier.getText, table)
} else {
table
}
tableWithAlias.optionalMap(ctx.sample)(withSample)
val table = mayApplyAliasPlan(ctx.tableAlias, UnresolvedRelation(tableId))
table.optionalMap(ctx.sample)(withSample)
}

/**
Expand Down Expand Up @@ -739,12 +730,14 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
/**
* Create an alias (SubqueryAlias) for a join relation. This is practically the same as
* visitAliasedQuery and visitNamedExpression, ANTLR4 however requires us to use 3 different
* hooks.
* hooks. We could add alias names for output columns, for example:
* {{{
* SELECT a, b, c, d FROM (src1 s1 INNER JOIN src2 s2 ON s1.id = s2.id) dst(a, b, c, d)
* }}}
*/
override def visitAliasedRelation(ctx: AliasedRelationContext): LogicalPlan = withOrigin(ctx) {
plan(ctx.relation)
.optionalMap(ctx.sample)(withSample)
.optionalMap(ctx.strictIdentifier)(aliasPlan)
val relation = plan(ctx.relation).optionalMap(ctx.sample)(withSample)
mayApplyAliasPlan(ctx.tableAlias, relation)
}

/**
Expand All @@ -756,31 +749,43 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
* }}}
*/
override def visitAliasedQuery(ctx: AliasedQueryContext): LogicalPlan = withOrigin(ctx) {
val alias = if (ctx.tableAlias.strictIdentifier == null) {
val relation = plan(ctx.queryNoWith).optionalMap(ctx.sample)(withSample)
if (ctx.tableAlias.strictIdentifier == null) {
// For un-aliased subqueries, use a default alias name that is not likely to conflict with
// normal subquery names, so that parent operators can only access the columns in subquery by
// unqualified names. Users can still use this special qualifier to access columns if they
// know it, but that's not recommended.
"__auto_generated_subquery_name"
} else {
ctx.tableAlias.strictIdentifier.getText
}
val subquery = SubqueryAlias(alias, plan(ctx.queryNoWith).optionalMap(ctx.sample)(withSample))
if (ctx.tableAlias.identifierList != null) {
val columnAliases = visitIdentifierList(ctx.tableAlias.identifierList)
UnresolvedSubqueryColumnAliases(columnAliases, subquery)
SubqueryAlias("__auto_generated_subquery_name", relation)
} else {
subquery
mayApplyAliasPlan(ctx.tableAlias, relation)
}
}

/**
* Create an alias (SubqueryAlias) for a LogicalPlan.
* Create an alias ([[SubqueryAlias]]) for a [[LogicalPlan]].
*/
private def aliasPlan(alias: ParserRuleContext, plan: LogicalPlan): LogicalPlan = {
SubqueryAlias(alias.getText, plan)
}

/**
* If aliases specified in a FROM clause, create a subquery alias ([[SubqueryAlias]]) and
* column aliases for a [[LogicalPlan]].
*/
private def mayApplyAliasPlan(tableAlias: TableAliasContext, plan: LogicalPlan): LogicalPlan = {
if (tableAlias.strictIdentifier != null) {
val subquery = SubqueryAlias(tableAlias.strictIdentifier.getText, plan)
if (tableAlias.identifierList != null) {
val columnNames = visitIdentifierList(tableAlias.identifierList)
UnresolvedSubqueryColumnAliases(columnNames, subquery)
} else {
subquery
}
} else {
plan
}
}

/**
* Create a Sequence of Strings for a parenthesis enclosed alias list.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ object JoinType {
case _ =>
val supported = Seq(
"inner",
"outer", "full", "fullouter",
"leftouter", "left",
"rightouter", "right",
"leftsemi",
"leftanti",
"outer", "full", "fullouter", "full_outer",
"leftouter", "left", "left_outer",
"rightouter", "right", "right_outer",
"leftsemi", "left_semi",
"leftanti", "left_anti",
"cross")

throw new IllegalArgumentException(s"Unsupported join type '$typ'. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1228,10 +1228,12 @@ class SQLConf extends Serializable with Logging {
* not set yet, return `defaultValue`.
*/
def getConfString(key: String, defaultValue: String): String = {
val entry = sqlConfEntries.get(key)
if (entry != null && defaultValue != "<undefined>") {
// Only verify configs in the SQLConf object
entry.valueConverter(defaultValue)
if (defaultValue != null && defaultValue != "<undefined>") {
val entry = sqlConfEntries.get(key)
if (entry != null) {
// Only verify configs in the SQLConf object
entry.valueConverter(defaultValue)
}
}
Option(settings.get(key)).getOrElse(defaultValue)
}
Expand Down
Loading