Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion connector/servicegraphconnector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type Config struct {
// MetricsFlushInterval is the interval at which metrics are flushed to the exporter.
// If set to 0, metrics are flushed on every received batch of traces.
// Default is 60s if unset.
MetricsFlushInterval *time.Duration `mapstructure:"metrics_flush_interval"`
MetricsFlushInterval time.Duration `mapstructure:"metrics_flush_interval"`

// DatabaseNameAttributes is the attribute name list of attributes need to match used to identify the database name from span attributes, the higher the front, the higher the priority.
// The default value is {"db.name"}.
Expand Down Expand Up @@ -78,5 +78,13 @@ func (c *Config) Validate() error {
return errors.New("use either `latency_histogram_buckets` or `exponential_histogram_max_size`")
}

if c.StoreExpirationLoop <= 0 {
return errors.New("`store_expiration_loop` must be positive")
}

if c.CacheLoop <= 0 {
return errors.New("`cache_loop` must be positive")
}
Comment on lines +81 to +87

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not backwards compatible. Should we log a warning instead?


return nil
}
1 change: 0 additions & 1 deletion connector/servicegraphconnector/config.schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ properties:
format: duration
metrics_flush_interval:
description: MetricsFlushInterval is the interval at which metrics are flushed to the exporter. If set to 0, metrics are flushed on every received batch of traces. Default is 60s if unset.
x-pointer: true
type: string
format: duration
metrics_timestamp_offset:
Expand Down
8 changes: 5 additions & 3 deletions connector/servicegraphconnector/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ func TestLoadConfig(t *testing.T) {
TTL: time.Second,
MaxItems: 10,
},
CacheLoop: time.Minute,
StoreExpirationLoop: 2 * time.Second,
DatabaseNameAttributes: []string{"db.name"},
CacheLoop: time.Minute,
StoreExpirationLoop: 2 * time.Second,
VirtualNodePeerAttributes: []string{"peer.service", "db.name", "db.system"},
MetricsFlushInterval: 60 * time.Second,
DatabaseNameAttributes: []string{"db.name"},
},
cfg.Connectors[component.NewID(metadata.Type)],
)
Expand Down
58 changes: 17 additions & 41 deletions connector/servicegraphconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor"
conventionsv125 "go.opentelemetry.io/otel/semconv/v1.25.0"
conventionsv128 "go.opentelemetry.io/otel/semconv/v1.28.0"
conventions "go.opentelemetry.io/otel/semconv/v1.38.0"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/connector/servicegraphconnector/internal/metadata"
Expand All @@ -41,19 +38,12 @@ const (

var (
legacyDefaultLatencyHistogramBuckets = []float64{
2, 4, 6, 8, 10, 50, 100, 200, 400, 800, 1000, 1400, 2000, 5000, 10_000, 15_000,
2, 4, 6, 8, 10, 50, 100, 200, 400, 800, 1000, 1400, 2000, 5000, 10000, 15000,
}

defaultLatencyHistogramBuckets = []float64{
0.002, 0.004, 0.006, 0.008, 0.01, 0.05, 0.1, 0.2, 0.4, 0.8, 1, 1.4, 2, 5, 10, 15,
}

defaultPeerAttributes = []string{
string(conventions.PeerServiceKey), string(conventionsv125.DBNameKey), string(conventionsv128.DBSystemKey),
}

defaultDatabaseNameAttributes = []string{string(conventionsv125.DBNameKey)}
Comment on lines -50 to -54

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI in factory.go this has been changed to only use semconv v1.25.0 and the keys remain the same, so this is backwards compatible.


defaultMetricsFlushInterval = 60 * time.Second // 1 DPM
)

type metricSeries struct {
Expand Down Expand Up @@ -96,6 +86,16 @@ type serviceGraphConnector struct {
func newConnector(set component.TelemetrySettings, config component.Config, next consumer.Metrics) (*serviceGraphConnector, error) {
pConfig := config.(*Config)

if pConfig.MetricsFlushInterval.Nanoseconds() <= 0 {
set.Logger.Warn("MetricsFlushInterval is set to 0, metrics will be flushed on every received batch of traces")
}

telemetryBuilder, err := metadata.NewTelemetryBuilder(set)
if err != nil {
return nil, err
}

// Compute histogram bounds based on configuration
var bounds []float64
if pConfig.ExponentialHistogramMaxSize == 0 {
bounds = defaultLatencyHistogramBuckets
Expand All @@ -107,33 +107,6 @@ func newConnector(set component.TelemetrySettings, config component.Config, next
}
}

if pConfig.CacheLoop <= 0 {
pConfig.CacheLoop = time.Minute
}

if pConfig.StoreExpirationLoop <= 0 {
pConfig.StoreExpirationLoop = 2 * time.Second
}

if pConfig.VirtualNodePeerAttributes == nil {
pConfig.VirtualNodePeerAttributes = defaultPeerAttributes
}

if len(pConfig.DatabaseNameAttributes) == 0 {
pConfig.DatabaseNameAttributes = defaultDatabaseNameAttributes
}

if pConfig.MetricsFlushInterval == nil {
pConfig.MetricsFlushInterval = &defaultMetricsFlushInterval
} else if pConfig.MetricsFlushInterval.Nanoseconds() <= 0 {
set.Logger.Warn("MetricsFlushInterval is set to 0, metrics will be flushed on every received batch of traces")
}

telemetryBuilder, err := metadata.NewTelemetryBuilder(set)
if err != nil {
return nil, err
}

return &serviceGraphConnector{
config: pConfig,
logger: set.Logger,
Expand All @@ -160,7 +133,7 @@ func newConnector(set component.TelemetrySettings, config component.Config, next
func (p *serviceGraphConnector) Start(context.Context, component.Host) error {
p.store = store.NewStore(p.config.Store.TTL, p.config.Store.MaxItems, p.onComplete, p.onExpire)

go p.metricFlushLoop(*p.config.MetricsFlushInterval)
go p.metricFlushLoop(p.config.MetricsFlushInterval)

go p.cacheLoop(p.config.CacheLoop)

Expand Down Expand Up @@ -221,7 +194,7 @@ func (p *serviceGraphConnector) ConsumeTraces(ctx context.Context, td ptrace.Tra
}

// If metricsFlushInterval is not set, flush metrics immediately.
if *p.config.MetricsFlushInterval <= 0 {
if p.config.MetricsFlushInterval <= 0 {
if err := p.flushMetrics(ctx); err != nil {
// Not return error here to avoid impacting traces.
p.logger.Error("failed to flush metrics", zap.Error(err))
Expand Down Expand Up @@ -792,6 +765,9 @@ func durationToFloat(d time.Duration) float64 {
}

func mapDurationsToFloat(vs []time.Duration) []float64 {
if vs == nil {
return nil
}
vsm := make([]float64, len(vs))
for i, v := range vs {
vsm[i] = durationToFloat(v)
Expand Down
Loading
Loading