Skip to content

Conversation

@windpiger
Copy link
Contributor

@windpiger windpiger commented Feb 13, 2017

What changes were proposed in this pull request?

This PR is a folllowup work from SPARK-19329 - PR(#16672), which has unify the action when we reading from or writing to a datasource table with a non pre-existing locaiton, so here we should also unify the hive serde tables.

That is :

reading from a hive serde table: return 0 rows
writing to a hive serde table: write data successfully

Currently when we select from a hive serde table which has a non pre-existing location will throw an exception:

Input path does not exist: file:/tmp/spark-37caa4e6-5a6a-4361-a905-06cc56afb274
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/tmp/spark-37caa4e6-5a6a-4361-a905-06cc56afb274
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:194)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2080)
at org.apache.spark.rdd.RDD.count(RDD.scala:1157)
at org.apache.spark.sql.QueryTest$.checkAnswer(QueryTest.scala:258)

How was this patch tested?

unit tests added

@SparkQA
Copy link

SparkQA commented Feb 13, 2017

Test build #72808 has started for PR 16910 at commit cb98375.

@windpiger
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Feb 13, 2017

Test build #72812 has finished for PR 16910 at commit cb98375.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@windpiger
Copy link
Contributor Author

cc @gatorsmile @cloud-fan

@SparkQA
Copy link

SparkQA commented Feb 13, 2017

Test build #72815 has finished for PR 16910 at commit 401e86d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@windpiger
Copy link
Contributor Author

windpiger commented Feb 17, 2017

@gatorsmile could you help to review this? thanks :)

dir.delete()
checkAnswer(spark.table("t"), Nil)

val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> new File(dir, "x")

}
}

test("read data from a hive serde table which has a not existed location should succeed") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only test case failed without this fix. Right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is~

s"""
|CREATE TABLE t(a string, b int)
|USING hive
|OPTIONS(path "file:${dir.getAbsolutePath}")
Copy link
Member

@gatorsmile gatorsmile Feb 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indent issues.


// if the table location is not exists, return an empty RDD
if (!fs.exists(locationPath)) {
return new EmptyRDD[InternalRow](sparkSession.sparkContext)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do it in makeRDDForTable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do it here for both non-partition table and partition table,while the partition table run well when the location does not exist with verifyPartitionPath set true,If we also want to run well when verifyPartitionPath set false, we should also do it in makeRDDForPartitionTable, then under this situation I do it here for both non-partition table and partition table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems not reasonable to do this for partition table, because the real partition path maybe not under the location of the partition table, I moved this logic to makeRDDForTable

@SparkQA
Copy link

SparkQA commented Feb 21, 2017

Test build #73191 has finished for PR 16910 at commit 4493a8f.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

retest this please

val locationPath = new Path(inputPathStr)
val fs = locationPath.getFileSystem(sparkSession.sessionState.newHadoopConf())

// if the table location is not exists, return an empty RDD
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is not exists -> does not exist

@SparkQA
Copy link

SparkQA commented Feb 21, 2017

Test build #73193 has finished for PR 16910 at commit 4493a8f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 21, 2017

Test build #73194 has finished for PR 16910 at commit 6fb2b57.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 21, 2017

Test build #73196 has finished for PR 16910 at commit b4caca7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@windpiger
Copy link
Contributor Author

@gatorsmile I have fixed the review above~

HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow, deserializer)
}
val locationPath = new Path(inputPathStr)
val fs = locationPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about replacing sparkSession.sessionState.newHadoopConf() by broadcastedHadoopConf.value.value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok~

|PARTITIONED BY(a, b)
|LOCATION "file:${dir.getCanonicalPath}"
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useless?


val newDirFile = new File(dir, "x")
spark.sql(s"ALTER TABLE t PARTITION(a=1, b=2) SET LOCATION " +
s"'${newDirFile.getAbsolutePath}'")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shorten it to a single line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

101 characters...
let me modify some code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, 101 is still ok

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e...is't it 100? let me test it...
I have modify some code to make it moer clear


val newDirFile = new File(dir, "x")
spark.sql(s"ALTER TABLE t PARTITION(a=1, b=2) SET LOCATION " +
s"'${newDirFile.getAbsolutePath}'")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shorten it to a single line?

@gatorsmile
Copy link
Member

LGTM except a few minor comments.

@SparkQA
Copy link

SparkQA commented Feb 22, 2017

Test build #73266 has started for PR 16910 at commit 119fa64.

@windpiger
Copy link
Contributor Author

retest this please

@windpiger
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Mar 1, 2017

Test build #73657 has finished for PR 16910 at commit f83d81d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 1, 2017

Test build #73661 has finished for PR 16910 at commit 2456a94.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 1, 2017

Test build #73663 has finished for PR 16910 at commit 2456a94.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 1, 2017

Test build #73666 has finished for PR 16910 at commit f4b4d29.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf())
assert(new Path(table.location) == fs.makeQualified(dirPath))

val tableLocFile = new File(table.location.stripPrefix("file:"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new File(new URI(table.location))? please avoid .stripPrefix("file:") which looks very hacky.

checkAnswer(spark.table("t"), Row("c", 1) :: Nil)

val newDirFile = new File(dir, "x")
val newDirPath = newDirFile.getAbsolutePath.stripSuffix("/")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.stripSuffix("/") is it needed?

@SparkQA
Copy link

SparkQA commented Mar 1, 2017

Test build #73677 has finished for PR 16910 at commit 3dcd6c6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

can you resolve the conflict?

@windpiger
Copy link
Contributor Author

ok, do it now ~ yesterday is ok...

@SparkQA
Copy link

SparkQA commented Mar 2, 2017

Test build #73735 has started for PR 16910 at commit a4f771a.

s"""
|CREATE TABLE t(a string, b int)
|USING hive
|LOCATION '$dir'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just call dir.delete before creating this table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok~

Copy link
Contributor Author

@windpiger windpiger Mar 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I found the dir will be created in create table, so we should keep current logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does hive have the same behavior?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I test it in Hive

create table test(a string) location 'hdfs:/xx';

then hdfs:/xx will be created

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems the InMemoryCatalog doesn't do this, you can send a new PR to fix it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok thanks~

@windpiger
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Mar 2, 2017

Test build #73747 has finished for PR 16910 at commit a4f771a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 3, 2017

Test build #73829 has started for PR 16910 at commit 15c0a77.

@windpiger
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Mar 3, 2017

Test build #73831 has finished for PR 16910 at commit 15c0a77.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

spark.sql(s"ALTER TABLE t PARTITION(a=1, b=2) SET LOCATION '$newDirPath'")
assert(!newDirFile.exists())
// select from a partition which location has changed to a not existed location
withSQLConf(SQLConf.HIVE_VERIFY_PARTITION_PATH.key -> "true") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why setting this conf?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we don't set it,it will throw an exception,if we set it,it will check if the partition path exists,and will not throw exception just return emptyrdd even if path not existed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this expected? I think hive will always return empty result right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW this conf will be removed soon, as it has bugs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok~thanks~ then here we also need to modify something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, hive return empty , if there is a bug here(could you describe what the bug is?), we can remove the conf ,and always return result?

@gatorsmile
Copy link
Member

Should we just close it now?

@SparkQA
Copy link

SparkQA commented May 31, 2018

Test build #91357 has finished for PR 16910 at commit 15c0a77.

  • This patch fails PySpark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in 1a4fda8 Jul 19, 2018
zifeif2 pushed a commit to zifeif2/spark that referenced this pull request Nov 22, 2025
Closes apache#17422
Closes apache#17619
Closes apache#18034
Closes apache#18229
Closes apache#18268
Closes apache#17973
Closes apache#18125
Closes apache#18918
Closes apache#19274
Closes apache#19456
Closes apache#19510
Closes apache#19420
Closes apache#20090
Closes apache#20177
Closes apache#20304
Closes apache#20319
Closes apache#20543
Closes apache#20437
Closes apache#21261
Closes apache#21726
Closes apache#14653
Closes apache#13143
Closes apache#17894
Closes apache#19758
Closes apache#12951
Closes apache#17092
Closes apache#21240
Closes apache#16910
Closes apache#12904
Closes apache#21731
Closes apache#21095

Added:
Closes apache#19233
Closes apache#20100
Closes apache#21453
Closes apache#21455
Closes apache#18477

Added:
Closes apache#21812
Closes apache#21787

Author: hyukjinkwon <[email protected]>

Closes apache#21781 from HyukjinKwon/closing-prs.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants