Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for TIBCO's Avro serializer/deserializer #558

Merged
merged 3 commits into from
Jan 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 41 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ If you do not override the `JVM_OPTS_FILE`, the docker container will take the d
* `properties`: all the configurations found on [Kafka consumer documentation](https://kafka.apache.org/documentation/#consumerconfigs). Most important is `bootstrap.servers` that is a list of host:port of your Kafka brokers.
* `schema-registry`: *(optional)*
* `url`: the schema registry url
* `type`: the type of schema registry used, either 'confluent' or 'tibco'
* `basic-auth-username`: schema registry basic auth username
* `basic-auth-password`: schema registry basic auth password
* `properties`: all the configurations for registry client, especially ssl configuration
Expand Down Expand Up @@ -254,6 +255,7 @@ akhq:
ssl.key.password: {{password}}
schema-registry:
url: "https://{{host}}.aivencloud.com:12838"
type: "confluent"
basic-auth-username: avnadmin
basic-auth-password: {{password}}
properties: {}
Expand Down Expand Up @@ -578,11 +580,14 @@ The username field can be any string field, the roles field has to be a JSON arr
> More information can be found on [Micronaut documentation](https://docs.micronaut.io/snapshot/guide/index.html#config)

### Docker
AKHQ docker image support 3 environment variables to handle configuration :

The AKHQ docker image supports 4 environment variables to handle configuration :
* `AKHQ_CONFIGURATION`: a string that contains the full configuration in yml that will be written on
/app/configuration.yml on the container.
* `MICRONAUT_APPLICATION_JSON`: a string that contains the full configuration in JSON format
* `MICRONAUT_CONFIG_FILES`: a path to a configuration file on the container. Default path is `/app/application.yml`
* `MICRONAUT_CONFIG_FILES`: a path to a configuration file in the container. Default path is `/app/application.yml`
* `CLASSPATH`: additional Java classpath entries. Must be used to specify the location of the TIBCO Avro client library
jar if a 'tibco' schema registry type is used

#### How to mount configuration file

Expand All @@ -600,6 +605,40 @@ volumeMounts:

```

#### Using the TIBCO schema registry

If you are using the TIBCO schema registry, you will also need to mount and use the TIBCO Avro client library and its
dependencies. The akhq service in a docker compose file might look something like:

```yaml
akhq:
# build:
# context: .
image: tchiotludo/akhq
volumes:
- /opt/tibco/akd/repo/1.2/lib/tibftl-kafka-avro-1.2.0-thin.jar:/app/tibftl-kafka-avro-1.2.0-thin.jar
- /opt/tibco/akd/repo/1.2/lib/deps:/app/deps
environment:
AKHQ_CONFIGURATION: |
akhq:
connections:
docker-kafka-server:
properties:
bootstrap.servers: "kafka:9092"
schema-registry:
type: "tibco"
url: "http://repo:8081"
connect:
- name: "connect"
url: "http://connect:8083"
CLASSPATH: "/app/tibftl-kafka-avro-1.2.0-thin.jar:/app/deps/*"
ports:
- 8080:8080
links:
- kafka
- repo
```

## Api
An **experimental** api is available that allow you to fetch all the exposed on AKHQ through api.

Expand Down
1 change: 1 addition & 0 deletions application.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ akhq:
bootstrap.servers: "kafka:9092"
schema-registry:
url: "http://schema-registry:8085" # schema registry url (optional)
type: "confluent" # schema registry type (optional). Supported types are "confluent" (default) or "tibco"
# Basic Auth user / pass
basic-auth-username: basic-auth-user
basic-auth-password: basic-auth-pass
Expand Down
2 changes: 1 addition & 1 deletion docker/app/akhq
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ do
JAVA_OPTS="${JAVA_OPTS} ${JVM_OPT}"
done

/usr/local/openjdk-11/bin/java ${JAVA_OPTS} -jar /app/akhq.jar
/usr/local/openjdk-11/bin/java ${JAVA_OPTS} -cp /app/akhq.jar:${CLASSPATH} org.akhq.App
3 changes: 3 additions & 0 deletions helm/akhq/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ extraEnv: []
# bootstrap.servers: "kafka:9092"
# - name: JAVA_OPTS
# value: "-Djavax.net.ssl.trustStore=/usr/local/openjdk-11/lib/security/cacerts -Djavax.net.ssl.trustStorePassword=password"
# - name: CLASSPATH
# value: "/any/additional/jars/desired.jar:/go/here.jar"

## Or you can also use configmap for the configuration...
configuration: |
Expand All @@ -39,6 +41,7 @@ secrets: |
bootstrap.servers: "kafka:9092"
schema-registry:
url: "http://schema-registry:8085"
type: "confluent"
basic-auth-username: basic-auth-user
basic-auth-password: basic-auth-pass
connect:
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/akhq/configs/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public static class SchemaRegistry {
String url;
String basicAuthUsername;
String basicAuthPassword;
SchemaRegistryType type = SchemaRegistryType.CONFLUENT;

@MapFormat(transformation = MapFormat.MapTransformation.FLAT)
Map<String, String> properties;
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/org/akhq/configs/SchemaRegistryType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.akhq.configs;

public enum SchemaRegistryType {
CONFLUENT,
TIBCO
}
5 changes: 5 additions & 0 deletions src/main/java/org/akhq/controllers/TopicController.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.swagger.v3.oas.annotations.Operation;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.akhq.configs.Connection;
import org.akhq.configs.Role;
import org.akhq.models.*;
import org.akhq.modules.AbstractKafkaWrapper;
Expand Down Expand Up @@ -55,6 +56,8 @@ public class TopicController extends AbstractController {
private Environment environment;
@Inject
private AccessControlListRepository aclRepository;
@Inject
private Connection.SchemaRegistry schemaRegistry;

@Value("${akhq.topic.replication}")
private Short replicationFactor;
Expand Down Expand Up @@ -146,6 +149,7 @@ public Record produce(
keySchema,
valueSchema
),
schemaRegistry.getType(),
key.map(String::getBytes).orElse(null),
value.getBytes(),
headers
Expand Down Expand Up @@ -275,6 +279,7 @@ public Record deleteRecordApi(String cluster, String topicName, Integer partitio
partition,
Base64.getDecoder().decode(key)
),
schemaRegistry.getType(),
Base64.getDecoder().decode(key),
null,
new HashMap<>()
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/akhq/models/ConnectPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ public ConnectPlugin(ConnectorPlugin connectorPlugin, ConnectorPluginConfigValid
results.getConfigs()
.stream()
.map(config -> new Definition(config.getDefinition())),
registryDefintion()
registryDefinition()
)
.sorted(Comparator.comparing(Definition::getGroup, (s1, s2) -> s1.equals("Others") ? 1 : s1.compareTo(s2))
.thenComparing(Definition::getOrder)
)
.collect(Collectors.toList());
}

public Stream<Definition> registryDefintion() {
public Stream<Definition> registryDefinition() {
return Stream.of(
Definition.builder()
.name("schema.registry.url")
Expand Down
25 changes: 19 additions & 6 deletions src/main/java/org/akhq/models/Record.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package org.akhq.models;

import com.fasterxml.jackson.annotation.JsonIgnore;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import lombok.*;
import org.akhq.configs.SchemaRegistryType;
import org.akhq.utils.AvroToJsonSerializer;
import org.akhq.utils.ProtobufToJsonDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;

import java.nio.ByteBuffer;
import java.time.Instant;
Expand All @@ -32,7 +33,7 @@ public class Record {
private Integer valueSchemaId;
private Map<String, String> headers = new HashMap<>();
@JsonIgnore
private KafkaAvroDeserializer kafkaAvroDeserializer;
private Deserializer kafkaAvroDeserializer;
private ProtobufToJsonDeserializer protobufToJsonDeserializer;

@Getter(AccessLevel.NONE)
Expand All @@ -49,7 +50,14 @@ public class Record {

private final List<String> exceptions = new ArrayList<>();

public Record(RecordMetadata record, byte[] bytesKey, byte[] bytesValue, Map<String, String> headers) {
private byte MAGIC_BYTE;

public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte[] bytesKey, byte[] bytesValue, Map<String, String> headers) {
if (schemaRegistryType == SchemaRegistryType.TIBCO) {
this.MAGIC_BYTE = (byte) 0x80;
} else {
this.MAGIC_BYTE = 0x0;
}
this.topic = record.topic();
this.partition = record.partition();
this.offset = record.offset();
Expand All @@ -61,8 +69,13 @@ public Record(RecordMetadata record, byte[] bytesKey, byte[] bytesValue, Map<Str
this.headers = headers;
}

public Record(ConsumerRecord<byte[], byte[]> record, KafkaAvroDeserializer kafkaAvroDeserializer,
public Record(ConsumerRecord<byte[], byte[]> record, SchemaRegistryType schemaRegistryType, Deserializer kafkaAvroDeserializer,
ProtobufToJsonDeserializer protobufToJsonDeserializer, byte[] bytesValue) {
if (schemaRegistryType == SchemaRegistryType.TIBCO) {
this.MAGIC_BYTE = (byte) 0x80;
} else {
this.MAGIC_BYTE = 0x0;
}
this.topic = record.topic();
this.partition = record.partition();
this.offset = record.offset();
Expand Down Expand Up @@ -134,13 +147,13 @@ private String convertToString(byte[] payload, Integer schemaId, boolean isKey)
}
}

private static Integer getAvroSchemaId(byte[] payload) {
private Integer getAvroSchemaId(byte[] payload) {
try {
ByteBuffer buffer = ByteBuffer.wrap(payload);
byte magicBytes = buffer.get();
int schemaId = buffer.getInt();

if (magicBytes == 0 && schemaId >= 0) {
if (magicBytes == MAGIC_BYTE && schemaId >= 0) {
return schemaId;
}
} catch (Exception ignore) {
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/org/akhq/models/TopicPartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ public ConsumerGroupOffset(TopicPartition topicPartition) {
public ConsumerGroupOffset(
org.apache.kafka.common.TopicPartition topicPartition,
OffsetAndMetadata offsetAndMetadata,
Partition.Offsets partiionOffsets
Partition.Offsets partitionOffsets
) {
super(topicPartition);

this.offset = offsetAndMetadata != null ? Optional.of(offsetAndMetadata.offset()) : Optional.empty();
this.metadata = offsetAndMetadata != null ? Optional.of(offsetAndMetadata.metadata()) : Optional.empty();
this.firstOffset = partiionOffsets != null ? Optional.of(partiionOffsets.getFirstOffset()) : Optional.empty();
this.lastOffset = partiionOffsets != null ? Optional.of(partiionOffsets.getLastOffset()) : Optional.empty();
this.firstOffset = partitionOffsets != null ? Optional.of(partitionOffsets.getFirstOffset()) : Optional.empty();
this.lastOffset = partitionOffsets != null ? Optional.of(partitionOffsets.getLastOffset()) : Optional.empty();
}

public Optional<Long> getOffsetLag() {
Expand Down
11 changes: 9 additions & 2 deletions src/main/java/org/akhq/modules/AvroSerializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import lombok.extern.slf4j.Slf4j;
import org.akhq.configs.SchemaRegistryType;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
Expand All @@ -26,12 +27,18 @@
@Slf4j
public class AvroSerializer {

public static final int MAGIC_BYTE = 0;
private final int MAGIC_BYTE;

public static final int SCHEMA_ID_SIZE = 4;
private SchemaRegistryClient registryClient;

public AvroSerializer(SchemaRegistryClient registryClient) {
public AvroSerializer(SchemaRegistryClient registryClient, SchemaRegistryType schemaRegistryType) {
this.registryClient = registryClient;
if (schemaRegistryType == SchemaRegistryType.TIBCO) {
MAGIC_BYTE = (byte) 0x80;
} else {
MAGIC_BYTE = 0x0;
}
}

public byte[] toAvro(String json, int schemaId) {
Expand Down
16 changes: 10 additions & 6 deletions src/main/java/org/akhq/repositories/AvroWireFormatConverter.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import lombok.extern.slf4j.Slf4j;
import org.akhq.configs.SchemaRegistryType;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;

Expand All @@ -28,31 +29,34 @@
@Slf4j
public class AvroWireFormatConverter {

private static final byte MAGIC_BYTE = 0;
private static final Pattern AVRO_CONTENT_TYPE_PATTERN = Pattern.compile("\"?application/vnd\\.(.+)\\.v(\\d+)\\+avro\"?");

public byte[] convertValueToWireFormat(ConsumerRecord<byte[], byte[]> record, SchemaRegistryClient registryClient) {
public byte[] convertValueToWireFormat(ConsumerRecord<byte[], byte[]> record, SchemaRegistryClient registryClient, SchemaRegistryType schemaRegistryType) {
byte magicByte = 0x0;
if (schemaRegistryType == SchemaRegistryType.TIBCO) {
magicByte = (byte) 0x80;
}
Iterator<Header> contentTypeIter = record.headers().headers("contentType").iterator();
byte[] value = record.value();
if (contentTypeIter.hasNext() &&
value.length > 0 &&
ByteBuffer.wrap(value).get() != MAGIC_BYTE) {
ByteBuffer.wrap(value).get() != magicByte) {
String headerValue = new String(contentTypeIter.next().value());
Matcher matcher = AVRO_CONTENT_TYPE_PATTERN.matcher(headerValue);
if (matcher.matches()) {
String subject = matcher.group(1);
int version = Integer.parseInt(matcher.group(2));
value = prependWireFormatHeader(value, registryClient, subject, version);
value = prependWireFormatHeader(value, registryClient, subject, version, magicByte);
}
}
return value;
}

private byte[] prependWireFormatHeader(byte[] value, SchemaRegistryClient registryClient, String subject, int version) {
private byte[] prependWireFormatHeader(byte[] value, SchemaRegistryClient registryClient, String subject, int version, byte magicByte) {
try {
SchemaMetadata schemaMetadata = registryClient.getSchemaMetadata(subject, version);
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(MAGIC_BYTE);
out.write(magicByte);
out.write(ByteBuffer.allocate(4).putInt(schemaMetadata.getId()).array());
out.write(value);
value = out.toByteArray();
Expand Down
12 changes: 10 additions & 2 deletions src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.reactivex.Flowable;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.akhq.configs.Connection;
import org.akhq.controllers.TopicController;
import org.akhq.models.Partition;
import org.akhq.models.Record;
Expand Down Expand Up @@ -56,6 +57,9 @@ public class RecordRepository extends AbstractRepository {
@Inject
private AvroWireFormatConverter avroWireFormatConverter;

@Inject
private Connection.SchemaRegistry schemaRegistry;

@Value("${akhq.topic-data.poll-timeout:1000}")
protected int pollTimeout;

Expand Down Expand Up @@ -420,18 +424,22 @@ private ConsumerRecords<byte[], byte[]> poll(KafkaConsumer<byte[], byte[]> consu
private Record newRecord(ConsumerRecord<byte[], byte[]> record, String clusterId) {
return new Record(
record,
this.schemaRegistry.getType(),
this.schemaRegistryRepository.getKafkaAvroDeserializer(clusterId),
this.customDeserializerRepository.getProtobufToJsonDeserializer(clusterId),
avroWireFormatConverter.convertValueToWireFormat(record, this.kafkaModule.getRegistryClient(clusterId))
avroWireFormatConverter.convertValueToWireFormat(record, this.kafkaModule.getRegistryClient(clusterId),
this.kafkaModule.getConnection(clusterId).getSchemaRegistry().getType())
);
}

private Record newRecord(ConsumerRecord<byte[], byte[]> record, BaseOptions options) {
return new Record(
record,
this.schemaRegistry.getType(),
this.schemaRegistryRepository.getKafkaAvroDeserializer(options.clusterId),
this.customDeserializerRepository.getProtobufToJsonDeserializer(options.clusterId),
avroWireFormatConverter.convertValueToWireFormat(record, this.kafkaModule.getRegistryClient(options.clusterId))
avroWireFormatConverter.convertValueToWireFormat(record, this.kafkaModule.getRegistryClient(options.clusterId),
this.kafkaModule.getConnection(options.clusterId).getSchemaRegistry().getType())
);
}

Expand Down
Loading