-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-15899] [SQL] Fix the construction of the file path with hadoop Path #13868
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
e1c79cc
23540ff
330f068
5522736
34ef3e1
1be6499
fb12118
2b592b6
1b5b035
ea24b59
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit | |
| import scala.collection.JavaConverters._ | ||
| import scala.collection.immutable | ||
|
|
||
| import org.apache.hadoop.fs.Path | ||
| import org.apache.parquet.hadoop.ParquetOutputCommitter | ||
|
|
||
| import org.apache.spark.internal.Logging | ||
|
|
@@ -55,7 +56,7 @@ object SQLConf { | |
| val WAREHOUSE_PATH = SQLConfigBuilder("spark.sql.warehouse.dir") | ||
| .doc("The default location for managed databases and tables.") | ||
| .stringConf | ||
| .createWithDefault("file:${system:user.dir}/spark-warehouse") | ||
| .createWithDefault("${system:user.dir}/spark-warehouse") | ||
|
|
||
| val OPTIMIZER_MAX_ITERATIONS = SQLConfigBuilder("spark.sql.optimizer.maxIterations") | ||
| .internal() | ||
|
|
@@ -679,7 +680,10 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { | |
|
|
||
| def variableSubstituteDepth: Int = getConf(VARIABLE_SUBSTITUTE_DEPTH) | ||
|
|
||
| def warehousePath: String = getConf(WAREHOUSE_PATH) | ||
| def warehousePath: String = { | ||
| new Path(getConf(WAREHOUSE_PATH).replace("${system:user.dir}", | ||
|
||
| System.getProperty("user.dir"))).toString | ||
| } | ||
|
|
||
| override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -111,36 +111,42 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { | |
| catalog.createPartitions(tableName, Seq(part), ignoreIfExists = false) | ||
| } | ||
|
|
||
| private def appendTrailingSlash(path: String): String = { | ||
| if (!path.endsWith(File.separator)) path + File.separator else path | ||
| } | ||
|
|
||
| test("the qualified path of a database is stored in the catalog") { | ||
| val catalog = spark.sessionState.catalog | ||
|
|
||
| withTempDir { tmpDir => | ||
| val path = tmpDir.toString | ||
| // The generated temp path is not qualified. | ||
| assert(!path.startsWith("file:/")) | ||
| sql(s"CREATE DATABASE db1 LOCATION '$path'") | ||
| val pathWithForwardSlash = path.replace("\\", "/") | ||
| sql(s"CREATE DATABASE db1 LOCATION '$pathWithForwardSlash'") | ||
| val pathInCatalog = new Path(catalog.getDatabaseMetadata("db1").locationUri).toUri | ||
| assert("file" === pathInCatalog.getScheme) | ||
| val expectedPath = if (path.endsWith(File.separator)) path.dropRight(1) else path | ||
| val expectedPath = new Path(path).toUri.toString | ||
|
||
| assert(expectedPath === pathInCatalog.getPath) | ||
|
|
||
| withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) { | ||
| sql(s"CREATE DATABASE db2") | ||
| val pathInCatalog = new Path(catalog.getDatabaseMetadata("db2").locationUri).toUri | ||
| assert("file" === pathInCatalog.getScheme) | ||
| val expectedPath = appendTrailingSlash(spark.sessionState.conf.warehousePath) + "db2.db" | ||
| assert(expectedPath === pathInCatalog.getPath) | ||
| val pathInCatalog2 = new Path(catalog.getDatabaseMetadata("db2").locationUri).toUri | ||
| assert("file" === pathInCatalog2.getScheme) | ||
| val expectedPath2 = new Path(spark.sessionState.conf.warehousePath + "/" + "db2.db") | ||
| .toUri | ||
| .toString | ||
| assert(expectedPath2 === pathInCatalog2.getPath) | ||
| } | ||
|
|
||
| sql("DROP DATABASE db1") | ||
| sql("DROP DATABASE db2") | ||
| } | ||
| } | ||
|
|
||
| private def makeQualifiedPath(path: String): Path = { | ||
| // copy-paste from SessionCatalog | ||
| val hadoopPath = new Path(path) | ||
| val fs = hadoopPath.getFileSystem(sparkContext.hadoopConfiguration) | ||
| fs.makeQualified(hadoopPath) | ||
| } | ||
|
|
||
| test("Create/Drop Database") { | ||
| withTempDir { tmpDir => | ||
| val path = tmpDir.toString | ||
|
|
@@ -154,8 +160,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { | |
|
|
||
| sql(s"CREATE DATABASE $dbName") | ||
| val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks) | ||
| val expectedLocation = | ||
| "file:" + appendTrailingSlash(path) + s"$dbNameWithoutBackTicks.db" | ||
| val expectedLocation = makeQualifiedPath(path + "/" + s"$dbNameWithoutBackTicks.db") | ||
| .toString | ||
| assert(db1 == CatalogDatabase( | ||
| dbNameWithoutBackTicks, | ||
| "", | ||
|
|
@@ -181,8 +187,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { | |
| sql(s"CREATE DATABASE $dbName") | ||
| val db1 = catalog.getDatabaseMetadata(dbName) | ||
| val expectedLocation = | ||
| "file:" + appendTrailingSlash(System.getProperty("user.dir")) + | ||
| s"spark-warehouse/$dbName.db" | ||
| makeQualifiedPath(s"${System.getProperty("user.dir")}/spark-warehouse" + | ||
| "/" + s"$dbName.db").toString | ||
| assert(db1 == CatalogDatabase( | ||
| dbName, | ||
| "", | ||
|
|
@@ -200,17 +206,17 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { | |
| val catalog = spark.sessionState.catalog | ||
| val databaseNames = Seq("db1", "`database`") | ||
| withTempDir { tmpDir => | ||
| val path = tmpDir.toString | ||
| val dbPath = "file:" + path | ||
| val path = new Path(tmpDir.toString).toUri.toString | ||
| databaseNames.foreach { dbName => | ||
| try { | ||
| val dbNameWithoutBackTicks = cleanIdentifier(dbName) | ||
| sql(s"CREATE DATABASE $dbName Location '$path'") | ||
| val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks) | ||
| val expPath = makeQualifiedPath(tmpDir.toString).toString | ||
| assert(db1 == CatalogDatabase( | ||
| dbNameWithoutBackTicks, | ||
| "", | ||
| if (dbPath.endsWith(File.separator)) dbPath.dropRight(1) else dbPath, | ||
| expPath, | ||
| Map.empty)) | ||
| sql(s"DROP DATABASE $dbName CASCADE") | ||
| assert(!catalog.databaseExists(dbNameWithoutBackTicks)) | ||
|
|
@@ -233,8 +239,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { | |
| val dbNameWithoutBackTicks = cleanIdentifier(dbName) | ||
| sql(s"CREATE DATABASE $dbName") | ||
| val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks) | ||
| val expectedLocation = | ||
| "file:" + appendTrailingSlash(path) + s"$dbNameWithoutBackTicks.db" | ||
| val expectedLocation = makeQualifiedPath(path + "/" + s"$dbNameWithoutBackTicks.db") | ||
| .toString | ||
| assert(db1 == CatalogDatabase( | ||
| dbNameWithoutBackTicks, | ||
| "", | ||
|
|
@@ -474,7 +480,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { | |
| databaseNames.foreach { dbName => | ||
| try { | ||
| val dbNameWithoutBackTicks = cleanIdentifier(dbName) | ||
| val location = "file:" + appendTrailingSlash(path) + s"$dbNameWithoutBackTicks.db" | ||
| val location = makeQualifiedPath(path + "/" + s"$dbNameWithoutBackTicks.db").toString | ||
|
|
||
| sql(s"CREATE DATABASE $dbName") | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@avulanov can I call on your expertise here? @koertkuipers and I noticed that this causes a problem, in that this path intends to be a local file system path in the local home dir, but will now be interpreted as a path on HDFS for HDFS deployments.
If this is intended to be a local path always, and it seems like it is, then the usages of the new
makeQualifiedPathare a bit wrong in that they explicitly resolve the path against the Hadoop file system, which can be HDFS.Alternatively, just removing
user.dirkind of works too, in that it will at least become a path relative to the HDFS user dir I think. Do you know which is better?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or use FileSystem.getHomeDirectory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that would resolve it, probably, if the intent is to let this become a directory on HDFS. I think it was supposed to be a local file so maybe we have to find a Windows-friendly way to bring back the
file:prefix.Maybe make the default value just "spark-warehouse" and then below in
def warehousePath, add logic to resolve this explicitly against theLocalFilesystem? I'll give that a shot soon if nobody has better ideas.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the goal is to have a local directory here, always, you could add this to the config constant:
Which should be Windows-compatible, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've filed https://issues.apache.org/jira/browse/SPARK-17810 and am about to open a PR for the fix I proposed. I think you're right, though then I wonder, what if I set the value to "/my/local/path"? it still will get interpreted later as an HDFS path, when as I understand it's always supposed to be treated as a local path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.transformalso applies to user-provided values, so "/my/local/path" would become "file:/my/local/path" or something.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can use
File.toURI()forWAREHOUSE_PATH, if we are sure that it is always a local path. However, I remember someone in this thread mentioned that the path might be an amazons3path. Is this supposed to happen?