Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for TIBCO's Avro serializer/deserializer #558

Merged
merged 3 commits into from
Jan 27, 2021

Conversation

ebowden-tibco
Copy link
Contributor

This PR works as-is for me for deserializing & displaying messages that were serialized by the TIBCO Avro serialization library. Since the TIBCO client library isn't currently available via a public Maven repository, it must be manually downloaded (as part of the "TIBCO® Messaging - Schema Repository for Apache Kafka - Community Edition" package at https://www.tibco.com/products/tibco-messaging/downloads) and then AKHQ must be started via setting the classpath with the TIBCO jar and its dependencies specified in addition to AKHQ's jar, rather than just using the -jar option. Meaning - with this PR, instead of starting AKHQ like:

java -Dmicronaut.config.files=/path/to/config.yml -jar /path/to/akhq-0.1-all.jar

You'll need to start it by calling:

java -Dmicronaut.config.files=/path/to/config.yml -cp /opt/tibco/akd/repo/1.2/lib/tibftl-kafka-avro-1.2.0-thin.jar:/path/to/akhq-0.1-all.jar:/opt/tibco/akd/repo/1.2/lib/deps/retrofit-2.9.0.jar:/opt/tibco/akd/repo/1.2/lib/deps/converter-gson-2.9.0.jar:/opt/tibco/akd/repo/1.2/lib/deps/okhttp-3.14.9.jar:/opt/tibco/akd/repo/1.2/lib/deps/okio-jvm-2.9.0.jar:/opt/tibco/akd/repo/1.2/lib/deps/kotlin-stdlib-1.4.10.jar org.akhq.App

Additionally, in your AKHQ config file's 'schema-registry' section, you'll need to add a new 'type' option set to 'tibco' if you're using a TIBCO schema repository with the TIBCO serializer/deserializer. E.g.:

akhq:
  connections:
    dev:
      properties:
        bootstrap.servers: "localhost:9092"
        security.protocol: plaintext
      schema-registry:
        url: "http://localhost:8585/schema/v1"
        type: "tibco"
        properties: {}
      connect:
        - name: connect-1

(Also note the URL; it should be set to the URL of the TIBCO FTL Server used by the TIBCO schema repository with "/schema/v1" appended at the end as shown above.)

This PR contains just a minimal number of changes to allow full compatibility with the TIBCO schema repository and Avro serializer client library. It may be worth discussing more substantial changes to AKHQ to allow for the flexibility to specify a SerDe class and options to use on a per-topic-regex basis (similar to what's currently used for the JSON deserializer) in the future, which would allow AKHQ to support any serialization format out there that has an existing Kafka SerDe for it without needing to add explicit support for the format to AKHQ each time.

…and the TIBCO Avro client jar is on the classpath, then the TIBCO Avro wire format will be used for serialization/deserialization of Avro messages. Also cleaned up a few small unrelated code typos.
@ebowden-tibco
Copy link
Contributor Author

Hi there - just checking in to see if anyone's had a chance to take a look at this PR. Let me know what you think - what you'd like changed if anything, etc. Thanks!

@@ -31,6 +31,7 @@ public Connection(@Parameter String name) {
String url;
String basicAuthUsername;
String basicAuthPassword;
String type;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change this one to an Enum please and put a default one to Confluent

config.putAll(this.kafkaModule.getConnection(clusterId).getProperties());
deserializer.configure(config, false);
} else {
deserializer = (Deserializer) Class.forName("io.confluent.kafka.serializers.KafkaAvroDeserializer").getDeclaredConstructor(SchemaRegistryClient.class).newInstance(this.kafkaModule.getRegistryClient(clusterId));
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use of reflection here ?
The class is in class path

@tchiotludo
Copy link
Owner

sorry @ebowden-tibco , completely forgot.
Comment with some minor change.

Also you need to provide :

  • some documentation on README.md (maybe you need also to have the instruction for helm charts, how to mount jar) .
  • populate the application example with the new configuration.
  • add a new configuration_env variable for the classpath (-cp) for the docker version here

And it will be fine 👍
Thanks

@ebowden-tibco
Copy link
Contributor Author

Thanks for the feedback! I'll update the PR soon with the requested changes.

…ng how to configure and use the TIBCO schema registry. Changed schema registry 'type' option to an Enum and simplified the code a bit. Added support for the CLASSPATH environment variable to the Docker image. Fixed a few nearby doc typos.
@tchiotludo tchiotludo merged commit aada01e into tchiotludo:dev Jan 27, 2021
@tchiotludo
Copy link
Owner

Perfect !!
Thanks @ebowden-tibco

@AntAreS24
Copy link

@tchiotludo, out of curiosity, could I modify the build.gradle to add a new Maven repo (company Nexus)

}

with the TIBCO Jar in it, and add the reference to the TIBCO jar ?

I will still need to make the changes for the schema repository type in the config file.

@tchiotludo
Copy link
Owner

tchiotludo commented Jan 28, 2021

I don't think so, the license will not allow to embed a proprietary jar on this opensource project.
(I think I don't know the license of TIBCO)

@AntAreS24
Copy link

Of course not here, but if I clone it locally and make the changes.

@xpr3sso
Copy link
Contributor

xpr3sso commented Jan 30, 2021

Hi @ebowden-tibco ,
do I need some config adjustments to run your changes without having a registry? :)
Can't access the topic list for my local test cluster with these changes..

@ebowden-tibco
Copy link
Contributor Author

@xpr3sso - You shouldn't need any config changes. Could you give an example config snippet that fails for you? Do you just not have a "schema-registry" section in your config at all? Does everything work immediately before commit aada01e ? ie, if you checkout at commit 477c704, do things work again? I'm not sure how these changes would've affected AKHQ use without a registry, but I'm new to the AKHQ code base, so anything's possible.

@xpr3sso
Copy link
Contributor

xpr3sso commented Jan 30, 2021

@ebowden-tibco just checked 477c704 and this is working fine for me :)
The injection of Connection.SchemaRegistry (https://github.com/tchiotludo/akhq/pull/558/files#diff-3901f50bbc040388cf793601f793a114ab4bb6d3fe0dd15f06c33dec41e3d12bR61) seems to fail, as I don't have any specified

 Internal Server Error: Failed to inject value for field [schemaRegistry] of class: org.akhq.repositories.RecordRepository Path Taken: TopicController.recordRepository --> RecordRepository.schemaRegistry
io.micronaut.context.exceptions.DependencyInjectionException: Failed to inject value for field [schemaRegistry] of class: org.akhq.repositories.RecordRepository
Path Taken: TopicController.recordRepository --> RecordRepository.schemaRegistry
	at io.micronaut.context.AbstractBeanDefinition.getBeanForField(AbstractBeanDefinition.java:1462)
	at io.micronaut.context.AbstractBeanDefinition.injectBeanField(AbstractBeanDefinition.java:731)
	at org.akhq.repositories.$RecordRepositoryDefinition.injectBean(Unknown Source)
	at org.akhq.repositories.$RecordRepositoryDefinition.build(Unknown Source)
	at io.micronaut.context.DefaultBeanContext.doCreateBean(DefaultBeanContext.java:1898)
	at io.micronaut.context.DefaultBeanContext.createAndRegisterSingletonInternal(DefaultBeanContext.java:2647)
	at io.micronaut.context.DefaultBeanContext.createAndRegisterSingleton(DefaultBeanContext.java:2633)
	at io.micronaut.context.DefaultBeanContext.getBeanForDefinition(DefaultBeanContext.java:2305)
	at io.micronaut.context.DefaultBeanContext.getBeanInternal(DefaultBeanContext.java:2279)
	at io.micronaut.context.DefaultBeanContext.getBean(DefaultBeanContext.java:1245)
	at io.micronaut.context.AbstractBeanDefinition.getBeanForField(AbstractBeanDefinition.java:1441)
	at io.micronaut.context.AbstractBeanDefinition.injectBeanField(AbstractBeanDefinition.java:731)
	at org.akhq.controllers.$TopicControllerDefinition.injectBean(Unknown Source)
	at org.akhq.controllers.$TopicControllerDefinition.build(Unknown Source)
	at io.micronaut.context.DefaultBeanContext.doCreateBean(DefaultBeanContext.java:1898)
	at io.micronaut.context.DefaultBeanContext.createAndRegisterSingletonInternal(DefaultBeanContext.java:2647)
	at io.micronaut.context.DefaultBeanContext.createAndRegisterSingleton(DefaultBeanContext.java:2633)
	at io.micronaut.context.DefaultBeanContext.getBeanForDefinition(DefaultBeanContext.java:2305)
	at io.micronaut.context.DefaultBeanContext.access$100(DefaultBeanContext.java:78)
	at io.micronaut.context.DefaultBeanContext$4.getTarget(DefaultBeanContext.java:431)
	at io.micronaut.context.DefaultBeanContext$4.invoke(DefaultBeanContext.java:474)
	at io.micronaut.web.router.AbstractRouteMatch.execute(AbstractRouteMatch.java:312)
	at io.micronaut.web.router.RouteMatch.execute(RouteMatch.java:118)
	at io.micronaut.http.server.netty.RoutingInBoundHandler.lambda$buildResultEmitter$10(RoutingInBoundHandler.java:1363)
	at io.reactivex.internal.operators.flowable.FlowableDefer.subscribeActual(FlowableDefer.java:35)
	at io.reactivex.Flowable.subscribe(Flowable.java:14918)
	at io.reactivex.Flowable.subscribe(Flowable.java:14865)
	at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.run(FlowableSubscribeOn.java:82)
	at io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker$BooleanRunnable.run(ExecutorScheduler.java:288)
	at io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker.run(ExecutorScheduler.java:253)
	at io.micronaut.scheduling.instrument.InvocationInstrumenterWrappedRunnable.run(InvocationInstrumenterWrappedRunnable.java:47)
	at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:79)
	at io.micrometer.core.instrument.Timer.lambda$wrap$0(Timer.java:148)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: io.micronaut.context.exceptions.NoSuchBeanException: No bean of type [org.akhq.configs.Connection$SchemaRegistry] exists. Make sure the bean is not disabled by bean requirements (enable trace logging for 'io.micronaut.context.condition' to check) and if the bean is enabled then ensure the class is declared a bean and annotation processing is enabled (for Java and Kotlin the 'micronaut-inject-java' dependency should be configured as an annotation processor).
	at io.micronaut.context.DefaultBeanContext.getBeanInternal(DefaultBeanContext.java:2290)
	at io.micronaut.context.DefaultBeanContext.getBean(DefaultBeanContext.java:1245)
	at io.micronaut.context.AbstractBeanDefinition.getBeanForField(AbstractBeanDefinition.java:1441)
	... 35 more

Are you able to reproduce this? My connections section looks like:

connections:
  localhost:
    properties:
      bootstrap.servers: "localhost:9092"

@ebowden-tibco
Copy link
Contributor Author

Doh! Yes, I can reproduce it here. I did all my testing with a schema-registry section in the config file. Tested with and without the new schema registry "type" option present, but didn't think to test with/without the entire schema-registry config block itself. I'll try to get this fixed today.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants