Skip to content

Commit

Permalink
Merge pull request #294 from yianni/kotel-plugin
Browse files Browse the repository at this point in the history
Add Kotel plugin
  • Loading branch information
twmb authored Feb 8, 2023
2 parents 7581420 + baf7eac commit 27f24f2
Show file tree
Hide file tree
Showing 17 changed files with 1,539 additions and 1 deletion.
28 changes: 28 additions & 0 deletions examples/hooks_and_logging/plugin_kotel/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
kotel hooks
===

This example demonstrates how to use the kotel package to easily export
OpenTelemetry traces and metrics.

The example includes end-to-end tracing, where spans are injected into produce
records and extracted from consume records. The following spans will be linked
with a trace ID:

1) request
2) topic send
3) topic receive
4) topic process

The example also includes how metrics are exported, they will be displayed
every 60 seconds.

If your broker is running on `localhost:9092`, run `go run .` in this directory
to see traces/metrics printed in the console.

## Flags

`-brokers` can be specified to override the default localhost:9092 broker to any
comma delimited set of brokers.

`-topic` can be specified to producer/consume from an existing topic on your
local broker.
27 changes: 27 additions & 0 deletions examples/hooks_and_logging/plugin_kotel/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
module kotel_hooks

go 1.19

require (
github.com/google/uuid v1.3.0
github.com/twmb/franz-go v1.11.2
github.com/twmb/franz-go/plugin/kotel v0.0.0-00010101000000-000000000000
go.opentelemetry.io/otel v1.11.2
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.34.0
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.11.2
go.opentelemetry.io/otel/sdk v1.11.2
go.opentelemetry.io/otel/sdk/metric v0.34.0
go.opentelemetry.io/otel/trace v1.11.2
)

require (
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/klauspost/compress v1.15.15 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.2.0 // indirect
go.opentelemetry.io/otel/metric v0.34.0 // indirect
golang.org/x/sys v0.4.0 // indirect
)

replace github.com/twmb/franz-go => ../../..
44 changes: 44 additions & 0 deletions examples/hooks_and_logging/plugin_kotel/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw=
github.com/klauspost/compress v1.15.15/go.mod h1:ZcK2JAFqKOpnBlxcLsJzYfrS9X1akm9fHZNnD9+Vo/4=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc=
github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/twmb/franz-go/pkg/kmsg v1.2.0 h1:jYWh2qFw5lDbNv5Gvu/sMKagzICxuA5L6m1W2Oe7XUo=
github.com/twmb/franz-go/pkg/kmsg v1.2.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
go.opentelemetry.io/otel v1.11.2 h1:YBZcQlsVekzFsFbjygXMOXSs6pialIZxcjfO/mBDmR0=
go.opentelemetry.io/otel v1.11.2/go.mod h1:7p4EUV+AqgdlNV9gL97IgUZiVR3yrFXYo53f9BM3tRI=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.34.0 h1:O1E9/qhspQSz3O6/dSGLNBND2TO9mUaSvlhcKJMv278=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.34.0/go.mod h1:Id0oYi2ARij/um3gFV+t5rH1MTFdJpfTimsFsqKS7pE=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.11.2 h1:BhEVgvuE1NWLLuMLvC6sif791F45KFHi5GhOs1KunZU=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.11.2/go.mod h1:bx//lU66dPzNT+Y0hHA12ciKoMOH9iixEwCqC1OeQWQ=
go.opentelemetry.io/otel/metric v0.34.0 h1:MCPoQxcg/26EuuJwpYN1mZTeCYAUGx8ABxfW07YkjP8=
go.opentelemetry.io/otel/metric v0.34.0/go.mod h1:ZFuI4yQGNCupurTXCwkeD/zHBt+C2bR7bw5JqUm/AP8=
go.opentelemetry.io/otel/sdk v1.11.2 h1:GF4JoaEx7iihdMFu30sOyRx52HDHOkl9xQ8SMqNXUiU=
go.opentelemetry.io/otel/sdk v1.11.2/go.mod h1:wZ1WxImwpq+lVRo4vsmSOxdd+xwoUJ6rqyLc3SyX9aU=
go.opentelemetry.io/otel/sdk/metric v0.34.0 h1:7ElxfQpXCFZlRTvVRTkcUvK8Gt5DC8QzmzsLsO2gdzo=
go.opentelemetry.io/otel/sdk/metric v0.34.0/go.mod h1:l4r16BIqiqPy5rd14kkxllPy/fOI4tWo1jkpD9Z3ffQ=
go.opentelemetry.io/otel/trace v1.11.2 h1:Xf7hWSF2Glv0DE3MH7fBHvtpSBsjcBUe5MYAmZM/+y0=
go.opentelemetry.io/otel/trace v1.11.2/go.mod h1:4N+yC7QEz7TTsG9BSRLNAa63eg5E06ObSbKPmxQ/pKA=
golang.org/x/crypto v0.0.0-20220817201139-bc19a97f63c8/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18=
golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
260 changes: 260 additions & 0 deletions examples/hooks_and_logging/plugin_kotel/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
package main

import (
"context"
"flag"
"fmt"
"log"
"strconv"
"strings"
"sync"
"time"

"github.com/google/uuid"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/plugin/kotel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/exporters/stdout/stdoutmetric"
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
"go.opentelemetry.io/otel/trace"
)

var (
seedBrokers = flag.String("brokers", "localhost:9092", "comma delimited list of seed brokers")
topic = flag.String("topic", "topic", "topic to produce and consume for trace/metric exporting")
)

func initTracerProvider() (*sdktrace.TracerProvider, error) {
// Create a new trace exporter
traceExporter, err := stdouttrace.New(stdouttrace.WithPrettyPrint())
if err != nil {
return nil, fmt.Errorf("failed to create trace exporter: %w", err)
}
// Create a new batch span processor
bsp := sdktrace.NewBatchSpanProcessor(traceExporter)
// Create a new resource with default attributes and additional attributes
res, err := resource.Merge(
resource.Default(),
resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String("test-service"),
semconv.ServiceNamespaceKey.String("test-namespace"),
semconv.ServiceVersionKey.String("0.0.1"),
semconv.ServiceInstanceIDKey.String(uuid.New().String()),
),
)
if err != nil {
return nil, fmt.Errorf("failed to create tracer resource: %w", err)
}
// Create a new tracer provider with the batch span processor, always
// sample, and the created resource.
tp := sdktrace.NewTracerProvider(
sdktrace.WithSpanProcessor(bsp),
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithResource(res),
)
return tp, nil
}

func initMeterProvider() (*metric.MeterProvider, error) {
// Create a new meter exporter
exp, err := stdoutmetric.New()
if err != nil {
return nil, fmt.Errorf("failed to create meter exporter: %w", err)
}
// Create a new resource with default attributes and additional attributes
res, err := resource.Merge(
resource.Default(),
resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String("test-service"),
semconv.ServiceNamespaceKey.String("test-namespace"),
semconv.ServiceVersionKey.String("0.0.1"),
semconv.ServiceInstanceIDKey.String(uuid.New().String()),
),
)
if err != nil {
return nil, fmt.Errorf("failed to create meter resource: %w", err)
}
// Create a new meter provider with the created exporter and the created
// resource.
mp := metric.NewMeterProvider(
metric.WithReader(metric.NewPeriodicReader(exp)),
metric.WithResource(res),
)
return mp, nil
}

func newKotelTracer(tracerProvider *sdktrace.TracerProvider) *kotel.Tracer {
// Create a new kotel tracer with the provided tracer provider and
// propagator.
tracerOpts := []kotel.TracerOpt{
kotel.TracerProvider(tracerProvider),
kotel.TracerPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{})),
}
return kotel.NewTracer(tracerOpts...)
}

func newKotelMeter(meterProvider *metric.MeterProvider) *kotel.Meter {
// Create a new kotel meter using a provided meter provider.
meterOpts := []kotel.MeterOpt{kotel.MeterProvider(meterProvider)}
return kotel.NewMeter(meterOpts...)
}

func newKotel(tracer *kotel.Tracer, meter *kotel.Meter) *kotel.Kotel {
kotelOps := []kotel.Opt{
kotel.WithTracer(tracer),
kotel.WithMeter(meter),
}
return kotel.NewKotel(kotelOps...)
}

func newProducerClient(kotelService *kotel.Kotel) (*kgo.Client, error) {
opts := []kgo.Opt{
kgo.SeedBrokers(strings.Split(*seedBrokers, ",")...),
kgo.WithHooks(kotelService.Hooks()...),
}
return kgo.NewClient(opts...)
}

func produceMessage(client *kgo.Client, tracer trace.Tracer) error {
// Start a new span with options.
opts := []trace.SpanStartOption{
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes([]attribute.KeyValue{attribute.String("some-key", "foo")}...),
}
ctx, span := tracer.Start(context.Background(), "request", opts...)
// End the span when function exits.
defer span.End()

// Simulate some work.
time.Sleep(1 * time.Second)

var wg sync.WaitGroup
wg.Add(1)
record := &kgo.Record{Topic: *topic, Key: []byte("some-key"), Value: []byte("some-value")}
// Pass in the context from the tracer.Start() call to ensure that the span
// created is linked to the parent span.
client.Produce(ctx, record, func(_ *kgo.Record, err error) {
defer wg.Done()
if err != nil {
fmt.Printf("record had a produce error: %v\n", err)
// Set the status and record error on the span.
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
}
})
wg.Wait()
return nil
}

func newConsumerClient(kotelService *kotel.Kotel) (*kgo.Client, error) {
// Create options for the new client.
opts := []kgo.Opt{
kgo.SeedBrokers(strings.Split(*seedBrokers, ",")...),
// Add hooks from kotel service.
kgo.WithHooks(kotelService.Hooks()...),
kgo.ConsumeTopics(*topic),
}
return kgo.NewClient(opts...)
}

func consumeMessages(client *kgo.Client, tracer *kotel.Tracer) error {
fetches := client.PollFetches(context.Background())
if errs := fetches.Errors(); len(errs) > 0 {
return fmt.Errorf("%v", errs)
}

iter := fetches.RecordIter()
for !iter.Done() {
record := iter.Next()
processRecord(record, tracer)
}
return nil
}

func processRecord(record *kgo.Record, tracer *kotel.Tracer) {
_, span := tracer.WithProcessSpan(record)
// Simulate some work.
time.Sleep(1 * time.Second)
// End the span when function exits.
defer span.End()
fmt.Printf(
"processed offset '%s' with key '%s' and value '%s'\n",
strconv.FormatInt(record.Offset, 10),
string(record.Key),
string(record.Value),
)
}

func do() error {
// Initialize tracer provider and handle shutdown.
tracerProvider, err := initTracerProvider()
if err != nil {
return fmt.Errorf("failed to initialize tracer provider: %w", err)
}
defer func() {
if err := tracerProvider.Shutdown(context.Background()); err != nil {
log.Printf("Error shutting down tracer provider: %v", err)
}
}()

// Initialize meter provider and handle shutdown.
meterProvider, err := initMeterProvider()
if err != nil {
return fmt.Errorf("failed to initialize meter provider: %w", err)
}
defer func() {
if err := meterProvider.Shutdown(context.Background()); err != nil {
log.Printf("Error shutting down meter provider: %v", err)
}
}()

// Create a new kotel tracer and meter.
kotelTracer := newKotelTracer(tracerProvider)
kotelMeter := newKotelMeter(meterProvider)

// Create a new kotel service.
kotelService := newKotel(kotelTracer, kotelMeter)

// Initialize producer client and handle close.
producerClient, err := newProducerClient(kotelService)
if err != nil {
return fmt.Errorf("unable to create producer client: %w", err)
}
defer producerClient.Close()

// Create request tracer and produce message.
requestTracer := tracerProvider.Tracer("request-service")
if err := produceMessage(producerClient, requestTracer); err != nil {
return fmt.Errorf("failed to produce message: %w", err)
}

// Initialize consumer client and handle close.
consumerClient, err := newConsumerClient(kotelService)
if err != nil {
return fmt.Errorf("unable to create consumer client: %w", err)
}
defer consumerClient.Close()

// Pass in the kotel tracer and consume messages in a loop.
for {
if err := consumeMessages(consumerClient, kotelTracer); err != nil {
return fmt.Errorf("failed to consume messages: %w", err)
}
}
}

func main() {
flag.Parse()

if err := do(); err != nil {
log.Fatal(err)
}
}
2 changes: 1 addition & 1 deletion pkg/kgo/record_and_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ type Record struct {
// Context is an optional field that is used for enriching records.
//
// If this field is nil when producing, it is set to the Produce ctx
// arg. This field can be used to propagate record encrichment across
// arg. This field can be used to propagate record enrichment across
// producer hooks. It can also be set in a consumer hook to propagate
// enrichment to consumer clients.
Context context.Context
Expand Down
Loading

0 comments on commit 27f24f2

Please sign in to comment.