Skip to content

Commit

Permalink
KAFKAC-54: Add example with Kafka Streams
Browse files Browse the repository at this point in the history
  • Loading branch information
avsej committed Nov 4, 2016
1 parent 312de86 commit 9cc2a75
Show file tree
Hide file tree
Showing 10 changed files with 648 additions and 9 deletions.
56 changes: 56 additions & 0 deletions config/connect-distributed.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
##
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##

# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092

# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=connect-cluster

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Topic to use for storing offsets. This topic should have many partitions and be replicated.
offset.storage.topic=connect-offsets

# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated topic.
# You may need to manually create the topic to ensure single partition for the config topic as auto created topics may have multiple partitions.
config.storage.topic=connect-configs

# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated.
status.storage.topic=connect-status

# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
6 changes: 2 additions & 4 deletions config/connect-standalone.properties
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ value.converter.schemas.enable=true

# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
internal.key.converter=io.confluent.connect.avro.AvroConverter
internal.key.converter.schema.registry.url=http://localhost:8081
internal.value.converter=io.confluent.connect.avro.AvroConverter
internal.value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

Expand Down
24 changes: 19 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
~ See the License for the specific language governing permissions and
~ limitations under the License.
~-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

<modelVersion>4.0.0</modelVersion>
Expand Down Expand Up @@ -52,11 +52,13 @@
<properties>
<java-compat.version>1.6</java-compat.version>
<confluent.version>3.0.1</confluent.version>
<kafka.version>0.10.0.1</kafka.version>
<junit.version>4.12</junit.version>
<kafka-connect-api.version>0.10.0.1</kafka-connect-api.version>
<dcp-client.version>0.6.0</dcp-client.version>
<kafka-streams.version>0.10.0.1</kafka-streams.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<confluent.maven.repo>http://packages.confluent.io/maven/</confluent.maven.repo>
<junit.version>4.12</junit.version>
<mysql.version>5.1.6</mysql.version>
</properties>

<repositories>
Expand All @@ -71,7 +73,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>${kafka.version}</version>
<version>${kafka-connect-api.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
Expand All @@ -83,12 +85,24 @@
<artifactId>dcp-client</artifactId>
<version>${dcp-client.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka-streams.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
208 changes: 208 additions & 0 deletions src/test/java/examples/KafkaStreamsDemo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* Copyright (c) 2016 Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package examples;

import com.couchbase.client.deps.com.fasterxml.jackson.databind.JsonNode;
import com.couchbase.client.deps.com.fasterxml.jackson.databind.ObjectMapper;
import examples.serde.KeyAvroSerde;
import examples.serde.ValueAvroSerde;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueMapper;

import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Properties;

public class KafkaStreamsDemo {
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

public static void main(String[] args) throws InterruptedException, SQLException {
/**
* The example assumes the following SQL schema
*
* DROP DATABASE IF EXISTS beer_sample_sql;
* CREATE DATABASE beer_sample_sql CHARACTER SET utf8 COLLATE utf8_general_ci;
* USE beer_sample_sql;
*
* CREATE TABLE breweries (
* id VARCHAR(256) NOT NULL,
* name VARCHAR(256),
* description TEXT,
* country VARCHAR(256),
* city VARCHAR(256),
* state VARCHAR(256),
* phone VARCHAR(40),
* updated_at DATETIME,
* PRIMARY KEY (id)
* );
*
*
* CREATE TABLE beers (
* id VARCHAR(256) NOT NULL,
* brewery_id VARCHAR(256) NOT NULL,
* name VARCHAR(256),
* category VARCHAR(256),
* style VARCHAR(256),
* description TEXT,
* abv DECIMAL(10,2),
* ibu DECIMAL(10,2),
* updated_at DATETIME,
* PRIMARY KEY (id)
* );
*/
try {
Class.forName("com.mysql.jdbc.Driver");
} catch (ClassNotFoundException e) {
System.err.println("Failed to load MySQL JDBC driver");
}
Connection connection = DriverManager
.getConnection("jdbc:mysql://localhost:3306/beer_sample_sql", "root", "secret");
final PreparedStatement insertBrewery = connection.prepareStatement(
"INSERT INTO breweries (id, name, description, country, city, state, phone, updated_at)" +
" VALUES (?, ?, ?, ?, ?, ?, ?, ?)");
final PreparedStatement insertBeer = connection.prepareStatement(
"INSERT INTO beers (id, brewery_id, name, description, category, style, abv, ibu, updated_at)" +
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)");

String schemaRegistryUrl = "http://localhost:8081";

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-test");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, KeyAvroSerde.class);
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, ValueAvroSerde.class);

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

KStreamBuilder builder = new KStreamBuilder();

KStream<String, GenericRecord> source = builder
.stream("streaming-topic-beer-sample");

KStream<String, JsonNode>[] documents = source
.mapValues(new ValueMapper<GenericRecord, JsonNode>() {
@Override
public JsonNode apply(GenericRecord value) {
ByteBuffer buf = (ByteBuffer) value.get("content");
try {
JsonNode doc = MAPPER.readTree(buf.array());
return doc;
} catch (IOException e) {
return null;
}
}
})
.branch(
new Predicate<String, JsonNode>() {
@Override
public boolean test(String key, JsonNode value) {
return "beer".equals(value.get("type").asText()) &&
value.has("brewery_id") &&
value.has("name") &&
value.has("description") &&
value.has("category") &&
value.has("style") &&
value.has("abv") &&
value.has("ibu") &&
value.has("updated");
}
},
new Predicate<String, JsonNode>() {
@Override
public boolean test(String key, JsonNode value) {
return "brewery".equals(value.get("type").asText()) &&
value.has("name") &&
value.has("description") &&
value.has("country") &&
value.has("city") &&
value.has("state") &&
value.has("phone") &&
value.has("updated");
}
}
);
documents[0].foreach(new ForeachAction<String, JsonNode>() {
@Override
public void apply(String key, JsonNode value) {
try {
insertBeer.setString(1, key);
insertBeer.setString(2, value.get("brewery_id").asText());
insertBeer.setString(3, value.get("name").asText());
insertBeer.setString(4, value.get("description").asText());
insertBeer.setString(5, value.get("category").asText());
insertBeer.setString(6, value.get("style").asText());
insertBeer.setBigDecimal(7, new BigDecimal(value.get("abv").asText()));
insertBeer.setBigDecimal(8, new BigDecimal(value.get("ibu").asText()));
insertBeer.setDate(9, new Date(DATE_FORMAT.parse(value.get("updated").asText()).getTime()));
insertBeer.execute();
} catch (SQLException e) {
System.err.println("Failed to insert record: " + key + ". " + e);
} catch (ParseException e) {
System.err.println("Failed to insert record: " + key + ". " + e);
}
}
});
documents[1].foreach(new ForeachAction<String, JsonNode>() {
@Override
public void apply(String key, JsonNode value) {
try {
insertBrewery.setString(1, key);
insertBrewery.setString(2, value.get("name").asText());
insertBrewery.setString(3, value.get("description").asText());
insertBrewery.setString(4, value.get("country").asText());
insertBrewery.setString(5, value.get("city").asText());
insertBrewery.setString(6, value.get("state").asText());
insertBrewery.setString(7, value.get("phone").asText());
insertBrewery.setDate(8, new Date(DATE_FORMAT.parse(value.get("updated").asText()).getTime()));
insertBrewery.execute();
} catch (SQLException e) {
System.err.println("Failed to insert record: " + key + ". " + e);
} catch (ParseException e) {
System.err.println("Failed to insert record: " + key + ". " + e);
}
}
});

final KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
streams.close();
}
}));
}
}
58 changes: 58 additions & 0 deletions src/test/java/examples/serde/KeyAvroDeserializer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (c) 2016 Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package examples.serde;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.Map;

public class KeyAvroDeserializer implements Deserializer<String> {

KafkaAvroDeserializer inner;

/**
* Constructor used by Kafka Streams.
*/
public KeyAvroDeserializer() {
inner = new KafkaAvroDeserializer();
}

public KeyAvroDeserializer(SchemaRegistryClient client) {
inner = new KafkaAvroDeserializer(client);
}

public KeyAvroDeserializer(SchemaRegistryClient client, Map<String, ?> props) {
inner = new KafkaAvroDeserializer(client, props);
}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
inner.configure(configs, isKey);
}

@Override
public String deserialize(String s, byte[] bytes) {
return (String) inner.deserialize(s, bytes);
}

@Override
public void close() {
inner.close();
}
}
Loading

0 comments on commit 9cc2a75

Please sign in to comment.