Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/tcplog-receiver-new-metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
component: receiver/tcplog

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add new metrics to track incoming connections and payload size in the tcplog receiver

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [45146]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
45 changes: 44 additions & 1 deletion pkg/stanza/operator/input/tcp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/jpillora/backoff"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/otel/metric"
"golang.org/x/text/encoding"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/textutils"
Expand Down Expand Up @@ -72,6 +73,11 @@ type BaseConfig struct {
SplitConfig split.Config `mapstructure:"multiline,omitempty"`
TrimConfig trim.Config `mapstructure:",squash"`
SplitFuncBuilder SplitFuncBuilder `mapstructure:"-"`
Metrics MetricsConfig `mapstructure:"metrics,omitempty"`
}

type MetricsConfig struct {
Enabled bool `mapstructure:"enabled,omitempty"`
}

type SplitFuncBuilder func(enc encoding.Encoding) (bufio.SplitFunc, error)
Expand Down Expand Up @@ -122,10 +128,43 @@ func (c Config) Build(set component.TelemetrySettings) (operator.Operator, error
splitFunc = trim.WithFunc(splitFunc, c.TrimConfig.Func())

var resolver *helper.IPResolver
if c.AddAttributes {
if c.AddAttributes || c.Metrics.Enabled {
resolver = helper.NewIPResolver()
}

var (
metricPayloadSize metric.Int64Histogram
metricConnectionsCreated metric.Int64Counter
metricConnectionsClosed metric.Int64Counter
)

if c.Metrics.Enabled {
meter := set.MeterProvider.Meter("github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/tcp")
if metricPayloadSize, err = meter.Int64Histogram(
"otelcol_tcplog_receiver_payload_size_bytes",

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"otelcol_tcplog_receiver_payload_size_bytes",
"otelcol_receiver_payload_size_bytes",

Or to follow semantic conventions on namespacing:

Suggested change
"otelcol_tcplog_receiver_payload_size_bytes",
"otel.col.receiver.payload.size",

Or following additional semconv patterns

Suggested change
"otelcol_tcplog_receiver_payload_size_bytes",
"otel.col.receiver.network.io",

We could even have if we add otel.component.type attribute capturing that it is a collector reciever.

Suggested change
"otelcol_tcplog_receiver_payload_size_bytes",
"otel.col.network.io",

All options use attributes to identify the reciever ie otel.component.name as well as the network direction.

metric.WithDescription("Size of the payload size received by the tcp log receiver"),
metric.WithUnit("bytes"),
metric.WithExplicitBucketBoundaries(64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536,
524288, 1048576, 2097152, 4194304, 8388608, 16777216, 33554432, 67108864, 134217728), // 64 bytes to 128MB
); err != nil {
return nil, err
}

if metricConnectionsCreated, err = meter.Int64Counter(
"otelcol_tcplog_receiver_connections_created_total",

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"otelcol_tcplog_receiver_connections_created_total",
"otelcol_receiver_connections_created_total",

Or to follow semantic conventions on namespacing:

Suggested change
"otelcol_tcplog_receiver_connections_created_total",
"otel.col.receiver.network.connection.created",

Or even which fits better with other conventions and allows consolidation.

Suggested change
"otelcol_tcplog_receiver_connections_created_total",
"otel.col.receiver.network.connection.status",

With network.connection.state attribute = established

We could even have if we add otel.component.type attribute capturing that it is a collector reciever.

Suggested change
"otelcol_tcplog_receiver_connections_created_total",
"otel.col.network.connection.status",

All options use attributes to identify the reciever ie otel.component.name

metric.WithDescription("Total number of connections created by the tcp log receiver"),
); err != nil {
return nil, err
}

if metricConnectionsClosed, err = meter.Int64Counter(
"otelcol_tcplog_receiver_connections_closed_total",

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"otelcol_tcplog_receiver_connections_closed_total",
"otelcol_receiver_connections_created_total",

Or to follow semantic conventions on namespacing:

Suggested change
"otelcol_tcplog_receiver_connections_closed_total",
"otel.col.receiver.network.connection.closed",

Or even which fits better with other conventions and allows consolidation.

Suggested change
"otelcol_tcplog_receiver_connections_closed_total",
"otel.col.receiver.network.connection.status",

With network.connection.state attribute = closed

We could even have if we add otel.component.type attribute capturing that it is a collector reciever.

Suggested change
"otelcol_tcplog_receiver_connections_closed_total",
"otel.col.network.connection.status",

All options use attributes to identify the reciever ie otel.component.name

metric.WithDescription("Total number of connections closed by the tcp log receiver"),
); err != nil {
return nil, err
}
}

tcpInput := &Input{
InputOperator: inputOperator,
address: c.ListenAddress,
Expand All @@ -138,6 +177,10 @@ func (c Config) Build(set component.TelemetrySettings) (operator.Operator, error
Max: 3 * time.Second,
},
resolver: resolver,

metricPayloadSize: metricPayloadSize,
metricConnectionsCreated: metricConnectionsCreated,
metricConnectionsClosed: metricConnectionsClosed,
}

if c.TLS != nil {
Expand Down
52 changes: 41 additions & 11 deletions pkg/stanza/operator/input/tcp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"time"

"github.com/jpillora/backoff"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
"golang.org/x/text/encoding"

Expand All @@ -42,6 +44,10 @@ type Input struct {
encoding encoding.Encoding
splitFunc bufio.SplitFunc
resolver *helper.IPResolver

metricPayloadSize metric.Int64Histogram
metricConnectionsCreated metric.Int64Counter
metricConnectionsClosed metric.Int64Counter
}

// Start will start listening for log entries over tcp.
Expand Down Expand Up @@ -78,7 +84,7 @@ func (i *Input) configureListener() error {
return nil
}

// goListenn will listen for tcp connections.
// goListen will listen for tcp connections.
func (i *Input) goListen(ctx context.Context) {
i.wg.Add(1)

Expand All @@ -100,6 +106,13 @@ func (i *Input) goListen(ctx context.Context) {
i.backoff.Reset()

i.Logger().Debug("Received connection", zap.String("address", conn.RemoteAddr().String()))

if i.metricConnectionsCreated != nil {
_, _, remoteHostname := i.getAddrAttributes(conn.RemoteAddr())
hostAttr := attribute.KeyValue{Key: "client.address", Value: attribute.StringValue(remoteHostname)}
i.metricConnectionsCreated.Add(ctx, 1, metric.WithAttributes(hostAttr))
}

subctx, cancel := context.WithCancel(ctx)
i.goHandleClose(subctx, conn)
i.goHandleMessages(subctx, conn, cancel)
Expand All @@ -117,11 +130,17 @@ func (i *Input) goHandleClose(ctx context.Context, conn net.Conn) {
i.Logger().Debug("Closing connection", zap.String("address", conn.RemoteAddr().String()))
if err := conn.Close(); err != nil {
i.Logger().Error("Failed to close connection", zap.Error(err))
return
}
if i.metricConnectionsClosed != nil {
_, _, remoteHostname := i.getAddrAttributes(conn.RemoteAddr())
hostAttr := attribute.KeyValue{Key: "client.address", Value: attribute.StringValue(remoteHostname)}
i.metricConnectionsClosed.Add(ctx, 1, metric.WithAttributes(hostAttr))
}
}()
}

// goHandleMessages will handles messages from a tcp connection.
// goHandleMessages will handle messages from a tcp connection.
func (i *Input) goHandleMessages(ctx context.Context, conn net.Conn, cancel context.CancelFunc) {
i.wg.Add(1)

Expand Down Expand Up @@ -159,6 +178,9 @@ func (i *Input) goHandleMessages(ctx context.Context, conn net.Conn, cancel cont
}

func (i *Input) handleMessage(ctx context.Context, conn net.Conn, dec *encoding.Decoder, log []byte) {
if i.metricPayloadSize != nil {
i.metricPayloadSize.Record(ctx, int64(len(log)))
}
decoded, err := textutils.DecodeAsString(dec, log)
if err != nil {
i.Logger().Error("Failed to decode data", zap.Error(err))
Expand All @@ -173,18 +195,16 @@ func (i *Input) handleMessage(ctx context.Context, conn net.Conn, dec *encoding.

if i.addAttributes {
entry.AddAttribute("net.transport", "IP.TCP")
if addr, ok := conn.RemoteAddr().(*net.TCPAddr); ok {
ip := addr.IP.String()
if ip, port, name := i.getAddrAttributes(conn.RemoteAddr()); ip != "" {
entry.AddAttribute("net.peer.ip", ip)
entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.peer.name", i.resolver.GetHostFromIP(ip))
entry.AddAttribute("net.peer.port", port)
entry.AddAttribute("net.peer.name", name)
}

if addr, ok := conn.LocalAddr().(*net.TCPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.host.ip", addr.IP.String())
entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.host.name", i.resolver.GetHostFromIP(ip))
if ip, port, name := i.getAddrAttributes(conn.LocalAddr()); ip != "" {
entry.AddAttribute("net.host.ip", ip)
entry.AddAttribute("net.host.port", port)
entry.AddAttribute("net.host.name", name)
}
}

Expand All @@ -194,6 +214,16 @@ func (i *Input) handleMessage(ctx context.Context, conn net.Conn, dec *encoding.
}
}

func (i *Input) getAddrAttributes(addr net.Addr) (ip, port, name string) {
if tcpAddr, ok := addr.(*net.TCPAddr); ok {
ip = tcpAddr.IP.String()
port = strconv.FormatInt(int64(tcpAddr.Port), 10)
name = i.resolver.GetHostFromIP(tcpAddr.IP.String())
return ip, port, name
}
return "", "", ""
}

func truncateMaxLog(data []byte, maxLogSize int) (token []byte) {
if len(data) >= maxLogSize {
return data[:maxLogSize]
Expand Down
107 changes: 95 additions & 12 deletions pkg/stanza/operator/input/tcp/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net"
"os"
"strconv"
"strings"
"testing"
"time"

Expand All @@ -18,6 +19,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/otel/sdk/metric/metricdata"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
Expand Down Expand Up @@ -89,6 +91,10 @@ func tcpInputTest(input []byte, expected []string) func(t *testing.T) {
tcpInput := op.(*Input)
tcpInput.OutputOperators = []operator.Operator{&mockOutput}

require.Nil(t, tcpInput.metricConnectionsCreated, "metricConnectionsCreated should be nil when metrics are disabled")
require.Nil(t, tcpInput.metricConnectionsClosed, "metricConnectionsClosed should be nil when metrics are disabled")
require.Nil(t, tcpInput.metricPayloadSize, "metricPayloadSize should be nil when metrics are disabled")

entryChan := make(chan *entry.Entry, 1)
mockOutput.On("Process", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
entryChan <- args.Get(1).(*entry.Entry)
Expand Down Expand Up @@ -163,18 +169,12 @@ func tcpInputAttributesTest(input []byte, expected []string) func(t *testing.T)
expectedAttributes := map[string]any{
"net.transport": "IP.TCP",
}
if addr, ok := conn.RemoteAddr().(*net.TCPAddr); ok {
ip := addr.IP.String()
expectedAttributes["net.host.ip"] = addr.IP.String()
expectedAttributes["net.host.port"] = strconv.FormatInt(int64(addr.Port), 10)
expectedAttributes["net.host.name"] = tcpInput.resolver.GetHostFromIP(ip)
}
if addr, ok := conn.LocalAddr().(*net.TCPAddr); ok {
ip := addr.IP.String()
expectedAttributes["net.peer.ip"] = ip
expectedAttributes["net.peer.port"] = strconv.FormatInt(int64(addr.Port), 10)
expectedAttributes["net.peer.name"] = tcpInput.resolver.GetHostFromIP(ip)
}
expectedAttributes["net.host.ip"],
expectedAttributes["net.host.port"],
expectedAttributes["net.host.name"] = tcpInput.getAddrAttributes(conn.RemoteAddr())
expectedAttributes["net.peer.ip"],
expectedAttributes["net.peer.port"],
expectedAttributes["net.peer.name"] = tcpInput.getAddrAttributes(conn.LocalAddr())
require.Equal(t, expectedMessage, entry.Body)
require.Equal(t, expectedAttributes, entry.Attributes)
case <-time.After(time.Second):
Expand Down Expand Up @@ -262,6 +262,84 @@ func tlsInputTest(input []byte, expected []string) func(t *testing.T) {
}
}

func tcpInputMetricsTest(input []byte, expected []string) func(t *testing.T) {
return func(t *testing.T) {
cfg := NewConfigWithID("test_id")
cfg.ListenAddress = ":0"
cfg.Metrics.Enabled = true

tt := componenttest.NewTelemetry()
set := tt.NewTelemetrySettings()
op, err := cfg.Build(set)
require.NoError(t, err)

mockOutput := testutil.Operator{}
tcpInput := op.(*Input)
tcpInput.OutputOperators = []operator.Operator{&mockOutput}

entryChan := make(chan *entry.Entry, 1)
mockOutput.On("Process", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
entryChan <- args.Get(1).(*entry.Entry)
}).Return(nil)

checkCounterMetricValue := func(metricName string, expectedValue int64) {
metricObj, errM := tt.GetMetric(metricName)
require.NoError(t, errM)
require.NotNil(t, metricObj)

data := metricObj.Data.(metricdata.Sum[int64])
require.Equal(t, expectedValue, data.DataPoints[0].Value, "metric %s has unexpected value", metricName)
}

err = tcpInput.Start(testutil.NewUnscopedMockPersister())
require.NoError(t, err)
defer func() {
require.NoError(t, tcpInput.Stop(), "expected to stop tcp input operator without error")
checkCounterMetricValue("otelcol_tcplog_receiver_connections_closed_total", 1)
}()

conn, err := net.Dial("tcp", tcpInput.listener.Addr().String())
require.NoError(t, err)
defer conn.Close()

require.NotNil(t, tcpInput.metricConnectionsCreated, "metricConnectionsCreated should be initialized")
require.NotNil(t, tcpInput.metricConnectionsClosed, "metricConnectionsClosed should be initialized")
require.NotNil(t, tcpInput.metricPayloadSize, "metricPayloadSize should be initialized")

_, err = conn.Write(input)
require.NoError(t, err)

for _, expectedMessage := range expected {
select {
case entry := <-entryChan:
require.Equal(t, expectedMessage, entry.Body)
case <-time.After(time.Second):
require.FailNow(t, "Timed out waiting for message to be written")
}
}

select {
case entry := <-entryChan:
require.FailNow(t, fmt.Sprintf("Unexpected entry: %s", entry))
case <-time.After(100 * time.Millisecond):
break
}

// Check payload size metric value
{
metricObj, err := tt.GetMetric("otelcol_tcplog_receiver_payload_size_bytes")
require.NoError(t, err)
require.NotNil(t, metricObj)

data := metricObj.Data.(metricdata.Histogram[int64])
require.Equal(t, uint64(1), data.DataPoints[0].Count, "metric otelcol_tcplog_receiver_payload_size_bytes has unexpected count")
require.Equal(t, int64(len(strings.TrimSpace(string(input)))), data.DataPoints[0].Sum, "metric otelcol_tcplog_receiver_payload_size_bytes has unexpected sum")
}

checkCounterMetricValue("otelcol_tcplog_receiver_connections_created_total", 1)
}
}

func TestBuild(t *testing.T) {
cases := []struct {
name string
Expand Down Expand Up @@ -375,6 +453,11 @@ func TestTLSTCPInput(t *testing.T) {
t.Run("CarriageReturn", tlsInputTest([]byte("message\r\n"), []string{"message"}))
}

func TestTCPInputMetrics(t *testing.T) {
t.Run("Simple", tcpInputMetricsTest([]byte("message\n"), []string{"message"}))
t.Run("CarriageReturn", tcpInputMetricsTest([]byte("message\r\n"), []string{"message"}))
}

func TestFailToBind(t *testing.T) {
ip := "localhost"
port := 0
Expand Down
Loading