Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,31 @@

package org.apache.spark.sql.hive.execution

import java.io.IOException
import java.net.URI
import java.text.SimpleDateFormat
import java.util
import java.util.{Date, Random}

import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.FileUtils

import scala.collection.JavaConverters._
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.exec.TaskRunner
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
import org.apache.hadoop.hive.serde2.Serializer
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf}

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.{UnaryNode, SparkPlan}
import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.hive._
import org.apache.spark.sql.types.DataType
Expand All @@ -54,6 +61,61 @@ case class InsertIntoHiveTable(
@transient private lazy val hiveContext = new Context(sc.hiveconf)
@transient private lazy val catalog = sc.catalog

val stagingDir = new HiveConf().getVar(HiveConf.ConfVars.STAGINGDIR)

private def executionId: String = {
val rand: Random = new Random
val format: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS")
val executionId: String = "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why all this -- just us a UUID? you also have a redundant return and types here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it is. I am working on this way because I want to code is exactly the same as the spark 2.0.x version.

return executionId
Copy link
Contributor

@fidato13 fidato13 Nov 11, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the return statement in scala code be removed please.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @fidato13 this is ok, since the part of this code is reused from spark 2.0.2.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@merlintang Can we take this opportunity to rectify at other places as well, Adding a return statement at the end of a simple method where no complex control flows are introduced would rather make it seem like java style coding. Check on the below link for Scala style guide for reference:
https://github.com/databricks/scala-style-guide#return-statements

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, I will fix it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cheers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the return?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

}

private def getStagingDir(inputPath: Path, hadoopConf: Configuration): Path = {
val inputPathUri: URI = inputPath.toUri
val inputPathName: String = inputPathUri.getPath
val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
val stagingPathName: String =
if (inputPathName.indexOf(stagingDir) == -1) {
new Path(inputPathName, stagingDir).toString
} else {
inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length)
}
val dir: Path =
fs.makeQualified(
new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID))
logDebug("Created staging dir = " + dir + " for path = " + inputPath)
try {
if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'")
}
fs.deleteOnExit(dir)
}
catch {
case e: IOException =>
throw new RuntimeException(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use RuntimeException; why even handle this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can find the reason that we use this code is because (1) the old version need to use the hive package to create the staging directory, in the hive code, this staging directory is storied in a hash map, and then these staging directories would be removed when the session is closed. however, our spark code do not trigger the hive session close, then, these directories will not be removed. (2) you can find the pushed code just simulate the hive way to create the staging directory inside the spark rather than based on the hive. Then, the staging directory will be removed. (3) I will fix the return type issue, thanks for your comments @srowen

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost all the codes in this PR are copied from the existing master. This PR is just for branch 1.6

"Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e)

}
return dir
Copy link
Contributor

@fidato13 fidato13 Nov 11, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the return statement in scala code be removed please.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for your comment, I will update this push it again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

}

private def getExternalScratchDir(extURI: URI, hadoopConf: Configuration): Path = {
getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), hadoopConf)
}

def getExternalTmpPath(path: Path, hadoopConf: Configuration): Path = {
val extURI: URI = path.toUri
if (extURI.getScheme == "viewfs") {
getExtTmpPathRelTo(path.getParent, hadoopConf)
} else {
new Path(getExternalScratchDir(extURI, hadoopConf), "-ext-10000")
}
}

def getExtTmpPathRelTo(path: Path, hadoopConf: Configuration): Path = {
new Path(getStagingDir(path, hadoopConf), "-ext-10000") // Hive uses 10000
}

private def newSerializer(tableDesc: TableDesc): Serializer = {
val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
serializer.initialize(null, tableDesc.getProperties)
Expand Down Expand Up @@ -129,7 +191,9 @@ case class InsertIntoHiveTable(
// instances within the closure, since Serializer is not serializable while TableDesc is.
val tableDesc = table.tableDesc
val tableLocation = table.hiveQlTable.getDataLocation
val tmpLocation = hiveContext.getExternalTmpPath(tableLocation)
val jobConf = new JobConf(sc.hiveconf)
val tmpLocation = getExternalTmpPath(tableLocation, jobConf)

val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
val isCompressed = sc.hiveconf.getBoolean(
ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal)
Expand Down Expand Up @@ -175,7 +239,6 @@ case class InsertIntoHiveTable(
}
}

val jobConf = new JobConf(sc.hiveconf)
val jobConfSer = new SerializableJobConf(jobConf)

// When speculation is on and output committer class name contains "Direct", we should warn
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
sql("SELECT * FROM createAndInsertTest"),
testData.collect().toSeq
)


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: please remove these two lines.

}

test("Double create fails when allowExisting = false") {
Expand Down