Skip to content

Commit b2ed1f3

Browse files
committed
Merge remote-tracking branch 'upstream/master' into dt-opt
2 parents 0f676e2 + b7c89a7 commit b2ed1f3

File tree

10 files changed

+115
-26
lines changed

10 files changed

+115
-26
lines changed

sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,9 @@ private[sql] object JsonRDD extends Logging {
109109
val newType = dataType match {
110110
case NullType => StringType
111111
case ArrayType(NullType, containsNull) => ArrayType(StringType, containsNull)
112-
case struct: StructType => nullTypeToStringType(struct)
112+
case ArrayType(struct: StructType, containsNull) =>
113+
ArrayType(nullTypeToStringType(struct), containsNull)
114+
case struct: StructType =>nullTypeToStringType(struct)
113115
case other: DataType => other
114116
}
115117
StructField(fieldName, newType, nullable)

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -373,8 +373,9 @@ private[parquet] object ParquetTypesConverter extends Logging {
373373
}
374374
ParquetRelation.enableLogForwarding()
375375

376-
val children = fs.listStatus(path).filterNot {
377-
_.getPath.getName == FileOutputCommitter.SUCCEEDED_FILE_NAME
376+
val children = fs.listStatus(path).filterNot { status =>
377+
val name = status.getPath.getName
378+
name(0) == '.' || name == FileOutputCommitter.SUCCEEDED_FILE_NAME
378379
}
379380

380381
// NOTE (lian): Parquet "_metadata" file can be very slow if the file consists of lots of row

sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,8 @@ class JsonSuite extends QueryTest {
213213
StructField("arrayOfStruct", ArrayType(
214214
StructType(
215215
StructField("field1", BooleanType, true) ::
216-
StructField("field2", StringType, true) :: Nil)), true) ::
216+
StructField("field2", StringType, true) ::
217+
StructField("field3", StringType, true) :: Nil)), true) ::
217218
StructField("struct", StructType(
218219
StructField("field1", BooleanType, true) ::
219220
StructField("field2", DecimalType, true) :: Nil), true) ::
@@ -263,8 +264,12 @@ class JsonSuite extends QueryTest {
263264

264265
// Access elements of an array of structs.
265266
checkAnswer(
266-
sql("select arrayOfStruct[0], arrayOfStruct[1], arrayOfStruct[2] from jsonTable"),
267-
(true :: "str1" :: Nil, false :: null :: Nil, null) :: Nil
267+
sql("select arrayOfStruct[0], arrayOfStruct[1], arrayOfStruct[2], arrayOfStruct[3] " +
268+
"from jsonTable"),
269+
(true :: "str1" :: null :: Nil,
270+
false :: null :: null :: Nil,
271+
null :: null :: null :: Nil,
272+
null) :: Nil
268273
)
269274

270275
// Access a struct and fields inside of it.

sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ object TestJsonData {
4343
"arrayOfDouble":[1.2, 1.7976931348623157E308, 4.9E-324, 2.2250738585072014E-308],
4444
"arrayOfBoolean":[true, false, true],
4545
"arrayOfNull":[null, null, null, null],
46-
"arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}],
46+
"arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}],
4747
"arrayOfArray1":[[1, 2, 3], ["str1", "str2"]],
4848
"arrayOfArray2":[[1, 2, 3], [1.1, 2.1, 3.1]]
4949
}""" :: Nil)

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ package org.apache.spark.sql.hive
1919

2020
import scala.util.parsing.combinator.RegexParsers
2121

22-
import org.apache.hadoop.fs.Path
23-
import org.apache.hadoop.hive.conf.HiveConf
2422
import org.apache.hadoop.hive.metastore.api.{FieldSchema, StorageDescriptor, SerDeInfo}
2523
import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition}
2624
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
@@ -39,6 +37,7 @@ import org.apache.spark.sql.catalyst.rules._
3937
import org.apache.spark.sql.catalyst.types._
4038
import org.apache.spark.sql.columnar.InMemoryRelation
4139
import org.apache.spark.sql.hive.execution.HiveTableScan
40+
import org.apache.spark.util.Utils
4241

4342
/* Implicit conversions */
4443
import scala.collection.JavaConversions._
@@ -288,7 +287,10 @@ private[hive] case class MetastoreRelation
288287
)
289288

290289
val tableDesc = new TableDesc(
291-
Class.forName(hiveQlTable.getSerializationLib).asInstanceOf[Class[Deserializer]],
290+
Class.forName(
291+
hiveQlTable.getSerializationLib,
292+
true,
293+
Utils.getContextOrSparkClassLoader).asInstanceOf[Class[Deserializer]],
292294
hiveQlTable.getInputFormatClass,
293295
// The class of table should be org.apache.hadoop.hive.ql.metadata.Table because
294296
// getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ private[hive] case class AddFile(filePath: String) extends Command
4646

4747
private[hive] case class DropTable(tableName: String, ifExists: Boolean) extends Command
4848

49+
private[hive] case class AnalyzeTable(tableName: String) extends Command
50+
4951
/** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */
5052
private[hive] object HiveQl {
5153
protected val nativeCommands = Seq(
@@ -74,7 +76,6 @@ private[hive] object HiveQl {
7476
"TOK_CREATEFUNCTION",
7577
"TOK_DROPFUNCTION",
7678

77-
"TOK_ANALYZE",
7879
"TOK_ALTERDATABASE_PROPERTIES",
7980
"TOK_ALTERINDEX_PROPERTIES",
8081
"TOK_ALTERINDEX_REBUILD",
@@ -92,7 +93,6 @@ private[hive] object HiveQl {
9293
"TOK_ALTERTABLE_SKEWED",
9394
"TOK_ALTERTABLE_TOUCH",
9495
"TOK_ALTERTABLE_UNARCHIVE",
95-
"TOK_ANALYZE",
9696
"TOK_CREATEDATABASE",
9797
"TOK_CREATEFUNCTION",
9898
"TOK_CREATEINDEX",
@@ -239,7 +239,6 @@ private[hive] object HiveQl {
239239
ShellCommand(sql.drop(1))
240240
} else {
241241
val tree = getAst(sql)
242-
243242
if (nativeCommands contains tree.getText) {
244243
NativeCommand(sql)
245244
} else {
@@ -387,6 +386,22 @@ private[hive] object HiveQl {
387386
ifExists) =>
388387
val tableName = tableNameParts.map { case Token(p, Nil) => p }.mkString(".")
389388
DropTable(tableName, ifExists.nonEmpty)
389+
// Support "ANALYZE TABLE tableNmae COMPUTE STATISTICS noscan"
390+
case Token("TOK_ANALYZE",
391+
Token("TOK_TAB", Token("TOK_TABNAME", tableNameParts) :: partitionSpec) ::
392+
isNoscan) =>
393+
// Reference:
394+
// https://cwiki.apache.org/confluence/display/Hive/StatsDev#StatsDev-ExistingTables
395+
if (partitionSpec.nonEmpty) {
396+
// Analyze partitions will be treated as a Hive native command.
397+
NativePlaceholder
398+
} else if (isNoscan.isEmpty) {
399+
// If users do not specify "noscan", it will be treated as a Hive native command.
400+
NativePlaceholder
401+
} else {
402+
val tableName = tableNameParts.map { case Token(p, Nil) => p }.mkString(".")
403+
AnalyzeTable(tableName)
404+
}
390405
// Just fake explain for any of the native commands.
391406
case Token("TOK_EXPLAIN", explainArgs)
392407
if noExplainCommands.contains(explainArgs.head.getText) =>

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ private[hive] trait HiveStrategies {
8383

8484
case DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nil
8585

86+
case AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil
87+
8688
case describe: logical.DescribeCommand =>
8789
val resolvedTable = context.executePlan(describe.table).analyzed
8890
resolvedTable match {

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -72,17 +72,12 @@ case class HiveTableScan(
7272
}
7373

7474
private def addColumnMetadataToConf(hiveConf: HiveConf) {
75-
// Specifies IDs and internal names of columns to be scanned.
76-
val neededColumnIDs = attributes.map(a => relation.output.indexWhere(_.name == a.name): Integer)
77-
val columnInternalNames = neededColumnIDs.map(HiveConf.getColumnInternalName(_)).mkString(",")
78-
79-
if (attributes.size == relation.output.size) {
80-
// SQLContext#pruneFilterProject guarantees no duplicated value in `attributes`
81-
ColumnProjectionUtils.setFullyReadColumns(hiveConf)
82-
} else {
83-
ColumnProjectionUtils.appendReadColumnIDs(hiveConf, neededColumnIDs)
84-
}
75+
// Specifies needed column IDs for those non-partitioning columns.
76+
val neededColumnIDs =
77+
attributes.map(a =>
78+
relation.attributes.indexWhere(_.name == a.name): Integer).filter(index => index >= 0)
8579

80+
ColumnProjectionUtils.appendReadColumnIDs(hiveConf, neededColumnIDs)
8681
ColumnProjectionUtils.appendReadColumnNames(hiveConf, attributes.map(_.name))
8782

8883
// Specifies types and object inspectors of columns to be scanned.
@@ -99,7 +94,7 @@ case class HiveTableScan(
9994
.mkString(",")
10095

10196
hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames)
102-
hiveConf.set(serdeConstants.LIST_COLUMNS, columnInternalNames)
97+
hiveConf.set(serdeConstants.LIST_COLUMNS, relation.attributes.map(_.name).mkString(","))
10398
}
10499

105100
addColumnMetadataToConf(context.hiveconf)

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DropTable.scala renamed to sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,32 @@ import org.apache.spark.sql.catalyst.expressions.Row
2323
import org.apache.spark.sql.execution.{Command, LeafNode}
2424
import org.apache.spark.sql.hive.HiveContext
2525

26+
/**
27+
* :: DeveloperApi ::
28+
* Analyzes the given table in the current database to generate statistics, which will be
29+
* used in query optimizations.
30+
*
31+
* Right now, it only supports Hive tables and it only updates the size of a Hive table
32+
* in the Hive metastore.
33+
*/
34+
@DeveloperApi
35+
case class AnalyzeTable(tableName: String) extends LeafNode with Command {
36+
37+
def hiveContext = sqlContext.asInstanceOf[HiveContext]
38+
39+
def output = Seq.empty
40+
41+
override protected[sql] lazy val sideEffectResult = {
42+
hiveContext.analyze(tableName)
43+
Seq.empty[Any]
44+
}
45+
46+
override def execute(): RDD[Row] = {
47+
sideEffectResult
48+
sparkContext.emptyRDD[Row]
49+
}
50+
}
51+
2652
/**
2753
* :: DeveloperApi ::
2854
* Drops a table from the metastore and removes it if it is cached.

sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,54 @@ package org.apache.spark.sql.hive
1919

2020
import scala.reflect.ClassTag
2121

22+
2223
import org.apache.spark.sql.{SQLConf, QueryTest}
24+
import org.apache.spark.sql.catalyst.plans.logical.NativeCommand
2325
import org.apache.spark.sql.execution.{BroadcastHashJoin, ShuffledHashJoin}
2426
import org.apache.spark.sql.hive.test.TestHive
2527
import org.apache.spark.sql.hive.test.TestHive._
2628

2729
class StatisticsSuite extends QueryTest {
2830

31+
test("parse analyze commands") {
32+
def assertAnalyzeCommand(analyzeCommand: String, c: Class[_]) {
33+
val parsed = HiveQl.parseSql(analyzeCommand)
34+
val operators = parsed.collect {
35+
case a: AnalyzeTable => a
36+
case o => o
37+
}
38+
39+
assert(operators.size === 1)
40+
if (operators(0).getClass() != c) {
41+
fail(
42+
s"""$analyzeCommand expected command: $c, but got ${operators(0)}
43+
|parsed command:
44+
|$parsed
45+
""".stripMargin)
46+
}
47+
}
48+
49+
assertAnalyzeCommand(
50+
"ANALYZE TABLE Table1 COMPUTE STATISTICS",
51+
classOf[NativeCommand])
52+
assertAnalyzeCommand(
53+
"ANALYZE TABLE Table1 PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS",
54+
classOf[NativeCommand])
55+
assertAnalyzeCommand(
56+
"ANALYZE TABLE Table1 PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS noscan",
57+
classOf[NativeCommand])
58+
assertAnalyzeCommand(
59+
"ANALYZE TABLE Table1 PARTITION(ds, hr) COMPUTE STATISTICS",
60+
classOf[NativeCommand])
61+
assertAnalyzeCommand(
62+
"ANALYZE TABLE Table1 PARTITION(ds, hr) COMPUTE STATISTICS noscan",
63+
classOf[NativeCommand])
64+
65+
assertAnalyzeCommand(
66+
"ANALYZE TABLE Table1 COMPUTE STATISTICS nOscAn",
67+
classOf[AnalyzeTable])
68+
}
69+
2970
test("analyze MetastoreRelations") {
3071
def queryTotalSize(tableName: String): BigInt =
3172
catalog.lookupRelation(None, tableName).statistics.sizeInBytes
@@ -37,7 +78,7 @@ class StatisticsSuite extends QueryTest {
3778

3879
assert(queryTotalSize("analyzeTable") === defaultSizeInBytes)
3980

40-
analyze("analyzeTable")
81+
sql("ANALYZE TABLE analyzeTable COMPUTE STATISTICS noscan")
4182

4283
assert(queryTotalSize("analyzeTable") === BigInt(11624))
4384

@@ -66,7 +107,7 @@ class StatisticsSuite extends QueryTest {
66107

67108
assert(queryTotalSize("analyzeTable_part") === defaultSizeInBytes)
68109

69-
analyze("analyzeTable_part")
110+
sql("ANALYZE TABLE analyzeTable_part COMPUTE STATISTICS noscan")
70111

71112
assert(queryTotalSize("analyzeTable_part") === BigInt(17436))
72113

0 commit comments

Comments
 (0)