diff --git a/exporters/metric/dogstatsd/dogstatsd.go b/exporters/metric/dogstatsd/dogstatsd.go index 9c69d95409c..e3fac20a174 100644 --- a/exporters/metric/dogstatsd/dogstatsd.go +++ b/exporters/metric/dogstatsd/dogstatsd.go @@ -54,17 +54,8 @@ var ( // NewRawExporter returns a new Dogstatsd-syntax exporter for use in a pipeline. func NewRawExporter(config Config) (*Exporter, error) { - // TODO: Remove the resource value set from the Config here when - // https://github.com/open-telemetry/opentelemetry-go/pull/640 - // and 641 are released. The resources will be received on - // the first call to Export(). - res := config.Resource - if res == nil { - res = resource.New() - } - exp := &Exporter{ - labelEncoder: NewLabelEncoder(res), + labelEncoder: NewLabelEncoder(), } var err error @@ -92,7 +83,7 @@ func InstallNewPipeline(config Config) (*push.Controller, error) { // NewExportPipeline sets up a complete export pipeline with the recommended setup, // chaining a NewRawExporter into the recommended selectors and batchers. -func NewExportPipeline(config Config, period time.Duration) (*push.Controller, error) { +func NewExportPipeline(config Config, period time.Duration, opts ...push.Option) (*push.Controller, error) { exporter, err := NewRawExporter(config) if err != nil { return nil, err @@ -100,9 +91,9 @@ func NewExportPipeline(config Config, period time.Duration) (*push.Controller, e // The ungrouped batcher ensures that the export sees the full // set of labels as dogstatsd tags. - batcher := ungrouped.New(exporter, exporter.labelEncoder, false) + batcher := ungrouped.New(exporter, false) - pusher := push.New(batcher, exporter, period) + pusher := push.New(batcher, exporter, period, opts...) pusher.Start() return pusher, nil @@ -127,7 +118,27 @@ func (*Exporter) AppendName(rec export.Record, buf *bytes.Buffer) { } // AppendTags is part of the stats-internal adapter interface. -func (e *Exporter) AppendTags(rec export.Record, buf *bytes.Buffer) { - encoded := rec.Labels().Encoded(e.labelEncoder) - _, _ = buf.WriteString(encoded) +func (e *Exporter) AppendTags(rec export.Record, res *resource.Resource, buf *bytes.Buffer) { + rencoded := res.Encoded(e.labelEncoder) + lencoded := rec.Labels().Encoded(e.labelEncoder) + + // Note: We do not de-duplicate tag-keys between resources and + // event labels here. Instead, include resources first so + // that the receiver can apply OTel's last-value-wins + // semantcis, if desired. + rlen := len(rencoded) + llen := len(lencoded) + if rlen == 0 && llen == 0 { + return + } + + buf.WriteString("|#") + + _, _ = buf.WriteString(rencoded) + + if rlen != 0 && llen != 0 { + buf.WriteRune(',') + } + + _, _ = buf.WriteString(lencoded) } diff --git a/exporters/metric/dogstatsd/dogstatsd_test.go b/exporters/metric/dogstatsd/dogstatsd_test.go index 5ec9e49cb9c..acfb346fcd1 100644 --- a/exporters/metric/dogstatsd/dogstatsd_test.go +++ b/exporters/metric/dogstatsd/dogstatsd_test.go @@ -31,28 +31,71 @@ import ( ) // TestDogstatsLabels that labels are formatted in the correct style, -// whether or not the provided labels were encoded by a statsd label -// encoder. +// including Resources. func TestDogstatsLabels(t *testing.T) { - encoder := dogstatsd.NewLabelEncoder(resource.New(key.String("R", "S"))) - ctx := context.Background() - checkpointSet := test.NewCheckpointSet(encoder) + type testCase struct { + name string + resources []core.KeyValue + labels []core.KeyValue + expected string + } - desc := metric.NewDescriptor("test.name", metric.CounterKind, core.Int64NumberKind) - cagg := sum.New() - _ = cagg.Update(ctx, core.NewInt64Number(123), &desc) - cagg.Checkpoint(ctx, &desc) + kvs := func(kvs ...core.KeyValue) []core.KeyValue { return kvs } - checkpointSet.Add(&desc, cagg, key.New("A").String("B")) + cases := []testCase{ + { + name: "no labels", + resources: nil, + labels: nil, + expected: "test.name:123|c\n", + }, + { + name: "only resources", + resources: kvs(key.String("R", "S")), + labels: nil, + expected: "test.name:123|c|#R:S\n", + }, + { + name: "only labels", + resources: nil, + labels: kvs(key.String("A", "B")), + expected: "test.name:123|c|#A:B\n", + }, + { + name: "both resources and labels", + resources: kvs(key.String("R", "S")), + labels: kvs(key.String("A", "B")), + expected: "test.name:123|c|#R:S,A:B\n", + }, + { + resources: kvs(key.String("A", "R")), + labels: kvs(key.String("A", "B")), + expected: "test.name:123|c|#A:R,A:B\n", + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + res := resource.New(tc.resources...) + ctx := context.Background() + checkpointSet := test.NewCheckpointSet() - var buf bytes.Buffer - exp, err := dogstatsd.NewRawExporter(dogstatsd.Config{ - Writer: &buf, - }) - require.Nil(t, err) + desc := metric.NewDescriptor("test.name", metric.CounterKind, core.Int64NumberKind) + cagg := sum.New() + _ = cagg.Update(ctx, core.NewInt64Number(123), &desc) + cagg.Checkpoint(ctx, &desc) - err = exp.Export(ctx, checkpointSet) - require.Nil(t, err) + checkpointSet.Add(&desc, cagg, tc.labels...) - require.Equal(t, "test.name:123|c|#R:S,A:B\n", buf.String()) + var buf bytes.Buffer + exp, err := dogstatsd.NewRawExporter(dogstatsd.Config{ + Writer: &buf, + }) + require.Nil(t, err) + + err = exp.Export(ctx, res, checkpointSet) + require.Nil(t, err) + + require.Equal(t, tc.expected, buf.String()) + }) + } } diff --git a/exporters/metric/dogstatsd/example_test.go b/exporters/metric/dogstatsd/example_test.go index 5f3630fe4c8..4322fbbaf43 100644 --- a/exporters/metric/dogstatsd/example_test.go +++ b/exporters/metric/dogstatsd/example_test.go @@ -25,6 +25,8 @@ import ( "go.opentelemetry.io/contrib/exporters/metric/dogstatsd" "go.opentelemetry.io/otel/api/key" "go.opentelemetry.io/otel/api/metric" + "go.opentelemetry.io/otel/sdk/metric/controller/push" + "go.opentelemetry.io/otel/sdk/resource" ) func ExampleNew() { @@ -60,7 +62,7 @@ func ExampleNew() { // In real code, use the URL field: // // URL: fmt.Sprint("unix://", path), - }, time.Minute) + }, time.Minute, push.WithResource(resource.New(key.String("host", "name")))) if err != nil { log.Fatal("Could not initialize dogstatsd exporter:", err) } @@ -73,7 +75,7 @@ func ExampleNew() { meter := pusher.Meter("example") // Create and update a single counter: - counter := metric.Must(meter).NewInt64Counter("a.counter", metric.WithKeys(key)) + counter := metric.Must(meter).NewInt64Counter("a.counter") counter.Add(ctx, 100, key.String("value")) @@ -83,5 +85,5 @@ func ExampleNew() { wg.Wait() // Output: - // a.counter:100|c|#key:value + // a.counter:100|c|#host:name,key:value } diff --git a/exporters/metric/dogstatsd/go.mod b/exporters/metric/dogstatsd/go.mod index 17cdab6bad5..33254fd053d 100644 --- a/exporters/metric/dogstatsd/go.mod +++ b/exporters/metric/dogstatsd/go.mod @@ -4,5 +4,6 @@ go 1.14 require ( github.com/stretchr/testify v1.5.1 - go.opentelemetry.io/otel v0.4.2 + go.opentelemetry.io/otel v0.4.3 ) + diff --git a/exporters/metric/dogstatsd/go.sum b/exporters/metric/dogstatsd/go.sum index 74c25e4528d..92e2b9ecdfd 100644 --- a/exporters/metric/dogstatsd/go.sum +++ b/exporters/metric/dogstatsd/go.sum @@ -34,8 +34,11 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +go.opentelemetry.io v0.1.0 h1:EANZoRCOP+A3faIlw/iN6YEWoYb1vleZRKm1EvH8T48= go.opentelemetry.io/otel v0.4.2 h1:nT+GOqqRR1cIY92xmo1DeiXLHtIlXH1KLRgnsnhuNrs= go.opentelemetry.io/otel v0.4.2/go.mod h1:OgNpQOjrlt33Ew6Ds0mGjmcTQg/rhUctsbkRdk/g1fw= +go.opentelemetry.io/otel v0.4.3 h1:CroUX/0O1ZDcF0iWOO8gwYFWb5EbdSF0/C1yosO+Vhs= +go.opentelemetry.io/otel v0.4.3/go.mod h1:jzBIgIzK43Iu1BpDAXwqOd6UPsSAk+ewVZ5ofSXw4Ek= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= diff --git a/exporters/metric/dogstatsd/internal/statsd/conn.go b/exporters/metric/dogstatsd/internal/statsd/conn.go index 22363440a18..a6839db4193 100644 --- a/exporters/metric/dogstatsd/internal/statsd/conn.go +++ b/exporters/metric/dogstatsd/internal/statsd/conn.go @@ -51,12 +51,6 @@ type ( // MaxPacketSize this limits the packet size for packet-oriented transports. MaxPacketSize int - // Resource WILL BE REMOVED AFTER RESOURCES ARE ADDED - // TO THE EXPORT APIs. TODO: Remove this when Export() - // passes the Resource. See: - // https://github.com/open-telemetry/opentelemetry-go/pull/640 - Resource *resource.Resource - // TODO support Dial and Write timeouts } @@ -74,7 +68,7 @@ type ( // statsd vs. dogstatsd. Adapter interface { AppendName(export.Record, *bytes.Buffer) - AppendTags(export.Record, *bytes.Buffer) + AppendTags(export.Record, *resource.Resource, *bytes.Buffer) } ) @@ -166,7 +160,7 @@ func dial(endpoint string) (net.Conn, error) { } // Export is common code for any statsd-based metric.Exporter implementation. -func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error { +func (e *Exporter) Export(_ context.Context, resource *resource.Resource, checkpointSet export.CheckpointSet) error { buf := &e.buffer buf.Reset() @@ -181,7 +175,7 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) for pt := 0; pt < pts; pt++ { before := buf.Len() - if err := e.formatMetric(rec, pt, buf); err != nil { + if err := e.formatMetric(rec, resource, pt, buf); err != nil { return err } @@ -249,7 +243,7 @@ func (e *Exporter) countPoints(rec export.Record) (int, error) { // formatMetric formats an individual export record. For some records // this will emit a single statistic, for some it will emit more than // one. -func (e *Exporter) formatMetric(rec export.Record, pos int, buf *bytes.Buffer) error { +func (e *Exporter) formatMetric(rec export.Record, res *resource.Resource, pos int, buf *bytes.Buffer) error { desc := rec.Descriptor() agg := rec.Aggregator() @@ -270,34 +264,34 @@ func (e *Exporter) formatMetric(rec export.Record, pos int, buf *bytes.Buffer) e if err != nil { return err } - e.formatSingleStat(rec, points[pos], format, buf) + e.formatSingleStat(rec, res, points[pos], format, buf) } else if sum, ok := agg.(aggregator.Sum); ok { sum, err := sum.Sum() if err != nil { return err } - e.formatSingleStat(rec, sum, formatCounter, buf) + e.formatSingleStat(rec, res, sum, formatCounter, buf) } else if lv, ok := agg.(aggregator.LastValue); ok { lv, _, err := lv.LastValue() if err != nil { return err } - e.formatSingleStat(rec, lv, formatGauge, buf) + e.formatSingleStat(rec, res, lv, formatGauge, buf) } return nil } // formatSingleStat encodes a single item of statsd data followed by a // newline. -func (e *Exporter) formatSingleStat(rec export.Record, val core.Number, fmtStr string, buf *bytes.Buffer) { +func (e *Exporter) formatSingleStat(rec export.Record, res *resource.Resource, val core.Number, fmtStr string, buf *bytes.Buffer) { e.adapter.AppendName(rec, buf) _, _ = buf.WriteRune(':') writeNumber(buf, val, rec.Descriptor().NumberKind()) _, _ = buf.WriteRune('|') _, _ = buf.WriteString(fmtStr) - e.adapter.AppendTags(rec, buf) + e.adapter.AppendTags(rec, res, buf) _, _ = buf.WriteRune('\n') } diff --git a/exporters/metric/dogstatsd/internal/statsd/conn_test.go b/exporters/metric/dogstatsd/internal/statsd/conn_test.go index 99ef236724d..a39f492449c 100644 --- a/exporters/metric/dogstatsd/internal/statsd/conn_test.go +++ b/exporters/metric/dogstatsd/internal/statsd/conn_test.go @@ -27,31 +27,32 @@ import ( "go.opentelemetry.io/contrib/exporters/metric/dogstatsd/internal/statsd" "go.opentelemetry.io/otel/api/core" "go.opentelemetry.io/otel/api/key" + "go.opentelemetry.io/otel/api/label" "go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/api/unit" "go.opentelemetry.io/otel/exporters/metric/test" export "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/resource" ) // withTagsAdapter tests a dogstatsd-style statsd exporter. type withTagsAdapter struct { - export.LabelEncoder + label.Encoder } func (*withTagsAdapter) AppendName(rec export.Record, buf *bytes.Buffer) { _, _ = buf.WriteString(rec.Descriptor().Name()) } -func (ta *withTagsAdapter) AppendTags(rec export.Record, buf *bytes.Buffer) { - encoded := rec.Labels().Encoded(ta.LabelEncoder) - _, _ = buf.WriteString("++") - _, _ = buf.WriteString(encoded) +func (ta *withTagsAdapter) AppendTags(rec export.Record, _ *resource.Resource, buf *bytes.Buffer) { + _, _ = buf.WriteString("|#") + _, _ = buf.WriteString(rec.Labels().Encoded(ta.Encoder)) } func newWithTagsAdapter() *withTagsAdapter { return &withTagsAdapter{ // Note: This uses non-statsd syntax. (No problem.) - export.NewDefaultLabelEncoder(), + label.DefaultEncoder(), } } @@ -71,7 +72,7 @@ func (*noTagsAdapter) AppendName(rec export.Record, buf *bytes.Buffer) { } } -func (*noTagsAdapter) AppendTags(rec export.Record, buf *bytes.Buffer) { +func (*noTagsAdapter) AppendTags(_ export.Record, _ *resource.Resource, _ *bytes.Buffer) { } func newNoTagsAdapter() *noTagsAdapter { @@ -95,10 +96,10 @@ func TestBasicFormat(t *testing.T) { for _, ao := range []adapterOutput{{ adapter: newWithTagsAdapter(), - expected: `counter:%s|c++A=B,C=D -observer:%s|g++A=B,C=D -measure:%s|h++A=B,C=D -timer:%s|ms++A=B,C=D + expected: `counter:%s|c|#A=B,C=D +observer:%s|g|#A=B,C=D +measure:%s|h|#A=B,C=D +timer:%s|ms|#A=B,C=D `}, { adapter: newNoTagsAdapter(), expected: `counter.B.D:%s|c @@ -126,7 +127,7 @@ timer.B.D:%s|ms t.Fatal("New error: ", err) } - checkpointSet := test.NewCheckpointSet(export.NewDefaultLabelEncoder()) + checkpointSet := test.NewCheckpointSet() cdesc := metric.NewDescriptor( "counter", metric.CounterKind, nkind) gdesc := metric.NewDescriptor( @@ -147,7 +148,7 @@ timer.B.D:%s|ms checkpointSet.AddMeasure(&mdesc, value, labels...) checkpointSet.AddMeasure(&tdesc, value, labels...) - err = exp.Export(ctx, checkpointSet) + err = exp.Export(ctx, nil, checkpointSet) require.Nil(t, err) var vfmt string @@ -288,7 +289,7 @@ func TestPacketSplit(t *testing.T) { t.Fatal("New error: ", err) } - checkpointSet := test.NewCheckpointSet(adapter.LabelEncoder) + checkpointSet := test.NewCheckpointSet() desc := metric.NewDescriptor("counter", metric.CounterKind, core.Int64NumberKind) var expected []string @@ -297,14 +298,14 @@ func TestPacketSplit(t *testing.T) { tcase.setup(func(nkeys int) { labels := makeLabels(offset, nkeys) offset += nkeys - iter := export.LabelSlice(labels).Iter() - encoded := adapter.LabelEncoder.Encode(iter) - expect := fmt.Sprint("counter:100|c++", encoded, "\n") + elabels := label.NewSet(labels...) + encoded := adapter.Encoder.Encode(elabels.Iter()) + expect := fmt.Sprint("counter:100|c|#", encoded, "\n") expected = append(expected, expect) checkpointSet.AddCounter(&desc, 100, labels...) }) - err = exp.Export(ctx, checkpointSet) + err = exp.Export(ctx, nil, checkpointSet) require.Nil(t, err) tcase.check(expected, writer.vec, t) @@ -325,14 +326,14 @@ func TestArraySplit(t *testing.T) { t.Fatal("New error: ", err) } - checkpointSet := test.NewCheckpointSet(adapter.LabelEncoder) + checkpointSet := test.NewCheckpointSet() desc := metric.NewDescriptor("measure", metric.MeasureKind, core.Int64NumberKind) for i := 0; i < 1024; i++ { checkpointSet.AddMeasure(&desc, 100) } - err = exp.Export(ctx, checkpointSet) + err = exp.Export(ctx, nil, checkpointSet) require.Nil(t, err) require.Greater(t, len(writer.vec), 1) diff --git a/exporters/metric/dogstatsd/labels.go b/exporters/metric/dogstatsd/labels.go index a515b02eba2..b62b5dad2bf 100644 --- a/exporters/metric/dogstatsd/labels.go +++ b/exporters/metric/dogstatsd/labels.go @@ -16,12 +16,10 @@ package dogstatsd import ( "bytes" - "sort" "sync" "go.opentelemetry.io/otel/api/core" - export "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/api/label" ) // LabelEncoder encodes metric labels in the dogstatsd syntax. @@ -31,40 +29,30 @@ import ( // // https://github.com/stripe/veneur/blob/master/sinks/datadog/datadog.go type LabelEncoder struct { - pool sync.Pool - resources []core.KeyValue + pool sync.Pool } -var _ export.LabelEncoder = &LabelEncoder{} -var leID = export.NewLabelEncoderID() +var _ label.Encoder = &LabelEncoder{} +var leID = label.NewEncoderID() // NewLabelEncoder returns a new encoder for dogstatsd-syntax metric // labels. -func NewLabelEncoder(resource *resource.Resource) *LabelEncoder { - attrs := resource.Attributes() - sort.Slice(attrs[:], func(i, j int) bool { - return attrs[i].Key < attrs[j].Key - }) - +func NewLabelEncoder() *LabelEncoder { return &LabelEncoder{ pool: sync.Pool{ New: func() interface{} { return &bytes.Buffer{} }, }, - resources: attrs, } } // Encode emits a string like "|#key1:value1,key2:value2". -func (e *LabelEncoder) Encode(iter export.LabelIterator) string { +func (e *LabelEncoder) Encode(iter label.Iterator) string { buf := e.pool.Get().(*bytes.Buffer) defer e.pool.Put(buf) buf.Reset() - for _, kv := range e.resources { - e.encodeOne(buf, kv) - } for iter.Next() { e.encodeOne(buf, iter.Label()) } @@ -72,9 +60,7 @@ func (e *LabelEncoder) Encode(iter export.LabelIterator) string { } func (e *LabelEncoder) encodeOne(buf *bytes.Buffer, kv core.KeyValue) { - if buf.Len() == 0 { - _, _ = buf.WriteString("|#") - } else { + if buf.Len() != 0 { _, _ = buf.WriteRune(',') } _, _ = buf.WriteString(string(kv.Key)) @@ -82,6 +68,6 @@ func (e *LabelEncoder) encodeOne(buf *bytes.Buffer, kv core.KeyValue) { _, _ = buf.WriteString(kv.Value.Emit()) } -func (*LabelEncoder) ID() int64 { +func (*LabelEncoder) ID() label.EncoderID { return leID } diff --git a/exporters/metric/dogstatsd/labels_test.go b/exporters/metric/dogstatsd/labels_test.go index 1d2c7a113d5..63f67370c11 100644 --- a/exporters/metric/dogstatsd/labels_test.go +++ b/exporters/metric/dogstatsd/labels_test.go @@ -22,8 +22,7 @@ import ( "go.opentelemetry.io/contrib/exporters/metric/dogstatsd" "go.opentelemetry.io/otel/api/core" "go.opentelemetry.io/otel/api/key" - export "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/api/label" ) var testLabels = []core.KeyValue{ @@ -38,20 +37,20 @@ var testResources = []core.KeyValue{ } func TestLabelSyntax(t *testing.T) { - encoder := dogstatsd.NewLabelEncoder(resource.New()) + encoder := dogstatsd.NewLabelEncoder() - require.Equal(t, `|#A:B,C:D,E:1.5`, encoder.Encode(export.LabelSlice(testLabels).Iter())) + labels := label.NewSet(testLabels...) + require.Equal(t, `A:B,C:D,E:1.5`, encoder.Encode(labels.Iter())) kvs := []core.KeyValue{ key.String("A", "B"), } - require.Equal(t, `|#A:B`, encoder.Encode(export.LabelSlice(kvs).Iter())) + labels = label.NewSet(kvs...) + require.Equal(t, `A:B`, encoder.Encode(labels.Iter())) - require.Equal(t, "", encoder.Encode(export.LabelSlice(nil).Iter())) -} - -func TestLabelResources(t *testing.T) { - encoder := dogstatsd.NewLabelEncoder(resource.New(testResources...)) + labels = label.NewSet() + require.Equal(t, "", encoder.Encode(labels.Iter())) - require.Equal(t, `|#R1:V1,R2:V2,A:B,C:D,E:1.5`, encoder.Encode(export.LabelSlice(testLabels).Iter())) + labels = label.Set{} + require.Equal(t, "", encoder.Encode(labels.Iter())) }