diff --git a/.chloggen/so-reuse-port.yaml b/.chloggen/so-reuse-port.yaml new file mode 100644 index 000000000000..595a213a8b12 --- /dev/null +++ b/.chloggen/so-reuse-port.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: pkg/config/confignet + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Added ReusePort option to confignet.AddrConfig to enable SO_REUSEPORT on supported platforms. + +# One or more tracking issues or pull requests related to the change +issues: [14046] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/.github/workflows/utils/cspell.json b/.github/workflows/utils/cspell.json index 2d768d5c1740..f7350047c722 100644 --- a/.github/workflows/utils/cspell.json +++ b/.github/workflows/utils/cspell.json @@ -72,6 +72,7 @@ "RCPC", "Rahul", "SASL", + "SO_REUSEPORT", "Samplingdecision", "Sharma", "Statefulness", diff --git a/config/confignet/README.md b/config/confignet/README.md index 02bc3d1b2372..16b74980d9a3 100644 --- a/config/confignet/README.md +++ b/config/confignet/README.md @@ -13,6 +13,7 @@ leverage network configuration to set connection and transport information. - `transport`: Known protocols are "tcp", "tcp4" (IPv4-only), "tcp6" (IPv6-only), "udp", "udp4" (IPv4-only), "udp6" (IPv6-only), "ip", "ip4" (IPv4-only), "ip6" (IPv6-only), "unix", "unixgram" and "unixpacket". +- `reuse_port`: If set to `true`, enables the SO_REUSEPORT socket option on the listener, allowing multiple processes to listen on the same port - `dialer`: Dialer configuration - `timeout`: Dialer timeout is the maximum amount of time a dial will wait for a connect to complete. The default is no timeout. diff --git a/config/confignet/confignet.go b/config/confignet/confignet.go index 5cbb3ff114d5..77abb14cb905 100644 --- a/config/confignet/confignet.go +++ b/config/confignet/confignet.go @@ -84,6 +84,13 @@ type AddrConfig struct { // DialerConfig contains options for connecting to an address. DialerConfig DialerConfig `mapstructure:"dialer,omitempty"` + + // ReusePort enables the SO_REUSEPORT socket option on the listener. + // This allows multiple server instances to bind to the same address/port. + // This is useful for horizontal scaling and zero-downtime restarts. + // Note: This option is only supported on Linux and Darwin-based operating systems. + ReusePort bool `mapstructure:"reuse_port,omitempty"` + // prevent unkeyed literal initialization _ struct{} } @@ -103,11 +110,19 @@ func (na *AddrConfig) Dial(ctx context.Context) (net.Conn, error) { // Listen equivalent with net.ListenConfig's Listen for this address. func (na *AddrConfig) Listen(ctx context.Context) (net.Listener, error) { - lc := net.ListenConfig{} + lc, err := na.getListenConfig() + if err != nil { + return nil, err + } return lc.Listen(ctx, string(na.Transport), na.Endpoint) } func (na *AddrConfig) Validate() error { + _, err := na.getListenConfig() + if err != nil { + return err + } + switch na.Transport { case TransportTypeTCP, TransportTypeTCP4, diff --git a/config/confignet/confignet_test.go b/config/confignet/confignet_test.go index 84ea79373d85..6298c4f43fa3 100644 --- a/config/confignet/confignet_test.go +++ b/config/confignet/confignet_test.go @@ -7,6 +7,7 @@ import ( "context" "errors" "net" + "runtime" "testing" "time" @@ -159,3 +160,73 @@ func Test_TransportType_UnmarshalText(t *testing.T) { err = tt.UnmarshalText([]byte("invalid")) require.Error(t, err) } + +func TestServerReusePort(t *testing.T) { + if runtime.GOOS != "linux" && runtime.GOOS != "darwin" { + sc := &AddrConfig{ + Endpoint: "localhost:4318", + Transport: TransportTypeTCP, + ReusePort: true, + } + + _, err := sc.Listen(t.Context()) + require.EqualError(t, err, "ReusePort is not supported on this platform") + return + } + + tests := []struct { + name string + reusePort bool + expectedError bool + }{ + { + name: "ReusePort enabled", + reusePort: true, + expectedError: false, + }, + { + name: "ReusePort disabled", + reusePort: false, + expectedError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sc := &AddrConfig{ + Endpoint: "localhost:4318", + Transport: TransportTypeTCP, + ReusePort: tt.reusePort, + } + + ln1, err := sc.Listen(t.Context()) + require.NoError(t, err) + defer ln1.Close() + + ln2, err := sc.Listen(t.Context()) + if tt.expectedError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + if ln2 != nil { + ln2.Close() + } + }) + } +} + +func TestServerConfigValidate(t *testing.T) { + sc := &AddrConfig{ + Endpoint: "localhost:4318", + Transport: TransportTypeTCP, + ReusePort: true, + } + + if runtime.GOOS != "linux" && runtime.GOOS != "darwin" { + require.Error(t, sc.Validate()) + } else { + require.NoError(t, sc.Validate()) + } +} diff --git a/config/confignet/go.mod b/config/confignet/go.mod index 42c371d55079..b2bfa14a5744 100644 --- a/config/confignet/go.mod +++ b/config/confignet/go.mod @@ -5,6 +5,7 @@ go 1.25.0 require ( github.com/stretchr/testify v1.11.1 go.uber.org/goleak v1.3.0 + golang.org/x/sys v0.39.0 ) require ( diff --git a/config/confignet/go.sum b/config/confignet/go.sum index f989d17ba1ed..02b8bd41ec50 100644 --- a/config/confignet/go.sum +++ b/config/confignet/go.sum @@ -18,6 +18,8 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk= +golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/config/confignet/listen_config_other.go b/config/confignet/listen_config_other.go new file mode 100644 index 000000000000..6f014061d6ad --- /dev/null +++ b/config/confignet/listen_config_other.go @@ -0,0 +1,18 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 +//go:build !(linux || darwin) + +package confignet // import "go.opentelemetry.io/collector/config/confignet" + +import ( + "errors" + "net" +) + +func (na *AddrConfig) getListenConfig() (net.ListenConfig, error) { + if na.ReusePort { + return net.ListenConfig{}, errors.New("ReusePort is not supported on this platform") + } + + return net.ListenConfig{}, nil +} diff --git a/config/confignet/listen_config_unix.go b/config/confignet/listen_config_unix.go new file mode 100644 index 000000000000..509687b5510f --- /dev/null +++ b/config/confignet/listen_config_unix.go @@ -0,0 +1,30 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 +//go:build linux || darwin + +package confignet // import "go.opentelemetry.io/collector/config/confignet" + +import ( + "net" + "syscall" + + "golang.org/x/sys/unix" +) + +func (na *AddrConfig) getListenConfig() (net.ListenConfig, error) { + cfg := net.ListenConfig{} + if na.ReusePort { + cfg.Control = func(_, _ string, c syscall.RawConn) error { + var controlErr error + err := c.Control(func(fd uintptr) { + controlErr = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEPORT, 1) + }) + if err != nil { + return err + } + return controlErr + } + } + + return cfg, nil +}