From 9cc2a754ceeb969e5c074be5b98a07ca811df4b3 Mon Sep 17 00:00:00 2001 From: Sergey Avseyev Date: Sat, 5 Nov 2016 02:00:49 +0300 Subject: [PATCH] KAFKAC-54: Add example with Kafka Streams --- config/connect-distributed.properties | 56 +++++ config/connect-standalone.properties | 6 +- pom.xml | 24 +- src/test/java/examples/KafkaStreamsDemo.java | 208 ++++++++++++++++++ .../examples/serde/KeyAvroDeserializer.java | 58 +++++ .../java/examples/serde/KeyAvroSerde.java | 68 ++++++ .../examples/serde/KeyAvroSerializer.java | 54 +++++ .../examples/serde/ValueAvroDeserializer.java | 59 +++++ .../java/examples/serde/ValueAvroSerde.java | 69 ++++++ .../examples/serde/ValueAvroSerializer.java | 55 +++++ 10 files changed, 648 insertions(+), 9 deletions(-) create mode 100644 config/connect-distributed.properties create mode 100644 src/test/java/examples/KafkaStreamsDemo.java create mode 100644 src/test/java/examples/serde/KeyAvroDeserializer.java create mode 100644 src/test/java/examples/serde/KeyAvroSerde.java create mode 100644 src/test/java/examples/serde/KeyAvroSerializer.java create mode 100644 src/test/java/examples/serde/ValueAvroDeserializer.java create mode 100644 src/test/java/examples/serde/ValueAvroSerde.java create mode 100644 src/test/java/examples/serde/ValueAvroSerializer.java diff --git a/config/connect-distributed.properties b/config/connect-distributed.properties new file mode 100644 index 0000000..d782614 --- /dev/null +++ b/config/connect-distributed.properties @@ -0,0 +1,56 @@ +## +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +# These are defaults. This file just demonstrates how to override some settings. +bootstrap.servers=localhost:9092 + +# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs +group.id=connect-cluster + +# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will +# need to configure these based on the format they want their data in when loaded from or stored into Kafka +key.converter=io.confluent.connect.avro.AvroConverter +key.converter.schema.registry.url=http://localhost:8081 +value.converter=io.confluent.connect.avro.AvroConverter +value.converter.schema.registry.url=http://localhost:8081 +# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply +# it to +key.converter.schemas.enable=true +value.converter.schemas.enable=true + +# The internal converter used for offsets and config data is configurable and must be specified, but most users will +# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format. +internal.key.converter=org.apache.kafka.connect.json.JsonConverter +internal.value.converter=org.apache.kafka.connect.json.JsonConverter +internal.key.converter.schemas.enable=false +internal.value.converter.schemas.enable=false + +# Topic to use for storing offsets. This topic should have many partitions and be replicated. +offset.storage.topic=connect-offsets + +# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated topic. +# You may need to manually create the topic to ensure single partition for the config topic as auto created topics may have multiple partitions. +config.storage.topic=connect-configs + +# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated. +status.storage.topic=connect-status + +# Flush much faster than normal, which is useful for testing/debugging +offset.flush.interval.ms=10000 + +consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor +producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor diff --git a/config/connect-standalone.properties b/config/connect-standalone.properties index 90b6f60..b064f80 100644 --- a/config/connect-standalone.properties +++ b/config/connect-standalone.properties @@ -29,10 +29,8 @@ value.converter.schemas.enable=true # The internal converter used for offsets and config data is configurable and must be specified, but most users will # always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format. -internal.key.converter=io.confluent.connect.avro.AvroConverter -internal.key.converter.schema.registry.url=http://localhost:8081 -internal.value.converter=io.confluent.connect.avro.AvroConverter -internal.value.converter.schema.registry.url=http://localhost:8081 +internal.key.converter=org.apache.kafka.connect.json.JsonConverter +internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false diff --git a/pom.xml b/pom.xml index 4f2da5d..0abec4c 100644 --- a/pom.xml +++ b/pom.xml @@ -14,8 +14,8 @@ ~ See the License for the specific language governing permissions and ~ limitations under the License. ~--> - 4.0.0 @@ -52,11 +52,13 @@ 1.6 3.0.1 - 0.10.0.1 - 4.12 + 0.10.0.1 0.6.0 + 0.10.0.1 UTF-8 http://packages.confluent.io/maven/ + 4.12 + 5.1.6 @@ -71,7 +73,7 @@ org.apache.kafka connect-api - ${kafka.version} + ${kafka-connect-api.version} io.confluent @@ -83,12 +85,24 @@ dcp-client ${dcp-client.version} + + org.apache.kafka + kafka-streams + ${kafka-streams.version} + test + junit junit ${junit.version} test + + mysql + mysql-connector-java + ${mysql.version} + test + diff --git a/src/test/java/examples/KafkaStreamsDemo.java b/src/test/java/examples/KafkaStreamsDemo.java new file mode 100644 index 0000000..9229287 --- /dev/null +++ b/src/test/java/examples/KafkaStreamsDemo.java @@ -0,0 +1,208 @@ +/* + * Copyright (c) 2016 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package examples; + +import com.couchbase.client.deps.com.fasterxml.jackson.databind.JsonNode; +import com.couchbase.client.deps.com.fasterxml.jackson.databind.ObjectMapper; +import examples.serde.KeyAvroSerde; +import examples.serde.ValueAvroSerde; +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.ForeachAction; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.Predicate; +import org.apache.kafka.streams.kstream.ValueMapper; + +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.sql.Connection; +import java.sql.Date; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Properties; + +public class KafkaStreamsDemo { + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + + public static void main(String[] args) throws InterruptedException, SQLException { + /** + * The example assumes the following SQL schema + * + * DROP DATABASE IF EXISTS beer_sample_sql; + * CREATE DATABASE beer_sample_sql CHARACTER SET utf8 COLLATE utf8_general_ci; + * USE beer_sample_sql; + * + * CREATE TABLE breweries ( + * id VARCHAR(256) NOT NULL, + * name VARCHAR(256), + * description TEXT, + * country VARCHAR(256), + * city VARCHAR(256), + * state VARCHAR(256), + * phone VARCHAR(40), + * updated_at DATETIME, + * PRIMARY KEY (id) + * ); + * + * + * CREATE TABLE beers ( + * id VARCHAR(256) NOT NULL, + * brewery_id VARCHAR(256) NOT NULL, + * name VARCHAR(256), + * category VARCHAR(256), + * style VARCHAR(256), + * description TEXT, + * abv DECIMAL(10,2), + * ibu DECIMAL(10,2), + * updated_at DATETIME, + * PRIMARY KEY (id) + * ); + */ + try { + Class.forName("com.mysql.jdbc.Driver"); + } catch (ClassNotFoundException e) { + System.err.println("Failed to load MySQL JDBC driver"); + } + Connection connection = DriverManager + .getConnection("jdbc:mysql://localhost:3306/beer_sample_sql", "root", "secret"); + final PreparedStatement insertBrewery = connection.prepareStatement( + "INSERT INTO breweries (id, name, description, country, city, state, phone, updated_at)" + + " VALUES (?, ?, ?, ?, ?, ?, ?, ?)"); + final PreparedStatement insertBeer = connection.prepareStatement( + "INSERT INTO beers (id, brewery_id, name, description, category, style, abv, ibu, updated_at)" + + " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"); + + String schemaRegistryUrl = "http://localhost:8081"; + + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-test"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); + props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, KeyAvroSerde.class); + props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, ValueAvroSerde.class); + + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + KStreamBuilder builder = new KStreamBuilder(); + + KStream source = builder + .stream("streaming-topic-beer-sample"); + + KStream[] documents = source + .mapValues(new ValueMapper() { + @Override + public JsonNode apply(GenericRecord value) { + ByteBuffer buf = (ByteBuffer) value.get("content"); + try { + JsonNode doc = MAPPER.readTree(buf.array()); + return doc; + } catch (IOException e) { + return null; + } + } + }) + .branch( + new Predicate() { + @Override + public boolean test(String key, JsonNode value) { + return "beer".equals(value.get("type").asText()) && + value.has("brewery_id") && + value.has("name") && + value.has("description") && + value.has("category") && + value.has("style") && + value.has("abv") && + value.has("ibu") && + value.has("updated"); + } + }, + new Predicate() { + @Override + public boolean test(String key, JsonNode value) { + return "brewery".equals(value.get("type").asText()) && + value.has("name") && + value.has("description") && + value.has("country") && + value.has("city") && + value.has("state") && + value.has("phone") && + value.has("updated"); + } + } + ); + documents[0].foreach(new ForeachAction() { + @Override + public void apply(String key, JsonNode value) { + try { + insertBeer.setString(1, key); + insertBeer.setString(2, value.get("brewery_id").asText()); + insertBeer.setString(3, value.get("name").asText()); + insertBeer.setString(4, value.get("description").asText()); + insertBeer.setString(5, value.get("category").asText()); + insertBeer.setString(6, value.get("style").asText()); + insertBeer.setBigDecimal(7, new BigDecimal(value.get("abv").asText())); + insertBeer.setBigDecimal(8, new BigDecimal(value.get("ibu").asText())); + insertBeer.setDate(9, new Date(DATE_FORMAT.parse(value.get("updated").asText()).getTime())); + insertBeer.execute(); + } catch (SQLException e) { + System.err.println("Failed to insert record: " + key + ". " + e); + } catch (ParseException e) { + System.err.println("Failed to insert record: " + key + ". " + e); + } + } + }); + documents[1].foreach(new ForeachAction() { + @Override + public void apply(String key, JsonNode value) { + try { + insertBrewery.setString(1, key); + insertBrewery.setString(2, value.get("name").asText()); + insertBrewery.setString(3, value.get("description").asText()); + insertBrewery.setString(4, value.get("country").asText()); + insertBrewery.setString(5, value.get("city").asText()); + insertBrewery.setString(6, value.get("state").asText()); + insertBrewery.setString(7, value.get("phone").asText()); + insertBrewery.setDate(8, new Date(DATE_FORMAT.parse(value.get("updated").asText()).getTime())); + insertBrewery.execute(); + } catch (SQLException e) { + System.err.println("Failed to insert record: " + key + ". " + e); + } catch (ParseException e) { + System.err.println("Failed to insert record: " + key + ". " + e); + } + } + }); + + final KafkaStreams streams = new KafkaStreams(builder, props); + streams.start(); + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + streams.close(); + } + })); + } +} \ No newline at end of file diff --git a/src/test/java/examples/serde/KeyAvroDeserializer.java b/src/test/java/examples/serde/KeyAvroDeserializer.java new file mode 100644 index 0000000..824b39f --- /dev/null +++ b/src/test/java/examples/serde/KeyAvroDeserializer.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2016 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package examples.serde; + +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import org.apache.kafka.common.serialization.Deserializer; + +import java.util.Map; + +public class KeyAvroDeserializer implements Deserializer { + + KafkaAvroDeserializer inner; + + /** + * Constructor used by Kafka Streams. + */ + public KeyAvroDeserializer() { + inner = new KafkaAvroDeserializer(); + } + + public KeyAvroDeserializer(SchemaRegistryClient client) { + inner = new KafkaAvroDeserializer(client); + } + + public KeyAvroDeserializer(SchemaRegistryClient client, Map props) { + inner = new KafkaAvroDeserializer(client, props); + } + + @Override + public void configure(Map configs, boolean isKey) { + inner.configure(configs, isKey); + } + + @Override + public String deserialize(String s, byte[] bytes) { + return (String) inner.deserialize(s, bytes); + } + + @Override + public void close() { + inner.close(); + } +} diff --git a/src/test/java/examples/serde/KeyAvroSerde.java b/src/test/java/examples/serde/KeyAvroSerde.java new file mode 100644 index 0000000..7188687 --- /dev/null +++ b/src/test/java/examples/serde/KeyAvroSerde.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2016 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package examples.serde; + +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; + +import java.util.Collections; +import java.util.Map; + +public class KeyAvroSerde implements Serde { + + private final Serde inner; + + /** + * Constructor used by Kafka Streams. + */ + public KeyAvroSerde() { + inner = Serdes.serdeFrom(new KeyAvroSerializer(), new KeyAvroDeserializer()); + } + + public KeyAvroSerde(SchemaRegistryClient client) { + this(client, Collections.emptyMap()); + } + + public KeyAvroSerde(SchemaRegistryClient client, Map props) { + inner = Serdes.serdeFrom(new KeyAvroSerializer(client), new KeyAvroDeserializer(client, props)); + } + + @Override + public Serializer serializer() { + return inner.serializer(); + } + + @Override + public Deserializer deserializer() { + return inner.deserializer(); + } + + @Override + public void configure(Map configs, boolean isKey) { + inner.serializer().configure(configs, isKey); + inner.deserializer().configure(configs, isKey); + } + + @Override + public void close() { + inner.serializer().close(); + inner.deserializer().close(); + } +} diff --git a/src/test/java/examples/serde/KeyAvroSerializer.java b/src/test/java/examples/serde/KeyAvroSerializer.java new file mode 100644 index 0000000..575a5b8 --- /dev/null +++ b/src/test/java/examples/serde/KeyAvroSerializer.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2016 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package examples.serde; + +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import org.apache.kafka.common.serialization.Serializer; + +import java.util.Map; + +public class KeyAvroSerializer implements Serializer { + + KafkaAvroSerializer inner; + + /** + * Constructor used by Kafka Streams. + */ + public KeyAvroSerializer() { + inner = new KafkaAvroSerializer(); + } + + public KeyAvroSerializer(SchemaRegistryClient client) { + inner = new KafkaAvroSerializer(client); + } + + @Override + public void configure(Map configs, boolean isKey) { + inner.configure(configs, isKey); + } + + @Override + public byte[] serialize(String topic, String key) { + return inner.serialize(topic, key); + } + + @Override + public void close() { + inner.close(); + } +} diff --git a/src/test/java/examples/serde/ValueAvroDeserializer.java b/src/test/java/examples/serde/ValueAvroDeserializer.java new file mode 100644 index 0000000..5b8873f --- /dev/null +++ b/src/test/java/examples/serde/ValueAvroDeserializer.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2016 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package examples.serde; + +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.common.serialization.Deserializer; + +import java.util.Map; + +public class ValueAvroDeserializer implements Deserializer { + + KafkaAvroDeserializer inner; + + /** + * Constructor used by Kafka Streams. + */ + public ValueAvroDeserializer() { + inner = new KafkaAvroDeserializer(); + } + + public ValueAvroDeserializer(SchemaRegistryClient client) { + inner = new KafkaAvroDeserializer(client); + } + + public ValueAvroDeserializer(SchemaRegistryClient client, Map props) { + inner = new KafkaAvroDeserializer(client, props); + } + + @Override + public void configure(Map configs, boolean isKey) { + inner.configure(configs, isKey); + } + + @Override + public GenericRecord deserialize(String s, byte[] bytes) { + return (GenericRecord) inner.deserialize(s, bytes); + } + + @Override + public void close() { + inner.close(); + } +} diff --git a/src/test/java/examples/serde/ValueAvroSerde.java b/src/test/java/examples/serde/ValueAvroSerde.java new file mode 100644 index 0000000..a5d2c47 --- /dev/null +++ b/src/test/java/examples/serde/ValueAvroSerde.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2016 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package examples.serde; + +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; + +import java.util.Collections; +import java.util.Map; + +public class ValueAvroSerde implements Serde { + + private final Serde inner; + + /** + * Constructor used by Kafka Streams. + */ + public ValueAvroSerde() { + inner = Serdes.serdeFrom(new ValueAvroSerializer(), new ValueAvroDeserializer()); + } + + public ValueAvroSerde(SchemaRegistryClient client) { + this(client, Collections.emptyMap()); + } + + public ValueAvroSerde(SchemaRegistryClient client, Map props) { + inner = Serdes.serdeFrom(new ValueAvroSerializer(client), new ValueAvroDeserializer(client, props)); + } + + @Override + public Serializer serializer() { + return inner.serializer(); + } + + @Override + public Deserializer deserializer() { + return inner.deserializer(); + } + + @Override + public void configure(Map configs, boolean isKey) { + inner.serializer().configure(configs, isKey); + inner.deserializer().configure(configs, isKey); + } + + @Override + public void close() { + inner.serializer().close(); + inner.deserializer().close(); + } +} diff --git a/src/test/java/examples/serde/ValueAvroSerializer.java b/src/test/java/examples/serde/ValueAvroSerializer.java new file mode 100644 index 0000000..9cd52b7 --- /dev/null +++ b/src/test/java/examples/serde/ValueAvroSerializer.java @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2016 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package examples.serde; + +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.common.serialization.Serializer; + +import java.util.Map; + +public class ValueAvroSerializer implements Serializer { + + KafkaAvroSerializer inner; + + /** + * Constructor used by Kafka Streams. + */ + public ValueAvroSerializer() { + inner = new KafkaAvroSerializer(); + } + + public ValueAvroSerializer(SchemaRegistryClient client) { + inner = new KafkaAvroSerializer(client); + } + + @Override + public void configure(Map configs, boolean isKey) { + inner.configure(configs, isKey); + } + + @Override + public byte[] serialize(String topic, GenericRecord value) { + return inner.serialize(topic, value); + } + + @Override + public void close() { + inner.close(); + } +}