Skip to content

Conversation

@dongjoon-hyun
Copy link
Member

@dongjoon-hyun dongjoon-hyun commented Feb 15, 2018

What changes were proposed in this pull request?

In a kerberized cluster, when Spark reads a file path (e.g. people.json), it warns with a wrong warning message during looking up people.json/_spark_metadata. The root cause of this situation is the difference between LocalFileSystem and DistributedFileSystem. LocalFileSystem.exists() returns false, but DistributedFileSystem.exists raises org.apache.hadoop.security.AccessControlException.

scala> spark.version
res0: String = 2.4.0-SNAPSHOT

scala> spark.read.json("file:///usr/hdp/current/spark-client/examples/src/main/resources/people.json").show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

scala> spark.read.json("hdfs:///tmp/people.json")
18/02/15 05:00:48 WARN streaming.FileStreamSink: Error while looking for metadata directory.
18/02/15 05:00:48 WARN streaming.FileStreamSink: Error while looking for metadata directory.

After this PR,

scala> spark.read.json("hdfs:///tmp/people.json").show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

How was this patch tested?

Manual.

@SparkQA
Copy link

SparkQA commented Feb 15, 2018

Test build #87469 has finished for PR 20616 at commit a14ff69.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

Retest this please.

@SparkQA
Copy link

SparkQA commented Feb 15, 2018

Test build #87476 has finished for PR 20616 at commit a14ff69.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

Hi, @cloud-fan and @gatorsmile .
Could you review this PR?

@cloud-fan
Copy link
Contributor

LGTM, cc @zsxwing

@dongjoon-hyun
Copy link
Member Author

Thank you for review, @cloud-fan .

if (fs.isDirectory(hdfsPath)) {
val metadataPath = new Path(hdfsPath, metadataDir)
val res = fs.exists(metadataPath)
res
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just fs.exists(new Path(hdfsPath, metadataDir))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep.

@SparkQA
Copy link

SparkQA commented Feb 20, 2018

Test build #87550 has finished for PR 20616 at commit 27188e7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

Could you review this, @zsxwing and @gatorsmile ?

@dongjoon-hyun
Copy link
Member Author

With more manual tests, I observed that the original situation happens on only kerberized environments. I updated PR/JIRA description.

@zsxwing
Copy link
Member

zsxwing commented Feb 20, 2018

@dongjoon-hyun could you also post the error happening in kerberized environments?

@dongjoon-hyun
Copy link
Member Author

The warning error messages in kerberized environments are the one in PR/JIRA description.
For example, Apache Spark 2.2.1 binary on kerberized cluster shows the following.

scala> spark.read.json("hdfs:///tmp/people.json").show
18/02/20 23:24:05 WARN FileStreamSink: Error while looking for metadata directory.
18/02/20 23:24:05 WARN FileStreamSink: Error while looking for metadata directory.
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

scala> spark.version
res1: String = 2.2.1

@zsxwing
Copy link
Member

zsxwing commented Feb 20, 2018

@dongjoon-hyun I meant the stack trace thrown from fs.exists.

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Feb 20, 2018

Here, it is. It's AccessControlException, @zsxwing . The following is the result from 2.4.0-SNAPSHOT.

18/02/20 23:46:53 WARN streaming.FileStreamSink: Error while looking for metadata directory.
org.apache.hadoop.security.AccessControlException: Permission denied: user=spark, access=EXECUTE, inode="/tmp/people.json/_spark_metadata":ambari-qa:hdfs:-rw-r--r--
	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:319)
	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkTraverse(FSPermissionChecker.java:259)
	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:205)
	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190)
	at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1955)
	at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getFileInfo(FSDirStatAndListingOp.java:109)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:4111)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getFileInfo(NameNodeRpcServer.java:1137)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getFileInfo(ClientNamenodeProtocolServerSideTranslatorPB.java:866)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2345)

	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
	at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
	at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2110)
	at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
	at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
	at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:46)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:354)
	at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.createBaseDataset(JsonDataSource.scala:114)
	at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.infer(JsonDataSource.scala:95)
	at org.apache.spark.sql.execution.datasources.json.JsonDataSource.inferSchema(JsonDataSource.scala:63)
	at org.apache.spark.sql.execution.datasources.json.JsonFileFormat.inferSchema(JsonFileFormat.scala:57)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:202)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:202)
	at scala.Option.orElse(Option.scala:289)
	at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:201)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:392)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
	at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:397)
	at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:340)
	at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:24)
	at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:29)
	at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:31)
	at $line14.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:33)
	at $line14.$read$$iw$$iw$$iw$$iw.<init>(<console>:35)
	at $line14.$read$$iw$$iw$$iw.<init>(<console>:37)
	at $line14.$read$$iw$$iw.<init>(<console>:39)
	at $line14.$read$$iw.<init>(<console>:41)
	at $line14.$read.<init>(<console>:43)
	at $line14.$read$.<init>(<console>:47)
	at $line14.$read$.<clinit>(<console>)
	at $line14.$eval$.$print$lzycompute(<console>:7)
	at $line14.$eval$.$print(<console>:6)
	at $line14.$eval.$print(<console>)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
	at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
	at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)
	at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)
	at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
	at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
	at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)
	at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)
	at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
	at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807)
	at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681)
	at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395)
	at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:415)
	at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:923)
	at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
	at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
	at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)
	at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909)
	at org.apache.spark.repl.Main$.doMain(Main.scala:76)
	at org.apache.spark.repl.Main$.main(Main.scala:56)
	at org.apache.spark.repl.Main.main(Main.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:879)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=spark, access=EXECUTE, inode="/tmp/people.json/_spark_metadata":ambari-qa:hdfs:-rw-r--r--
	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:319)
	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkTraverse(FSPermissionChecker.java:259)
	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:205)
	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190)
	at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1955)
	at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getFileInfo(FSDirStatAndListingOp.java:109)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:4111)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getFileInfo(NameNodeRpcServer.java:1137)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getFileInfo(ClientNamenodeProtocolServerSideTranslatorPB.java:866)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2345)

	at org.apache.hadoop.ipc.Client.call(Client.java:1475)
	at org.apache.hadoop.ipc.Client.call(Client.java:1412)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
	at com.sun.proxy.$Proxy15.getFileInfo(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy16.getFileInfo(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
	... 69 more

@zsxwing
Copy link
Member

zsxwing commented Feb 21, 2018

LGTM. Merging to master. Thanks!

@dongjoon-hyun
Copy link
Member Author

Thank you, @zsxwing and @cloud-fan .

@asfgit asfgit closed this in 3e48f3b Feb 21, 2018
@dongjoon-hyun dongjoon-hyun deleted the SPARK-23434 branch February 21, 2018 00:06
@dongjoon-hyun
Copy link
Member Author

Hi, @cloud-fan and @zsxwing .
Actually, this is reported at Apache Spark 2.0.2, 2.1.2, 2.2.1, 2.3.0 .
Since 2.3 is announced officially, can we have this in the old branches?

@cloud-fan
Copy link
Contributor

no objection from my side.

@dongjoon-hyun
Copy link
Member Author

Thank you, @cloud-fan .
Then, I'll make a backport PR to pass Jenkins once more for each branch.

peter-toth pushed a commit to peter-toth/spark that referenced this pull request Oct 6, 2018
…DFS file path

In a kerberized cluster, when Spark reads a file path (e.g. `people.json`), it warns with a wrong warning message during looking up `people.json/_spark_metadata`. The root cause of this situation is the difference between `LocalFileSystem` and `DistributedFileSystem`. `LocalFileSystem.exists()` returns `false`, but `DistributedFileSystem.exists` raises `org.apache.hadoop.security.AccessControlException`.

```scala
scala> spark.version
res0: String = 2.4.0-SNAPSHOT

scala> spark.read.json("file:///usr/hdp/current/spark-client/examples/src/main/resources/people.json").show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

scala> spark.read.json("hdfs:///tmp/people.json")
18/02/15 05:00:48 WARN streaming.FileStreamSink: Error while looking for metadata directory.
18/02/15 05:00:48 WARN streaming.FileStreamSink: Error while looking for metadata directory.
```

After this PR,
```scala
scala> spark.read.json("hdfs:///tmp/people.json").show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+
```

Manual.

Author: Dongjoon Hyun <[email protected]>

Closes apache#20616 from dongjoon-hyun/SPARK-23434.

Change-Id: I45931d7132c5cb9acd6cf095b9af6cb87a3f0c33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants