Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DEVX-1881: Update microservices code to work with secure services #346

Merged
merged 3 commits into from
Jul 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,39 @@
import io.confluent.examples.streams.microservices.domain.Schemas;
import io.confluent.examples.streams.microservices.domain.Schemas.Topics;
import io.confluent.examples.streams.utils.MonitoringInterceptorUtils;
import org.apache.commons.cli.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import static io.confluent.examples.streams.avro.microservices.Product.JUMPERS;
import static io.confluent.examples.streams.avro.microservices.Product.UNDERPANTS;
import static io.confluent.examples.streams.microservices.util.MicroserviceUtils.ProductTypeSerde;
import static io.confluent.examples.streams.microservices.util.MicroserviceUtils.*;
import static java.util.Arrays.asList;

public class AddInventory {

private static void sendInventory(final List<KeyValue<Product, Integer>> inventory,
final Schemas.Topic<Product, Integer> topic,
final String bootstrapServers) {
final String bootstrapServers,
final Properties defaultConfig) {

final Properties producerConfig = new Properties();
producerConfig.putAll(defaultConfig);
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "inventory-generator");
MonitoringInterceptorUtils.maybeConfigureInterceptorsProducer(producerConfig);

final ProductTypeSerde productSerde = new ProductTypeSerde();

try (final KafkaProducer<Product, Integer> stockProducer = new KafkaProducer<>(
Expand All @@ -46,19 +52,50 @@ private static void sendInventory(final List<KeyValue<Product, Integer>> invento
}
}

public static void main(final String[] args) {
public static void main(final String[] args) throws Exception {

final Options opts = new Options();
opts.addOption(Option.builder("b")
.longOpt("bootstrap-servers").hasArg().desc("Kafka cluster bootstrap server string").build())
.addOption(Option.builder("c")
.longOpt("config-file").hasArg().desc("Java properties file with configurations for Kafka Clients").build())
.addOption(Option.builder("h")
.longOpt("help").hasArg(false).desc("Show usage information").build())
.addOption(Option.builder("u")
.longOpt("underpants").hasArg().desc("Quantity of underpants to add to inventory").build())
.addOption(Option.builder("j")
.longOpt("jumpers").hasArg().desc("Quantity of jumpers to add to inventory").build());

final CommandLine cl = new DefaultParser().parse(opts, args);

if (cl.hasOption("h")) {
final HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("Add Inventory", opts);
return;
}

final int quantityUnderpants = Integer.parseInt(cl.getOptionValue("u", "20"));
final int quantityJumpers = Integer.parseInt(cl.getOptionValue("j", "10"));

final String bootstrapServers = cl.getOptionValue("b", DEFAULT_BOOTSTRAP_SERVERS);

final int quantityUnderpants = args.length > 0 ? Integer.parseInt(args[0]) : 20;
final int quantityJumpers = args.length > 1 ? Integer.parseInt(args[1]) : 10;
final String bootstrapServers = args.length > 2 ? args[2] : "localhost:9092";
final Properties defaultConfig = Optional.ofNullable(cl.getOptionValue("config-file", null))
.map(path -> {
try {
return buildPropertiesFromConfigFile(path);
} catch (final IOException e) {
throw new RuntimeException(e);
}
})
.orElse(new Properties());

// Send Inventory
final List<KeyValue<Product, Integer>> inventory = asList(
new KeyValue<>(UNDERPANTS, quantityUnderpants),
new KeyValue<>(JUMPERS, quantityJumpers)
);
System.out.printf("Send inventory to %s%n", Topics.WAREHOUSE_INVENTORY);
sendInventory(inventory, Topics.WAREHOUSE_INVENTORY, bootstrapServers);
sendInventory(inventory, Topics.WAREHOUSE_INVENTORY, bootstrapServers, defaultConfig);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,23 @@
import static io.confluent.examples.streams.microservices.domain.Schemas.Topics.ORDERS;
import static io.confluent.examples.streams.microservices.domain.Schemas.Topics.ORDERS_ENRICHED;
import static io.confluent.examples.streams.microservices.domain.Schemas.Topics.PAYMENTS;
import static io.confluent.examples.streams.microservices.util.MicroserviceUtils.addShutdownHookAndBlock;
import static io.confluent.examples.streams.microservices.util.MicroserviceUtils.baseStreamsConfig;
import static io.confluent.examples.streams.microservices.util.MicroserviceUtils.parseArgsAndConfigure;
import static io.confluent.examples.streams.microservices.util.MicroserviceUtils.*;

import io.confluent.examples.streams.avro.microservices.Customer;
import io.confluent.examples.streams.avro.microservices.Order;
import io.confluent.examples.streams.avro.microservices.OrderEnriched;
import io.confluent.examples.streams.avro.microservices.Payment;

import io.confluent.examples.streams.microservices.domain.Schemas;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import org.apache.commons.cli.*;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;

import java.io.IOException;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.streams.KafkaStreams.State;
Expand Down Expand Up @@ -48,8 +53,10 @@ public EmailService(final Emailer emailer) {
}

@Override
public void start(final String bootstrapServers, final String stateDir) {
streams = processStreams(bootstrapServers, stateDir);
public void start(final String bootstrapServers,
final String stateDir,
final Properties defaultConfig) {
streams = processStreams(bootstrapServers, stateDir, defaultConfig);
streams.cleanUp(); //don't do this in prod as it clears your state stores
final CountDownLatch startLatch = new CountDownLatch(1);
streams.setStateListener((newState, oldState) -> {
Expand All @@ -70,7 +77,9 @@ public void start(final String bootstrapServers, final String stateDir) {

}

private KafkaStreams processStreams(final String bootstrapServers, final String stateDir) {
private KafkaStreams processStreams(final String bootstrapServers,
final String stateDir,
final Properties defaultConfig) {

final StreamsBuilder builder = new StreamsBuilder();

Expand Down Expand Up @@ -106,12 +115,62 @@ private KafkaStreams processStreams(final String bootstrapServers, final String
//TopicNameExtractor to get the topic name (i.e., customerLevel) from the enriched order record being sent
.to((orderId, orderEnriched, record) -> orderEnriched.getCustomerLevel(), Produced.with(ORDERS_ENRICHED.keySerde(), ORDERS_ENRICHED.valueSerde()));

return new KafkaStreams(builder.build(), baseStreamsConfig(bootstrapServers, stateDir, SERVICE_APP_ID));
return new KafkaStreams(builder.build(),
baseStreamsConfig(bootstrapServers, stateDir, SERVICE_APP_ID, defaultConfig));
}

public static void main(final String[] args) throws Exception {
final Options opts = new Options();
opts.addOption(Option.builder("b")
.longOpt("bootstrap-servers")
.hasArg()
.desc("Kafka cluster bootstrap server string (ex: broker:9092)")
.build());
opts.addOption(Option.builder("s")
.longOpt("schema-registry")
.hasArg()
.desc("Schema Registry URL")
.build());
opts.addOption(Option.builder("c")
.longOpt("config-file")
.hasArg()
.desc("Java properties file with configurations for Kafka Clients")
.build());
opts.addOption(Option.builder("t")
.longOpt("state-dir")
.hasArg()
.desc("The directory for state storage")
.build());
opts.addOption(Option.builder("h").longOpt("help").hasArg(false).desc("Show usage information").build());

final CommandLine cl = new DefaultParser().parse(opts, args);

if (cl.hasOption("h")) {
final HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("Email Service", opts);
return;
}
final EmailService service = new EmailService(new LoggingEmailer());
service.start(parseArgsAndConfigure(args), "/tmp/kafka-streams");

final Properties defaultConfig = Optional.ofNullable(cl.getOptionValue("config-file", null))
.map(path -> {
try {
return buildPropertiesFromConfigFile(path);
} catch (final IOException e) {
throw new RuntimeException(e);
}
})
.orElse(new Properties());


final String schemaRegistryUrl = cl.getOptionValue("schema-registry", DEFAULT_SCHEMA_REGISTRY_URL);
defaultConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
Schemas.configureSerdes(defaultConfig);

service.start(
cl.getOptionValue("bootstrap-servers", DEFAULT_BOOTSTRAP_SERVERS),
cl.getOptionValue("state-dir", "/tmp/kafka-streams-examples"),
defaultConfig);
addShutdownHookAndBlock(service);
}

Expand All @@ -137,8 +196,8 @@ interface Emailer {

public static class EmailTuple {

public Order order;
public Payment payment;
final public Order order;
final public Payment payment;
public Customer customer;

public EmailTuple(final Order order, final Payment payment) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,14 @@
import io.confluent.examples.streams.avro.microservices.OrderValidation;
import io.confluent.examples.streams.avro.microservices.OrderValue;
import io.confluent.examples.streams.microservices.domain.Schemas;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import org.apache.commons.cli.*;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
Expand All @@ -32,9 +38,7 @@
import static io.confluent.examples.streams.avro.microservices.OrderValidationType.FRAUD_CHECK;
import static io.confluent.examples.streams.microservices.domain.Schemas.Topics.ORDERS;
import static io.confluent.examples.streams.microservices.domain.Schemas.Topics.ORDER_VALIDATIONS;
import static io.confluent.examples.streams.microservices.util.MicroserviceUtils.addShutdownHookAndBlock;
import static io.confluent.examples.streams.microservices.util.MicroserviceUtils.baseStreamsConfig;
import static io.confluent.examples.streams.microservices.util.MicroserviceUtils.parseArgsAndConfigure;
import static io.confluent.examples.streams.microservices.util.MicroserviceUtils.*;


/**
Expand All @@ -52,8 +56,10 @@ public class FraudService implements Service {
private KafkaStreams streams;

@Override
public void start(final String bootstrapServers, final String stateDir) {
streams = processStreams(bootstrapServers, stateDir);
public void start(final String bootstrapServers,
final String stateDir,
final Properties defaultConfig) {
streams = processStreams(bootstrapServers, stateDir, defaultConfig);
streams.cleanUp(); //don't do this in prod as it clears your state stores
final CountDownLatch startLatch = new CountDownLatch(1);
streams.setStateListener((newState, oldState) -> {
Expand All @@ -75,7 +81,9 @@ public void start(final String bootstrapServers, final String stateDir) {
log.info("Started Service " + getClass().getSimpleName());
}

private KafkaStreams processStreams(final String bootstrapServers, final String stateDir) {
private KafkaStreams processStreams(final String bootstrapServers,
final String stateDir,
final Properties defaultConfig) {

//Latch onto instances of the orders and inventory topics
final StreamsBuilder builder = new StreamsBuilder();
Expand Down Expand Up @@ -121,7 +129,7 @@ private KafkaStreams processStreams(final String bootstrapServers, final String
//as caching in Kafka Streams will conflate subsequent updates for the same key. Disabling caching ensures
//we get a complete "changelog" from the aggregate(...) step above (i.e. every input event will have a
//corresponding output event.
final Properties props = baseStreamsConfig(bootstrapServers, stateDir, SERVICE_APP_ID);
final Properties props = baseStreamsConfig(bootstrapServers, stateDir, SERVICE_APP_ID, defaultConfig);
props.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");

return new KafkaStreams(builder.build(), props);
Expand All @@ -132,8 +140,56 @@ private OrderValue simpleMerge(final OrderValue a, final OrderValue b) {
}

public static void main(final String[] args) throws Exception {

final Options opts = new Options();
opts.addOption(Option.builder("b")
.longOpt("bootstrap-servers")
.hasArg()
.desc("Kafka cluster bootstrap server string (ex: broker:9092)")
.build());
opts.addOption(Option.builder("s")
.longOpt("schema-registry")
.hasArg()
.desc("Schema Registry URL")
.build());
opts.addOption(Option.builder("c")
.longOpt("config-file")
.hasArg()
.desc("Java properties file with configurations for Kafka Clients")
.build());
opts.addOption(Option.builder("t")
.longOpt("state-dir")
.hasArg()
.desc("The directory for state storage")
.build());
opts.addOption(Option.builder("h").longOpt("help").hasArg(false).desc("Show usage information").build());

final CommandLine cl = new DefaultParser().parse(opts, args);

if (cl.hasOption("h")) {
final HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("Fraud Service", opts);
return;
}
final FraudService service = new FraudService();
service.start(parseArgsAndConfigure(args), "/tmp/kafka-streams");
final Properties defaultConfig = Optional.ofNullable(cl.getOptionValue("config-file", null))
.map(path -> {
try {
return buildPropertiesFromConfigFile(path);
} catch (final IOException e) {
throw new RuntimeException(e);
}
})
.orElse(new Properties());

final String schemaRegistryUrl = cl.getOptionValue("schema-registry", DEFAULT_SCHEMA_REGISTRY_URL);
defaultConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
Schemas.configureSerdes(defaultConfig);

service.start(
cl.getOptionValue("bootstrap-servers", DEFAULT_BOOTSTRAP_SERVERS),
cl.getOptionValue("state-dir", "/tmp/kafka-streams-examples"),
defaultConfig);
addShutdownHookAndBlock(service);
}

Expand Down
Loading