Skip to content

Commit b27f3c9

Browse files
elekElek, Márton
authored and
Elek, Márton
committed
eventstat: publisher to send out telemetry compatible UDP packages
Biggest part of this patch is just moving code out from telemetry/client.go to it them reusable. Other important part is the eventstat/publisher.go which re-uses the existing code from telemetry/client.go to send out UDP packages in the same format. Change-Id: I68be36017bcb4ef010bacf8b8d843d541150ded9
1 parent ee3a77c commit b27f3c9

File tree

7 files changed

+367
-156
lines changed

7 files changed

+367
-156
lines changed

eventstat/publisher.go

+124
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
// Copyright (C) 2022 Storj Labs, Inc.
2+
// See LICENSE for copying information.
3+
4+
package eventstat
5+
6+
import (
7+
"context"
8+
"os"
9+
"strings"
10+
"time"
11+
12+
"storj.io/common/telemetry"
13+
)
14+
15+
// ClientOpts allows you to set Client Options.
16+
type ClientOpts struct {
17+
// Interval is how frequently stats from the provided Registry will be
18+
// sent up. Note that this interval is "jittered", so the actual interval
19+
// is taken from a normal distribution with a mean of Interval and a
20+
// variance of Interval/4. Defaults to DefaultInterval.
21+
Interval time.Duration
22+
23+
// Application is the application name, usually prepended to metric names.
24+
// By default it will be os.Args[0].
25+
Application string
26+
27+
// Instance is a string that identifies this particular server. Could be a
28+
// node id, but defaults to the result of DefaultInstanceId().
29+
Instance string
30+
31+
// PacketSize controls how we fragment the data as it goes out in UDP
32+
// packets. Defaults to DefaultPacketSize.
33+
PacketSize int
34+
}
35+
36+
// UDPPublisher is an eventstat telemetry client for sending UDP packets at a regular interval.
37+
type UDPPublisher struct {
38+
reporter *telemetry.Reporter
39+
}
40+
41+
// NewUDPPublisher constructs a telemetry client that sends packets to remoteAddr
42+
// over UDP.
43+
func NewUDPPublisher(remoteAddr string, registry *Registry, opts ClientOpts) (rv *UDPPublisher, err error) {
44+
if opts.Interval == 0 {
45+
opts.Interval = telemetry.DefaultInterval
46+
}
47+
if opts.Application == "" {
48+
if len(os.Args) > 0 {
49+
opts.Application = os.Args[0]
50+
} else {
51+
// what the actual heck
52+
opts.Application = telemetry.DefaultApplication
53+
}
54+
}
55+
if opts.Instance == "" {
56+
opts.Instance = telemetry.DefaultInstanceID()
57+
}
58+
if opts.PacketSize == 0 {
59+
opts.PacketSize = telemetry.DefaultPacketSize
60+
}
61+
62+
udpOptions := telemetry.Options{
63+
Application: opts.Application,
64+
InstanceID: []byte(opts.Instance),
65+
Address: remoteAddr,
66+
PacketSize: opts.PacketSize,
67+
}
68+
reporter, err := telemetry.NewReporter(opts.Interval, func(ctx context.Context) error {
69+
return telemetry.Send(ctx, udpOptions, func(publishEntry func(key string, value float64)) {
70+
registry.PublishAndReset(func(name string, tags Tags, value float64) {
71+
telemetryKey := telemetryKey(name, tags)
72+
publishEntry(telemetryKey, value)
73+
})
74+
})
75+
})
76+
if err != nil {
77+
return nil, err
78+
}
79+
return &UDPPublisher{
80+
reporter: reporter,
81+
}, nil
82+
}
83+
84+
func telemetryKey(name string, tags Tags) string {
85+
builder := strings.Builder{}
86+
writeTag(&builder, name)
87+
if len(tags) > 0 {
88+
builder.WriteString(",")
89+
builder.WriteString(tags.String())
90+
}
91+
builder.WriteString(" value")
92+
telemetryKey := builder.String()
93+
return telemetryKey
94+
}
95+
96+
// Run calls Report roughly every Interval.
97+
func (c *UDPPublisher) Run(ctx context.Context) {
98+
c.reporter.Run(ctx)
99+
}
100+
101+
// publish sends out message immediately independent on Interval.
102+
func (c *UDPPublisher) publish(ctx context.Context) error {
103+
return c.reporter.Publish(ctx)
104+
}
105+
106+
// writeTag writes a tag key, value, or field key to the builder.
107+
func writeTag(builder *strings.Builder, tag string) {
108+
if strings.IndexByte(tag, ',') == -1 &&
109+
strings.IndexByte(tag, '=') == -1 &&
110+
strings.IndexByte(tag, ' ') == -1 {
111+
112+
builder.WriteString(tag)
113+
return
114+
}
115+
116+
for i := 0; i < len(tag); i++ {
117+
if tag[i] == ',' ||
118+
tag[i] == '=' ||
119+
tag[i] == ' ' {
120+
builder.WriteByte('\\')
121+
}
122+
builder.WriteByte(tag[i])
123+
}
124+
}

eventstat/publisher_test.go

+94
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// Copyright (C) 2022 Storj Labs, Inc.
2+
// See LICENSE for copying information.
3+
4+
package eventstat
5+
6+
import (
7+
"fmt"
8+
"runtime"
9+
"testing"
10+
"time"
11+
12+
"github.com/stretchr/testify/assert"
13+
"github.com/stretchr/testify/require"
14+
15+
"storj.io/common/telemetry"
16+
"storj.io/common/testcontext"
17+
)
18+
19+
func TestMetrics(t *testing.T) {
20+
ctx := testcontext.New(t)
21+
22+
if runtime.GOOS == "windows" {
23+
// TODO(windows): currently closing doesn't seem to be shutting down the server
24+
t.Skip("broken")
25+
}
26+
27+
s, err := telemetry.Listen("127.0.0.1:0")
28+
assert.NoError(t, err)
29+
defer func() { _ = s.Close() }()
30+
31+
r := Registry{}
32+
counter := r.NewTagCounter("http_user_agent", "agent")
33+
fmt.Println(s.Addr())
34+
c, err := NewUDPPublisher(s.Addr(), &r, ClientOpts{
35+
Application: "testapp",
36+
Instance: "testinst",
37+
Interval: 10 * time.Millisecond,
38+
})
39+
require.NoError(t, err)
40+
41+
counter("aws")
42+
counter("aws")
43+
counter("aws")
44+
counter("rclone")
45+
46+
err = c.publish(ctx)
47+
require.NoError(t, err)
48+
49+
expectedMetric := 4
50+
51+
keys := make(chan string, expectedMetric)
52+
values := make(chan float64, expectedMetric)
53+
defer close(keys)
54+
defer close(values)
55+
56+
ctx.Go(func() error {
57+
fmt.Println("Listening on " + s.Addr())
58+
// note: this is the telemetry server which guarantees that our sender is still compatible with the format
59+
_ = s.Serve(ctx, telemetry.HandlerFunc(
60+
func(application, instance string, key []byte, val float64) {
61+
assert.Equal(t, application, "testapp")
62+
assert.Equal(t, instance, "testinst")
63+
keys <- string(key)
64+
values <- val
65+
}))
66+
return nil
67+
})
68+
69+
for i := 0; i < expectedMetric; i++ {
70+
key := <-keys
71+
value := <-values
72+
73+
switch key {
74+
case "http_user_agent_count,agent=aws value":
75+
assert.Equal(t, float64(3), value)
76+
case "http_user_agent_count,agent=rclone value":
77+
assert.Equal(t, float64(1), value)
78+
case "http_user_agent_discarded value":
79+
assert.Equal(t, float64(0), value)
80+
case "http_user_agent_buckets value":
81+
assert.Equal(t, float64(2), value)
82+
default:
83+
require.Failf(t, "Unexpected UDP metric", "key=%s", key)
84+
}
85+
86+
}
87+
88+
}
89+
90+
func TestTelemetryKey(t *testing.T) {
91+
assert.Equal(t, "key1 value", telemetryKey("key1", Tags{}))
92+
assert.Equal(t, "key2,foo=bar value", telemetryKey("key2", Tags{"foo": "bar"}))
93+
assert.Equal(t, "key3,f\\==ba\\,r value", telemetryKey("key3", Tags{"f=": "ba,r"}))
94+
}

eventstat/registry.go

+15
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,27 @@
44
package eventstat
55

66
import (
7+
"strings"
78
"sync"
89
)
910

1011
// Tags represent key/values for any event.
1112
type Tags map[string]string
1213

14+
func (t *Tags) String() string {
15+
var builder strings.Builder
16+
for k, v := range *t {
17+
if builder.Len() > 0 {
18+
builder.WriteString(",")
19+
}
20+
writeTag(&builder, k)
21+
builder.WriteString("=")
22+
writeTag(&builder, v)
23+
24+
}
25+
return builder.String()
26+
}
27+
1328
// Publisher is a function which sends out statistics.
1429
type Publisher func(name string, tags Tags, value float64)
1530

eventstat/registry_test.go

+7
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,13 @@ func TestRegistry_WithLimit(t *testing.T) {
7575
}, p.sortedEvents())
7676
}
7777

78+
func TestTags(t *testing.T) {
79+
require.Equal(t, "foo=bar", (&eventstat.Tags{"foo": "bar"}).String())
80+
twoKeys := (&eventstat.Tags{"foo1": "bar", "foo2": "bar"}).String()
81+
require.True(t, twoKeys == "foo1=bar,foo2=bar" || twoKeys == "foo2=bar,foo1=bar")
82+
require.Equal(t, "f\\=oo=b\\,a\\ r", (&eventstat.Tags{"f=oo": "b,a r"}).String())
83+
}
84+
7885
func BenchmarkRegistry(b *testing.B) {
7986
b.ReportAllocs()
8087
r := eventstat.Registry{}

0 commit comments

Comments
 (0)