Skip to content

Commit

Permalink
Add Cassandra OTEL exporter
Browse files Browse the repository at this point in the history
Signed-off-by: Pavol Loffay <[email protected]>
  • Loading branch information
pavolloffay committed Mar 25, 2020
1 parent e124801 commit b631669
Show file tree
Hide file tree
Showing 21 changed files with 487 additions and 141 deletions.
8 changes: 8 additions & 0 deletions cmd/opentelemetry-collector/app/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"github.com/open-telemetry/opentelemetry-collector/defaults"
"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka"
storageCassandra "github.com/jaegertracing/jaeger/plugin/storage/cassandra"
storageKafka "github.com/jaegertracing/jaeger/plugin/storage/kafka"
)

Expand All @@ -30,8 +32,14 @@ func Components(v *viper.Viper) config.Factories {
opts.InitFromViper(v)
return opts
}}
cassandraExp := cassandra.Factory{OptionsFactory: func() *storageCassandra.Options {
opts := cassandra.DefaultOptions()
opts.InitFromViper(v)
return opts
}}

factories, _ := defaults.Components()
factories.Exporters[kafkaExp.Type()] = kafkaExp
factories.Exporters[cassandraExp.Type()] = cassandraExp
return factories
}
8 changes: 7 additions & 1 deletion cmd/opentelemetry-collector/app/defaults/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,22 @@ import (

"github.com/magiconair/properties/assert"

"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka"
jConfig "github.com/jaegertracing/jaeger/pkg/config"
)

func TestComponents(t *testing.T) {
v, _ := jConfig.Viperize(kafka.DefaultOptions().AddFlags)
v, _ := jConfig.Viperize(kafka.DefaultOptions().AddFlags, cassandra.DefaultOptions().AddFlags)
factories := Components(v)
assert.Equal(t, "jaeger_kafka", factories.Exporters[kafka.TypeStr].Type())
assert.Equal(t, "jaeger_cassandra", factories.Exporters[cassandra.TypeStr].Type())

kafkaFactory := factories.Exporters[kafka.TypeStr]
kc := kafkaFactory.CreateDefaultConfig().(*kafka.Config)
assert.Equal(t, []string{"127.0.0.1:9092"}, kc.Config.Brokers)

cassandraFactory := factories.Exporters[cassandra.TypeStr]
cc := cassandraFactory.CreateDefaultConfig().(*cassandra.Config)
assert.Equal(t, []string{"127.0.0.1"}, cc.Options.GetPrimary().Servers)
}
27 changes: 27 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/cassandra/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cassandra

import (
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"

"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
)

// Config holds configuration of Jaeger Cassandra exporter/storage.
type Config struct {
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
cassandra.Options `mapstructure:",squash"`
}
88 changes: 88 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/cassandra/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cassandra

import (
"path"
"testing"
"time"

"github.com/open-telemetry/opentelemetry-collector/config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger/cmd/flags"
jConfig "github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
)

func TestDefaultConfig(t *testing.T) {
factory := &Factory{OptionsFactory: func() *cassandra.Options {
v, _ := jConfig.Viperize(DefaultOptions().AddFlags)
opts := DefaultOptions()
opts.InitFromViper(v)
return opts
}}
defaultCfg := factory.CreateDefaultConfig().(*Config)
assert.Equal(t, []string{"127.0.0.1"}, defaultCfg.Options.GetPrimary().Servers)
assert.Equal(t, []string{"127.0.0.1"}, defaultCfg.Options.Primary.Servers)
assert.Equal(t, 2, defaultCfg.Primary.ConnectionsPerHost)
assert.Equal(t, "jaeger_v1_test", defaultCfg.Primary.Keyspace)
assert.Equal(t, 3, defaultCfg.Primary.MaxRetryAttempts)
assert.Equal(t, 4, defaultCfg.Primary.ProtoVersion)
assert.Equal(t, time.Minute, defaultCfg.Primary.ReconnectInterval)
assert.Equal(t, time.Hour*12, defaultCfg.SpanStoreWriteCacheTTL)
assert.Equal(t, true, defaultCfg.Index.Tags)
assert.Equal(t, true, defaultCfg.Index.Logs)
assert.Equal(t, true, defaultCfg.Index.ProcessTags)
}

func TestLoadConfigAndFlags(t *testing.T) {
factories, err := config.ExampleComponents()
require.NoError(t, err)

v, c := jConfig.Viperize(DefaultOptions().AddFlags, flags.AddConfigFileFlag)
err = c.ParseFlags([]string{"--cassandra.servers=bar", "--cassandra.port=9000", "--config-file=./testdata/jaeger-config.yaml"})
require.NoError(t, err)

err = flags.TryLoadConfigFile(v)
require.NoError(t, err)

factory := &Factory{OptionsFactory: func() *cassandra.Options {
opts := DefaultOptions()
opts.InitFromViper(v)
require.Equal(t, []string{"bar"}, opts.GetPrimary().Servers)
return opts
}}

factories.Exporters[TypeStr] = factory
colConfig, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
require.NoError(t, err)
require.NotNil(t, colConfig)

cfg := colConfig.Exporters[TypeStr].(*Config)
assert.Equal(t, TypeStr, cfg.Name())
assert.Equal(t, []string{"first", "second"}, cfg.Primary.Servers)
assert.Equal(t, 9000, cfg.Primary.Port)
assert.Equal(t, false, cfg.Index.Tags)
assert.Equal(t, "my-keyspace", cfg.Primary.Keyspace)
assert.Equal(t, false, cfg.Index.Tags)
assert.Equal(t, true, cfg.Index.Logs)
assert.Equal(t, "user", cfg.Primary.Authenticator.Basic.Username)
assert.Equal(t, "pass", cfg.Primary.Authenticator.Basic.Password)
assert.Equal(t, time.Second*12, cfg.SpanStoreWriteCacheTTL)
assert.Equal(t, true, cfg.Primary.TLS.Enabled)
assert.Equal(t, "/foo/bar", cfg.Primary.TLS.CAPath)
}
16 changes: 16 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/cassandra/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package cassandra implements Jaeger Cassandra storage as OpenTelemetry exporter.
package cassandra
36 changes: 36 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/cassandra/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cassandra

import (
"github.com/open-telemetry/opentelemetry-collector/exporter"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

storageOtelExporter "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
)

// New creates Cassandra exporter/storage
func New(config *Config, log *zap.Logger) (exporter.TraceExporter, error) {
f := cassandra.NewFactory()
f.InitFromOptions(&config.Options)

err := f.Initialize(metrics.NullFactory, log)
if err != nil {
return nil, err
}
return storageOtelExporter.NewSpanWriterExporter(config, f)
}
75 changes: 75 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/cassandra/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cassandra

import (
"fmt"

"github.com/open-telemetry/opentelemetry-collector/config/configerror"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/exporter"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
)

const (
// TypeStr defines type of the Cassandra exporter.
TypeStr = "jaeger_cassandra"
)

// OptionsFactory returns initialized cassandra.OptionsFactory structure.
type OptionsFactory func() *cassandra.Options

// DefaultOptions creates Cassandra options supported by this exporter.
func DefaultOptions() *cassandra.Options {
return cassandra.NewOptions("cassandra")
}

// Factory is the factory for Jaeger Cassandra exporter.
type Factory struct {
OptionsFactory OptionsFactory
}

// Type gets the type of exporter.
func (Factory) Type() string {
return TypeStr
}

// CreateDefaultConfig returns default configuration of Factory.
func (f Factory) CreateDefaultConfig() configmodels.Exporter {
opts := f.OptionsFactory()
return &Config{
Options: *opts,
ExporterSettings: configmodels.ExporterSettings{
TypeVal: TypeStr,
NameVal: TypeStr,
},
}
}

// CreateTraceExporter creates Jaeger Cassandra trace exporter.
func (Factory) CreateTraceExporter(log *zap.Logger, cfg configmodels.Exporter) (exporter.TraceExporter, error) {
config, ok := cfg.(*Config)
if !ok {
return nil, fmt.Errorf("could not cast configuration to %s", TypeStr)
}
return New(config, log)
}

// CreateMetricsExporter is not implemented.
func (Factory) CreateMetricsExporter(*zap.Logger, configmodels.Exporter) (exporter.MetricsExporter, error) {
return nil, configerror.ErrDataTypeIsNotSupported
}
66 changes: 66 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/cassandra/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cassandra

import (
"testing"

"github.com/open-telemetry/opentelemetry-collector/config/configcheck"
"github.com/open-telemetry/opentelemetry-collector/config/configerror"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

jConfig "github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
)

func TestCreateTraceExporter(t *testing.T) {
v, _ := jConfig.Viperize(DefaultOptions().AddFlags)
opts := DefaultOptions()
opts.InitFromViper(v)
factory := Factory{OptionsFactory: func() *cassandra.Options {
return opts
}}
exporter, err := factory.CreateTraceExporter(zap.NewNop(), factory.CreateDefaultConfig())
require.Nil(t, exporter)
assert.EqualError(t, err, "gocql: unable to create session: control: unable to connect to initial hosts: dial tcp 127.0.0.1:9042: connect: connection refused")
}

func TestCreateTraceExporter_NilConfig(t *testing.T) {
factory := Factory{}
exporter, err := factory.CreateTraceExporter(zap.NewNop(), nil)
require.Nil(t, exporter)
assert.EqualError(t, err, "could not cast configuration to jaeger_cassandra")
}

func TestCreateDefaultConfig(t *testing.T) {
factory := Factory{OptionsFactory: DefaultOptions}
cfg := factory.CreateDefaultConfig()
assert.NotNil(t, cfg, "failed to create default config")
assert.NoError(t, configcheck.ValidateConfig(cfg))
}

func TestCreateMetricsExporter(t *testing.T) {
f := Factory{OptionsFactory: DefaultOptions}
mReceiver, err := f.CreateMetricsExporter(zap.NewNop(), f.CreateDefaultConfig())
assert.Equal(t, err, configerror.ErrDataTypeIsNotSupported)
assert.Nil(t, mReceiver)
}

func TestType(t *testing.T) {
factory := Factory{}
assert.Equal(t, TypeStr, factory.Type())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
receivers:
examplereceiver:

processors:
exampleprocessor:

exporters:
jaeger_cassandra:
servers: "first,second"
index:
tags: false
username: user
password: pass
span_store_write_cache_ttl: 12s
tls:
enabled: true
ca: /foo/bar

service:
pipelines:
traces:
receivers: [examplereceiver]
processors: [exampleprocessor]
exporters: [jaeger_cassandra]
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
cassandra:
keyspace: my-keyspace
Loading

0 comments on commit b631669

Please sign in to comment.