diff --git a/.chloggen/tcplog-receiver-new-metrics.yaml b/.chloggen/tcplog-receiver-new-metrics.yaml new file mode 100644 index 0000000000000..8e917d62e9844 --- /dev/null +++ b/.chloggen/tcplog-receiver-new-metrics.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: receiver/tcplog + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add new metrics to track incoming connections and payload size in the tcplog receiver + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [45146] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/pkg/stanza/operator/input/tcp/config.go b/pkg/stanza/operator/input/tcp/config.go index 7defea5754c4b..efdb2c7bc7e01 100644 --- a/pkg/stanza/operator/input/tcp/config.go +++ b/pkg/stanza/operator/input/tcp/config.go @@ -14,6 +14,7 @@ import ( "github.com/jpillora/backoff" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtls" + "go.opentelemetry.io/otel/metric" "golang.org/x/text/encoding" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/textutils" @@ -72,6 +73,11 @@ type BaseConfig struct { SplitConfig split.Config `mapstructure:"multiline,omitempty"` TrimConfig trim.Config `mapstructure:",squash"` SplitFuncBuilder SplitFuncBuilder `mapstructure:"-"` + Metrics MetricsConfig `mapstructure:"metrics,omitempty"` +} + +type MetricsConfig struct { + Enabled bool `mapstructure:"enabled,omitempty"` } type SplitFuncBuilder func(enc encoding.Encoding) (bufio.SplitFunc, error) @@ -122,10 +128,43 @@ func (c Config) Build(set component.TelemetrySettings) (operator.Operator, error splitFunc = trim.WithFunc(splitFunc, c.TrimConfig.Func()) var resolver *helper.IPResolver - if c.AddAttributes { + if c.AddAttributes || c.Metrics.Enabled { resolver = helper.NewIPResolver() } + var ( + metricPayloadSize metric.Int64Histogram + metricConnectionsCreated metric.Int64Counter + metricConnectionsClosed metric.Int64Counter + ) + + if c.Metrics.Enabled { + meter := set.MeterProvider.Meter("github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/tcp") + if metricPayloadSize, err = meter.Int64Histogram( + "otelcol_tcplog_receiver_payload_size_bytes", + metric.WithDescription("Size of the payload size received by the tcp log receiver"), + metric.WithUnit("bytes"), + metric.WithExplicitBucketBoundaries(64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536, + 524288, 1048576, 2097152, 4194304, 8388608, 16777216, 33554432, 67108864, 134217728), // 64 bytes to 128MB + ); err != nil { + return nil, err + } + + if metricConnectionsCreated, err = meter.Int64Counter( + "otelcol_tcplog_receiver_connections_created_total", + metric.WithDescription("Total number of connections created by the tcp log receiver"), + ); err != nil { + return nil, err + } + + if metricConnectionsClosed, err = meter.Int64Counter( + "otelcol_tcplog_receiver_connections_closed_total", + metric.WithDescription("Total number of connections closed by the tcp log receiver"), + ); err != nil { + return nil, err + } + } + tcpInput := &Input{ InputOperator: inputOperator, address: c.ListenAddress, @@ -138,6 +177,10 @@ func (c Config) Build(set component.TelemetrySettings) (operator.Operator, error Max: 3 * time.Second, }, resolver: resolver, + + metricPayloadSize: metricPayloadSize, + metricConnectionsCreated: metricConnectionsCreated, + metricConnectionsClosed: metricConnectionsClosed, } if c.TLS != nil { diff --git a/pkg/stanza/operator/input/tcp/input.go b/pkg/stanza/operator/input/tcp/input.go index 77c60cc2fab04..d7f4c1e901d1b 100644 --- a/pkg/stanza/operator/input/tcp/input.go +++ b/pkg/stanza/operator/input/tcp/input.go @@ -17,6 +17,8 @@ import ( "time" "github.com/jpillora/backoff" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "go.uber.org/zap" "golang.org/x/text/encoding" @@ -42,6 +44,10 @@ type Input struct { encoding encoding.Encoding splitFunc bufio.SplitFunc resolver *helper.IPResolver + + metricPayloadSize metric.Int64Histogram + metricConnectionsCreated metric.Int64Counter + metricConnectionsClosed metric.Int64Counter } // Start will start listening for log entries over tcp. @@ -78,7 +84,7 @@ func (i *Input) configureListener() error { return nil } -// goListenn will listen for tcp connections. +// goListen will listen for tcp connections. func (i *Input) goListen(ctx context.Context) { i.wg.Add(1) @@ -100,6 +106,13 @@ func (i *Input) goListen(ctx context.Context) { i.backoff.Reset() i.Logger().Debug("Received connection", zap.String("address", conn.RemoteAddr().String())) + + if i.metricConnectionsCreated != nil { + _, _, remoteHostname := i.getAddrAttributes(conn.RemoteAddr()) + hostAttr := attribute.KeyValue{Key: "client.address", Value: attribute.StringValue(remoteHostname)} + i.metricConnectionsCreated.Add(ctx, 1, metric.WithAttributes(hostAttr)) + } + subctx, cancel := context.WithCancel(ctx) i.goHandleClose(subctx, conn) i.goHandleMessages(subctx, conn, cancel) @@ -117,11 +130,17 @@ func (i *Input) goHandleClose(ctx context.Context, conn net.Conn) { i.Logger().Debug("Closing connection", zap.String("address", conn.RemoteAddr().String())) if err := conn.Close(); err != nil { i.Logger().Error("Failed to close connection", zap.Error(err)) + return + } + if i.metricConnectionsClosed != nil { + _, _, remoteHostname := i.getAddrAttributes(conn.RemoteAddr()) + hostAttr := attribute.KeyValue{Key: "client.address", Value: attribute.StringValue(remoteHostname)} + i.metricConnectionsClosed.Add(ctx, 1, metric.WithAttributes(hostAttr)) } }() } -// goHandleMessages will handles messages from a tcp connection. +// goHandleMessages will handle messages from a tcp connection. func (i *Input) goHandleMessages(ctx context.Context, conn net.Conn, cancel context.CancelFunc) { i.wg.Add(1) @@ -159,6 +178,9 @@ func (i *Input) goHandleMessages(ctx context.Context, conn net.Conn, cancel cont } func (i *Input) handleMessage(ctx context.Context, conn net.Conn, dec *encoding.Decoder, log []byte) { + if i.metricPayloadSize != nil { + i.metricPayloadSize.Record(ctx, int64(len(log))) + } decoded, err := textutils.DecodeAsString(dec, log) if err != nil { i.Logger().Error("Failed to decode data", zap.Error(err)) @@ -173,18 +195,16 @@ func (i *Input) handleMessage(ctx context.Context, conn net.Conn, dec *encoding. if i.addAttributes { entry.AddAttribute("net.transport", "IP.TCP") - if addr, ok := conn.RemoteAddr().(*net.TCPAddr); ok { - ip := addr.IP.String() + if ip, port, name := i.getAddrAttributes(conn.RemoteAddr()); ip != "" { entry.AddAttribute("net.peer.ip", ip) - entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10)) - entry.AddAttribute("net.peer.name", i.resolver.GetHostFromIP(ip)) + entry.AddAttribute("net.peer.port", port) + entry.AddAttribute("net.peer.name", name) } - if addr, ok := conn.LocalAddr().(*net.TCPAddr); ok { - ip := addr.IP.String() - entry.AddAttribute("net.host.ip", addr.IP.String()) - entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10)) - entry.AddAttribute("net.host.name", i.resolver.GetHostFromIP(ip)) + if ip, port, name := i.getAddrAttributes(conn.LocalAddr()); ip != "" { + entry.AddAttribute("net.host.ip", ip) + entry.AddAttribute("net.host.port", port) + entry.AddAttribute("net.host.name", name) } } @@ -194,6 +214,16 @@ func (i *Input) handleMessage(ctx context.Context, conn net.Conn, dec *encoding. } } +func (i *Input) getAddrAttributes(addr net.Addr) (ip, port, name string) { + if tcpAddr, ok := addr.(*net.TCPAddr); ok { + ip = tcpAddr.IP.String() + port = strconv.FormatInt(int64(tcpAddr.Port), 10) + name = i.resolver.GetHostFromIP(tcpAddr.IP.String()) + return ip, port, name + } + return "", "", "" +} + func truncateMaxLog(data []byte, maxLogSize int) (token []byte) { if len(data) >= maxLogSize { return data[:maxLogSize] diff --git a/pkg/stanza/operator/input/tcp/input_test.go b/pkg/stanza/operator/input/tcp/input_test.go index 0d1b12bcd8381..b6cdb57e3f2a0 100644 --- a/pkg/stanza/operator/input/tcp/input_test.go +++ b/pkg/stanza/operator/input/tcp/input_test.go @@ -10,6 +10,7 @@ import ( "net" "os" "strconv" + "strings" "testing" "time" @@ -18,6 +19,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configtls" + "go.opentelemetry.io/otel/sdk/metric/metricdata" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" @@ -89,6 +91,10 @@ func tcpInputTest(input []byte, expected []string) func(t *testing.T) { tcpInput := op.(*Input) tcpInput.OutputOperators = []operator.Operator{&mockOutput} + require.Nil(t, tcpInput.metricConnectionsCreated, "metricConnectionsCreated should be nil when metrics are disabled") + require.Nil(t, tcpInput.metricConnectionsClosed, "metricConnectionsClosed should be nil when metrics are disabled") + require.Nil(t, tcpInput.metricPayloadSize, "metricPayloadSize should be nil when metrics are disabled") + entryChan := make(chan *entry.Entry, 1) mockOutput.On("Process", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { entryChan <- args.Get(1).(*entry.Entry) @@ -163,18 +169,12 @@ func tcpInputAttributesTest(input []byte, expected []string) func(t *testing.T) expectedAttributes := map[string]any{ "net.transport": "IP.TCP", } - if addr, ok := conn.RemoteAddr().(*net.TCPAddr); ok { - ip := addr.IP.String() - expectedAttributes["net.host.ip"] = addr.IP.String() - expectedAttributes["net.host.port"] = strconv.FormatInt(int64(addr.Port), 10) - expectedAttributes["net.host.name"] = tcpInput.resolver.GetHostFromIP(ip) - } - if addr, ok := conn.LocalAddr().(*net.TCPAddr); ok { - ip := addr.IP.String() - expectedAttributes["net.peer.ip"] = ip - expectedAttributes["net.peer.port"] = strconv.FormatInt(int64(addr.Port), 10) - expectedAttributes["net.peer.name"] = tcpInput.resolver.GetHostFromIP(ip) - } + expectedAttributes["net.host.ip"], + expectedAttributes["net.host.port"], + expectedAttributes["net.host.name"] = tcpInput.getAddrAttributes(conn.RemoteAddr()) + expectedAttributes["net.peer.ip"], + expectedAttributes["net.peer.port"], + expectedAttributes["net.peer.name"] = tcpInput.getAddrAttributes(conn.LocalAddr()) require.Equal(t, expectedMessage, entry.Body) require.Equal(t, expectedAttributes, entry.Attributes) case <-time.After(time.Second): @@ -262,6 +262,84 @@ func tlsInputTest(input []byte, expected []string) func(t *testing.T) { } } +func tcpInputMetricsTest(input []byte, expected []string) func(t *testing.T) { + return func(t *testing.T) { + cfg := NewConfigWithID("test_id") + cfg.ListenAddress = ":0" + cfg.Metrics.Enabled = true + + tt := componenttest.NewTelemetry() + set := tt.NewTelemetrySettings() + op, err := cfg.Build(set) + require.NoError(t, err) + + mockOutput := testutil.Operator{} + tcpInput := op.(*Input) + tcpInput.OutputOperators = []operator.Operator{&mockOutput} + + entryChan := make(chan *entry.Entry, 1) + mockOutput.On("Process", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + entryChan <- args.Get(1).(*entry.Entry) + }).Return(nil) + + checkCounterMetricValue := func(metricName string, expectedValue int64) { + metricObj, errM := tt.GetMetric(metricName) + require.NoError(t, errM) + require.NotNil(t, metricObj) + + data := metricObj.Data.(metricdata.Sum[int64]) + require.Equal(t, expectedValue, data.DataPoints[0].Value, "metric %s has unexpected value", metricName) + } + + err = tcpInput.Start(testutil.NewUnscopedMockPersister()) + require.NoError(t, err) + defer func() { + require.NoError(t, tcpInput.Stop(), "expected to stop tcp input operator without error") + checkCounterMetricValue("otelcol_tcplog_receiver_connections_closed_total", 1) + }() + + conn, err := net.Dial("tcp", tcpInput.listener.Addr().String()) + require.NoError(t, err) + defer conn.Close() + + require.NotNil(t, tcpInput.metricConnectionsCreated, "metricConnectionsCreated should be initialized") + require.NotNil(t, tcpInput.metricConnectionsClosed, "metricConnectionsClosed should be initialized") + require.NotNil(t, tcpInput.metricPayloadSize, "metricPayloadSize should be initialized") + + _, err = conn.Write(input) + require.NoError(t, err) + + for _, expectedMessage := range expected { + select { + case entry := <-entryChan: + require.Equal(t, expectedMessage, entry.Body) + case <-time.After(time.Second): + require.FailNow(t, "Timed out waiting for message to be written") + } + } + + select { + case entry := <-entryChan: + require.FailNow(t, fmt.Sprintf("Unexpected entry: %s", entry)) + case <-time.After(100 * time.Millisecond): + break + } + + // Check payload size metric value + { + metricObj, err := tt.GetMetric("otelcol_tcplog_receiver_payload_size_bytes") + require.NoError(t, err) + require.NotNil(t, metricObj) + + data := metricObj.Data.(metricdata.Histogram[int64]) + require.Equal(t, uint64(1), data.DataPoints[0].Count, "metric otelcol_tcplog_receiver_payload_size_bytes has unexpected count") + require.Equal(t, int64(len(strings.TrimSpace(string(input)))), data.DataPoints[0].Sum, "metric otelcol_tcplog_receiver_payload_size_bytes has unexpected sum") + } + + checkCounterMetricValue("otelcol_tcplog_receiver_connections_created_total", 1) + } +} + func TestBuild(t *testing.T) { cases := []struct { name string @@ -375,6 +453,11 @@ func TestTLSTCPInput(t *testing.T) { t.Run("CarriageReturn", tlsInputTest([]byte("message\r\n"), []string{"message"})) } +func TestTCPInputMetrics(t *testing.T) { + t.Run("Simple", tcpInputMetricsTest([]byte("message\n"), []string{"message"})) + t.Run("CarriageReturn", tcpInputMetricsTest([]byte("message\r\n"), []string{"message"})) +} + func TestFailToBind(t *testing.T) { ip := "localhost" port := 0