Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exception when using StringConverter for key and value converter #130

Closed
blbradley opened this issue Sep 29, 2016 · 11 comments
Closed

exception when using StringConverter for key and value converter #130

blbradley opened this issue Sep 29, 2016 · 11 comments

Comments

@blbradley
Copy link

I'm working to see if this is fixable.

[2016-09-28 19:16:22,211] ERROR Task coinsmith-raw-hdfs-sink-test-8-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.avro.file.DataFileWriter$AppendWriteException: org.apache.avro.AvroRuntimeException: Unknown datum type io.confluent.kafka.serializers.NonRecordContainer: io.confluent.kafka.serializers.NonRecordContainer@31d89133
        at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:296)
        at io.confluent.connect.hdfs.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:64)
        at io.confluent.connect.hdfs.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:59)
        at io.confluent.connect.hdfs.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:487)
        at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:264)
        at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:234)
        at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:91)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:381)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:227)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type io.confluent.kafka.serializers.NonRecordContainer: io.confluent.kafka.serializers.NonRecordContainer@31d89133
        at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:636)
        at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:601)
        at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:151)
        at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71)
        at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
        at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:290)
        ... 17 more
@cotedm
Copy link

cotedm commented Jan 6, 2017

@blbradley I'm a little late to the party here, but can you clarify what happened here a bit? I'm trying to understand how the data was brought into Kafka mainly. Was it brought in via a connector that was using the AvroConverter and then you are trying to use the StringConverter on the other side with the HDFS Sink Connector?

@blbradley
Copy link
Author

I can! I used kafka-console-consumer to write string data to Kafka. Setup kafka-connect standalone with key and value converter to be StringConverter. I expected that to convert written strings to the output format AvroFormat. It yields this instead.

@cotedm
Copy link

cotedm commented Jan 9, 2017

@blbradley I think I understand what you're looking for and we can do better in this scenario for sure. I think there's a greater philosophical question to answer here because the strings are inherently unstructured data and we have to decide what it means to turn them into Avro data which is structured. This is discussed a little bit on the mailing list:
https://groups.google.com/forum/?utm_medium=email&utm_source=footer#!msg/confluent-platform/7_3a916s1YM/bUbc7gp_DgAJ

So I guess my question is, what's the best way to handle this for your use case? I see two scenarios:

  1. The structured output format isn't a requirement and AvroFormat is just being used because we don't currently have an out-of-the-box unstructured format (because it would break things like hive integration should you choose to use it). Here we could look at what the options are for plugging in a format. There's at least one that comes to mind if you want to go that route.
  2. The structured output format is a requirement and there's an expectation that we should be able to handle the string data with a generic avro schema. This is a totally fair thing to expect (at least we shouldn't break), but the proper fix is probably in the AvroConverter itself instead of the connector. I think this is your goal with do not use container when AvroSchema is already available #131 combined with your PR to the schema registry repo.

What do you think accomplishes your goal best here? Probably both things require documentation of what the behavior is and both probably need to be implemented eventually, but it'd be good to get your input on which one hits the mark best for you at this point.

@blbradley
Copy link
Author

I don't plan on using #131. It was mostly an exercise for myself and for the community to see if unstructured data could be used with Confluent Platform. I appreciate your detailed response and am very happy that my work got the conversation started.

I'm very willing to discuss the issue. I think the first option you presented sounds best even if it is more work up front. Sounds like you have a good idea of what the 'out-of-the-box unstructured format' should be. How will you proceed?

@cotedm
Copy link

cotedm commented Jan 17, 2017

@blbradley I was thinking something like the SourceFormat that was included in a connector built off of this one would be a good place to start. However, if we start doing unstructured data, we break the Hive integration piece, so we have to be clear about the two different modes of the connector. I guess let me ask you this: do you have a need for hive integration in your use case? Maybe it makes sense for the next step to be a bit of a product survey to see if most people are using that.

@blbradley
Copy link
Author

I don't require Hive integration at the moment.

Maybe it makes sense for the next step to be a bit of a product survey to see if most people are using that.

That sounds great.

@ewencp
Copy link
Contributor

ewencp commented Feb 27, 2017

This was an error in how AvroRecordWriterProvider was handling the NonRecordContainer type that AvroConverter emits for primitive types in order to be able to include schema information. #176 has fixed this.

@ewencp ewencp closed this as completed Feb 27, 2017
@blbradley
Copy link
Author

@ewencp I think this means that string data will now work. Is that correct?

@ewencp
Copy link
Contributor

ewencp commented Feb 28, 2017

@blbradley Yes, that's correct. This problem was just a general problem for all primitive types (e.g. ints, strings, byte[], etc). The test in the patch is for ints, but this should fix strings as well.

@blbradley
Copy link
Author

Ok great. Then, this could clear up #74 for JSON with no defined schema.

@cotedm
Copy link

cotedm commented Feb 28, 2017

I think #74 was originally a feature request to have a JSON output format so while this fix will let you take JSON input as a string and write it out, it doesn't do anything with the JSON, it's just a 1-1 copy. So while I would agree StringConverter will work for some, I don't think we can close #74 just yet because we need to confirm it's a broadly applicable solution for JSON. I'll update there though.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants