From c649b278debd6a42bf632d7a0296f92369c73360 Mon Sep 17 00:00:00 2001 From: Tigran Najaryan Date: Wed, 4 Nov 2020 14:21:05 -0500 Subject: [PATCH] Change default OTLP port number This implements specification change https://github.com/open-telemetry/opentelemetry-specification/pull/1221 To make transition to new port numbers less painful OTLP receiver will also accept data on the legacy port numbers when it is configured to use the default endpoint. Users who use the default Collector config can continue sending data to the legacy ports and have a graceful period to update their senders to start sending to the new ports. Note that OTLP/HTTP continues using a separate port number from OTLP/gRPC. There is separate work in progress to use one port for both. --- receiver/otlpreceiver/factory.go | 26 +++++++---- receiver/otlpreceiver/otlp.go | 74 ++++++++++++++++++++++++------ receiver/otlpreceiver/otlp_test.go | 5 +- service/internal/resources.go | 26 +++-------- testbed/testbed/senders.go | 18 ++++++++ testbed/tests/trace_test.go | 5 +- 6 files changed, 106 insertions(+), 48 deletions(-) diff --git a/receiver/otlpreceiver/factory.go b/receiver/otlpreceiver/factory.go index 60659cd6ee0..2bde57cdca0 100644 --- a/receiver/otlpreceiver/factory.go +++ b/receiver/otlpreceiver/factory.go @@ -19,6 +19,7 @@ import ( "fmt" "github.com/spf13/viper" + "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" @@ -37,6 +38,11 @@ const ( protoGRPC = "grpc" protoHTTP = "http" protocolsFieldName = "protocols" + + defaultGRPCEndpoint = "0.0.0.0:4317" + defaultHTTPEndpoint = "0.0.0.0:4318" + legacyGRPCEndpoint = "0.0.0.0:55680" + legacyHTTPEndpoint = "0.0.0.0:55681" ) func NewFactory() component.ReceiverFactory { @@ -59,14 +65,14 @@ func createDefaultConfig() configmodels.Receiver { Protocols: Protocols{ GRPC: &configgrpc.GRPCServerSettings{ NetAddr: confignet.NetAddr{ - Endpoint: "0.0.0.0:55680", + Endpoint: defaultGRPCEndpoint, Transport: "tcp", }, // We almost write 0 bytes, so no need to tune WriteBufferSize. ReadBufferSize: 512 * 1024, }, HTTP: &confighttp.HTTPServerSettings{ - Endpoint: "0.0.0.0:55681", + Endpoint: defaultHTTPEndpoint, }, }, } @@ -117,11 +123,11 @@ func customUnmarshaler(componentViperSection *viper.Viper, intoCfg interface{}) // CreateTracesReceiver creates a trace receiver based on provided config. func createTraceReceiver( ctx context.Context, - _ component.ReceiverCreateParams, + params component.ReceiverCreateParams, cfg configmodels.Receiver, nextConsumer consumer.TracesConsumer, ) (component.TracesReceiver, error) { - r, err := createReceiver(cfg) + r, err := createReceiver(cfg, params.Logger) if err != nil { return nil, err } @@ -134,11 +140,11 @@ func createTraceReceiver( // CreateMetricsReceiver creates a metrics receiver based on provided config. func createMetricsReceiver( ctx context.Context, - _ component.ReceiverCreateParams, + params component.ReceiverCreateParams, cfg configmodels.Receiver, consumer consumer.MetricsConsumer, ) (component.MetricsReceiver, error) { - r, err := createReceiver(cfg) + r, err := createReceiver(cfg, params.Logger) if err != nil { return nil, err } @@ -151,11 +157,11 @@ func createMetricsReceiver( // CreateLogReceiver creates a log receiver based on provided config. func createLogReceiver( ctx context.Context, - _ component.ReceiverCreateParams, + params component.ReceiverCreateParams, cfg configmodels.Receiver, consumer consumer.LogsConsumer, ) (component.LogsReceiver, error) { - r, err := createReceiver(cfg) + r, err := createReceiver(cfg, params.Logger) if err != nil { return nil, err } @@ -165,7 +171,7 @@ func createLogReceiver( return r, nil } -func createReceiver(cfg configmodels.Receiver) (*otlpReceiver, error) { +func createReceiver(cfg configmodels.Receiver, logger *zap.Logger) (*otlpReceiver, error) { rCfg := cfg.(*Config) // There must be one receiver for both metrics and traces. We maintain a map of @@ -176,7 +182,7 @@ func createReceiver(cfg configmodels.Receiver) (*otlpReceiver, error) { if !ok { var err error // We don't have a receiver, so create one. - receiver, err = newOtlpReceiver(rCfg) + receiver, err = newOtlpReceiver(rCfg, logger) if err != nil { return nil, err } diff --git a/receiver/otlpreceiver/otlp.go b/receiver/otlpreceiver/otlp.go index 0b8d8e66077..b5322cee25a 100644 --- a/receiver/otlpreceiver/otlp.go +++ b/receiver/otlpreceiver/otlp.go @@ -22,10 +22,12 @@ import ( "sync" gatewayruntime "github.com/grpc-ecosystem/grpc-gateway/runtime" + "go.uber.org/zap" "google.golang.org/grpc" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenterror" + "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer" collectorlog "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/logs/v1" @@ -49,14 +51,17 @@ type otlpReceiver struct { stopOnce sync.Once startServerOnce sync.Once + + logger *zap.Logger } // newOtlpReceiver just creates the OpenTelemetry receiver services. It is the caller's // responsibility to invoke the respective Start*Reception methods as well // as the various Stop*Reception methods to end it. -func newOtlpReceiver(cfg *Config) (*otlpReceiver, error) { +func newOtlpReceiver(cfg *Config, logger *zap.Logger) (*otlpReceiver, error) { r := &otlpReceiver{ - cfg: cfg, + cfg: cfg, + logger: logger, } if cfg.GRPC != nil { opts, err := cfg.GRPC.ToServerOption() @@ -84,6 +89,38 @@ func newOtlpReceiver(cfg *Config) (*otlpReceiver, error) { return r, nil } +func (r *otlpReceiver) startGRPCServer(cfg *configgrpc.GRPCServerSettings, host component.Host) error { + r.logger.Info("Starting GRPC server", zap.String("endpoint", cfg.NetAddr.Endpoint)) + var gln net.Listener + gln, err := cfg.ToListener() + if err != nil { + r.logger.Error("Failed to setup a listener", zap.Error(err)) + return err + } + go func() { + if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil { + host.ReportFatalError(errGrpc) + } + }() + return nil +} + +func (r *otlpReceiver) startHTTPServer(cfg *confighttp.HTTPServerSettings, host component.Host) error { + r.logger.Info("Starting HTTP server", zap.String("endpoint", cfg.Endpoint)) + var hln net.Listener + hln, err := r.cfg.HTTP.ToListener() + if err != nil { + r.logger.Error("Failed to setup a listener", zap.Error(err)) + return err + } + go func() { + if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil { + host.ReportFatalError(errHTTP) + } + }() + return nil +} + // Start runs the trace receiver on the gRPC server. Currently // it also enables the metrics receiver too. func (r *otlpReceiver) Start(_ context.Context, host component.Host) error { @@ -94,32 +131,41 @@ func (r *otlpReceiver) Start(_ context.Context, host component.Host) error { var err error r.startServerOnce.Do(func() { if r.cfg.GRPC != nil { - var gln net.Listener - gln, err = r.cfg.GRPC.ToListener() + err = r.startGRPCServer(r.cfg.GRPC, host) if err != nil { return } - go func() { - if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil { - host.ReportFatalError(errGrpc) + if r.cfg.GRPC.NetAddr.Endpoint == defaultGRPCEndpoint { + r.logger.Info("Setting up a second GRPC listener on legacy endpoint") + + // Copy the config. + cfgLegacyGRPC := r.cfg.GRPC + // And use the legacy endpoint. + cfgLegacyGRPC.NetAddr.Endpoint = legacyGRPCEndpoint + err = r.startGRPCServer(cfgLegacyGRPC, host) + if err != nil { + return } - }() + } } if r.cfg.HTTP != nil { r.serverHTTP = r.cfg.HTTP.ToServer( r.gatewayMux, confighttp.WithErrorHandler(errorHandler), ) - var hln net.Listener - hln, err = r.cfg.HTTP.ToListener() + err = r.startHTTPServer(r.cfg.HTTP, host) if err != nil { return } - go func() { - if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil { - host.ReportFatalError(errHTTP) + if r.cfg.HTTP.Endpoint == defaultHTTPEndpoint { + r.logger.Info("Setting up a second HTTP listener on legacy endpoint") + cfgLegacyHTTP := r.cfg.HTTP + cfgLegacyHTTP.Endpoint = legacyHTTPEndpoint + err = r.startHTTPServer(cfgLegacyHTTP, host) + if err != nil { + return } - }() + } } }) return err diff --git a/receiver/otlpreceiver/otlp_test.go b/receiver/otlpreceiver/otlp_test.go index 7f5cc28f217..5fa87419b03 100644 --- a/receiver/otlpreceiver/otlp_test.go +++ b/receiver/otlpreceiver/otlp_test.go @@ -30,6 +30,7 @@ import ( "github.com/gogo/protobuf/jsonpb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" spb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -698,7 +699,7 @@ func TestGRPCInvalidTLSCredentials(t *testing.T) { } // TLS is resolved during Creation of the receiver for GRPC. - _, err := createReceiver(cfg) + _, err := createReceiver(cfg, zap.NewNop()) assert.EqualError(t, err, `failed to load TLS config: for auth via TLS, either both certificate and key must be supplied, or neither`) } @@ -745,7 +746,7 @@ func newHTTPReceiver(t *testing.T, endpoint string, tc consumer.TracesConsumer, } func newReceiver(t *testing.T, factory component.ReceiverFactory, cfg *Config, tc consumer.TracesConsumer, mc consumer.MetricsConsumer) *otlpReceiver { - r, err := createReceiver(cfg) + r, err := createReceiver(cfg, zap.NewNop()) require.NoError(t, err) if tc != nil { params := component.ReceiverCreateParams{} diff --git a/service/internal/resources.go b/service/internal/resources.go index 3777769f2b3..9806a206ec6 100644 --- a/service/internal/resources.go +++ b/service/internal/resources.go @@ -1,17 +1,3 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - // Code generated by "esc -pkg internal -o resources.go templates/"; DO NOT EDIT. package internal @@ -227,7 +213,7 @@ var _escData = map[string]*_escFile{ name: "component_header.html", local: "templates/component_header.html", size: 156, - modtime: 1594178791, + modtime: 1603991049, compressed: ` H4sIAAAAAAAC/1SMsQqDMBRFd7/iIq7q5lBiltKt9B8CPklQX6R1e9x/L6ZQ2vXcc65ZE3AZ0V3ztmcV PW467TnpQVZmzZp0Kfs96VJQizTjw1uyAgAXB+8C4lPmsT4fydqbdY+wCen64F0fB19iWV/yF/54X0en @@ -239,7 +225,7 @@ U3kHAAD//zT+SdCcAAAA name: "extensions_table.html", local: "templates/extensions_table.html", size: 353, - modtime: 1594178791, + modtime: 1603991049, compressed: ` H4sIAAAAAAAC/2SQwU7DMBBE7/2KlemRNJwjxxwQHDnwB248DRbOOnK2tGD531HTQIvqk1fzZjU7Wuw2 gCb5CmjVNiaHVE2j7Tz3DT0osyIiynltqWlp8xSHMTJYntmN0bOUsgDJcg9ap3jw7HC8n7+z5y0epgU7 @@ -252,7 +238,7 @@ oxX5HeETfMGv9NPTkv4i2e6jT3HPrqE7AEui8yaECbdWkzPYUXWlaHFkg++5VR1YkJTRlt4Tdq06HVfK name: "footer.html", local: "templates/footer.html", size: 15, - modtime: 1594178791, + modtime: 1603991049, compressed: ` H4sIAAAAAAAC/7LRT8pPqbTjstHPKMnNsQMEAAD//wEFevAPAAAA `, @@ -262,7 +248,7 @@ H4sIAAAAAAAC/7LRT8pPqbTjstHPKMnNsQMEAAD//wEFevAPAAAA name: "header.html", local: "templates/header.html", size: 467, - modtime: 1594178791, + modtime: 1603991049, compressed: ` H4sIAAAAAAAC/5TRMU8sIRAH8P4+BY/25eC9szGGxUItLIwW11giO7uMB8wG5rxsLvfdDdnTxNhoBeFP fpnM3/y5fbzZPj/dicAp2pVph4guj52ELK0J4Hq7EkIIk4Cd8MGVCtzJPQ/rS3mOGDmCPR7Vtl1OJ6OX @@ -276,7 +262,7 @@ vuDEoocBiqjF/5RszGuV1uhFsCujl0bMC/Vz62vzZe1hY98DAAD//7qRGmLTAQAA name: "pipelines_table.html", local: "templates/pipelines_table.html", size: 1946, - modtime: 1594178791, + modtime: 1603991049, compressed: ` H4sIAAAAAAAC/7SVwXLTMBCG7zyFxnRyIjVcU1scSpnhAMN0eAFZ2gRNlZVmJbdujd+dsWyrTp0LtL5k rOjX/tlv/8hFEJUB5sOjgTKrLCmgrXdCajzs2MeMv2OMsSLQ8DAsFJPWeCew/MSE0QcsDewDLyr+tTbm @@ -293,7 +279,7 @@ QeMmXNC4hCvdNKvQgsYtacFoGWFFxSvCNl+lu3HQFXl8JfO/AQAA//9We3KLmgcAAA== name: "properties_table.html", local: "templates/properties_table.html", size: 420, - modtime: 1594178791, + modtime: 1603991049, compressed: ` H4sIAAAAAAAC/2SRwW7DIBBE7/6KVRr1VMc5u5gfqFT11Ds2U8sqWVuwqRoR/r1yTCpb4YAEO48ZDarV MR7ezQkp1apqdaHEtA4U5OLQ7NrRW/gyTKYbuK/puNMFEVGMtB/Y4pfqho6UUr71hnvk0Qvt4XACyyw6 diff --git a/testbed/testbed/senders.go b/testbed/testbed/senders.go index 14af5c48cad..2bee5f6552f 100644 --- a/testbed/testbed/senders.go +++ b/testbed/testbed/senders.go @@ -317,6 +317,15 @@ func (ote *OTLPHTTPTraceDataSender) Start() error { return exp.Start(context.Background(), ote) } +func (ote *OTLPHTTPTraceDataSender) GenConfigYAMLStr() string { + // Note that this generates a receiver config for agent. + return ` + otlp: + protocols: + http: + endpoint: "0.0.0.0:4318"` +} + // OTLPHTTPMetricsDataSender implements MetricDataSender for OTLP/HTTP metrics exporterType. type OTLPHTTPMetricsDataSender struct { otlpHTTPDataSender @@ -447,6 +456,15 @@ func (ote *OTLPTraceDataSender) Start() error { return exp.Start(context.Background(), ote) } +func (ote *OTLPTraceDataSender) GenConfigYAMLStr() string { + // Note that this generates a receiver config for agent. + return ` + otlp: + protocols: + grpc: + endpoint: "0.0.0.0:4317"` +} + // OTLPMetricsDataSender implements MetricDataSender for OTLP metrics exporterType. type OTLPMetricsDataSender struct { otlpDataSender diff --git a/testbed/tests/trace_test.go b/testbed/tests/trace_test.go index 7424d78dd3e..11449a503c3 100644 --- a/testbed/tests/trace_test.go +++ b/testbed/tests/trace_test.go @@ -67,7 +67,8 @@ func TestTrace10kSPS(t *testing.T) { }, { "OTLP", - testbed.NewOTLPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)), + // testbed.NewOTLPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)), + testbed.NewOTLPTraceDataSender(testbed.DefaultHost, 55680), testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)), testbed.ResourceSpec{ ExpectedMaxCPU: 20, @@ -76,7 +77,7 @@ func TestTrace10kSPS(t *testing.T) { }, { "OTLP-HTTP", - testbed.NewOTLPHTTPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)), + testbed.NewOTLPHTTPTraceDataSender(testbed.DefaultHost, 55681), testbed.NewOTLPHTTPDataReceiver(testbed.GetAvailablePort(t)), testbed.ResourceSpec{ ExpectedMaxCPU: 20,