Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exceptions when network is broken #141

Open
skyahead opened this issue Oct 21, 2016 · 9 comments
Open

Exceptions when network is broken #141

skyahead opened this issue Oct 21, 2016 · 9 comments

Comments

@skyahead
Copy link
Contributor

I have a hdfs connector running on my machine, which reads from a Kafka Cluster and then writes into a HDFS cluster. When I disable my machine's network connection, and then enable it after a while, various exceptions throw.

The problem is the hdfs connector will keep trying to acquire lease on WAL files for ever, but never get it.

The exceptions that the hdfs connector throws are:

org.apache.kafka.connect.errors.ConnectException: Cannot acquire lease after timeout, will retry.
at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:95) ~[main/:na]
at io.confluent.connect.hdfs.wal.FSWAL.apply(FSWAL.java:105) ~[main/:na]
at io.confluent.connect.hdfs.TopicPartitionWriter.applyWAL(TopicPartitionWriter.java:484) [main/:na]
at io.confluent.connect.hdfs.TopicPartitionWriter.recover(TopicPartitionWriter.java:212) [main/:na]
at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:256) [main/:na]
at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:234) [main/:na]
at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:103) [main/:na]
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:384) [main/:na]
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:240) [main/:na]
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172) [main/:na]
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143) [main/:na]
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) [connect-runtime-0.10.0.1-cp1.jar:na]
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) [connect-runtime-0.10.0.1-cp1.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_101]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_101]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_101]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_101]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101]

From the hdfs's namenode log, I can see the problem is that the hdfs connector is still holding the lease:

2016-10-21 13:38:57,427 INFO org.apache.hadoop.ipc.Server: IPC Server handler 4 on 54310, call org.apache.hadoop.hdfs.protocol.ClientProtocol.append from 1xx.120.162.xx:48872 Call#364999 Retry#0: org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException: Failed to APPEND_FILE /tmp/logs/topicname/6/log for DFSClient_NONMAPREDUCE_284522662_40 on 1xx.120.162.xx because DFSClient_NONMAPREDUCE_284522662_40 is already the current lease holder.

@cotedm
Copy link

cotedm commented Jan 6, 2017

@skyahead when you see this, do you also see something like "Error closing " followed by your file name? I commented on #142 as I'm thinking there's something to be fixed here, but the issue should be framed up a bit more.

@cmccabe
Copy link
Member

cmccabe commented Jan 25, 2017

Does the connector call recoverLease?

@cotedm
Copy link

cotedm commented Jan 25, 2017

@cmccabe the only lease acquisition I'm aware of happens here: https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/wal/FSWAL.java#L73

I haven't seen an attempt at doing a lease recovery. Would that help avoid this though? Seems like the task is mistakenly trying to get a new lease and not able to do so because it already has one.

@skyahead
Copy link
Contributor Author

@cotedm sorry for replying so late.

Yes, parsing error messages is a bad idea. And I figured out a way to resolve this issue without doing that :-)

It turns out that the HDFS FileSystem has a final static CACHE, which is used by HDFS Connect in two places.

  1. in HdfsStorage, when a new HDFS Connect Cluster starts, and
  2. in WALFile, when data is really being written into (or read from) HDFS.

Out concern is on the second place.

ALL the reading and writing of data files in /+tmp and /logs share a SINGLE FileSystem object, which is cached in the static CACHE.

If the network breaks and then resumes, this CACHE item is NOT regenerated. With this same item, the HDFS cluster will see a same DFSClient that is already the lease, now trying to get the lease again.

To solve it, therefore, we have to clear this cache entry properly.

Please have a look if the new change does the trick.

@skyahead
Copy link
Contributor Author

@cmccabe I feel like recoverLease may break the exactly-once semantics in scenarios like below.

Say somehow a Connect instance is re-balanced and new tasks started on a new node, while the old instance is NOT really dead but the communication between itself and Kafka brokers is delayed.

In this case, if recoverLease() hit HDFS first on the new instance, and if the old instance is still writing into the same files, the some records can be lost.

@sanxiago
Copy link

sanxiago commented Apr 2, 2019

Hello @skyahead and @cmccabe,

Quoting from another thread, which I believes applies to the issue you are currently facing:

This looks to me like this is suffering from an unintended side effect of
closing the FileSystem object. Hadoop internally caches instances of the
FileSystem class, and the same instance can be returned to multiple call
sites. Even after one call site closes it, it's possible that other call
sites still hold a reference to that same FileSystem instance. Closing the
FileSystem instance makes it unusable.

HdfsAdmin#getInotifyEventStream is likely using the same FileSystem
instance that your own FileSystem.get call returns. By closing it (using
try-with-resources), that FileSystem instance is made invalid for the
subsequent calls to retrieve inotify events.

The FileSystem cache is a fairly common source of confusion. However, its
current behavior is considered by design. For reasons of
backwards-incompatibility, we can't easily change its behavior to help with
confusing situations like this. (Sorry!)

A few suggestions to try:

  1. Just don't close the FileSystem. Even if you don't close it
    explicitly, it will be closed at process teardown via a shutdown hook.
    This definitely looks wrong from a resource management perspective, but a
    lot of applications work this way.

  2. Call FileSystem#newInstance instead of FileSystem#get. The newInstance
    method is guaranteed to return an instance unique to that call site, not a
    shared instance potentially in use by other call sites. If you use
    newInstance, then you must guarantee it gets closed to avoid a leak with a
    long-term impact.

  3. You can disable the FileSystem cache for specific file system types by
    editing core-site.xml and setting property fs..impl.disable.cache to true, e.g. fs.hdfs.impl.disable.cache. In
    general, disabling the cache is not desirable, because the performance
    benefits of the cache are noticeable. Sometimes this is a helpful
    workaround for specific applications though.

This is the link to the original thread:
http://mail-archives.apache.org/mod_mbox/hadoop-user/201605.mbox/%3CCAN9K8p41zBV_PMgzzgqceBUe5ta-ArPreQdrYc9QYnaStEde3g@mail.gmail.com%3E

@cyrusv
Copy link
Contributor

cyrusv commented Apr 19, 2019

@sanxiago , can you give more insight on the potential for performance degradation in the fs..impl.disable.cache to true option? Should expect degredation in deployments with very high read volume cases only?

Our resistance to using FileSystem::newInstance is that we can't officially support a workaround like this into our codebase to support against older versions of HDFS, so turning off fs caching is the ideal solution since it can be contained to apply only to affected HDFS versions.

@sanxiago
Copy link

You will need to evaluate by use case, there will be added latency when disabling the cache.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants