diff --git a/cmd/agent/app/builder.go b/cmd/agent/app/builder.go index 51dc88f6211..09732e6b839 100644 --- a/cmd/agent/app/builder.go +++ b/cmd/agent/app/builder.go @@ -87,9 +87,10 @@ type ProcessorConfiguration struct { // ServerConfiguration holds config for a server that receives spans from the network type ServerConfiguration struct { - QueueSize int `yaml:"queueSize"` - MaxPacketSize int `yaml:"maxPacketSize"` - HostPort string `yaml:"hostPort" validate:"nonzero"` + QueueSize int `yaml:"queueSize"` + MaxPacketSize int `yaml:"maxPacketSize"` + SocketBufferSize int `yaml:"socketBufferSize"` + HostPort string `yaml:"hostPort" validate:"nonzero"` } // HTTPServerConfiguration holds config for a server providing sampling strategies and baggage restrictions to clients @@ -185,6 +186,7 @@ func (c *ProcessorConfiguration) applyDefaults() { func (c *ServerConfiguration) applyDefaults() { c.QueueSize = defaultInt(c.QueueSize, defaultQueueSize) c.MaxPacketSize = defaultInt(c.MaxPacketSize, defaultMaxPacketSize) + c.SocketBufferSize = defaultInt(c.SocketBufferSize, 0) } // getUDPServer gets a TBufferedServer backed server using the server configuration @@ -198,6 +200,11 @@ func (c *ServerConfiguration) getUDPServer(mFactory metrics.Factory) (servers.Se if err != nil { return nil, fmt.Errorf("cannot create UDPServerTransport: %w", err) } + if c.SocketBufferSize != 0 { + if err := transport.SetSocketBufferSize(c.SocketBufferSize); err != nil { + return nil, fmt.Errorf("cannot set UDP socket buffer size: %w", err) + } + } return servers.NewTBufferedServer(transport, c.QueueSize, c.MaxPacketSize, mFactory) } diff --git a/cmd/agent/app/builder_test.go b/cmd/agent/app/builder_test.go index 14cd735e328..58503c72bb9 100644 --- a/cmd/agent/app/builder_test.go +++ b/cmd/agent/app/builder_test.go @@ -54,6 +54,11 @@ processors: protocol: compact server: hostPort: 2.2.2.2:6831 + - model: jaeger + protocol: compact + server: + hostPort: 3.3.3.3:6831 + socketBufferSize: 16384 - model: jaeger protocol: binary workers: 20 @@ -70,7 +75,7 @@ func TestBuilderFromConfig(t *testing.T) { cfg := Builder{} err := yaml.Unmarshal([]byte(yamlConfig), &cfg) require.NoError(t, err) - assert.Len(t, cfg.Processors, 3) + assert.Len(t, cfg.Processors, 4) for i := range cfg.Processors { cfg.Processors[i].applyDefaults() cfg.Processors[i].Server.applyDefaults() @@ -95,6 +100,17 @@ func TestBuilderFromConfig(t *testing.T) { HostPort: "2.2.2.2:6831", }, }, cfg.Processors[1]) + assert.Equal(t, ProcessorConfiguration{ + Model: jaegerModel, + Protocol: compactProtocol, + Workers: 10, + Server: ServerConfiguration{ + QueueSize: 1000, + MaxPacketSize: 65000, + HostPort: "3.3.3.3:6831", + SocketBufferSize: 16384, + }, + }, cfg.Processors[2]) assert.Equal(t, ProcessorConfiguration{ Model: jaegerModel, Protocol: binaryProtocol, @@ -104,7 +120,7 @@ func TestBuilderFromConfig(t *testing.T) { MaxPacketSize: 65001, HostPort: "3.3.3.3:6832", }, - }, cfg.Processors[2]) + }, cfg.Processors[3]) assert.Equal(t, "4.4.4.4:5778", cfg.HTTPServer.HostPort) } @@ -125,7 +141,7 @@ func TestBuilderWithProcessorErrors(t *testing.T) { }{ {protocol: Protocol("bad"), err: "cannot find protocol factory for protocol bad"}, {protocol: compactProtocol, model: Model("bad"), err: "cannot find agent processor for data model bad"}, - {protocol: compactProtocol, model: jaegerModel, err: "no host:port provided for udp server: {QueueSize:1000 MaxPacketSize:65000 HostPort:}"}, + {protocol: compactProtocol, model: jaegerModel, err: "no host:port provided for udp server: {QueueSize:1000 MaxPacketSize:65000 SocketBufferSize:0 HostPort:}"}, {protocol: compactProtocol, model: zipkinModel, hostPort: "bad-host-port", errContains: "bad-host-port"}, } for _, tc := range testCases { diff --git a/cmd/agent/app/flags.go b/cmd/agent/app/flags.go index 7032d79873c..f0fd4e03063 100644 --- a/cmd/agent/app/flags.go +++ b/cmd/agent/app/flags.go @@ -26,10 +26,11 @@ import ( ) const ( - suffixWorkers = "workers" - suffixServerQueueSize = "server-queue-size" - suffixServerMaxPacketSize = "server-max-packet-size" - suffixServerHostPort = "server-host-port" + suffixWorkers = "workers" + suffixServerQueueSize = "server-queue-size" + suffixServerMaxPacketSize = "server-max-packet-size" + suffixServerSocketBufferSize = "server-socket-buffer-size" + suffixServerHostPort = "server-host-port" // HTTPServerHostPort is the flag for HTTP endpoint HTTPServerHostPort = "http-server.host-port" ) @@ -51,6 +52,7 @@ func AddFlags(flags *flag.FlagSet) { flags.Int(prefix+suffixWorkers, defaultServerWorkers, "how many workers the processor should run") flags.Int(prefix+suffixServerQueueSize, defaultQueueSize, "length of the queue for the UDP server") flags.Int(prefix+suffixServerMaxPacketSize, defaultMaxPacketSize, "max packet size for the UDP server") + flags.Int(prefix+suffixServerSocketBufferSize, 0, "socket buffer size for UDP packets in bytes") flags.String(prefix+suffixServerHostPort, ":"+strconv.Itoa(p.port), "host:port for the UDP server") } AddOTELFlags(flags) @@ -72,6 +74,7 @@ func (b *Builder) InitFromViper(v *viper.Viper) *Builder { p.Workers = v.GetInt(prefix + suffixWorkers) p.Server.QueueSize = v.GetInt(prefix + suffixServerQueueSize) p.Server.MaxPacketSize = v.GetInt(prefix + suffixServerMaxPacketSize) + p.Server.SocketBufferSize = v.GetInt(prefix + suffixServerSocketBufferSize) p.Server.HostPort = portNumToHostPort(v.GetString(prefix + suffixServerHostPort)) b.Processors = append(b.Processors, *p) } diff --git a/cmd/agent/app/servers/thriftudp/socket_buffer.go b/cmd/agent/app/servers/thriftudp/socket_buffer.go new file mode 100644 index 00000000000..0d547449fbc --- /dev/null +++ b/cmd/agent/app/servers/thriftudp/socket_buffer.go @@ -0,0 +1,31 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !windows + +package thriftudp + +import ( + "net" + "syscall" +) + +func setSocketBuffer(conn *net.UDPConn, bufferSize int) error { + file, err := conn.File() + if err != nil { + return err + } + + return syscall.SetsockoptInt(int(file.Fd()), syscall.SOL_SOCKET, syscall.SO_RCVBUF, bufferSize) +} diff --git a/cmd/agent/app/servers/thriftudp/socket_buffer_windows.go b/cmd/agent/app/servers/thriftudp/socket_buffer_windows.go new file mode 100644 index 00000000000..dd763ec4c86 --- /dev/null +++ b/cmd/agent/app/servers/thriftudp/socket_buffer_windows.go @@ -0,0 +1,24 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package thriftudp + +import ( + "net" +) + +// Not supported on windows, so windows version just returns nil +func setSocketBuffer(_ *net.UDPConn, _ int) error { + return nil +} diff --git a/cmd/agent/app/servers/thriftudp/transport.go b/cmd/agent/app/servers/thriftudp/transport.go index f900d3da5d7..f0d4fb2821a 100644 --- a/cmd/agent/app/servers/thriftudp/transport.go +++ b/cmd/agent/app/servers/thriftudp/transport.go @@ -26,7 +26,9 @@ import ( ) //MaxLength of UDP packet -const MaxLength = 65000 +const ( + MaxLength = 65000 +) var errConnAlreadyClosed = errors.New("connection already closed") @@ -83,6 +85,7 @@ func NewTUDPServerTransport(hostPort string) (*TUDPTransport, error) { if err != nil { return nil, thrift.NewTTransportException(thrift.NOT_OPEN, err.Error()) } + return &TUDPTransport{addr: conn.LocalAddr(), conn: conn}, nil } @@ -153,3 +156,8 @@ func (p *TUDPTransport) Flush(_ context.Context) error { p.writeBuf.Reset() // always reset the buffer, even in case of an error return err } + +// SetSocketBufferSize sets udp buffer size +func (p *TUDPTransport) SetSocketBufferSize(bufferSize int) error { + return setSocketBuffer(p.Conn(), bufferSize) +} diff --git a/cmd/agent/app/servers/thriftudp/transport_test.go b/cmd/agent/app/servers/thriftudp/transport_test.go index 214548bced8..77201fb183f 100644 --- a/cmd/agent/app/servers/thriftudp/transport_test.go +++ b/cmd/agent/app/servers/thriftudp/transport_test.go @@ -77,6 +77,20 @@ func TestNewTUDPServerTransport(t *testing.T) { require.False(t, trans.IsOpen()) } +func TestSetSocketBufferSize(t *testing.T) { + trans, err := NewTUDPServerTransport(localListenAddr.String()) + require.Nil(t, err) + require.True(t, trans.IsOpen()) + require.Equal(t, ^uint64(0), trans.RemainingBytes()) + + err = trans.SetSocketBufferSize(1024) + require.Nil(t, err) + + err = trans.Close() + require.Nil(t, err) + require.False(t, trans.IsOpen()) +} + func TestTUDPServerTransportIsOpen(t *testing.T) { _, err := NewTUDPServerTransport("fakeAddressAndPort") require.NotNil(t, err)