Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hdfs connector not consuming kafka topic #162

Open
heifrank opened this issue Jan 10, 2017 · 13 comments
Open

hdfs connector not consuming kafka topic #162

heifrank opened this issue Jan 10, 2017 · 13 comments
Labels

Comments

@heifrank
Copy link

heifrank commented Jan 10, 2017

Hi all,
I came to a problem that hdfs connector not consuming my kafka topic, however connect-file-sink do. After I started the job, i've wait about 3 minutes but none commit log ever show up, so i kill the job using ctrl + c. I noticed an error log appeared. The detail are as shown below.

The runtime log is as follows:

hdfs.url = hdfs://10.103.18.9:9000
hdfs.authentication.kerberos = false
hive.metastore.uris =
partition.field.name = date kerberos.ticket.renew.period.ms = 3600000 shutdown.timeout.ms = 3000
partitioner.class = io.confluent.connect.hdfs.partitioner.FieldPartitioner
storage.class = io.confluent.connect.hdfs.storage.HdfsStorage path.format = (io.confluent.connect.hdfs.HdfsSinkConnectorConfig:135)
[2017-01-10 12:37:10,018] INFO Hadoop configuration directory (io.confluent.connect.hdfs.DataWriter:94)
[2017-01-10 12:37:10,295] WARN Unable to load native-hadoop library for your platform... using builtin-java classes where applicable (org.apache.hadoop.util.NativeCodeLoader:62)
[2017-01-10 12:37:10,901] INFO Started recovery for topic partition test_kafka_docs2-2 (io.confluent.connect.hdfs.TopicPartitionWriter:193)
[2017-01-10 12:37:10,912] INFO Finished recovery for topic partition test_kafka_docs2-2 (io.confluent.connect.hdfs.TopicPartitionWriter:208) [2017-01-10 12:37:10,912] INFO Started recovery for topic partition test_kafka_docs2-1 (io.confluent.connect.hdfs.TopicPartitionWriter:193) [2017-01-10 12:37:10,916] INFO Finished recovery for topic partition test_kafka_docs2-1 (io.confluent.connect.hdfs.TopicPartitionWriter:208) [2017-01-10 12:37:10,917] INFO Started recovery for topic partition test_kafka_docs2-0 (io.confluent.connect.hdfs.TopicPartitionWriter:193) [2017-01-10 12:37:10,921] INFO Finished recovery for topic partition test_kafka_docs2-0 (io.confluent.connect.hdfs.TopicPartitionWriter:208)
[2017-01-10 12:37:10,921] INFO Started recovery for topic partition test_kafka_docs2-5 (io.confluent.connect.hdfs.TopicPartitionWriter:193) [2017-01-10 12:37:10,925] INFO Finished recovery for topic partition test_kafka_docs2-5 (io.confluent.connect.hdfs.TopicPartitionWriter:208)
[2017-01-10 12:37:10,925] INFO Started recovery for topic partition test_kafka_docs2-4 (io.confluent.connect.hdfs.TopicPartitionWriter:193)
[2017-01-10 12:37:10,929] INFO Finished recovery for topic partition test_kafka_docs2-4 (io.confluent.connect.hdfs.TopicPartitionWriter:208)
[2017-01-10 12:37:10,929] INFO Started recovery for topic partition test_kafka_docs2-3 (io.confluent.connect.hdfs.TopicPartitionWriter:193)
[2017-01-10 12:37:10,932] INFO Finished recovery for topic partition test_kafka_docs2-3 (io.confluent.connect.hdfs.TopicPartitionWriter:208)
[2017-01-10 12:37:10,932] INFO Sink task org.apache.kafka.connect.runtime.WorkerSinkTask@47c548fc finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:155)
^C[2017-01-10 12:40:13,518] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:68)
[2017-01-10 12:40:13,527] INFO Stopped ServerConnector@3382f8ae{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:306)
[2017-01-10 12:40:13,539] INFO Stopped o.e.j.s.ServletContextHandler@2974f221{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:865)
[2017-01-10 12:40:13,541] INFO Herder stopping (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:62)
[2017-01-10 12:40:13,541] INFO Stopping task hdfs-sink3-0 (org.apache.kafka.connect.runtime.Worker:305)
[2017-01-10 12:40:13,543] INFO Starting graceful shutdown of thread WorkerSinkTask-hdfs-sink3-0 (org.apache.kafka.connect.util.ShutdownableThread:119)
[2017-01-10 12:40:18,544] INFO Forcing shutdown of thread WorkerSinkTask-hdfs-sink3-0 (org.apache.kafka.connect.util.ShutdownableThread:141)
[2017-01-10 12:40:18,546] ERROR Graceful stop of task org.apache.kafka.connect.runtime.WorkerSinkTask@47c548fc failed. (org.apache.kafka.connect.runtime.Worker:312)
[2017-01-10 12:40:18,564] INFO Stopping connector hdfs-sink3 (org.apache.kafka.connect.runtime.Worker:226)
[2017-01-10 12:40:18,565] INFO Stopped connector hdfs-sink3 (org.apache.kafka.connect.runtime.Worker:240)
[2017-01-10 12:40:18,565] INFO Herder stopped (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:77)
[2017-01-10 12:40:18,565] INFO Worker stopping (org.apache.kafka.connect.runtime.Worker:115)
[2017-01-10 12:40:18,566] INFO Stopped FileOffsetBackingStore (org.apache.kafka.connect.storage.FileOffsetBackingStore:61)
[2017-01-10 12:40:18,566] INFO Worker stopped (org.apache.kafka.connect.runtime.Worker:155)
[2017-01-10 12:40:18,566] INFO Kafka Connect stopped (org.apache.kafka.connect.runtime.Connect:74)

The command i start job is ./bin/connect-standalone etc/kafka/connect-standalone.properties etc/kafka-connect-hdfs/quickstart-hdfs.properties

The connect-standalone.properties is as follows:
bootstrap.servers=10.103.17.106:9092
key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets offset.flush.interval.ms=10000

The quickstart-hdfs.properties is as follows:
name=hdfs-sink3 connector.class=io.confluent.connect.hdfs.HdfsSinkConnector tasks.max=1 topics=test_kafka_docs2
#topics=test_hdfs_kafka hdfs.url=hdfs://10.103.18.9:9000
flush.size=1000

Could anyone give some suggestions? thanks

@cotedm
Copy link

cotedm commented Jan 10, 2017

@heifrank I see two things I would change here to troubleshoot:

  1. flush.size is set to 1000. I would set this to something small like 3 and manually produce records to the topic to see if you can get it to trigger a file roll.
  2. This is a bit weird but maybe a copy/paste issue. I would make sure these are on separate lines:
    offset.storage.file.filename=/tmp/connect.offsets offset.flush.interval.ms=10000

@heifrank
Copy link
Author

@cotedm Thanks for your reply.

  1. my kafka topic contains 160000 records so i set flush.size to 1000, I've tried to set it to 3, it didn't work either.
  2. yes it's a copy/paste issue, they are on different lines.

It's wierd that my offset.flush.interval.ms is set to 10000 but it didn't show any commit message in this time interval. I waited 3 minutes and check kafka manager but didn't find any consumer consuming my topic. However using connect-file-sink properties works fine.
It behaves the same in both version 2.0.0 and 3.0.0.

@cotedm
Copy link

cotedm commented Jan 11, 2017

@heifrank do you see any directories created in HDFS? If the connection to HDFS is working properly, you should see a /topics and /logs directory created in HDFS by the connector. If you don't see those directories created on HDFS that could be your problem. If those directories are created, you should see something being written to /logs, if not please verify the permissions there.

@heifrank
Copy link
Author

@cotedm the /topics and /logs directory are created, but /logs is empty. /topics only has a /+tmp directory which is empty too. So it can't be an permission issue. Two things I notice:

  1. if i use schema registry properties which is etc/schema-registry/connect-avro-standalone.properties and consume another topic whose data is avro data format, it works fine.
  2. if i use etc/kafka/connect-standalone.properties and consume topic whose data is Json, it doesn't consume any message at all, but if i change hdfs-sink to file-sink it works fine.

@cotedm
Copy link

cotedm commented Jan 17, 2017

@heifrank I think the problem then is with the fact that we don't currently have a good way for you to write Json data built into the connector. It's some future work (see #74) but there are a lot of decisions to be made in order for it to really work for everyone. In the meantime, you can directly dump the Json data you have in to HDFS via the pluggable format of the connector. There is a connector built off of this one that has a SourceFormat that could work for you. Note, that hive integration won't work if you do this, but you can get your Json data over this way.

If you are wondering, the easiest way to plug this in is to build that connector's jar file and add the jar to the classpath. Then update the format.class config to point to the class. Please give that a go and let us know how it works out for future reference.

@xiabai84
Copy link

xiabai84 commented Apr 4, 2017

I have the same problem with json format record. Like heifrank said: it works fine by using avro format and schema registry. But once I changed to json, it could write data in topic but couldn't be consumed from kafka-hdfs-sink connector... see error logs below

[2017-04-04 13:49:49,835] ERROR Failed to convert config data to Kafka Connect format: (org.apache.kafka.connect.storage.KafkaConfigBackingStore:440)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:305)
at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:438)
at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:427)
at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:251)
at org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:274)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:141)
at org.apache.kafka.connect.storage.KafkaConfigBackingStore.start(KafkaConfigBackingStore.java:246)
at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:95)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:195)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Struct': was expecting ('true', 'false' or 'null')
at [Source: [B@64136ffe; line: 1, column: 8]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Struct': was expecting ('true', 'false' or 'null')
at [Source: [B@64136ffe; line: 1, column: 8]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1487)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:518)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3323)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2482)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:801)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:697)
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3604)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3549)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2161)
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:303)
at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:438)
at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:427)
at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:251)
at org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:274)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:141)
at org.apache.kafka.connect.storage.KafkaConfigBackingStore.start(KafkaConfigBackingStore.java:246)
at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:95)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:195)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2017-04-04 13:49:49,842] ERROR Failed to convert config data to Kafka Connect format: (org.apache.kafka.connect.storage.KafkaConfigBackingStore:440)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:305)
at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:438)
at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:427)
at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:251)
at org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:274)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:141)
at org.apache.kafka.connect.storage.KafkaConfigBackingStore.start(KafkaConfigBackingStore.java:246)
at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:95)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:195)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Struct': was expecting ('true', 'false' or 'null')
at [Source: [B@6e628bd1; line: 1, column: 8]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Struct': was expecting ('true', 'false' or 'null')
at [Source: [B@6e628bd1; line: 1, column: 8]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1487)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:518)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3323)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2482)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:801)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:697)
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3604)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3549)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2161)
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:303)
at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:438)
at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:427)
at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:251)
at org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:274)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:141)
at org.apache.kafka.connect.storage.KafkaConfigBackingStore.start(KafkaConfigBackingStore.java:246)
at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:95)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:195)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2017-04-04 13:49:49,843] INFO Removed connector json-elasticsearch-sink due to null configuration. This is usually intentional and does not indicate an issue. (org.apache.kafka.connect.storage.KafkaConfigBackingStore:495)

@ewencp
Copy link
Contributor

ewencp commented Apr 6, 2017

@xiabai84 Your problem seems unrelated. It looks like you have data in the Kafka Connect config topic that is in a different format than expected. This could happen if you had multiple Kafka Connect clusters trying to write to the same topic (with different internal converter configs), or a misconfigured worker. I'd suggest opening a new issue as this is unrelated to the original issue in this thread.

@ewencp
Copy link
Contributor

ewencp commented Apr 6, 2017

@heifrank Note that flush.size is per topic partition not per topic. This can be important if you had 16k messages in your topic, but more than 16 partitions since you may never commit files in that case. If you were hitting the point of committing files, we would expect to see messages like

Starting commit and rotation for topic partition ...

in the log. The lack of these messages suggests you're not hitting any of the conditions that triggers rotating files.

If you can't get any of these to trigger, you could also try the rotate.interval.ms setting, which will force rotation of files after a certain interval of wall-clock time. This gives you an easy way to at least ensure data is committed every N seconds, and should be useful for diagnosing whether the tasks are stuck or just not satisfying the other conditions for rotating files.

@baluchicken
Copy link

baluchicken commented May 25, 2017

I have the same problem as @heifrank . I am using the latest confluentinc/cp-kafka-connect docker container. Connector job seems to stuck with [2017-05-25 10:53:50,533] INFO Reflections took 37787 ms to scan 562 urls, producing 13659 keys and 89522 values (org.reflections.Reflections:229).
My Configs:
name=kafka-hdfs
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
task.max=1
topics=topic4
hdfs.url=hdfs://localhost:9000
#format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
flush.size=50
rotate.interval.ms=10000

bootstrap.servers=localhost2:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

As you see I also tried with ParquetFormat the result is the same.
@ewencp I also set the rotate.interval.ms but nothing happens. The directories in the Hdfs were created, but they are empty.

@sudhirsrepo
Copy link

sudhirsrepo commented Oct 27, 2017

Still facing same problem as @heifrank and @baluchicken. @ewencp , any work around or suggestion would be appreciated?

hdfs config

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=KAFKA_SPARK_FANOUT_SRP_TEST_JSON
hdfs.url=hdfs://localhost:9000
topics.dir=/home/hadoop/kafka/data
logs.dir=/home/hadoop/kafka/wal
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
partitioner.class=io.confluent.connect.hdfs.partitioner.FieldPartitioner
partition.field.name=PRT_ID
flush.size=100
rotate.interval.ms=10000

connector config

bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schema.registry.url=http://localhost:8081

key.converter.schemas.enable=false
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

schemas.enable=false

My schema with payload

{
	"schema":{
		  "type": "struct",
		  "version":1,
		  "name" : "KAFKA_SPARK_FANOUT_SRP_TEST_JSON",
		  "fields" : [
		    {"type":"int32","optional":true,"field":"PRT_ID"},
		    {"type":"string","optional":true,"field":"CRTD_BY"},
		    {"type":"int64","optional":true,"field":"CRTD_DT","name": "org.apache.kafka.connect.data.Timestamp", "version": 1},
		    {"type":"bytes","optional":true,"field":"RESIDUAL","name": "org.apache.kafka.connect.data.Decimal", "version": 1}
		    ]
		},
	"payload":{
		"PRT_ID":-999,
		"LINE_ID":2568731,
		"CRTD_BY":"SUDHIR",
		"CRTD_DT":1498039957000,
		"RESIDUAL": -1234.567
		}
}

Problem statement

1. If I enable schema value.converter.schemas.enable=true throws Null pointer exception

[2017-10-26 12:44:10,521] ERROR Task hdfs-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
java.lang.NullPointerException
	at org.apache.kafka.connect.json.JsonConverter$13.convert(JsonConverter.java:191)
	at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:716)
	at org.apache.kafka.connect.json.JsonConverter.access$000(JsonConverter.java:52)
	at org.apache.kafka.connect.json.JsonConverter$12.convert(JsonConverter.java:176)
	at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:712)
	at org.apache.kafka.connect.json.JsonConverter.jsonToConnect(JsonConverter.java:331)
	at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:320)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:407)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

2. If I disable schema value.converter.schemas.enable=false , it creates a directory like +tmp like

image

But no record is being consumed and it getting hung / stuck with following log,

[2017-10-26 13:00:17,133] INFO Started ServerConnector@144beac5{HTTP/1.1}{0.0.0.0:8084} (org.eclipse.jetty.server.ServerConnector:266)
[2017-10-26 13:00:17,135] INFO Started @22564ms (org.eclipse.jetty.server.Server:379)
[2017-10-26 13:00:17,137] INFO REST server listening at http://192.168.20.236:8084/, advertising URL http://192.168.20.236:8084/ (org.apache.kafka.connect.runtime.rest.RestServer:150)
[2017-10-26 13:00:17,138] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:55)
[2017-10-26 13:00:17,174] INFO ConnectorConfig values:
	connector.class = io.confluent.connect.hdfs.HdfsSinkConnector
	key.converter = null
	name = hdfs-sink
	tasks.max = 1
	transforms = null
	value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig:223)
[2017-10-26 13:00:17,175] INFO EnrichedConnectorConfig values:
	connector.class = io.confluent.connect.hdfs.HdfsSinkConnector
	key.converter = null
	name = hdfs-sink
	tasks.max = 1
	transforms = null
	value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:223)
[2017-10-26 13:00:17,175] INFO Creating connector hdfs-sink of type io.confluent.connect.hdfs.HdfsSinkConnector (org.apache.kafka.connect.runtime.Worker:204)
[2017-10-26 13:00:17,177] INFO Instantiated connector hdfs-sink with version 3.3.0 of type class io.confluent.connect.hdfs.HdfsSinkConnector (org.apache.kafka.connect.runtime.Worker:207)
[2017-10-26 13:00:17,178] INFO HdfsSinkConnectorConfig values:
	connect.hdfs.keytab =
	connect.hdfs.principal =
	filename.offset.zero.pad.width = 10
	flush.size = 100
	format.class = io.confluent.connect.hdfs.parquet.ParquetFormat
	hadoop.conf.dir =
	hadoop.home =
	hdfs.authentication.kerberos = false
	hdfs.namenode.principal =
	hdfs.url = hdfs://localhost:9000
	hive.conf.dir =
	hive.database = default
	hive.home =
	hive.integration = false
	hive.metastore.uris =
	kerberos.ticket.renew.period.ms = 3600000
	locale =
	logs.dir = /home/hadoop/kafka/wal
	partition.duration.ms = -1
	partition.field.name = PRT_ID
	partitioner.class = io.confluent.connect.hdfs.partitioner.FieldPartitioner
	path.format =
	retry.backoff.ms = 5000
	rotate.interval.ms = 10000
	rotate.schedule.interval.ms = -1
	schema.cache.size = 1000
	schema.compatibility = NONE
	shutdown.timeout.ms = 3000
	storage.class = io.confluent.connect.hdfs.storage.HdfsStorage
	timezone =
	topics.dir = /home/hadoop/kafka/data
 (io.confluent.connect.hdfs.HdfsSinkConnectorConfig:223)
[2017-10-26 13:00:17,184] INFO Finished creating connector hdfs-sink (org.apache.kafka.connect.runtime.Worker:225)
[2017-10-26 13:00:17,189] INFO SinkConnectorConfig values:
	connector.class = io.confluent.connect.hdfs.HdfsSinkConnector
	key.converter = null
	name = hdfs-sink
	tasks.max = 1
	topics = [KAFKA_SPARK_FANOUT_SRP_TEST_JSON]
	transforms = null
	value.converter = null
 (org.apache.kafka.connect.runtime.SinkConnectorConfig:223)
[2017-10-26 13:00:17,189] INFO EnrichedConnectorConfig values:
	connector.class = io.confluent.connect.hdfs.HdfsSinkConnector
	key.converter = null
	name = hdfs-sink
	tasks.max = 1
	topics = [KAFKA_SPARK_FANOUT_SRP_TEST_JSON]
	transforms = null
	value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:223)
[2017-10-26 13:00:17,192] INFO Creating task hdfs-sink-0 (org.apache.kafka.connect.runtime.Worker:358)
[2017-10-26 13:00:17,192] INFO ConnectorConfig values:
	connector.class = io.confluent.connect.hdfs.HdfsSinkConnector
	key.converter = null
	name = hdfs-sink
	tasks.max = 1
	transforms = null
	value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig:223)
[2017-10-26 13:00:17,193] INFO EnrichedConnectorConfig values:
	connector.class = io.confluent.connect.hdfs.HdfsSinkConnector
	key.converter = null
	name = hdfs-sink
	tasks.max = 1
	transforms = null
	value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:223)
[2017-10-26 13:00:17,194] INFO TaskConfig values:
	task.class = class io.confluent.connect.hdfs.HdfsSinkTask
 (org.apache.kafka.connect.runtime.TaskConfig:223)
[2017-10-26 13:00:17,194] INFO Instantiated task hdfs-sink-0 with version 3.3.0 of type io.confluent.connect.hdfs.HdfsSinkTask (org.apache.kafka.connect.runtime.Worker:373)
[2017-10-26 13:00:17,214] INFO ConsumerConfig values:
	auto.commit.interval.ms = 5000
	auto.offset.reset = earliest
	bootstrap.servers = [localhost:9092]
	check.crcs = true
	client.id =
	connections.max.idle.ms = 540000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = connect-hdfs-sink
	heartbeat.interval.ms = 3000
	interceptor.classes = null
	internal.leave.group.on.close = true
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 305000
	retry.backoff.ms = 100
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig:223)
[2017-10-26 13:00:17,341] INFO Kafka version : 0.11.0.0-cp1 (org.apache.kafka.common.utils.AppInfoParser:83)
[2017-10-26 13:00:17,341] INFO Kafka commitId : 5cadaa94d0a69e0d (org.apache.kafka.common.utils.AppInfoParser:84)
[2017-10-26 13:00:17,351] INFO HdfsSinkConnectorConfig values:
	connect.hdfs.keytab =
	connect.hdfs.principal =
	filename.offset.zero.pad.width = 10
	flush.size = 100
	format.class = io.confluent.connect.hdfs.parquet.ParquetFormat
	hadoop.conf.dir =
	hadoop.home =
	hdfs.authentication.kerberos = false
	hdfs.namenode.principal =
	hdfs.url = hdfs://localhost:9000
	hive.conf.dir =
	hive.database = default
	hive.home =
	hive.integration = false
	hive.metastore.uris =
	kerberos.ticket.renew.period.ms = 3600000
	locale =
	logs.dir = /home/hadoop/kafka/wal
	partition.duration.ms = -1
	partition.field.name = PRT_ID
	partitioner.class = io.confluent.connect.hdfs.partitioner.FieldPartitioner
	path.format =
	retry.backoff.ms = 5000
	rotate.interval.ms = 10000
	rotate.schedule.interval.ms = -1
	schema.cache.size = 1000
	schema.compatibility = NONE
	shutdown.timeout.ms = 3000
	storage.class = io.confluent.connect.hdfs.storage.HdfsStorage
	timezone =
	topics.dir = /home/hadoop/kafka/data
 (io.confluent.connect.hdfs.HdfsSinkConnectorConfig:223)
[2017-10-26 13:00:17,371] INFO Created connector hdfs-sink (org.apache.kafka.connect.cli.ConnectStandalone:91)
[2017-10-26 13:00:17,702] INFO AvroDataConfig values:
	schemas.cache.config = 1000
	enhanced.avro.schema.support = false
	connect.meta.data = true
 (io.confluent.connect.avro.AvroDataConfig:170)
[2017-10-26 13:00:17,707] INFO Hadoop configuration directory  (io.confluent.connect.hdfs.DataWriter:93)
[2017-10-26 13:00:18,512] WARN Unable to load native-hadoop library for your platform... using builtin-java classes where applicable (org.apache.hadoop.util.NativeCodeLoader:62)
[2017-10-26 13:00:19,685] INFO Sink task WorkerSinkTask{id=hdfs-sink-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:233)
[2017-10-26 13:00:19,857] INFO Discovered coordinator localhost:9092 (id: 2147483647 rack: null) for group connect-hdfs-sink. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:597)
[2017-10-26 13:00:19,861] INFO Revoking previously assigned partitions [] for group connect-hdfs-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:419)
[2017-10-26 13:00:19,861] INFO (Re-)joining group connect-hdfs-sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:432)
[2017-10-26 13:00:19,886] INFO Successfully joined group connect-hdfs-sink with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:399)
[2017-10-26 13:00:19,892] INFO Setting newly assigned partitions [KAFKA_SPARK_FANOUT_SRP_TEST_JSON-0] for group connect-hdfs-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:262)
[2017-10-26 13:00:19,958] INFO Started recovery for topic partition KAFKA_SPARK_FANOUT_SRP_TEST_JSON-0 (io.confluent.connect.hdfs.TopicPartitionWriter:208)
[2017-10-26 13:00:19,970] INFO Finished recovery for topic partition KAFKA_SPARK_FANOUT_SRP_TEST_JSON-0 (io.confluent.connect.hdfs.TopicPartitionWriter:223)

@madhumaran
Copy link

Same problem exists for me:
Log Error:
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.StackOverflowError\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertUnion(AvroSchemaConverter.java:214)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:171)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:130)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:227)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertFields(AvroSchemaConverter.java:124)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:152)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:157)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertUnion(AvroSchemaConverter.java:214)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:171)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:130)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:227)

@bkotesh
Copy link

bkotesh commented Dec 20, 2018

@gwenshap , @ewencp i am also having the same issue. Unable to figure out the exact error and fix the issue. Kindly help in resolving.
my hdfs-sink configuration
{
"name":"hdfs-sink",
"config":{
"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max":"1",
"topics":"mysql-prod-registrations",
"hadoop.conf.dir":"/usr/hdp/current/hadoop-client/conf",
"hadoop.home":"/usr/hdp/current/hadoop-client/",
"hdfs.url":"hdfs://HACluster:8020",
"topics.dir":"/topics",
"logs.dir":"/logs",
"flush.size":"100",
"rotate.interval.ms":"60000",
"format.class":"io.confluent.connect.hdfs.avro.AvroFormat",
"value.converter.schemas.enable": "false",
"partitioner.class":"io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner",
"partition.duration.ms":"1800000",
"path.format":"'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/",
"locale":"en",
"timezone":"Asia/Kolkata"
}
}

ERROR: {
"name": "hdfs-sink",
"connector": {
"state": "RUNNING",
"worker_id": "xxxxxxx:8083"
},
"tasks": [
{
"state": "FAILED",
"trace": "java.lang.NullPointerException\n\tat io.confluent.connect.hdfs.HdfsSinkTask.open(HdfsSinkTask.java:133)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:612)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:69)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:672)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:283)\n\tat org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422)\n\tat org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)\n\tat org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:343)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n",
"id": 0,
"worker_id": "xxxxxxx:8083"
}
],
"type": "sink"
}

@mcapitanio
Copy link

Same problem here, all schemas disabled in the Kafka HDFS connector, but receiving error message:

JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

9 participants