Skip to content

Commit

Permalink
Dogstatsd exporter resource support (for 0.4.3 release) (#25)
Browse files Browse the repository at this point in the history
* Update to 0.4.3 resources support

* Add more test naming
  • Loading branch information
jmacd authored Apr 28, 2020
1 parent 83131c9 commit a65fe91
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 106 deletions.
43 changes: 27 additions & 16 deletions exporters/metric/dogstatsd/dogstatsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,8 @@ var (

// NewRawExporter returns a new Dogstatsd-syntax exporter for use in a pipeline.
func NewRawExporter(config Config) (*Exporter, error) {
// TODO: Remove the resource value set from the Config here when
// https://github.com/open-telemetry/opentelemetry-go/pull/640
// and 641 are released. The resources will be received on
// the first call to Export().
res := config.Resource
if res == nil {
res = resource.New()
}

exp := &Exporter{
labelEncoder: NewLabelEncoder(res),
labelEncoder: NewLabelEncoder(),
}

var err error
Expand Down Expand Up @@ -92,17 +83,17 @@ func InstallNewPipeline(config Config) (*push.Controller, error) {

// NewExportPipeline sets up a complete export pipeline with the recommended setup,
// chaining a NewRawExporter into the recommended selectors and batchers.
func NewExportPipeline(config Config, period time.Duration) (*push.Controller, error) {
func NewExportPipeline(config Config, period time.Duration, opts ...push.Option) (*push.Controller, error) {
exporter, err := NewRawExporter(config)
if err != nil {
return nil, err
}

// The ungrouped batcher ensures that the export sees the full
// set of labels as dogstatsd tags.
batcher := ungrouped.New(exporter, exporter.labelEncoder, false)
batcher := ungrouped.New(exporter, false)

pusher := push.New(batcher, exporter, period)
pusher := push.New(batcher, exporter, period, opts...)
pusher.Start()

return pusher, nil
Expand All @@ -127,7 +118,27 @@ func (*Exporter) AppendName(rec export.Record, buf *bytes.Buffer) {
}

// AppendTags is part of the stats-internal adapter interface.
func (e *Exporter) AppendTags(rec export.Record, buf *bytes.Buffer) {
encoded := rec.Labels().Encoded(e.labelEncoder)
_, _ = buf.WriteString(encoded)
func (e *Exporter) AppendTags(rec export.Record, res *resource.Resource, buf *bytes.Buffer) {
rencoded := res.Encoded(e.labelEncoder)
lencoded := rec.Labels().Encoded(e.labelEncoder)

// Note: We do not de-duplicate tag-keys between resources and
// event labels here. Instead, include resources first so
// that the receiver can apply OTel's last-value-wins
// semantcis, if desired.
rlen := len(rencoded)
llen := len(lencoded)
if rlen == 0 && llen == 0 {
return
}

buf.WriteString("|#")

_, _ = buf.WriteString(rencoded)

if rlen != 0 && llen != 0 {
buf.WriteRune(',')
}

_, _ = buf.WriteString(lencoded)
}
79 changes: 61 additions & 18 deletions exporters/metric/dogstatsd/dogstatsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,71 @@ import (
)

// TestDogstatsLabels that labels are formatted in the correct style,
// whether or not the provided labels were encoded by a statsd label
// encoder.
// including Resources.
func TestDogstatsLabels(t *testing.T) {
encoder := dogstatsd.NewLabelEncoder(resource.New(key.String("R", "S")))
ctx := context.Background()
checkpointSet := test.NewCheckpointSet(encoder)
type testCase struct {
name string
resources []core.KeyValue
labels []core.KeyValue
expected string
}

desc := metric.NewDescriptor("test.name", metric.CounterKind, core.Int64NumberKind)
cagg := sum.New()
_ = cagg.Update(ctx, core.NewInt64Number(123), &desc)
cagg.Checkpoint(ctx, &desc)
kvs := func(kvs ...core.KeyValue) []core.KeyValue { return kvs }

checkpointSet.Add(&desc, cagg, key.New("A").String("B"))
cases := []testCase{
{
name: "no labels",
resources: nil,
labels: nil,
expected: "test.name:123|c\n",
},
{
name: "only resources",
resources: kvs(key.String("R", "S")),
labels: nil,
expected: "test.name:123|c|#R:S\n",
},
{
name: "only labels",
resources: nil,
labels: kvs(key.String("A", "B")),
expected: "test.name:123|c|#A:B\n",
},
{
name: "both resources and labels",
resources: kvs(key.String("R", "S")),
labels: kvs(key.String("A", "B")),
expected: "test.name:123|c|#R:S,A:B\n",
},
{
resources: kvs(key.String("A", "R")),
labels: kvs(key.String("A", "B")),
expected: "test.name:123|c|#A:R,A:B\n",
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
res := resource.New(tc.resources...)
ctx := context.Background()
checkpointSet := test.NewCheckpointSet()

var buf bytes.Buffer
exp, err := dogstatsd.NewRawExporter(dogstatsd.Config{
Writer: &buf,
})
require.Nil(t, err)
desc := metric.NewDescriptor("test.name", metric.CounterKind, core.Int64NumberKind)
cagg := sum.New()
_ = cagg.Update(ctx, core.NewInt64Number(123), &desc)
cagg.Checkpoint(ctx, &desc)

err = exp.Export(ctx, checkpointSet)
require.Nil(t, err)
checkpointSet.Add(&desc, cagg, tc.labels...)

require.Equal(t, "test.name:123|c|#R:S,A:B\n", buf.String())
var buf bytes.Buffer
exp, err := dogstatsd.NewRawExporter(dogstatsd.Config{
Writer: &buf,
})
require.Nil(t, err)

err = exp.Export(ctx, res, checkpointSet)
require.Nil(t, err)

require.Equal(t, tc.expected, buf.String())
})
}
}
8 changes: 5 additions & 3 deletions exporters/metric/dogstatsd/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"go.opentelemetry.io/contrib/exporters/metric/dogstatsd"
"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
"go.opentelemetry.io/otel/sdk/resource"
)

func ExampleNew() {
Expand Down Expand Up @@ -60,7 +62,7 @@ func ExampleNew() {
// In real code, use the URL field:
//
// URL: fmt.Sprint("unix://", path),
}, time.Minute)
}, time.Minute, push.WithResource(resource.New(key.String("host", "name"))))
if err != nil {
log.Fatal("Could not initialize dogstatsd exporter:", err)
}
Expand All @@ -73,7 +75,7 @@ func ExampleNew() {
meter := pusher.Meter("example")

// Create and update a single counter:
counter := metric.Must(meter).NewInt64Counter("a.counter", metric.WithKeys(key))
counter := metric.Must(meter).NewInt64Counter("a.counter")

counter.Add(ctx, 100, key.String("value"))

Expand All @@ -83,5 +85,5 @@ func ExampleNew() {
wg.Wait()

// Output:
// a.counter:100|c|#key:value
// a.counter:100|c|#host:name,key:value
}
3 changes: 2 additions & 1 deletion exporters/metric/dogstatsd/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ go 1.14

require (
github.com/stretchr/testify v1.5.1
go.opentelemetry.io/otel v0.4.2
go.opentelemetry.io/otel v0.4.3
)

3 changes: 3 additions & 0 deletions exporters/metric/dogstatsd/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
go.opentelemetry.io v0.1.0 h1:EANZoRCOP+A3faIlw/iN6YEWoYb1vleZRKm1EvH8T48=
go.opentelemetry.io/otel v0.4.2 h1:nT+GOqqRR1cIY92xmo1DeiXLHtIlXH1KLRgnsnhuNrs=
go.opentelemetry.io/otel v0.4.2/go.mod h1:OgNpQOjrlt33Ew6Ds0mGjmcTQg/rhUctsbkRdk/g1fw=
go.opentelemetry.io/otel v0.4.3 h1:CroUX/0O1ZDcF0iWOO8gwYFWb5EbdSF0/C1yosO+Vhs=
go.opentelemetry.io/otel v0.4.3/go.mod h1:jzBIgIzK43Iu1BpDAXwqOd6UPsSAk+ewVZ5ofSXw4Ek=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand Down
24 changes: 9 additions & 15 deletions exporters/metric/dogstatsd/internal/statsd/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,6 @@ type (
// MaxPacketSize this limits the packet size for packet-oriented transports.
MaxPacketSize int

// Resource WILL BE REMOVED AFTER RESOURCES ARE ADDED
// TO THE EXPORT APIs. TODO: Remove this when Export()
// passes the Resource. See:
// https://github.com/open-telemetry/opentelemetry-go/pull/640
Resource *resource.Resource

// TODO support Dial and Write timeouts
}

Expand All @@ -74,7 +68,7 @@ type (
// statsd vs. dogstatsd.
Adapter interface {
AppendName(export.Record, *bytes.Buffer)
AppendTags(export.Record, *bytes.Buffer)
AppendTags(export.Record, *resource.Resource, *bytes.Buffer)
}
)

Expand Down Expand Up @@ -166,7 +160,7 @@ func dial(endpoint string) (net.Conn, error) {
}

// Export is common code for any statsd-based metric.Exporter implementation.
func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error {
func (e *Exporter) Export(_ context.Context, resource *resource.Resource, checkpointSet export.CheckpointSet) error {
buf := &e.buffer
buf.Reset()

Expand All @@ -181,7 +175,7 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet)
for pt := 0; pt < pts; pt++ {
before := buf.Len()

if err := e.formatMetric(rec, pt, buf); err != nil {
if err := e.formatMetric(rec, resource, pt, buf); err != nil {
return err
}

Expand Down Expand Up @@ -249,7 +243,7 @@ func (e *Exporter) countPoints(rec export.Record) (int, error) {
// formatMetric formats an individual export record. For some records
// this will emit a single statistic, for some it will emit more than
// one.
func (e *Exporter) formatMetric(rec export.Record, pos int, buf *bytes.Buffer) error {
func (e *Exporter) formatMetric(rec export.Record, res *resource.Resource, pos int, buf *bytes.Buffer) error {
desc := rec.Descriptor()
agg := rec.Aggregator()

Expand All @@ -270,34 +264,34 @@ func (e *Exporter) formatMetric(rec export.Record, pos int, buf *bytes.Buffer) e
if err != nil {
return err
}
e.formatSingleStat(rec, points[pos], format, buf)
e.formatSingleStat(rec, res, points[pos], format, buf)

} else if sum, ok := agg.(aggregator.Sum); ok {
sum, err := sum.Sum()
if err != nil {
return err
}
e.formatSingleStat(rec, sum, formatCounter, buf)
e.formatSingleStat(rec, res, sum, formatCounter, buf)

} else if lv, ok := agg.(aggregator.LastValue); ok {
lv, _, err := lv.LastValue()
if err != nil {
return err
}
e.formatSingleStat(rec, lv, formatGauge, buf)
e.formatSingleStat(rec, res, lv, formatGauge, buf)
}
return nil
}

// formatSingleStat encodes a single item of statsd data followed by a
// newline.
func (e *Exporter) formatSingleStat(rec export.Record, val core.Number, fmtStr string, buf *bytes.Buffer) {
func (e *Exporter) formatSingleStat(rec export.Record, res *resource.Resource, val core.Number, fmtStr string, buf *bytes.Buffer) {
e.adapter.AppendName(rec, buf)
_, _ = buf.WriteRune(':')
writeNumber(buf, val, rec.Descriptor().NumberKind())
_, _ = buf.WriteRune('|')
_, _ = buf.WriteString(fmtStr)
e.adapter.AppendTags(rec, buf)
e.adapter.AppendTags(rec, res, buf)
_, _ = buf.WriteRune('\n')
}

Expand Down
Loading

0 comments on commit a65fe91

Please sign in to comment.