Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions batch_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package sentry

import (
"context"
"sync"
"time"
)

type BatchMeter struct {
client *Client
metricsCh chan Metric
flushCh chan chan struct{}
cancel context.CancelFunc
wg sync.WaitGroup
startOnce sync.Once
shutdownOnce sync.Once
}

func NewBatchMeter(client *Client) *BatchMeter {
return &BatchMeter{
client: client,
metricsCh: make(chan Metric, batchSize),
flushCh: make(chan chan struct{}),
}
}

func (m *BatchMeter) Start() {
m.startOnce.Do(func() {
ctx, cancel := context.WithCancel(context.Background())
m.cancel = cancel
m.wg.Add(1)
go m.run(ctx)
})
}

func (m *BatchMeter) Flush(timeout <-chan struct{}) {
done := make(chan struct{})
select {
case m.flushCh <- done:
select {
case <-done:
case <-timeout:
}
case <-timeout:
}
}

func (m *BatchMeter) Shutdown() {
m.shutdownOnce.Do(func() {
if m.cancel != nil {
m.cancel()
m.wg.Wait()
}
})
}

func (m *BatchMeter) run(ctx context.Context) {
defer m.wg.Done()
var metrics []Metric
timer := time.NewTimer(batchTimeout)
defer timer.Stop()

for {
select {
case metric := <-m.metricsCh:
metrics = append(metrics, metric)
if len(metrics) >= batchSize {
m.processEvent(metrics)
metrics = nil
if !timer.Stop() {
<-timer.C
}
timer.Reset(batchTimeout)
}
case <-timer.C:
if len(metrics) > 0 {
m.processEvent(metrics)
metrics = nil
}
timer.Reset(batchTimeout)
case done := <-m.flushCh:
flushDrain:
for {
select {
case metric := <-m.metricsCh:
metrics = append(metrics, metric)
default:
break flushDrain
}
}

if len(metrics) > 0 {
m.processEvent(metrics)
metrics = nil
}
if !timer.Stop() {
<-timer.C
}
timer.Reset(batchTimeout)
close(done)
case <-ctx.Done():
drain:
for {
select {
case metric := <-m.metricsCh:
metrics = append(metrics, metric)
default:
break drain
}
}

if len(metrics) > 0 {
m.processEvent(metrics)
}
return
}
}
}

func (m *BatchMeter) processEvent(metrics []Metric) {
event := NewEvent()
event.Timestamp = time.Now()
event.EventID = EventID(uuid())
event.Type = traceMetricEvent.Type
event.Metrics = metrics
m.client.Transport.SendEvent(event)
}
14 changes: 13 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,9 @@ type ClientOptions struct {
Tags map[string]string
// EnableLogs controls when logs should be emitted.
EnableLogs bool
// ExperimentalEnableTraceMetric controls when trace metrics should be emitted.
// This is an experimental feature that is subject to change.
ExperimentalEnableTraceMetric bool
// TraceIgnoreStatusCodes is a list of HTTP status codes that should not be traced.
// Each element can be either:
// - A single-element slice [code] for a specific status code
Expand Down Expand Up @@ -265,6 +268,7 @@ type Client struct {
// not supported, create a new client instead.
Transport Transport
batchLogger *BatchLogger
batchMeter *BatchMeter
}

// NewClient creates and returns an instance of Client configured using
Expand Down Expand Up @@ -369,6 +373,11 @@ func NewClient(options ClientOptions) (*Client, error) {
client.batchLogger.Start()
}

if options.ExperimentalEnableTraceMetric {
client.batchMeter = NewBatchMeter(&client)
client.batchMeter.Start()
}

client.setupTransport()
client.setupIntegrations()

Expand Down Expand Up @@ -531,7 +540,7 @@ func (client *Client) RecoverWithContext(
// the network synchronously, configure it to use the HTTPSyncTransport in the
// call to Init.
func (client *Client) Flush(timeout time.Duration) bool {
if client.batchLogger != nil {
if client.batchLogger != nil || client.batchMeter != nil {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return client.FlushWithContext(ctx)
Expand All @@ -555,6 +564,9 @@ func (client *Client) FlushWithContext(ctx context.Context) bool {
if client.batchLogger != nil {
client.batchLogger.Flush(ctx.Done())
}
if client.batchMeter != nil {
client.batchMeter.Flush(ctx.Done())
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Goroutine Leak in Client Close

The Client.Close() method doesn't call Shutdown() on batchMeter (or batchLogger), causing goroutine leaks. The BatchMeter.run() goroutine will continue running indefinitely even after the client is closed, as it only stops when the context is cancelled. The Shutdown() method should be called to properly cancel the context and wait for the goroutine to finish, similar to how Flush() calls both batchLogger.Flush() and batchMeter.Flush().

Fix in Cursor Fix in Web

return client.Transport.FlushWithContext(ctx)
}

Expand Down
100 changes: 99 additions & 1 deletion interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ var logEvent = struct {
"application/vnd.sentry.items.log+json",
}

var traceMetricEvent = struct {
Type string
ContentType string
}{
"trace_metric",
"application/vnd.sentry.items.trace-metric+json",
}

// Level marks the severity of the event.
type Level string

Expand Down Expand Up @@ -141,6 +149,26 @@ type LogEntry interface {
Emitf(format string, args ...interface{})
}

type MeterOptions struct {
// Attributes are key/value pairs that will be added to the metric.
// The attributes set here will take precedence over the attributes
// set from the Meter.
Attributes []attribute.Builder
// The unit of measurements, for "gauge" and "distribution" metrics.
Unit string
}

type Meter interface {
// GetCtx returns the [context.Context] set on the meter.
GetCtx() context.Context
// SetAttributes allows attaching parameters to the meter using the attribute API.
SetAttributes(...attribute.Builder)
Count(name string, count int64, options MeterOptions)
Gauge(name string, value int64, options MeterOptions)
FGauge(name string, value float64, options MeterOptions)
Distribution(name string, sample float64, options MeterOptions)
}

// Attachment allows associating files with your events to aid in investigation.
// An event may contain one or more attachments.
type Attachment struct {
Expand Down Expand Up @@ -395,14 +423,42 @@ type Event struct {
CheckIn *CheckIn `json:"check_in,omitempty"`
MonitorConfig *MonitorConfig `json:"monitor_config,omitempty"`

Items *sentryItems `json:"items,omitempty"`
// The fields below are only relevant for logs
Logs []Log `json:"items,omitempty"`
Logs []Log `json:"-"`

// The fields below are only relevant for metrics
Metrics []Metric `json:"-"`

// The fields below are not part of the final JSON payload.

sdkMetaData SDKMetaData
}

// sentryItems is a wrapper for the Items field of the Event struct.
// It is used to prevent the empty interface from being marshaled.
type sentryItems struct {
valid bool
t string
metrics []Metric
logs []Log
}

func (items *sentryItems) MarshalJSON() ([]byte, error) {
if !items.valid {
return nil, nil
}

switch items.t {
case traceMetricEvent.Type:
return json.Marshal(items.metrics)
case logEvent.Type:
return json.Marshal(items.logs)
default:
return nil, nil
}
}

// SetException appends the unwrapped errors to the event's exception list.
//
// maxErrorDepth is the maximum depth of the error chain we will look
Expand Down Expand Up @@ -480,6 +536,8 @@ func (e *Event) ToEnvelopeWithTime(dsn *protocol.Dsn, sentAt time.Time) (*protoc
mainItem = protocol.NewEnvelopeItem(protocol.EnvelopeItemTypeCheckIn, eventBody)
case logEvent.Type:
mainItem = protocol.NewLogItem(len(e.Logs), eventBody)
case traceMetricEvent.Type:
mainItem = protocol.NewTraceMetricsItem(len(e.Metrics), eventBody)
default:
mainItem = protocol.NewEnvelopeItem(protocol.EnvelopeItemTypeEvent, eventBody)
}
Expand Down Expand Up @@ -515,6 +573,25 @@ func (e *Event) MarshalJSON() ([]byte, error) {
if e.Type == checkInType {
return e.checkInMarshalJSON()
}

// HACK: Logs & metrics uses the same JSON key. This is not possible in Go.
// Since metrics is experimental, we'll try to prioritize logs.
if e.Logs != nil {
e.Items = &sentryItems{
valid: true,
t: logEvent.Type,
metrics: nil,
logs: e.Logs,
}
} else if e.Metrics != nil {
e.Items = &sentryItems{
valid: true,
t: traceMetricEvent.Type,
metrics: e.Metrics,
logs: nil,
}
}

return e.defaultMarshalJSON()
}

Expand Down Expand Up @@ -623,6 +700,8 @@ func (e *Event) toCategory() ratelimit.Category {
return ratelimit.CategoryLog
case checkInType:
return ratelimit.CategoryMonitor
case traceMetricEvent.Type:
return ratelimit.CategoryTraceMetric
default:
return ratelimit.CategoryUnknown
}
Expand Down Expand Up @@ -681,3 +760,22 @@ type Attribute struct {
Value any `json:"value"`
Type AttrType `json:"type"`
}

type MetricType string

const (
MetricTypeInvalid MetricType = ""
MetricTypeCounter MetricType = "counter"
MetricTypeGauge MetricType = "gauge"
MetricTypeDistribution MetricType = "distribution"
)

type Metric struct {
Timestamp time.Time `json:"timestamp"`
TraceID TraceID `json:"trace_id,omitempty"`
Type MetricType `json:"type"`
Name string `json:"name,omitempty"`
Value float64 `json:"value"`
Unit string `json:"unit,omitempty"`
Attributes map[string]Attribute `json:"attributes,omitempty"`
}
24 changes: 19 additions & 5 deletions internal/protocol/envelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ type EnvelopeItemType string

// Constants for envelope item types as defined in the Sentry documentation.
const (
EnvelopeItemTypeEvent EnvelopeItemType = "event"
EnvelopeItemTypeTransaction EnvelopeItemType = "transaction"
EnvelopeItemTypeCheckIn EnvelopeItemType = "check_in"
EnvelopeItemTypeAttachment EnvelopeItemType = "attachment"
EnvelopeItemTypeLog EnvelopeItemType = "log"
EnvelopeItemTypeEvent EnvelopeItemType = "event"
EnvelopeItemTypeTransaction EnvelopeItemType = "transaction"
EnvelopeItemTypeCheckIn EnvelopeItemType = "check_in"
EnvelopeItemTypeAttachment EnvelopeItemType = "attachment"
EnvelopeItemTypeLog EnvelopeItemType = "log"
EnvelopeItemTypeTraceMetrics EnvelopeItemType = "trace_metric"
)

// EnvelopeItemHeader represents the header of an envelope item.
Expand Down Expand Up @@ -211,3 +212,16 @@ func NewLogItem(itemCount int, payload []byte) *EnvelopeItem {
Payload: payload,
}
}

func NewTraceMetricsItem(itemCount int, payload []byte) *EnvelopeItem {
length := len(payload)
return &EnvelopeItem{
Header: &EnvelopeItemHeader{
Type: EnvelopeItemTypeTraceMetrics,
Length: &length,
ItemCount: &itemCount,
ContentType: "application/vnd.sentry.items.trace-metric+json",
},
Payload: payload,
}
}
Loading
Loading