Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/exporter-tinybird-logs-implementation.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: tinybird

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Implement logs propagation for Tinybird exporter

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [40475]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
37 changes: 25 additions & 12 deletions exporter/tinybirdexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@
package tinybirdexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/tinybirdexporter"

import (
"errors"
"fmt"
"net/url"
"regexp"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

var datasourceRegex = regexp.MustCompile(`^[\w_]+$`)
Expand All @@ -21,6 +25,9 @@ type SignalConfig struct {
}

func (cfg SignalConfig) Validate() error {
if cfg.Datasource == "" {
return errors.New("datasource cannot be empty")
}
if !datasourceRegex.MatchString(cfg.Datasource) {
return fmt.Errorf("invalid datasource %q: only letters, numbers, and underscores are allowed", cfg.Datasource)
}
Expand All @@ -29,32 +36,38 @@ func (cfg SignalConfig) Validate() error {

// Config defines configuration for the Tinybird exporter.
type Config struct {
Endpoint string `mapstructure:"endpoint"`
Token configopaque.String `mapstructure:"token"`
Metrics SignalConfig `mapstructure:"metrics"`
Traces SignalConfig `mapstructure:"traces"`
Logs SignalConfig `mapstructure:"logs"`
ClientConfig confighttp.ClientConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
RetryConfig configretry.BackOffConfig `mapstructure:"retry_on_failure"`
QueueConfig exporterhelper.QueueBatchConfig `mapstructure:"sending_queue"`

// Tinybird API token.
Token configopaque.String `mapstructure:"token"`
Metrics SignalConfig `mapstructure:"metrics"`
Traces SignalConfig `mapstructure:"traces"`
Logs SignalConfig `mapstructure:"logs"`
// Wait for data to be ingested before returning a response.
Wait bool `mapstructure:"wait"`
}

var _ component.Config = (*Config)(nil)

// Validate checks if the exporter configuration is valid
func (cfg *Config) Validate() error {
if cfg.Token == "" {
return errMissingToken
}
if cfg.Endpoint == "" {
if cfg.ClientConfig.Endpoint == "" {
return errMissingEndpoint
}
u, err := url.Parse(cfg.Endpoint)
u, err := url.Parse(cfg.ClientConfig.Endpoint)
if err != nil {
return fmt.Errorf("endpoint must be a valid URL: %w", err)
}
if u.Scheme != "http" && u.Scheme != "https" {
return fmt.Errorf("endpoint must have http or https scheme: %q", cfg.Endpoint)
return fmt.Errorf("endpoint must have http or https scheme: %q", cfg.ClientConfig.Endpoint)
}
if u.Host == "" {
return fmt.Errorf("endpoint must have a host: %q", cfg.Endpoint)
return fmt.Errorf("endpoint must have a host: %q", cfg.ClientConfig.Endpoint)
}
if cfg.Token == "" {
return errMissingToken
}
return nil
}
46 changes: 36 additions & 10 deletions exporter/tinybirdexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configcompression"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/confmap/confmaptest"
"go.opentelemetry.io/collector/confmap/xconfmap"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

func TestLoadConfig(t *testing.T) {
Expand All @@ -30,22 +34,44 @@ func TestLoadConfig(t *testing.T) {
id: component.NewIDWithName(component.MustNewType(typeStr), ""),
subName: "tinybird",
expected: &Config{
Endpoint: "https://api.tinybird.co",
Token: "test-token",
Metrics: SignalConfig{Datasource: "metrics"},
Traces: SignalConfig{Datasource: "traces"},
Logs: SignalConfig{Datasource: "logs"},
ClientConfig: func() confighttp.ClientConfig {
cfg := createDefaultConfig().(*Config).ClientConfig
cfg.Endpoint = "https://api.tinybird.co"
return cfg
}(),
RetryConfig: configretry.NewDefaultBackOffConfig(),
QueueConfig: exporterhelper.NewDefaultQueueConfig(),
Token: "test-token",
Metrics: SignalConfig{Datasource: "metrics"},
Traces: SignalConfig{Datasource: "traces"},
Logs: SignalConfig{Datasource: "logs"},
},
},
{
id: component.NewIDWithName(component.MustNewType(typeStr), "full"),
subName: "tinybird/full",
expected: &Config{
Endpoint: "https://api.tinybird.co",
Token: "test-token",
Metrics: SignalConfig{Datasource: "metrics"},
Traces: SignalConfig{Datasource: "traces"},
Logs: SignalConfig{Datasource: "logs"},
ClientConfig: func() confighttp.ClientConfig {
cfg := createDefaultConfig().(*Config).ClientConfig
cfg.Endpoint = "https://api.tinybird.co"
cfg.Compression = configcompression.TypeZstd
return cfg
}(),
RetryConfig: func() configretry.BackOffConfig {
cfg := createDefaultConfig().(*Config).RetryConfig
cfg.Enabled = false
return cfg
}(),
QueueConfig: func() exporterhelper.QueueBatchConfig {
cfg := createDefaultConfig().(*Config).QueueConfig
cfg.Enabled = false
return cfg
}(),
Token: "test-token",
Metrics: SignalConfig{Datasource: "metrics"},
Traces: SignalConfig{Datasource: "traces"},
Logs: SignalConfig{Datasource: "logs"},
Wait: true,
},
},
{
Expand Down
147 changes: 140 additions & 7 deletions exporter/tinybirdexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,60 @@
package tinybirdexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/tinybirdexporter"

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"runtime"
"strconv"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/tinybirdexporter/internal"
)

type tinybirdExporter struct{}
const (
headerRetryAfter = "Retry-After"
contentTypeNDJSON = "application/x-ndjson"
)

func newExporter(_ component.Config, _ exporter.Settings) (*tinybirdExporter, error) {
return &tinybirdExporter{}, nil
type tinybirdExporter struct {
config *Config
client *http.Client
logger *zap.Logger
settings component.TelemetrySettings
userAgent string
}

func (e *tinybirdExporter) start(_ context.Context, _ component.Host) error {
return nil
func newExporter(cfg component.Config, set exporter.Settings) *tinybirdExporter {
oCfg := cfg.(*Config)

userAgent := fmt.Sprintf("%s/%s (%s/%s)",
set.BuildInfo.Description, set.BuildInfo.Version, runtime.GOOS, runtime.GOARCH)

return &tinybirdExporter{
config: oCfg,
logger: set.Logger,
userAgent: userAgent,
settings: set.TelemetrySettings,
}
}

func (e *tinybirdExporter) start(ctx context.Context, host component.Host) error {
var err error
e.client, err = e.config.ClientConfig.ToClient(ctx, host, e.settings)
return err
}

func (e *tinybirdExporter) pushTraces(_ context.Context, _ ptrace.Traces) error {
Expand All @@ -32,6 +68,103 @@ func (e *tinybirdExporter) pushMetrics(_ context.Context, _ pmetric.Metrics) err
return errors.New("this component is under development and metrics are not yet supported, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/40475 to track development progress")
}

func (e *tinybirdExporter) pushLogs(_ context.Context, _ plog.Logs) error {
return errors.New("this component is under development and logs are not yet supported, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/40475 to track development progress")
func (e *tinybirdExporter) pushLogs(ctx context.Context, ld plog.Logs) error {
buffer := bytes.NewBuffer(nil)
encoder := json.NewEncoder(buffer)
err := internal.ConvertLogs(ld, encoder)
if err != nil {
return consumererror.NewPermanent(err)
}

if buffer.Len() > 0 {
Comment thread
mx-psi marked this conversation as resolved.
return e.export(ctx, e.config.Logs.Datasource, buffer)
}
return nil
}

func (e *tinybirdExporter) export(ctx context.Context, dataSource string, buffer *bytes.Buffer) error {
// Create request and add query parameters
url := e.config.ClientConfig.Endpoint + "/v0/events"
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, buffer)
if err != nil {
return consumererror.NewPermanent(err)
}
q := req.URL.Query()
q.Set("name", dataSource)
if e.config.Wait {
q.Set("wait", "true")
}
req.URL.RawQuery = q.Encode()

// Set headers
req.Header.Set("Content-Type", contentTypeNDJSON)
req.Header.Set("Authorization", "Bearer "+string(e.config.Token))
req.Header.Set("User-Agent", e.userAgent)

// Send request
resp, err := e.client.Do(req)
if err != nil {
return err
}
defer func() {
// Drain the response body to avoid leaking resources.
_, _ = io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}()

// Check if the request was successful.
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
return nil
}

// Read error response
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response body: %w", err)
}
formattedErr := fmt.Errorf("error exporting items, request to %s responded with HTTP Status Code %d, Message=%s",
url, resp.StatusCode, string(respBody))

// If the status code is not retryable, return a permanent error.
if !isRetryableStatusCode(resp.StatusCode) {
return consumererror.NewPermanent(formattedErr)
}

// Check if the server is overwhelmed.
isThrottleError := resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode == http.StatusServiceUnavailable
if isThrottleError {
values := resp.Header.Values(headerRetryAfter)
if len(values) == 0 {
return formattedErr
}
// The value of Retry-After field can be either an HTTP-date or a number of
// seconds to delay after the response is received. See https://datatracker.ietf.org/doc/html/rfc7231#section-7.1.3
//
// Tinybird Events API returns the delay-seconds in the Retry-After header.
// https://www.tinybird.co/docs/forward/get-data-in/events-api#rate-limit-headers
if seconds, err := strconv.Atoi(values[0]); err == nil {
return exporterhelper.NewThrottleRetry(formattedErr, time.Duration(seconds)*time.Second)
}
}

return formattedErr
}

// Determine if the status code is retryable according to Tinybird Events API.
// See https://www.tinybird.co/docs/api-reference/events-api#return-http-status-codes
func isRetryableStatusCode(code int) bool {
switch code {
case http.StatusTooManyRequests:
return true
case http.StatusInternalServerError:
return true
case http.StatusBadGateway:
return true
case http.StatusServiceUnavailable:
return true
case http.StatusGatewayTimeout:
return true
default:
return false
}
}
Loading