diff --git a/.chloggen/exporter-tinybird-logs-implementation.yaml b/.chloggen/exporter-tinybird-logs-implementation.yaml new file mode 100644 index 0000000000000..d461187d55092 --- /dev/null +++ b/.chloggen/exporter-tinybird-logs-implementation.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: new_component + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: tinybird + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Implement logs propagation for Tinybird exporter + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [40475] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/tinybirdexporter/config.go b/exporter/tinybirdexporter/config.go index 98a3616178b41..1d89060b8cd02 100644 --- a/exporter/tinybirdexporter/config.go +++ b/exporter/tinybirdexporter/config.go @@ -4,12 +4,16 @@ package tinybirdexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/tinybirdexporter" import ( + "errors" "fmt" "net/url" "regexp" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configopaque" + "go.opentelemetry.io/collector/config/configretry" + "go.opentelemetry.io/collector/exporter/exporterhelper" ) var datasourceRegex = regexp.MustCompile(`^[\w_]+$`) @@ -21,6 +25,9 @@ type SignalConfig struct { } func (cfg SignalConfig) Validate() error { + if cfg.Datasource == "" { + return errors.New("datasource cannot be empty") + } if !datasourceRegex.MatchString(cfg.Datasource) { return fmt.Errorf("invalid datasource %q: only letters, numbers, and underscores are allowed", cfg.Datasource) } @@ -29,32 +36,38 @@ func (cfg SignalConfig) Validate() error { // Config defines configuration for the Tinybird exporter. type Config struct { - Endpoint string `mapstructure:"endpoint"` - Token configopaque.String `mapstructure:"token"` - Metrics SignalConfig `mapstructure:"metrics"` - Traces SignalConfig `mapstructure:"traces"` - Logs SignalConfig `mapstructure:"logs"` + ClientConfig confighttp.ClientConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. + RetryConfig configretry.BackOffConfig `mapstructure:"retry_on_failure"` + QueueConfig exporterhelper.QueueBatchConfig `mapstructure:"sending_queue"` + + // Tinybird API token. + Token configopaque.String `mapstructure:"token"` + Metrics SignalConfig `mapstructure:"metrics"` + Traces SignalConfig `mapstructure:"traces"` + Logs SignalConfig `mapstructure:"logs"` + // Wait for data to be ingested before returning a response. + Wait bool `mapstructure:"wait"` } var _ component.Config = (*Config)(nil) // Validate checks if the exporter configuration is valid func (cfg *Config) Validate() error { - if cfg.Token == "" { - return errMissingToken - } - if cfg.Endpoint == "" { + if cfg.ClientConfig.Endpoint == "" { return errMissingEndpoint } - u, err := url.Parse(cfg.Endpoint) + u, err := url.Parse(cfg.ClientConfig.Endpoint) if err != nil { return fmt.Errorf("endpoint must be a valid URL: %w", err) } if u.Scheme != "http" && u.Scheme != "https" { - return fmt.Errorf("endpoint must have http or https scheme: %q", cfg.Endpoint) + return fmt.Errorf("endpoint must have http or https scheme: %q", cfg.ClientConfig.Endpoint) } if u.Host == "" { - return fmt.Errorf("endpoint must have a host: %q", cfg.Endpoint) + return fmt.Errorf("endpoint must have a host: %q", cfg.ClientConfig.Endpoint) + } + if cfg.Token == "" { + return errMissingToken } return nil } diff --git a/exporter/tinybirdexporter/config_test.go b/exporter/tinybirdexporter/config_test.go index e6d1e1267a743..99a0db6b5233c 100644 --- a/exporter/tinybirdexporter/config_test.go +++ b/exporter/tinybirdexporter/config_test.go @@ -10,8 +10,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configcompression" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/confmap/confmaptest" "go.opentelemetry.io/collector/confmap/xconfmap" + "go.opentelemetry.io/collector/exporter/exporterhelper" ) func TestLoadConfig(t *testing.T) { @@ -30,22 +34,44 @@ func TestLoadConfig(t *testing.T) { id: component.NewIDWithName(component.MustNewType(typeStr), ""), subName: "tinybird", expected: &Config{ - Endpoint: "https://api.tinybird.co", - Token: "test-token", - Metrics: SignalConfig{Datasource: "metrics"}, - Traces: SignalConfig{Datasource: "traces"}, - Logs: SignalConfig{Datasource: "logs"}, + ClientConfig: func() confighttp.ClientConfig { + cfg := createDefaultConfig().(*Config).ClientConfig + cfg.Endpoint = "https://api.tinybird.co" + return cfg + }(), + RetryConfig: configretry.NewDefaultBackOffConfig(), + QueueConfig: exporterhelper.NewDefaultQueueConfig(), + Token: "test-token", + Metrics: SignalConfig{Datasource: "metrics"}, + Traces: SignalConfig{Datasource: "traces"}, + Logs: SignalConfig{Datasource: "logs"}, }, }, { id: component.NewIDWithName(component.MustNewType(typeStr), "full"), subName: "tinybird/full", expected: &Config{ - Endpoint: "https://api.tinybird.co", - Token: "test-token", - Metrics: SignalConfig{Datasource: "metrics"}, - Traces: SignalConfig{Datasource: "traces"}, - Logs: SignalConfig{Datasource: "logs"}, + ClientConfig: func() confighttp.ClientConfig { + cfg := createDefaultConfig().(*Config).ClientConfig + cfg.Endpoint = "https://api.tinybird.co" + cfg.Compression = configcompression.TypeZstd + return cfg + }(), + RetryConfig: func() configretry.BackOffConfig { + cfg := createDefaultConfig().(*Config).RetryConfig + cfg.Enabled = false + return cfg + }(), + QueueConfig: func() exporterhelper.QueueBatchConfig { + cfg := createDefaultConfig().(*Config).QueueConfig + cfg.Enabled = false + return cfg + }(), + Token: "test-token", + Metrics: SignalConfig{Datasource: "metrics"}, + Traces: SignalConfig{Datasource: "traces"}, + Logs: SignalConfig{Datasource: "logs"}, + Wait: true, }, }, { diff --git a/exporter/tinybirdexporter/exporter.go b/exporter/tinybirdexporter/exporter.go index 638a7fbc30e61..9195801152015 100644 --- a/exporter/tinybirdexporter/exporter.go +++ b/exporter/tinybirdexporter/exporter.go @@ -4,24 +4,60 @@ package tinybirdexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/tinybirdexporter" import ( + "bytes" "context" + "encoding/json" "errors" + "fmt" + "io" + "net/http" + "runtime" + "strconv" + "time" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/tinybirdexporter/internal" ) -type tinybirdExporter struct{} +const ( + headerRetryAfter = "Retry-After" + contentTypeNDJSON = "application/x-ndjson" +) -func newExporter(_ component.Config, _ exporter.Settings) (*tinybirdExporter, error) { - return &tinybirdExporter{}, nil +type tinybirdExporter struct { + config *Config + client *http.Client + logger *zap.Logger + settings component.TelemetrySettings + userAgent string } -func (e *tinybirdExporter) start(_ context.Context, _ component.Host) error { - return nil +func newExporter(cfg component.Config, set exporter.Settings) *tinybirdExporter { + oCfg := cfg.(*Config) + + userAgent := fmt.Sprintf("%s/%s (%s/%s)", + set.BuildInfo.Description, set.BuildInfo.Version, runtime.GOOS, runtime.GOARCH) + + return &tinybirdExporter{ + config: oCfg, + logger: set.Logger, + userAgent: userAgent, + settings: set.TelemetrySettings, + } +} + +func (e *tinybirdExporter) start(ctx context.Context, host component.Host) error { + var err error + e.client, err = e.config.ClientConfig.ToClient(ctx, host, e.settings) + return err } func (e *tinybirdExporter) pushTraces(_ context.Context, _ ptrace.Traces) error { @@ -32,6 +68,103 @@ func (e *tinybirdExporter) pushMetrics(_ context.Context, _ pmetric.Metrics) err return errors.New("this component is under development and metrics are not yet supported, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/40475 to track development progress") } -func (e *tinybirdExporter) pushLogs(_ context.Context, _ plog.Logs) error { - return errors.New("this component is under development and logs are not yet supported, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/40475 to track development progress") +func (e *tinybirdExporter) pushLogs(ctx context.Context, ld plog.Logs) error { + buffer := bytes.NewBuffer(nil) + encoder := json.NewEncoder(buffer) + err := internal.ConvertLogs(ld, encoder) + if err != nil { + return consumererror.NewPermanent(err) + } + + if buffer.Len() > 0 { + return e.export(ctx, e.config.Logs.Datasource, buffer) + } + return nil +} + +func (e *tinybirdExporter) export(ctx context.Context, dataSource string, buffer *bytes.Buffer) error { + // Create request and add query parameters + url := e.config.ClientConfig.Endpoint + "/v0/events" + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, buffer) + if err != nil { + return consumererror.NewPermanent(err) + } + q := req.URL.Query() + q.Set("name", dataSource) + if e.config.Wait { + q.Set("wait", "true") + } + req.URL.RawQuery = q.Encode() + + // Set headers + req.Header.Set("Content-Type", contentTypeNDJSON) + req.Header.Set("Authorization", "Bearer "+string(e.config.Token)) + req.Header.Set("User-Agent", e.userAgent) + + // Send request + resp, err := e.client.Do(req) + if err != nil { + return err + } + defer func() { + // Drain the response body to avoid leaking resources. + _, _ = io.Copy(io.Discard, resp.Body) + resp.Body.Close() + }() + + // Check if the request was successful. + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + return nil + } + + // Read error response + respBody, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed to read response body: %w", err) + } + formattedErr := fmt.Errorf("error exporting items, request to %s responded with HTTP Status Code %d, Message=%s", + url, resp.StatusCode, string(respBody)) + + // If the status code is not retryable, return a permanent error. + if !isRetryableStatusCode(resp.StatusCode) { + return consumererror.NewPermanent(formattedErr) + } + + // Check if the server is overwhelmed. + isThrottleError := resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode == http.StatusServiceUnavailable + if isThrottleError { + values := resp.Header.Values(headerRetryAfter) + if len(values) == 0 { + return formattedErr + } + // The value of Retry-After field can be either an HTTP-date or a number of + // seconds to delay after the response is received. See https://datatracker.ietf.org/doc/html/rfc7231#section-7.1.3 + // + // Tinybird Events API returns the delay-seconds in the Retry-After header. + // https://www.tinybird.co/docs/forward/get-data-in/events-api#rate-limit-headers + if seconds, err := strconv.Atoi(values[0]); err == nil { + return exporterhelper.NewThrottleRetry(formattedErr, time.Duration(seconds)*time.Second) + } + } + + return formattedErr +} + +// Determine if the status code is retryable according to Tinybird Events API. +// See https://www.tinybird.co/docs/api-reference/events-api#return-http-status-codes +func isRetryableStatusCode(code int) bool { + switch code { + case http.StatusTooManyRequests: + return true + case http.StatusInternalServerError: + return true + case http.StatusBadGateway: + return true + case http.StatusServiceUnavailable: + return true + case http.StatusGatewayTimeout: + return true + default: + return false + } } diff --git a/exporter/tinybirdexporter/exporter_test.go b/exporter/tinybirdexporter/exporter_test.go new file mode 100644 index 0000000000000..44d88d9a7790f --- /dev/null +++ b/exporter/tinybirdexporter/exporter_test.go @@ -0,0 +1,241 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package tinybirdexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/tinybirdexporter" + +import ( + "context" + "io" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/tinybirdexporter/internal/metadata" +) + +func TestNewExporter(t *testing.T) { + tests := []struct { + name string + config *Config + }{ + { + name: "build exporter", + config: &Config{ + ClientConfig: confighttp.ClientConfig{ + Endpoint: "http://localhost:8080", + }, + Token: "test-token", + Metrics: SignalConfig{Datasource: "metrics_test"}, + Traces: SignalConfig{Datasource: "traces_test"}, + Logs: SignalConfig{Datasource: "logs_test"}, + Wait: true, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + exp := newExporter(tt.config, exportertest.NewNopSettings(metadata.Type)) + assert.NotNil(t, exp) + }) + } +} + +func TestExportLogs(t *testing.T) { + type args struct { + logs plog.Logs + config Config + } + type want struct { + requestQuery string + requestBody string + responseStatus int + err error + } + tests := []struct { + name string + args args + want want + }{ + { + name: "export without logs", + args: args{ + logs: func() plog.Logs { + logs := plog.NewLogs() + rl := logs.ResourceLogs().AppendEmpty() + rl.ScopeLogs().AppendEmpty() + return logs + }(), + config: Config{ + ClientConfig: confighttp.ClientConfig{}, + Token: "test-token", + Logs: SignalConfig{Datasource: "logs_test"}, + Wait: false, + }, + }, + want: want{ + requestQuery: "name=logs_test", + requestBody: "", + responseStatus: http.StatusOK, + err: nil, + }, + }, + { + name: "export with full log", + args: args{ + logs: func() plog.Logs { + logs := plog.NewLogs() + rl := logs.ResourceLogs().AppendEmpty() + rl.SetSchemaUrl("https://opentelemetry.io/schemas/1.20.0") + resource := rl.Resource() + resource.Attributes().PutStr("service.name", "test-service") + resource.Attributes().PutStr("environment", "production") + + sl := rl.ScopeLogs().AppendEmpty() + sl.SetSchemaUrl("https://opentelemetry.io/schemas/1.20.0") + scope := sl.Scope() + scope.SetName("test-scope") + scope.SetVersion("1.0.0") + scope.Attributes().PutStr("telemetry.sdk.name", "opentelemetry") + + log := sl.LogRecords().AppendEmpty() + log.Body().SetStr("User login attempt") + log.Attributes().PutStr("http.method", "POST") + log.Attributes().PutStr("http.url", "/api/login") + log.Attributes().PutStr("user.id", "12345") + log.SetTimestamp(pcommon.Timestamp(1719158401000000000)) // 2024-06-23T16:00:01Z + log.SetSeverityText("INFO") + log.SetSeverityNumber(plog.SeverityNumberInfo) + log.SetTraceID(pcommon.TraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})) + log.SetSpanID(pcommon.SpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8})) + log.SetFlags(plog.LogRecordFlags(1)) + return logs + }(), + config: Config{ + ClientConfig: confighttp.ClientConfig{}, + Token: "test-token", + Logs: SignalConfig{Datasource: "logs_test"}, + Wait: false, + }, + }, + want: want{ + requestQuery: "name=logs_test", + requestBody: `{"resource_schema_url":"https://opentelemetry.io/schemas/1.20.0","resource_attributes":{"service.name":"test-service","environment":"production"},"service_name":"test-service","scope_name":"test-scope","scope_version":"1.0.0","scope_schema_url":"https://opentelemetry.io/schemas/1.20.0","scope_attributes":{"telemetry.sdk.name":"opentelemetry"},"body":"User login attempt","log_attributes":{"http.method":"POST","http.url":"/api/login","user.id":"12345"},"timestamp":"2024-06-23T16:00:01Z","severity_text":"INFO","severity_number":9,"trace_id":"0102030405060708090a0b0c0d0e0f10","span_id":"0102030405060708","flags":1}`, + responseStatus: http.StatusOK, + err: nil, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, "POST", r.Method) + assert.Equal(t, "/v0/events", r.URL.Path) + assert.Equal(t, tt.want.requestQuery, r.URL.RawQuery) + assert.Equal(t, "application/x-ndjson", r.Header.Get("Content-Type")) + assert.Equal(t, "Bearer "+string(tt.args.config.Token), r.Header.Get("Authorization")) + gotBody, err := io.ReadAll(r.Body) + assert.NoError(t, err) + assert.JSONEq(t, tt.want.requestBody, string(gotBody)) + + w.WriteHeader(tt.want.responseStatus) + })) + defer server.Close() + + tt.args.config.ClientConfig.Endpoint = server.URL + + exp := newExporter(&tt.args.config, exportertest.NewNopSettings(metadata.Type)) + require.NoError(t, exp.start(context.Background(), componenttest.NewNopHost())) + + err := exp.pushLogs(context.Background(), tt.args.logs) + if tt.want.err != nil { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestExportErrorHandling(t *testing.T) { + tests := []struct { + name string + responseStatus int + responseBody string + headers map[string]string + wantErr bool + }{ + { + name: "success", + responseStatus: http.StatusOK, + wantErr: false, + }, + { + name: "throttled", + responseStatus: http.StatusTooManyRequests, + headers: map[string]string{"Retry-After": "30"}, + wantErr: true, + }, + { + name: "service unavailable", + responseStatus: http.StatusServiceUnavailable, + wantErr: true, + }, + { + name: "permanent error", + responseStatus: http.StatusBadRequest, + responseBody: "invalid request", + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + for k, v := range tt.headers { + w.Header().Set(k, v) + } + w.WriteHeader(tt.responseStatus) + if tt.responseBody != "" { + _, err := w.Write([]byte(tt.responseBody)) + assert.NoError(t, err) + } + })) + defer server.Close() + + config := &Config{ + ClientConfig: confighttp.ClientConfig{ + Endpoint: server.URL, + }, + Token: "test-token", + Metrics: SignalConfig{Datasource: "metrics_test"}, + Traces: SignalConfig{Datasource: "traces_test"}, + Logs: SignalConfig{Datasource: "logs_test"}, + } + + exp := newExporter(config, exportertest.NewNopSettings(metadata.Type)) + require.NoError(t, exp.start(context.Background(), componenttest.NewNopHost())) + + logs := plog.NewLogs() + rl := logs.ResourceLogs().AppendEmpty() + sl := rl.ScopeLogs().AppendEmpty() + lr := sl.LogRecords().AppendEmpty() + lr.Body().SetStr("test-log") + err := exp.pushLogs(context.Background(), logs) + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} diff --git a/exporter/tinybirdexporter/factory.go b/exporter/tinybirdexporter/factory.go index 847e995d9e576..83cf2c488c6eb 100644 --- a/exporter/tinybirdexporter/factory.go +++ b/exporter/tinybirdexporter/factory.go @@ -5,8 +5,12 @@ package tinybirdexporter // import "github.com/open-telemetry/opentelemetry-coll import ( "context" + "time" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configcompression" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" @@ -31,12 +35,22 @@ func NewFactory() exporter.Factory { } func createDefaultConfig() component.Config { + clientConfig := confighttp.NewDefaultClientConfig() + clientConfig.Timeout = 30 * time.Second + // Default to zstd compression + clientConfig.Compression = configcompression.TypeZstd + // We almost read 0 bytes, so no need to tune ReadBufferSize. + clientConfig.WriteBufferSize = 512 * 1024 + return &Config{ - Endpoint: "", - Token: "", - Metrics: SignalConfig{Datasource: "metrics"}, - Traces: SignalConfig{Datasource: "traces"}, - Logs: SignalConfig{Datasource: "logs"}, + ClientConfig: clientConfig, + RetryConfig: configretry.NewDefaultBackOffConfig(), + QueueConfig: exporterhelper.NewDefaultQueueConfig(), + Token: "", + Metrics: SignalConfig{Datasource: "metrics"}, + Traces: SignalConfig{Datasource: "traces"}, + Logs: SignalConfig{Datasource: "logs"}, + Wait: false, } } @@ -45,10 +59,9 @@ func createTracesExporter( set exporter.Settings, cfg component.Config, ) (exporter.Traces, error) { - exp, err := newExporter(cfg, set) - if err != nil { - return nil, err - } + oCfg := cfg.(*Config) + + exp := newExporter(oCfg, set) return exporterhelper.NewTraces( ctx, set, @@ -56,6 +69,10 @@ func createTracesExporter( exp.pushTraces, exporterhelper.WithStart(exp.start), exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), + // explicitly disable since we rely on http.Client timeout logic. + exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}), + exporterhelper.WithRetry(oCfg.RetryConfig), + exporterhelper.WithQueue(oCfg.QueueConfig), ) } @@ -64,10 +81,9 @@ func createMetricsExporter( set exporter.Settings, cfg component.Config, ) (exporter.Metrics, error) { - exp, err := newExporter(cfg, set) - if err != nil { - return nil, err - } + oCfg := cfg.(*Config) + + exp := newExporter(oCfg, set) return exporterhelper.NewMetrics( ctx, set, @@ -75,6 +91,10 @@ func createMetricsExporter( exp.pushMetrics, exporterhelper.WithStart(exp.start), exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), + // explicitly disable since we rely on http.Client timeout logic. + exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}), + exporterhelper.WithRetry(oCfg.RetryConfig), + exporterhelper.WithQueue(oCfg.QueueConfig), ) } @@ -83,10 +103,9 @@ func createLogsExporter( set exporter.Settings, cfg component.Config, ) (exporter.Logs, error) { - exp, err := newExporter(cfg, set) - if err != nil { - return nil, err - } + oCfg := cfg.(*Config) + + exp := newExporter(oCfg, set) return exporterhelper.NewLogs( ctx, set, @@ -94,5 +113,9 @@ func createLogsExporter( exp.pushLogs, exporterhelper.WithStart(exp.start), exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), + // explicitly disable since we rely on http.Client timeout logic. + exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}), + exporterhelper.WithRetry(oCfg.RetryConfig), + exporterhelper.WithQueue(oCfg.QueueConfig), ) } diff --git a/exporter/tinybirdexporter/go.mod b/exporter/tinybirdexporter/go.mod index 9830f14a72eaf..bf69f7522d543 100644 --- a/exporter/tinybirdexporter/go.mod +++ b/exporter/tinybirdexporter/go.mod @@ -3,30 +3,43 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/exporter/tinybi go 1.23.0 require ( + github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.128.0 github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/component v1.35.1-0.20250707130321-ac9adbf016bf go.opentelemetry.io/collector/component/componenttest v0.129.1-0.20250707130321-ac9adbf016bf + go.opentelemetry.io/collector/config/configcompression v1.35.1-0.20250707130321-ac9adbf016bf + go.opentelemetry.io/collector/config/confighttp v0.129.1-0.20250707130321-ac9adbf016bf go.opentelemetry.io/collector/config/configopaque v1.35.1-0.20250707130321-ac9adbf016bf + go.opentelemetry.io/collector/config/configretry v1.35.1-0.20250707130321-ac9adbf016bf go.opentelemetry.io/collector/confmap v1.35.1-0.20250707130321-ac9adbf016bf go.opentelemetry.io/collector/confmap/xconfmap v0.129.1-0.20250707130321-ac9adbf016bf go.opentelemetry.io/collector/consumer v1.35.1-0.20250707130321-ac9adbf016bf + go.opentelemetry.io/collector/consumer/consumererror v0.129.1-0.20250707130321-ac9adbf016bf go.opentelemetry.io/collector/exporter v0.129.1-0.20250707130321-ac9adbf016bf go.opentelemetry.io/collector/exporter/exportertest v0.129.1-0.20250707130321-ac9adbf016bf go.opentelemetry.io/collector/pdata v1.35.1-0.20250707130321-ac9adbf016bf + go.opentelemetry.io/otel v1.37.0 go.uber.org/goleak v1.3.0 + go.uber.org/zap v1.27.0 ) require ( github.com/cenkalti/backoff/v5 v5.0.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/foxboron/go-tpm-keyfiles v0.0.0-20250323135004-b31fac66206e // indirect + github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-viper/mapstructure/v2 v2.3.0 // indirect github.com/gobwas/glob v0.2.3 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/snappy v1.0.0 // indirect + github.com/google/go-tpm v0.9.5 // indirect github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/go-version v1.7.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.18.0 // indirect github.com/knadh/koanf/maps v0.1.2 // indirect github.com/knadh/koanf/providers/confmap v1.0.0 // indirect github.com/knadh/koanf/v2 v2.2.1 // indirect @@ -34,15 +47,21 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/rs/cors v1.11.1 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/collector/client v1.35.1-0.20250707130321-ac9adbf016bf // indirect - go.opentelemetry.io/collector/config/configretry v1.35.1-0.20250707130321-ac9adbf016bf // indirect - go.opentelemetry.io/collector/consumer/consumererror v0.129.1-0.20250707130321-ac9adbf016bf // indirect + go.opentelemetry.io/collector/config/configauth v0.129.1-0.20250707130321-ac9adbf016bf // indirect + go.opentelemetry.io/collector/config/configmiddleware v0.129.1-0.20250707130321-ac9adbf016bf // indirect + go.opentelemetry.io/collector/config/configoptional v0.129.1-0.20250707130321-ac9adbf016bf // indirect + go.opentelemetry.io/collector/config/configtls v1.35.1-0.20250707130321-ac9adbf016bf // indirect go.opentelemetry.io/collector/consumer/consumertest v0.129.1-0.20250707130321-ac9adbf016bf // indirect go.opentelemetry.io/collector/consumer/xconsumer v0.129.1-0.20250707130321-ac9adbf016bf // indirect go.opentelemetry.io/collector/exporter/xexporter v0.129.1-0.20250707130321-ac9adbf016bf // indirect go.opentelemetry.io/collector/extension v1.35.1-0.20250707130321-ac9adbf016bf // indirect + go.opentelemetry.io/collector/extension/extensionauth v1.35.1-0.20250707130321-ac9adbf016bf // indirect + go.opentelemetry.io/collector/extension/extensionmiddleware v0.129.1-0.20250707130321-ac9adbf016bf // indirect go.opentelemetry.io/collector/extension/xextension v0.129.1-0.20250707130321-ac9adbf016bf // indirect go.opentelemetry.io/collector/featuregate v1.35.1-0.20250707130321-ac9adbf016bf // indirect go.opentelemetry.io/collector/internal/telemetry v0.129.1-0.20250707130321-ac9adbf016bf // indirect @@ -53,20 +72,28 @@ require ( go.opentelemetry.io/collector/receiver/receivertest v0.129.1-0.20250707130321-ac9adbf016bf // indirect go.opentelemetry.io/collector/receiver/xreceiver v0.129.1-0.20250707130321-ac9adbf016bf // indirect go.opentelemetry.io/contrib/bridges/otelzap v0.12.0 // indirect - go.opentelemetry.io/otel v1.37.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0 // indirect go.opentelemetry.io/otel/log v0.13.0 // indirect go.opentelemetry.io/otel/metric v1.37.0 // indirect go.opentelemetry.io/otel/sdk v1.37.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.37.0 // indirect go.opentelemetry.io/otel/trace v1.37.0 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.27.0 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect - golang.org/x/net v0.39.0 // indirect + golang.org/x/crypto v0.39.0 // indirect + golang.org/x/net v0.41.0 // indirect golang.org/x/sys v0.33.0 // indirect - golang.org/x/text v0.24.0 // indirect + golang.org/x/text v0.26.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463 // indirect google.golang.org/grpc v1.73.0 // indirect google.golang.org/protobuf v1.36.6 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden diff --git a/exporter/tinybirdexporter/go.sum b/exporter/tinybirdexporter/go.sum index 3cc98f1ffeaab..2d72b31f95fa3 100644 --- a/exporter/tinybirdexporter/go.sum +++ b/exporter/tinybirdexporter/go.sum @@ -3,6 +3,14 @@ github.com/cenkalti/backoff/v5 v5.0.2/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F9 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= +github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/foxboron/go-tpm-keyfiles v0.0.0-20250323135004-b31fac66206e h1:2jjYsGgM13xId2Ku+UGDQTO5It50LhT6lljiVJvBj1Y= +github.com/foxboron/go-tpm-keyfiles v0.0.0-20250323135004-b31fac66206e/go.mod h1:uAyTlAUxchYuiFjTHmuIEJ4nGSm7iOPaGcAyA81fJ80= +github.com/foxboron/swtpm_test v0.0.0-20230726224112-46aaafdf7006 h1:50sW4r0PcvlpG4PV8tYh2RVCapszJgaOLRCS2subvV4= +github.com/foxboron/swtpm_test v0.0.0-20230726224112-46aaafdf7006/go.mod h1:eIXCMsMYCaqq9m1KSSxXwQG11krpuNPGP3k0uaWrbas= +github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= +github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= @@ -16,8 +24,14 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= +github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/go-tpm v0.9.5 h1:ocUmnDebX54dnW+MQWGQRbdaAcJELsa6PqZhJ48KwVU= +github.com/google/go-tpm v0.9.5/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY= +github.com/google/go-tpm-tools v0.4.4 h1:oiQfAIkc6xTy9Fl5NKTeTJkBTlXdHsxAofmQyxBKY98= +github.com/google/go-tpm-tools v0.4.4/go.mod h1:T8jXkp2s+eltnCDIsXR84/MTcVU9Ja7bh3Mit0pa4AY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -27,6 +41,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/knadh/koanf/maps v0.1.2 h1:RBfmAW5CnZT+PJ1CVc1QSJKf4Xu9kxfQgYVQSu8hpbo= github.com/knadh/koanf/maps v0.1.2/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI= github.com/knadh/koanf/providers/confmap v1.0.0 h1:mHKLJTE7iXEys6deO5p6olAiZdG5zwp8Aebir+/EaRE= @@ -46,10 +62,14 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= +github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/rs/cors v1.11.1 h1:eU3gRzXLRK57F5rKMGMZURNdIG4EoAmX8k94r9wXWHA= +github.com/rs/cors v1.11.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= @@ -64,10 +84,22 @@ go.opentelemetry.io/collector/component v1.35.1-0.20250707130321-ac9adbf016bf h1 go.opentelemetry.io/collector/component v1.35.1-0.20250707130321-ac9adbf016bf/go.mod h1:REK1LenAljD2qjKfdGOuUscv50dtTI0JuBIZO6IGUD0= go.opentelemetry.io/collector/component/componenttest v0.129.1-0.20250707130321-ac9adbf016bf h1:4UctMQ5b3PE/Ci9WMatuZ/qF8mo/kaJEENHWXwBuavE= go.opentelemetry.io/collector/component/componenttest v0.129.1-0.20250707130321-ac9adbf016bf/go.mod h1:ZTXhTLQjwTA5h+O7ka/RoKdhGhtnMW1JXcTl3iZjV7k= +go.opentelemetry.io/collector/config/configauth v0.129.1-0.20250707130321-ac9adbf016bf h1:GYSCJd7Sqfkkwhla5TAWvQLymp3PxOFmTQCXOuZOzoY= +go.opentelemetry.io/collector/config/configauth v0.129.1-0.20250707130321-ac9adbf016bf/go.mod h1:hsLvECIZIxschpUmFN+gkbwu9ftfiPg327sGsIic0gc= +go.opentelemetry.io/collector/config/configcompression v1.35.1-0.20250707130321-ac9adbf016bf h1:m0vuY/3a2ByuE3mDf0ZsGihMhj1k3QYwWj/nkSGtZ3o= +go.opentelemetry.io/collector/config/configcompression v1.35.1-0.20250707130321-ac9adbf016bf/go.mod h1:QwbNpaOl6Me+wd0EdFuEJg0Cc+WR42HNjJtdq4TwE6w= +go.opentelemetry.io/collector/config/confighttp v0.129.1-0.20250707130321-ac9adbf016bf h1:b/LlBXsAwqfdmHSKwo+DdpUuObJsQmCMjaAA4v5ZHIA= +go.opentelemetry.io/collector/config/confighttp v0.129.1-0.20250707130321-ac9adbf016bf/go.mod h1:uN6iZns/AaUGHJFOucpzsgu2ztJG1HAZcfRNF7596PI= +go.opentelemetry.io/collector/config/configmiddleware v0.129.1-0.20250707130321-ac9adbf016bf h1:J+h3MaUq0rKxnPPOK/No0NmGvPMPg9N4yQAmZl0JfJM= +go.opentelemetry.io/collector/config/configmiddleware v0.129.1-0.20250707130321-ac9adbf016bf/go.mod h1:p+KgXUw5JsTwvY3wh2elfdgQiE8wfKsaXBJVVnrPnMo= go.opentelemetry.io/collector/config/configopaque v1.35.1-0.20250707130321-ac9adbf016bf h1:n+/0UhYyhC262JOHxJjMiiWJABJE+xyMGOgsrfhbkHs= go.opentelemetry.io/collector/config/configopaque v1.35.1-0.20250707130321-ac9adbf016bf/go.mod h1:aAOmM/mSWE2F3A58x4MUw1bYW8TIjVxn5/WfgxRgMu0= +go.opentelemetry.io/collector/config/configoptional v0.129.1-0.20250707130321-ac9adbf016bf h1:MND2M+o4RL1uS09uTkTsM0KpVOUZmxlMoh9v6XuqsdE= +go.opentelemetry.io/collector/config/configoptional v0.129.1-0.20250707130321-ac9adbf016bf/go.mod h1:QAUKdkMwnbI/o6TyY10UZC+KQvkr8enrect4H9RE8qk= go.opentelemetry.io/collector/config/configretry v1.35.1-0.20250707130321-ac9adbf016bf h1:J8jnd8qbfcB7rAb6L0DfGWrqP2VQWvDz/8a0fN5z9+o= go.opentelemetry.io/collector/config/configretry v1.35.1-0.20250707130321-ac9adbf016bf/go.mod h1:QNnb+MCk7aS1k2EuGJMtlNCltzD7b8uC7Xel0Dxm1wQ= +go.opentelemetry.io/collector/config/configtls v1.35.1-0.20250707130321-ac9adbf016bf h1:8zUKv0BL5j7n3D2vasp+UMXc8CEAn1wOXmD2oW4oAvY= +go.opentelemetry.io/collector/config/configtls v1.35.1-0.20250707130321-ac9adbf016bf/go.mod h1:twLYBQkeB4r1EpGoDGiyOj6CVpxyTX9qCji/hRs75EE= go.opentelemetry.io/collector/confmap v1.35.1-0.20250707130321-ac9adbf016bf h1:x3Tuo3uZ7U9RoQX/wLHmpymnTjMymPTGTA669QUGms8= go.opentelemetry.io/collector/confmap v1.35.1-0.20250707130321-ac9adbf016bf/go.mod h1:taTeLqkfP3tzhZFv4Et036kqiWaKAteJ88f15RiEmOU= go.opentelemetry.io/collector/confmap/xconfmap v0.129.1-0.20250707130321-ac9adbf016bf h1:/eHKvwkOCoilNj5H7wxWBNDZrbtTteTFSrLFKjsQziE= @@ -88,6 +120,14 @@ go.opentelemetry.io/collector/exporter/xexporter v0.129.1-0.20250707130321-ac9ad go.opentelemetry.io/collector/exporter/xexporter v0.129.1-0.20250707130321-ac9adbf016bf/go.mod h1:LwyShsn2BrBRRHubDJz4U2WkXa04Ag2gHoei9uCB/wU= go.opentelemetry.io/collector/extension v1.35.1-0.20250707130321-ac9adbf016bf h1:bThYEAelVicHxMLSiUHNTevb1ICNhVtdtAqKvN+yWZU= go.opentelemetry.io/collector/extension v1.35.1-0.20250707130321-ac9adbf016bf/go.mod h1:OCSMbOJQlBF+I5APJy2HCoP2xuzJahGJN5S2beq9uK8= +go.opentelemetry.io/collector/extension/extensionauth v1.35.1-0.20250707130321-ac9adbf016bf h1:cc3txJpw8UCt0Lm+yli8S6oj5UQEBJyXetEde6Napok= +go.opentelemetry.io/collector/extension/extensionauth v1.35.1-0.20250707130321-ac9adbf016bf/go.mod h1:bjGAFwd0pjtPbevALtgazGWfHAoOzGr+e/oP5NjAGv4= +go.opentelemetry.io/collector/extension/extensionauth/extensionauthtest v0.129.0 h1:JFm1T3rxtSmWwG3oltSaZpDrS7KF8AU1efvW2g/0dy8= +go.opentelemetry.io/collector/extension/extensionauth/extensionauthtest v0.129.0/go.mod h1:So7bI+k8rtVVTosMHoRMKq0+amTg9D6TY/i73sIhhrk= +go.opentelemetry.io/collector/extension/extensionmiddleware v0.129.1-0.20250707130321-ac9adbf016bf h1:5kPonPk8CQS2UmaXaqS7xgjMmI60i5qAuShYAdu/qrk= +go.opentelemetry.io/collector/extension/extensionmiddleware v0.129.1-0.20250707130321-ac9adbf016bf/go.mod h1:xc1VLLUebuxPAdKCDopohorTZifokuwFfdvPINmx/GQ= +go.opentelemetry.io/collector/extension/extensionmiddleware/extensionmiddlewaretest v0.129.0 h1:V85S9H4UnhPWEmSewFx0L25+XKXZbNUnQHdjT0YAMRY= +go.opentelemetry.io/collector/extension/extensionmiddleware/extensionmiddlewaretest v0.129.0/go.mod h1:1sWR6V3xQt+9wsc4vW/lM9zn0YmpJH4o/tLBWQFnAxg= go.opentelemetry.io/collector/extension/extensiontest v0.129.0 h1:YYXwF3rE9/4py+BD/GPUs2k/7e9WwJSDh47L2ljyxMk= go.opentelemetry.io/collector/extension/extensiontest v0.129.0/go.mod h1:r1aMvxZLlHub1/28ABW/EM88YFP0AW0B+KrB/yxXlHc= go.opentelemetry.io/collector/extension/xextension v0.129.1-0.20250707130321-ac9adbf016bf h1:IXlIRUA5Pr2sAJnN571NWh3RZAe6lhlyLWZoc1/biXQ= @@ -114,6 +154,8 @@ go.opentelemetry.io/collector/receiver/xreceiver v0.129.1-0.20250707130321-ac9ad go.opentelemetry.io/collector/receiver/xreceiver v0.129.1-0.20250707130321-ac9adbf016bf/go.mod h1:iwqRLtWo9LGIM8o3J4ir57WLjkevpKPEDayqTNGY3QU= go.opentelemetry.io/contrib/bridges/otelzap v0.12.0 h1:FGre0nZh5BSw7G73VpT3xs38HchsfPsa2aZtMp0NPOs= go.opentelemetry.io/contrib/bridges/otelzap v0.12.0/go.mod h1:X2PYPViI2wTPIMIOBjG17KNybTzsrATnvPJ02kkz7LM= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0 h1:Hf9xI/XLML9ElpiHVDNwvqI0hIFlzV8dgIr35kV1kRU= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0/go.mod h1:NfchwuyNoMcZ5MLHwPrODwUF1HWCXWrL31s8gSAdIKY= go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ= go.opentelemetry.io/otel v1.37.0/go.mod h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I= go.opentelemetry.io/otel/log v0.13.0 h1:yoxRoIZcohB6Xf0lNv9QIyCzQvrtGZklVbdCoyb7dls= @@ -139,14 +181,16 @@ go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM= +golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY= -golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E= +golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw= +golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -157,8 +201,8 @@ golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= -golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= +golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M= +golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= diff --git a/exporter/tinybirdexporter/internal/encoder.go b/exporter/tinybirdexporter/internal/encoder.go new file mode 100644 index 0000000000000..dd8533623247d --- /dev/null +++ b/exporter/tinybirdexporter/internal/encoder.go @@ -0,0 +1,8 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/tinybirdexporter/internal" + +type Encoder interface { + Encode(v any) error +} diff --git a/exporter/tinybirdexporter/internal/logs.go b/exporter/tinybirdexporter/internal/logs.go new file mode 100644 index 0000000000000..347ad6f5f6b19 --- /dev/null +++ b/exporter/tinybirdexporter/internal/logs.go @@ -0,0 +1,75 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/tinybirdexporter/internal" + +import ( + "time" + + "go.opentelemetry.io/collector/pdata/plog" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil" +) + +type logSignal struct { + ResourceSchemaURL string `json:"resource_schema_url"` + ResourceAttributes map[string]string `json:"resource_attributes"` + ServiceName string `json:"service_name"` + ScopeSchemaURL string `json:"scope_schema_url"` + ScopeAttributes map[string]string `json:"scope_attributes"` + ScopeName string `json:"scope_name"` + ScopeVersion string `json:"scope_version"` + Timestamp string `json:"timestamp"` + TraceID string `json:"trace_id"` + SpanID string `json:"span_id"` + Flags uint32 `json:"flags"` + SeverityText string `json:"severity_text"` + SeverityNumber int32 `json:"severity_number"` + LogAttributes map[string]string `json:"log_attributes"` + Body string `json:"body"` +} + +func ConvertLogs(ld plog.Logs, encoder Encoder) error { + for i := 0; i < ld.ResourceLogs().Len(); i++ { + rl := ld.ResourceLogs().At(i) + resourceSchemaURL := rl.SchemaUrl() + resource := rl.Resource() + resourceAttributesMap := resource.Attributes() + resourceAttributes := convertAttributes(resourceAttributesMap) + serviceName := getServiceName(resource.Attributes()) + for j := 0; j < rl.ScopeLogs().Len(); j++ { + sl := rl.ScopeLogs().At(j) + scopeSchemaURL := sl.SchemaUrl() + scope := sl.Scope() + scopeName := scope.Name() + scopeVersion := scope.Version() + scopeAttributes := convertAttributes(scope.Attributes()) + for k := 0; k < sl.LogRecords().Len(); k++ { + log := sl.LogRecords().At(k) + logEntry := logSignal{ + ResourceSchemaURL: resourceSchemaURL, + ResourceAttributes: resourceAttributes, + ServiceName: serviceName, + ScopeName: scopeName, + ScopeVersion: scopeVersion, + ScopeSchemaURL: scopeSchemaURL, + ScopeAttributes: scopeAttributes, + Timestamp: log.Timestamp().AsTime().Format(time.RFC3339Nano), + SeverityText: log.SeverityText(), + SeverityNumber: int32(log.SeverityNumber()), + LogAttributes: convertAttributes(log.Attributes()), + Body: log.Body().AsString(), + TraceID: traceutil.TraceIDToHexOrEmptyString(log.TraceID()), + SpanID: traceutil.SpanIDToHexOrEmptyString(log.SpanID()), + Flags: uint32(log.Flags()), + } + + err := encoder.Encode(logEntry) + if err != nil { + return err + } + } + } + } + return nil +} diff --git a/exporter/tinybirdexporter/internal/logs_test.go b/exporter/tinybirdexporter/internal/logs_test.go new file mode 100644 index 0000000000000..d9c30cf9e9e42 --- /dev/null +++ b/exporter/tinybirdexporter/internal/logs_test.go @@ -0,0 +1,153 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal + +import ( + "reflect" + "testing" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" +) + +type logEncoderMock struct { + logs []logSignal +} + +func (e *logEncoderMock) Encode(v any) error { + e.logs = append(e.logs, v.(logSignal)) + return nil +} + +func TestConvertLogs(t *testing.T) { + type args struct { + ld plog.Logs + encoder Encoder + } + tests := []struct { + name string + args args + want []logSignal + wantErr bool + }{ + { + name: "empty logs", + args: args{ + ld: plog.NewLogs(), + encoder: &logEncoderMock{logs: []logSignal{}}, + }, + want: []logSignal{}, + }, + { + name: "basic log", + args: args{ + ld: func() plog.Logs { + ld := plog.NewLogs() + resourceLogs := ld.ResourceLogs().AppendEmpty() + resourceLogs.SetSchemaUrl("https://opentelemetry.io/schemas/1.20.0") + scopeLogs := resourceLogs.ScopeLogs().AppendEmpty() + scopeLogs.SetSchemaUrl("https://opentelemetry.io/schemas/1.20.0") + scopeLogs.Scope().SetName("test-scope") + scopeLogs.Scope().SetVersion("1.0.0") + log := scopeLogs.LogRecords().AppendEmpty() + log.SetTimestamp(pcommon.Timestamp(1719158400000000000)) + log.SetTraceID(pcommon.TraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})) + log.SetSpanID(pcommon.SpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8})) + log.SetFlags(1) + log.SetSeverityText("INFO") + log.SetSeverityNumber(plog.SeverityNumberInfo) + log.Body().SetStr("log without attributes") + return ld + }(), + encoder: &logEncoderMock{logs: []logSignal{}}, + }, + want: []logSignal{ + { + ResourceSchemaURL: "https://opentelemetry.io/schemas/1.20.0", + ResourceAttributes: map[string]string{}, + ServiceName: "", + ScopeSchemaURL: "https://opentelemetry.io/schemas/1.20.0", + ScopeAttributes: map[string]string{}, + ScopeName: "test-scope", + ScopeVersion: "1.0.0", + Timestamp: "2024-06-23T16:00:00Z", + TraceID: "0102030405060708090a0b0c0d0e0f10", + SpanID: "0102030405060708", + Flags: 1, + SeverityText: "INFO", + SeverityNumber: 9, + LogAttributes: map[string]string{}, + Body: "log without attributes", + }, + }, + }, + { + name: "log with attributes", + args: args{ + ld: func() plog.Logs { + ld := plog.NewLogs() + resourceLogs := ld.ResourceLogs().AppendEmpty() + resourceLogs.SetSchemaUrl("https://opentelemetry.io/schemas/1.20.0") + resource := resourceLogs.Resource() + resource.Attributes().PutStr("service.name", "test-service") + resource.Attributes().PutStr("environment", "production") + scopeLogs := resourceLogs.ScopeLogs().AppendEmpty() + scopeLogs.SetSchemaUrl("https://opentelemetry.io/schemas/1.20.0") + scopeLogs.Scope().SetName("test-scope") + scopeLogs.Scope().SetVersion("1.0.0") + scopeLogs.Scope().Attributes().PutStr("telemetry.sdk.name", "opentelemetry") + log := scopeLogs.LogRecords().AppendEmpty() + log.SetTimestamp(pcommon.Timestamp(1719158400000000000)) + log.SetTraceID(pcommon.TraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})) + log.SetSpanID(pcommon.SpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8})) + log.SetFlags(1) + log.SetSeverityText("INFO") + log.SetSeverityNumber(plog.SeverityNumberInfo) + log.Body().SetStr("log with attributes") + log.Attributes().PutStr("http.method", "GET") + log.Attributes().PutStr("http.url", "http://example.com") + return ld + }(), + encoder: &logEncoderMock{logs: []logSignal{}}, + }, + want: []logSignal{ + { + ResourceSchemaURL: "https://opentelemetry.io/schemas/1.20.0", + ResourceAttributes: map[string]string{ + "service.name": "test-service", + "environment": "production", + }, + ServiceName: "test-service", + ScopeSchemaURL: "https://opentelemetry.io/schemas/1.20.0", + ScopeAttributes: map[string]string{ + "telemetry.sdk.name": "opentelemetry", + }, + ScopeName: "test-scope", + ScopeVersion: "1.0.0", + Timestamp: "2024-06-23T16:00:00Z", + TraceID: "0102030405060708090a0b0c0d0e0f10", + SpanID: "0102030405060708", + Flags: 1, + SeverityText: "INFO", + SeverityNumber: 9, + LogAttributes: map[string]string{ + "http.method": "GET", + "http.url": "http://example.com", + }, + Body: "log with attributes", + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if err := ConvertLogs(tt.args.ld, tt.args.encoder); (err != nil) != tt.wantErr { + t.Errorf("ConvertLogs() error = %v, wantErr %v", err, tt.wantErr) + } + if !reflect.DeepEqual(tt.args.encoder.(*logEncoderMock).logs, tt.want) { + t.Errorf("ConvertLogs() logs = %v, want %v", tt.args.encoder.(*logEncoderMock).logs, tt.want) + } + }) + } +} diff --git a/exporter/tinybirdexporter/internal/utils.go b/exporter/tinybirdexporter/internal/utils.go new file mode 100644 index 0000000000000..5b65380b4e894 --- /dev/null +++ b/exporter/tinybirdexporter/internal/utils.go @@ -0,0 +1,26 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/tinybirdexporter/internal" + +import ( + "go.opentelemetry.io/collector/pdata/pcommon" + conventions "go.opentelemetry.io/otel/semconv/v1.27.0" +) + +func getServiceName(resAttr pcommon.Map) string { + if v, ok := resAttr.Get(string(conventions.ServiceNameKey)); ok { + return v.AsString() + } + + return "" +} + +func convertAttributes(attributes pcommon.Map) map[string]string { + attrs := make(map[string]string, attributes.Len()) + attributes.Range(func(k string, v pcommon.Value) bool { + attrs[k] = v.AsString() + return true + }) + return attrs +} diff --git a/exporter/tinybirdexporter/testdata/config.yaml b/exporter/tinybirdexporter/testdata/config.yaml index 802a0ec69e924..7a5bc98a3e7f7 100644 --- a/exporter/tinybirdexporter/testdata/config.yaml +++ b/exporter/tinybirdexporter/testdata/config.yaml @@ -4,10 +4,16 @@ tinybird: tinybird/full: endpoint: "https://api.tinybird.co" + compression: "zstd" + retry_on_failure: + enabled: false + sending_queue: + enabled: false token: "test-token" metrics::datasource: "metrics" traces::datasource: "traces" logs::datasource: "logs" + wait: true tinybird/invalid_datasource: endpoint: "https://api.tinybird.co" @@ -18,12 +24,6 @@ tinybird/invalid_datasource: tinybird/missing_token: endpoint: "https://api.tinybird.co" - metrics::datasource: "metrics" - traces::datasource: "traces" - logs::datasource: "logs" tinybird/missing_endpoint: token: "test-token" - metrics::datasource: "metrics" - traces::datasource: "traces" - logs::datasource: "logs" \ No newline at end of file