Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exceptions when hdfs is down or hard disk is full #143

Open
skyahead opened this issue Oct 24, 2016 · 9 comments
Open

Exceptions when hdfs is down or hard disk is full #143

skyahead opened this issue Oct 24, 2016 · 9 comments

Comments

@skyahead
Copy link
Contributor

If a hard disk in the hdfs cluster is full, or making it very bad, if we shut down the hdfs cluster (e.g., run stop-dfs.sh) while the Kafka connect is writing into it, then depending on luck, I keep getting one of all of the following exceptions.

After I restarted the hdfs cluster (i.e., running start-dfs.sh. BTW, this seems not something we should be doing in production, but our QA are doing extreme tests), the connectors keep throwing these exceptions for ever.

Exception one:

org.apache.kafka.connect.errors.ConnectException: java.net.ConnectException: Call From tianjili/xxx.120.162.86 to allinone2:54310 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused
at io.confluent.connect.hdfs.wal.FSWAL.append(FSWAL.java:64) ~[main/:na]
at io.confluent.connect.hdfs.TopicPartitionWriter.beginAppend(TopicPartitionWriter.java:593) ~[main/:na]
at io.confluent.connect.hdfs.TopicPartitionWriter.appendToWAL(TopicPartitionWriter.java:584) ~[main/:na]
at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:310) ~[main/:na]
at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:234) [main/:na]
at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:103) [main/:na]
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:384) [main/:na]
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:240) [main/:na]
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172) [main/:na]
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143) [main/:na]
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) [connect-runtime-0.10.0.1-cp1.jar:na]
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) [connect-runtime-0.10.0.1-cp1.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_101]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_101]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_101]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_101]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101]
Caused by: java.net.ConnectException: Call From tianjili/xxx.120.162.86 to allinone2:54310 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_101]
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_101]
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_101]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_101]
at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:791) ~[hadoop-common-2.6.0.jar:na]
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:731) ~[hadoop-common-2.6.0.jar:na]
at org.apache.hadoop.ipc.Client.call(Client.java:1472) ~[hadoop-common-2.6.0.jar:na]
at org.apache.hadoop.ipc.Client.call(Client.java:1399) ~[hadoop-common-2.6.0.jar:na]
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) ~[hadoop-common-2.6.0.jar:na]
at com.sun.proxy.$Proxy89.getAdditionalDatanode(Unknown Source) ~[na:na]
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolTranslatorPB.java:424) ~[hadoop-hdfs-2.6.0.jar:na]

Exception two:

java.io.IOException: Stream closed
at java.io.BufferedWriter.ensureOpen(BufferedWriter.java:116) ~[na:1.8.0_101]
at java.io.BufferedWriter.flushBuffer(BufferedWriter.java:126) ~[na:1.8.0_101]
at java.io.BufferedWriter.flush(BufferedWriter.java:253) ~[na:1.8.0_101]
at com.alu.bna.connect.common.CsvRecordWriter.close(CsvRecordWriter.java:86) ~[main/:na]
at com.alu.bna.connect.csv.CsvRecordWriterProvider$InnerRecordWriter.close(CsvRecordWriterProvider.java:99) ~[main/:na]
at io.confluent.connect.hdfs.TopicPartitionWriter.closeTempFile(TopicPartitionWriter.java:550) ~[main/:na]
at io.confluent.connect.hdfs.TopicPartitionWriter.closeTempFile(TopicPartitionWriter.java:561) ~[main/:na]
at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:307) ~[main/:na]
at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:234) [main/:na]
at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:103) [main/:na]
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:384) [main/:na]
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:240) [main/:na]
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172) [main/:na]
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143) [main/:na]
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) [connect-runtime-0.10.0.1-cp1.jar:na]
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) [connect-runtime-0.10.0.1-cp1.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_101]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_101]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_101]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_101]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101]

Exception three:

org.apache.kafka.connect.errors.ConnectException: Error creating writer for log file hdfs://allinone2:54310/tianjil/logs/xxxx/3/log
at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:91) ~[main/:na]
at io.confluent.connect.hdfs.wal.FSWAL.apply(FSWAL.java:105) ~[main/:na]
at io.confluent.connect.hdfs.TopicPartitionWriter.applyWAL(TopicPartitionWriter.java:484) [main/:na]
at io.confluent.connect.hdfs.TopicPartitionWriter.recover(TopicPartitionWriter.java:212) [main/:na]
at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:256) [main/:na]
at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:234) [main/:na]
at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:103) [main/:na]
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:384) [main/:na]
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:240) [main/:na]
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172) [main/:na]
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143) [main/:na]
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) [connect-runtime-0.10.0.1-cp1.jar:na]
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) [connect-runtime-0.10.0.1-cp1.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_101]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_101]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_101]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_101]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101]

Caused by: java.io.IOException: Cannot obtain block length for LocatedBlock{BP-1032838698-xxx.120.9.80-1476364592496:blk_1073745753_6894; getBlockSize()=35; corrupt=false; offset=0; locs=[xxx.120.9.84:50010, xxx.120.9.80:50010]; storageIDs=[DS-e2d7231e-e690-4ae2-a9bd-e1257a065f2f, DS-4973e814-95c8-44f8-a0be-fc437f39e542]; storageTypes=[DISK, DISK]}
at org.apache.hadoop.hdfs.DFSInputStream.readBlockLength(DFSInputStream.java:359) ~[hadoop-hdfs-2.6.0.jar:na]
at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:301) ~[hadoop-hdfs-2.6.0.jar:na]
at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:238) ~[hadoop-hdfs-2.6.0.jar:na]
at org.apache.hadoop.hdfs.DFSInputStream.(DFSInputStream.java:231) ~[hadoop-hdfs-2.6.0.jar:na]
at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1498) ~[hadoop-hdfs-2.6.0.jar:na]
at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:302) ~[hadoop-hdfs-2.6.0.jar:na]
at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:298) ~[hadoop-hdfs-2.6.0.jar:na]
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) ~[hadoop-common-2.6.0.jar:na]
at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:298) ~[hadoop-hdfs-2.6.0.jar:na]
at io.confluent.connect.hdfs.wal.WALFile$Reader.openFile(WALFile.java:579) ~[kafka-connect-hdfs-3.0.1.jar:na]
at io.confluent.connect.hdfs.wal.WALFile$Reader.(WALFile.java:528) ~[kafka-connect-hdfs-3.0.1.jar:na]

@lakeofsand
Copy link

@skyahead
We encounter a lot these days when hdfs-connector face hdfs/net abnormal,such as the thrid exception you listed here.Exceptions can only recovered by restart connect or rm hdfs files manually.

These issues clound be classfiled to two types:
1、“Kafka-connect” cant't recovered from some exception automatically,actually it can do it.
This it important when face with some common/temporary service down. "The problem is inevitable"
2、 connect -- hdfs same econunter some coordination problem.
The same problem may not exist in other context for process is not permanent.

@skyahead
Copy link
Contributor Author

@lakeofsand I have some code to prevent these exceptions form happening and make Kafka Connect survive network breakdown and HDFS outage, etc automatically. But Our QA reported missing records last week and I am in the process of resolving this issue. Will definitely report back when I fix this issue.

skyahead added a commit to skyahead/kafka-connect-hdfs that referenced this issue Nov 3, 2016
@cotedm
Copy link

cotedm commented Jan 6, 2017

@skyahead it seems like these three exceptions all stem from an inability to reconnect to HDFS even though the tasks are retrying to establish a connection.

The first one looks like maybe potentially a problem with trying to append to the WAL repeatedly when the state machine gets stuck in the TEMP_FILE_CLOSED state. Seems like this could be related to our inability to properly re-establish a lease but without more log it's difficult to say for sure.

The second one looks similar except we're stuck in the SHOULD_ROTATE state.

The third one looks like we've never even established a lease and again can be related to the same problem. My comments on #142 I believe are relevant here and can help you out if we properly null out the writer when we fail to close it. With that explanation, do you think we can close this as a dupe and reopen if the problem persists after pulling in an adapted version of #142?

For reference, this is the state machine I'm talking about.

https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java#L262

@skyahead
Copy link
Contributor Author

skyahead commented Feb 1, 2017

@cotedm Could you have another look at the current PR, when you get a chance?

@Perdjesk
Copy link

Perdjesk commented May 18, 2017

Hitting a very similar issue when a rolling restart of HDFS datanodes is executed (1 datanode every 120seconds).

Following ERROR is indefinitely looping. Impacted tasks appears to be random. Some survive the datanodes rolling restart some not.

[2017-05-18 14:32:56,507] ERROR Recovery failed at state RECOVERY_PARTITION_PAUSED (io.confluent.connect.hdfs.TopicPartitionWriter)
org.apache.kafka.connect.errors.ConnectException: Error creating writer for log file hdfs://hadoop//srv/prod/blu/connect-data/logs/topicname/6/log
        at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:91)
        at io.confluent.connect.hdfs.wal.FSWAL.apply(FSWAL.java:105)
        at io.confluent.connect.hdfs.TopicPartitionWriter.applyWAL(TopicPartitionWriter.java:486)
        at io.confluent.connect.hdfs.TopicPartitionWriter.recover(TopicPartitionWriter.java:213)
        at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:257)
        at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:247)
        at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:103)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Cannot obtain block length for LocatedBlock{BP-2074729095-ip-1486128728174:blk_1082143745_9382741; getBlockSize()=2640; corrupt=false; offset=0; locs=[DatanodeInfoWithStorage[ip:1019,DS-cce32714-xxxx-499b-9798-2803954eafe9,DISK], DatanodeInfoWithStorage[ip:1019,DS-ef0cdb7d-91ad-xxxx-ba50-fa3e2c124bc5,DISK], DatanodeInfoWithStorage[ip:1019,DS-6a920e7c-cd74-xxxx-ac27-02f07ceb9e13,DISK], DatanodeInfoWithStorage[ip:1019,DS-cfb5a118-b23f-xxxx-bb96-c2b85e5654b1,DISK], DatanodeInfoWithStorage[ip:1019,DS-6dccb500-2e19-xxxxx-a3d6-6976a6127f9c,DISK]]}
        at org.apache.hadoop.hdfs.DFSInputStream.readBlockLength(DFSInputStream.java:428)
        at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:336)
        at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272)
        at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264)
        at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526)
        at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
        at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
        at io.confluent.connect.hdfs.wal.WALFile$Reader.openFile(WALFile.java:573)
        at io.confluent.connect.hdfs.wal.WALFile$Reader.<init>(WALFile.java:522)
        at io.confluent.connect.hdfs.wal.WALFile$Writer.<init>(WALFile.java:215)
        at io.confluent.connect.hdfs.wal.WALFile.createWriter(WALFile.java:67)
        at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:73)
        ... 17 more

@blootsvoets
Copy link

Still experiencing a similar issue with Confluent 4.1.0:

[2018-07-19 09:39:57,033] ERROR Recovery failed at state RECOVERY_PARTITION_PAUSED (io.confluent.connect.hdfs.TopicPartitionWriter)
org.apache.kafka.connect.errors.DataException: Error creating writer for log file hdfs://hdfs-namenode-1:8020/logs/mytopic/2/log
	at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:91)
	at io.confluent.connect.hdfs.wal.FSWAL.apply(FSWAL.java:105)
	at io.confluent.connect.hdfs.TopicPartitionWriter.applyWAL(TopicPartitionWriter.java:601)
	at io.confluent.connect.hdfs.TopicPartitionWriter.recover(TopicPartitionWriter.java:256)
	at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:321)
	at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:374)
	at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:109)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:524)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:302)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Cannot obtain block length for LocatedBlock{BP-848146280-172.19.0.5-1531990635873:blk_1073741849_1025; getBlockSize()=35; corrupt=false; offset=0; locs=[DatanodeInfoWithStorage[172.19.0.6:9866,DS-44ed10ee-5e44-4189-b4c5-5f8579a21184,DISK], DatanodeInfoWithStorage[172.19.0.4:9866,DS-642d35fe-fe6b-43ae-82be-2c5817dbf478,DISK], DatanodeInfoWithStorage[172.19.0.3:9866,DS-b3765134-b9e3-4a99-bb2e-897ee45f3f76,DISK]]}
	at org.apache.hadoop.hdfs.DFSInputStream.readBlockLength(DFSInputStream.java:428)
	at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:336)
	at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272)
	at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264)
	at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526)
	at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
	at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
	at io.confluent.connect.hdfs.wal.WALFile$Reader.openFile(WALFile.java:551)
	at io.confluent.connect.hdfs.wal.WALFile$Reader.<init>(WALFile.java:436)
	at io.confluent.connect.hdfs.wal.WALFile$Writer.<init>(WALFile.java:156)
	at io.confluent.connect.hdfs.wal.WALFile.createWriter(WALFile.java:75)
	at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:73)
	... 17 more
[2018-07-19 09:39:57,037] ERROR Recovery failed at state RECOVERY_PARTITION_PAUSED (io.confluent.connect.hdfs.TopicPartitionWriter)
org.apache.kafka.connect.errors.DataException: Error creating writer for log file hdfs://hdfs-namenode-1:8020/logs/android_empatica_e4_electrodermal_activity/0/log
	at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:91)
	at io.confluent.connect.hdfs.wal.FSWAL.apply(FSWAL.java:105)
	at io.confluent.connect.hdfs.TopicPartitionWriter.applyWAL(TopicPartitionWriter.java:601)
	at io.confluent.connect.hdfs.TopicPartitionWriter.recover(TopicPartitionWriter.java:256)
	at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:321)
	at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:374)
	at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:109)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:524)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:302)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Cannot obtain block length for LocatedBlock{BP-848146280-172.19.0.5-1531990635873:blk_1073741854_1030; getBlockSize()=35; corrupt=false; offset=0; locs=[DatanodeInfoWithStorage[172.19.0.3:9866,DS-b3765134-b9e3-4a99-bb2e-897ee45f3f76,DISK], DatanodeInfoWithStorage[172.19.0.6:9866,DS-44ed10ee-5e44-4189-b4c5-5f8579a21184,DISK], DatanodeInfoWithStorage[172.19.0.4:9866,DS-642d35fe-fe6b-43ae-82be-2c5817dbf478,DISK]]}
	at org.apache.hadoop.hdfs.DFSInputStream.readBlockLength(DFSInputStream.java:428)
	at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:336)
	at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272)
	at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264)
	at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526)
	at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
	at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
	at io.confluent.connect.hdfs.wal.WALFile$Reader.openFile(WALFile.java:551)
	at io.confluent.connect.hdfs.wal.WALFile$Reader.<init>(WALFile.java:436)
	at io.confluent.connect.hdfs.wal.WALFile$Writer.<init>(WALFile.java:156)
	at io.confluent.connect.hdfs.wal.WALFile.createWriter(WALFile.java:75)
	at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:73)
	... 17 more

Our current resolution is to remove the entire /logs directory whenever this occurs, but that seems a heavy solution. This state is also hard to detect without parsing the logs. Or does it affect change the output on host:8083/connectors/hdfs-sink/status as well?

In general, I'd prefer shutting down a connector or a task to endlessly looping exceptions.

@skyahead
Copy link
Contributor Author

skyahead commented Jul 19, 2018 via email

@blootsvoets
Copy link

@skyahead will try to wait next time. If this behaviour is expected and automatically resolved by the code, I would not expect the logs filling up with stack traces for every (very frequent) retry though. A message that it will be automatically resolved would also be useful.

@alexandrfox
Copy link

alexandrfox commented Jul 30, 2018

Similarly, we still see TopicPartitionWriter stuck in SHOULD_ROTATE loop:

[2018-07-27 08:16:23,712] ERROR Exception on topic partition topic-name-1:  (io.confluent.connect.hdfs.TopicPartitionWriter)
org.apache.kafka.connect.errors.DataException: java.nio.channels.ClosedChannelException
        at io.confluent.connect.hdfs.avro.AvroRecordWriterProvider$1.close(AvroRecordWriterProvider.java:96)
        at io.confluent.connect.hdfs.TopicPartitionWriter.closeTempFile(TopicPartitionWriter.java:655)
        at io.confluent.connect.hdfs.TopicPartitionWriter.closeTempFile(TopicPartitionWriter.java:662)
        at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:386)
        at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:374)
        at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:109)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:524)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:302)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.nio.channels.ClosedChannelException
        at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1546)
        at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:104)
        at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at org.apache.avro.file.DataFileWriter$BufferedFileOutputStream$PositionFilter.write(DataFileWriter.java:458)
        at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
        at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
        at org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerWrite(BufferedBinaryEncoder.java:216)
        at org.apache.avro.io.BufferedBinaryEncoder.flushBuffer(BufferedBinaryEncoder.java:93)
        at org.apache.avro.io.BufferedBinaryEncoder.ensureBounds(BufferedBinaryEncoder.java:108)
        at org.apache.avro.io.BufferedBinaryEncoder.writeLong(BufferedBinaryEncoder.java:129)
        at org.apache.avro.file.DataFileStream$DataBlock.writeBlockTo(DataFileStream.java:367)
        at org.apache.avro.file.DataFileWriter.writeBlock(DataFileWriter.java:395)
        at org.apache.avro.file.DataFileWriter.sync(DataFileWriter.java:413)
        at org.apache.avro.file.DataFileWriter.flush(DataFileWriter.java:422)
        at org.apache.avro.file.DataFileWriter.close(DataFileWriter.java:445)
        at io.confluent.connect.hdfs.avro.AvroRecordWriterProvider$1.close(AvroRecordWriterProvider.java:94)
        ... 16 more

@skyahead I wonder if we should bring back the change from the original commit of yours to try/finally writer.close() method call in TopicPartitionWriter.closeTempFile() (or at least treat ClosedChannelException differently in this case). ~I don't see how can we lose messages by doing that since writeRecord() call won't be affected, as well as appendToWAL/commitFile state machine logic.~~ EDIT: after taking another hard look: there might be a situation where we have unflushed messages in FSDataOutputStream after calling writeRecord(), so we count them in recordCounter, offsets and startOffsets but network partition happens before buffer is flushed. So subsequent commitFile() call will commit an incomplete logfile.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants