Skip to content

Commit

Permalink
Migrate to kafka streams (#18)
Browse files Browse the repository at this point in the history
* Migrate span-normalizer to kafka streams

* use trace_id as key when sending RawSpans to the output topic. The raw-spans-grouper can then do a groupByKey on the trace_id

* Check if KStream for an input topic already exists and re-use it.

* Extract common streams config

* dd producer.max.request.size property

* merged topic creationg changes as part of this PR

* Schema registry configurable and decoupled

* 1. bring in changes from job-config branch
2. clean up flink dependencies
3. add a simple integration test

* revert config changes done for local testing

* remove dev docker registry used for testing

* Update span-normalizer/build.gradle.kts

Co-authored-by: Ronak <[email protected]>
Co-authored-by: Laxman Ch <[email protected]>
Co-authored-by: kotharironak <[email protected]>
  • Loading branch information
4 people authored Sep 15, 2020
1 parent 82dcfdc commit e4841cb
Show file tree
Hide file tree
Showing 15 changed files with 359 additions and 370 deletions.
34 changes: 11 additions & 23 deletions helm/templates/span-normalizer-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,17 @@ metadata:
release: {{ .Release.Name }}
data:
application.conf: |-
flink.job {
{{- if hasKey .Values.spanNormalizerConfig.flink "job" }}
metrics {
metrics {
reporters = {{- toJson .Values.spanNormalizerConfig.flink.job.metrics.reporters | trim | nindent 12 }}
}
}
{{- end }}
}
flink.source {
kafka {
bootstrap.servers = "{{ .Values.spanNormalizerConfig.flink.source.kafka.bootstrapServers }}"
}
}
flink.sink {
schema.registry {
schema.registry.url = "{{ .Values.spanNormalizerConfig.flink.sink.schema.registry.url }}"
value.subject.name.strategy = "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"
}
kafka {
bootstrap.servers = "{{ .Values.spanNormalizerConfig.flink.sink.kafka.bootstrapServers }}"
}
kafka.streams.config {
application.id = jaeger-spans-to-raw-spans-job
metrics.recording.level = "{{ .Values.spanNormalizerConfig.kafka.streams.config.metricsRecordingLevel }}"
num.stream.threads = "{{ .Values.spanNormalizerConfig.kafka.streams.config.numStreamThreads }}"
producer.compression.type = gzip
producer.max.request.size = 10485760
topology.optimization = all
bootstrap.servers = "{{ .Values.spanNormalizerConfig.kafka.streams.config.bootstrapServers }}"
auto.offset.reset = "latest"
auto.commit.interval.ms = 5000
schema.registry.url = "{{ .Values.spanNormalizerConfig.kafka.streams.config.schemaRegistryUrl }}"
}
{{- if hasKey .Values.spanNormalizerConfig "processor" }}
processor {
Expand Down
15 changes: 6 additions & 9 deletions helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,13 @@ deploymentSelectorMatchLabels:
###########
spanNormalizerConfig:
name: span-normalizer-config
flink:
source:
kafka:
bootstrapServers: "bootstrap:9092"
sink:
kafka:
kafka:
streams:
config:
metricsRecordingLevel: INFO
numStreamThreads: 2
bootstrapServers: "bootstrap:9092"
schema:
registry:
url: "http://schema-registry-service:8081"
schemaRegistryUrl: "http://schema-registry-service:8081"

logConfig:
name: span-normalizer-log-appender-config
Expand Down
15 changes: 8 additions & 7 deletions span-normalizer/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,17 @@ dependencies {
implementation(project(":raw-span-constants"))
implementation(project(":span-normalizer-api"))

implementation("org.hypertrace.core.datamodel:data-model:0.1.1")
implementation("org.hypertrace.core.flinkutils:flink-utils:0.1.6")
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.8")
implementation("org.hypertrace.core.datamodel:data-model:0.1.7")
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.9")
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.8")
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.5")

// Needed for flink metric exporter. Used for Hypertrace and debugging.
runtimeOnly("org.apache.flink:flink-metrics-slf4j:1.10.1")

// Required for the GRPC clients.
runtimeOnly("io.grpc:grpc-netty:1.31.1")
implementation("com.typesafe:config:1.4.0")
implementation("de.javakaffee:kryo-serializers:0.45")
implementation("org.apache.flink:flink-avro:1.7.0")
implementation("org.apache.flink:flink-streaming-java_2.11:1.7.0")

implementation("io.confluent:kafka-avro-serializer:5.5.1")
implementation("org.apache.commons:commons-lang3:3.10")

Expand All @@ -59,4 +58,6 @@ dependencies {

testImplementation("org.junit.jupiter:junit-jupiter:5.6.2")
testImplementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.8")
testImplementation("org.junit-pioneer:junit-pioneer:0.9.0")
testImplementation("org.apache.kafka:kafka-streams-test-utils:5.5.1-ccs")
}
Original file line number Diff line number Diff line change
@@ -1,26 +1,90 @@
package org.hypertrace.core.spannormalizer;

import static org.hypertrace.core.spannormalizer.constants.SpanNormalizerConstants.INPUT_TOPIC_CONFIG_KEY;
import static org.hypertrace.core.spannormalizer.constants.SpanNormalizerConstants.KAFKA_STREAMS_CONFIG_KEY;
import static org.hypertrace.core.spannormalizer.constants.SpanNormalizerConstants.OUTPUT_TOPIC_CONFIG_KEY;
import static org.hypertrace.core.spannormalizer.constants.SpanNormalizerConstants.SPAN_NORMALIZER_JOB_CONFIG;

import com.typesafe.config.Config;
import org.hypertrace.core.serviceframework.background.PlatformBackgroundJob;
import org.hypertrace.core.serviceframework.background.PlatformBackgroundService;
import io.jaegertracing.api_v2.JaegerSpanInternalModel.Span;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.hypertrace.core.kafkastreams.framework.KafkaStreamsApp;
import org.hypertrace.core.serviceframework.config.ConfigClient;
import org.hypertrace.core.serviceframework.config.ConfigUtils;
import org.hypertrace.core.spannormalizer.jaeger.JaegerSpanSerde;
import org.hypertrace.core.spannormalizer.jaeger.JaegerSpanToAvroRawSpanTransformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SpanNormalizer extends PlatformBackgroundService {
private static final Logger LOGGER = LoggerFactory.getLogger(SpanNormalizer.class);
public class SpanNormalizer extends KafkaStreamsApp {

private static final Logger logger = LoggerFactory.getLogger(SpanNormalizer.class);

public SpanNormalizer(ConfigClient configClient) {
super(configClient);
}

@Override
protected PlatformBackgroundJob createBackgroundJob(Config config) {
return new SpanNormalizerJob(config);
public StreamsBuilder buildTopology(Map<String, Object> streamsProperties,
StreamsBuilder streamsBuilder,
Map<String, KStream<?, ?>> inputStreams) {

Config jobConfig = getJobConfig(streamsProperties);
String inputTopic = jobConfig.getString(INPUT_TOPIC_CONFIG_KEY);
String outputTopic = jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY);

KStream<byte[], Span> inputStream = (KStream<byte[], Span>) inputStreams.get(inputTopic);
if (inputStream == null) {
inputStream = streamsBuilder
.stream(inputTopic, Consumed.with(Serdes.ByteArray(), new JaegerSpanSerde()));
inputStreams.put(inputTopic, inputStream);
}

inputStream
.transform(JaegerSpanToAvroRawSpanTransformer::new)
.to(outputTopic, Produced.keySerde(Serdes.String()));

return streamsBuilder;
}

@Override
public Map<String, Object> getStreamsConfig(Config jobConfig) {
Map<String, Object> streamsConfig = new HashMap<>(
ConfigUtils.getFlatMapConfig(jobConfig, KAFKA_STREAMS_CONFIG_KEY));
return streamsConfig;
}

@Override
public String getJobConfigKey() {
return SPAN_NORMALIZER_JOB_CONFIG;
}

@Override
public Logger getLogger() {
return logger;
}

@Override
protected Logger getLogger() {
return LOGGER;
public List<String> getInputTopics(Map<String, Object> properties) {
Config jobConfig = getJobConfig(properties);
return Collections.singletonList(jobConfig.getString(INPUT_TOPIC_CONFIG_KEY));
}

@Override
public List<String> getOutputTopics(Map<String, Object> properties) {
Config jobConfig = getJobConfig(properties);
return Collections.singletonList(jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY));
}

private Config getJobConfig(Map<String, Object> properties) {
return (Config) properties.get(getJobConfigKey());
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.hypertrace.core.spannormalizer.constants;

public class SpanNormalizerConstants {

public static final String SCHEMA_REGISTRY_CONFIG_KEY = "schema.registry.config";
public static final String KAFKA_CONFIG_KEY = "kafka.config";
public static final String KAFKA_STREAMS_CONFIG_KEY = "kafka.streams.config";
public static final String JOB_CONFIG = "jobconfig";
public static final String SPAN_NORMALIZER_CONFIG_KEY = "span.normalizer.config";
public static final String INPUT_TOPIC_CONFIG_KEY = "input.topic";
public static final String OUTPUT_TOPIC_CONFIG_KEY = "output.topic";
public static final String SPAN_TYPE_CONFIG_KEY = "span.type";
public static final String SPAN_NORMALIZER_JOB_CONFIG = "span-normalizer-job-config";
}

This file was deleted.

Loading

0 comments on commit e4841cb

Please sign in to comment.