Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Creation of log directory fails because of permission issues when using keytab #81

Open
zako opened this issue Jun 22, 2016 · 6 comments

Comments

@zako
Copy link

zako commented Jun 22, 2016

We are experiencing an issue where HDFS connector uses incorrect permissions when creating log directories given configurations with heterogeneous set of keytabs. This happens when we run a script to create several HDFS connectors, each having a different keytab, on an empty one node Kafka Connect cluster.

It seems the problem is within DataWriter which uses the keytab to call UserGroupInformation.loginUserFromKeytab which is a static instance. Since each DataWriter will be running in a separate task/thread, this static call may interfere with other running tasks. So it is possible for the log directory to get created under myuser2 and have the FS WAL file created under a different user, i.e. myuser1 in our example.

Here are the permissions of the log directories in HDFS (scrubbed some names):
$ hdfs dfs -ls /logs/mytopic/ Found 1 items drwxrwxrwx - myuser2 supergroup 0 2016-06-21 18:01 /logs/mytopic/0
$ hdfs dfs -ls /logs/mytopic/0 Found 1 items -rw-r--r-- 2 myuser1 supergroup 417 2016-06-21 18:25 /logs/mytopic/0/log

Here is the full stack trace (scrubbed some names):
[2016-06-21 18:51:21,479] ERROR Recovery failed at state RECOVERY_PARTITION_PAUSED (io.confluent.connect.hdfs.TopicPartitionWriter) org.apache.kafka.connect.errors.ConnectException: Error creating writer for log file hdfs://mynamenode:8020/logs/mytopic/0/log at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:91) at io.confluent.connect.hdfs.wal.FSWAL.apply(FSWAL.java:105) at io.confluent.connect.hdfs.TopicPartitionWriter.applyWAL(TopicPartitionWriter.java:441) at io.confluent.connect.hdfs.TopicPartitionWriter.recover(TopicPartitionWriter.java:197) at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:227) at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:234) at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:91) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:370) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:227) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.hadoop.security.AccessControlException: Permission denied: user=myuser2, access=WRITE, inode="/logs/mytopic/0/log":myuser1:supergroup:-rw-r--r-- at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:281) at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:262) at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:175) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:152) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6590) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6572) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPathAccess(FSNamesystem.java:6497) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2887) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:3189) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:3153) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:612) at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.append(AuthorizationProviderProxyClientProtocol.java:125) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:414) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2086) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2082) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2080) at sun.reflect.GeneratedConstructorAccessor92.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73) at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1769) at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1803) at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1796) at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:323) at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:319) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:319) at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1173) at io.confluent.connect.hdfs.wal.WALFile$Writer.<init>(WALFile.java:221) at io.confluent.connect.hdfs.wal.WALFile.createWriter(WALFile.java:67) at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:73) ... 17 more Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=myuser2, access=WRITE, inode="/logs/mytopic/0/log":myuser1:supergroup:-rw-r--r-- at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:281) at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:262) at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:175) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:152) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6590) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6572) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPathAccess(FSNamesystem.java:6497) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2887) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:3189) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:3153) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:612) at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.append(AuthorizationProviderProxyClientProtocol.java:125) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:414) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2086) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2082) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2080) at org.apache.hadoop.ipc.Client.call(Client.java:1468) at org.apache.hadoop.ipc.Client.call(Client.java:1399) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) at com.sun.proxy.$Proxy52.append(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:313) at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy53.append(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1767) ... 27 more

@Ishiihara
Copy link
Contributor

I think one easy workaround here is to use different topic and log directory for different connectors. This also needed as you want to make sure that WAL by different connectors are isolated. This is crucial to ensure the correct behavior of each connector as the connector relies on the data in HDFS to set the correct offset in case of rebalance and restart.

@zako
Copy link
Author

zako commented Jun 23, 2016

We do create a connector per topic in our setup. Here is an example of the configuration for 2 connectors:
{ "name": "TopicA", "config": { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "topics.dir": "/data/", "topics": "TopicA", "connect.hdfs.principal": "KeytabA@domain", "connect.hdfs.keytab": "./keytabs/KeytabA.keytab", <truncated> }, "tasks": [ { "connector": "TopicA", "task": 0 } ] }
{ "name": "TopicB", "config": { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "topics.dir": "/data/", "topics": "TopicB", "connect.hdfs.principal": "KeytabB@domain", "connect.hdfs.keytab": "./keytabs/KeytabB.keytab", <truncated> }, "tasks": [ { "connector": "TopicB", "task": 0 } ] }

The problem is both of these are initialized and created on the same Kafka Connect node. Kafka Connect will create a task per configuration and submit them to the cached thread pool executor in org.apache.kafka.connect.runtime.Worker to run concurrently. The creation of log, tmp and data directories all happen when instantiating a new instance of DataWriter which occurs when the HDFSSinkTask runs. Since permissions are static and apply to all threads, some directories will be created with the incorrect keytab.

@Ishiihara
Copy link
Contributor

@zako That makes sense and thanks for the analysis. I think to allow multiple users for different connector jobs, we need to use secure impersonation and use doAs to perform creation of topics.dir and logs.dir as well as writing data to HDFS. However, this involves relatively large change to the connector.

Another option is to use different Classloaders to load different connectors.

@zako
Copy link
Author

zako commented Jun 23, 2016

I think doAs sounds reasonable, however, my familiarity with Java security is very limited.

We will forgo the alternative to use different Classloaders. Our temporary workaround will be to use a single keytab to continue testing Kafka Connect and other functionality.

@cotedm
Copy link

cotedm commented Jan 9, 2017

Marking this as an enhancement for later evaluation. If there are other users who require multiple users for different connector jobs in a secure environment, it would be good to know about it.

@dbolshak
Copy link

@cotedm I've faced the same problem.

serssp pushed a commit to serssp/kafka-connect-hdfs that referenced this issue Dec 29, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants