From cfe828d9843dd208009aaf95a1ae55ff2f49a765 Mon Sep 17 00:00:00 2001 From: Tigran Najaryan Date: Wed, 4 Nov 2020 14:21:05 -0500 Subject: [PATCH] Change default OTLP port number This implements specification change https://github.com/open-telemetry/opentelemetry-specification/pull/1221 To make transition to new port numbers less painful OTLP receiver will also accept data on the legacy port numbers when it is configured to use the default endpoint. Users who use the default Collector config can continue sending data to the legacy ports and have a graceful period to update their senders to start sending to the new ports. Note that OTLP/HTTP continues using a separate port number from OTLP/gRPC. There is separate work in progress to use one port for both. --- receiver/otlpreceiver/README.md | 10 +-- receiver/otlpreceiver/config_test.go | 10 +-- receiver/otlpreceiver/factory.go | 26 ++++--- receiver/otlpreceiver/otlp.go | 111 +++++++++++++++++++-------- receiver/otlpreceiver/otlp_test.go | 5 +- 5 files changed, 110 insertions(+), 52 deletions(-) diff --git a/receiver/otlpreceiver/README.md b/receiver/otlpreceiver/README.md index b1d442800ab..0c428277697 100644 --- a/receiver/otlpreceiver/README.md +++ b/receiver/otlpreceiver/README.md @@ -25,9 +25,9 @@ receivers: The following settings are configurable: -- `endpoint` (default = 0.0.0.0:55680): host:port to which the receiver is - going to receive data. The valid syntax is described at - https://github.com/grpc/grpc/blob/master/doc/naming.md. +- `endpoint` (default = 0.0.0.0:4317 for grpc protocol, 0.0.0.0:4318 http protocol): + host:port to which the receiver is going to receive data. The valid syntax is + described at https://github.com/grpc/grpc/blob/master/doc/naming.md. ## Advanced Configuration @@ -48,7 +48,7 @@ IMPORTANT: bytes fields are encoded as base64 strings. To write traces with HTTP/JSON, `POST` to `[address]/v1/traces` for traces, to `[address]/v1/metrics` for metrics, to `[address]/v1/logs` for logs. The default -port is `55681`. +port is `4318`. The HTTP/JSON endpoint can also optionally configure [CORS](https://fetch.spec.whatwg.org/#cors-protocol), which is enabled by @@ -59,7 +59,7 @@ receivers: otlp: protocols: http: - endpoint: "localhost:55681" + endpoint: "localhost:4318" cors_allowed_origins: - http://test.com # Origins can have wildcards with *, use * by itself to match any origin. diff --git a/receiver/otlpreceiver/config_test.go b/receiver/otlpreceiver/config_test.go index 0c3129c5424..f18547dfad5 100644 --- a/receiver/otlpreceiver/config_test.go +++ b/receiver/otlpreceiver/config_test.go @@ -82,7 +82,7 @@ func TestLoadConfig(t *testing.T) { Protocols: Protocols{ GRPC: &configgrpc.GRPCServerSettings{ NetAddr: confignet.NetAddr{ - Endpoint: "0.0.0.0:55680", + Endpoint: "0.0.0.0:4317", Transport: "tcp", }, ReadBufferSize: 512 * 1024, @@ -112,7 +112,7 @@ func TestLoadConfig(t *testing.T) { Protocols: Protocols{ GRPC: &configgrpc.GRPCServerSettings{ NetAddr: confignet.NetAddr{ - Endpoint: "0.0.0.0:55680", + Endpoint: "0.0.0.0:4317", Transport: "tcp", }, MaxRecvMsgSizeMiB: 32, @@ -139,7 +139,7 @@ func TestLoadConfig(t *testing.T) { Protocols: Protocols{ GRPC: &configgrpc.GRPCServerSettings{ NetAddr: confignet.NetAddr{ - Endpoint: "0.0.0.0:55680", + Endpoint: "0.0.0.0:4317", Transport: "tcp", }, TLSSetting: &configtls.TLSServerSetting{ @@ -151,7 +151,7 @@ func TestLoadConfig(t *testing.T) { ReadBufferSize: 512 * 1024, }, HTTP: &confighttp.HTTPServerSettings{ - Endpoint: "0.0.0.0:55681", + Endpoint: "0.0.0.0:4318", TLSSetting: &configtls.TLSServerSetting{ TLSSetting: configtls.TLSSetting{ CertFile: "test.crt", @@ -170,7 +170,7 @@ func TestLoadConfig(t *testing.T) { }, Protocols: Protocols{ HTTP: &confighttp.HTTPServerSettings{ - Endpoint: "0.0.0.0:55681", + Endpoint: "0.0.0.0:4318", CorsOrigins: []string{"https://*.test.com", "https://test.com"}, }, }, diff --git a/receiver/otlpreceiver/factory.go b/receiver/otlpreceiver/factory.go index 60659cd6ee0..2bde57cdca0 100644 --- a/receiver/otlpreceiver/factory.go +++ b/receiver/otlpreceiver/factory.go @@ -19,6 +19,7 @@ import ( "fmt" "github.com/spf13/viper" + "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" @@ -37,6 +38,11 @@ const ( protoGRPC = "grpc" protoHTTP = "http" protocolsFieldName = "protocols" + + defaultGRPCEndpoint = "0.0.0.0:4317" + defaultHTTPEndpoint = "0.0.0.0:4318" + legacyGRPCEndpoint = "0.0.0.0:55680" + legacyHTTPEndpoint = "0.0.0.0:55681" ) func NewFactory() component.ReceiverFactory { @@ -59,14 +65,14 @@ func createDefaultConfig() configmodels.Receiver { Protocols: Protocols{ GRPC: &configgrpc.GRPCServerSettings{ NetAddr: confignet.NetAddr{ - Endpoint: "0.0.0.0:55680", + Endpoint: defaultGRPCEndpoint, Transport: "tcp", }, // We almost write 0 bytes, so no need to tune WriteBufferSize. ReadBufferSize: 512 * 1024, }, HTTP: &confighttp.HTTPServerSettings{ - Endpoint: "0.0.0.0:55681", + Endpoint: defaultHTTPEndpoint, }, }, } @@ -117,11 +123,11 @@ func customUnmarshaler(componentViperSection *viper.Viper, intoCfg interface{}) // CreateTracesReceiver creates a trace receiver based on provided config. func createTraceReceiver( ctx context.Context, - _ component.ReceiverCreateParams, + params component.ReceiverCreateParams, cfg configmodels.Receiver, nextConsumer consumer.TracesConsumer, ) (component.TracesReceiver, error) { - r, err := createReceiver(cfg) + r, err := createReceiver(cfg, params.Logger) if err != nil { return nil, err } @@ -134,11 +140,11 @@ func createTraceReceiver( // CreateMetricsReceiver creates a metrics receiver based on provided config. func createMetricsReceiver( ctx context.Context, - _ component.ReceiverCreateParams, + params component.ReceiverCreateParams, cfg configmodels.Receiver, consumer consumer.MetricsConsumer, ) (component.MetricsReceiver, error) { - r, err := createReceiver(cfg) + r, err := createReceiver(cfg, params.Logger) if err != nil { return nil, err } @@ -151,11 +157,11 @@ func createMetricsReceiver( // CreateLogReceiver creates a log receiver based on provided config. func createLogReceiver( ctx context.Context, - _ component.ReceiverCreateParams, + params component.ReceiverCreateParams, cfg configmodels.Receiver, consumer consumer.LogsConsumer, ) (component.LogsReceiver, error) { - r, err := createReceiver(cfg) + r, err := createReceiver(cfg, params.Logger) if err != nil { return nil, err } @@ -165,7 +171,7 @@ func createLogReceiver( return r, nil } -func createReceiver(cfg configmodels.Receiver) (*otlpReceiver, error) { +func createReceiver(cfg configmodels.Receiver, logger *zap.Logger) (*otlpReceiver, error) { rCfg := cfg.(*Config) // There must be one receiver for both metrics and traces. We maintain a map of @@ -176,7 +182,7 @@ func createReceiver(cfg configmodels.Receiver) (*otlpReceiver, error) { if !ok { var err error // We don't have a receiver, so create one. - receiver, err = newOtlpReceiver(rCfg) + receiver, err = newOtlpReceiver(rCfg, logger) if err != nil { return nil, err } diff --git a/receiver/otlpreceiver/otlp.go b/receiver/otlpreceiver/otlp.go index 0b8d8e66077..92ea7c09a0a 100644 --- a/receiver/otlpreceiver/otlp.go +++ b/receiver/otlpreceiver/otlp.go @@ -22,10 +22,12 @@ import ( "sync" gatewayruntime "github.com/grpc-ecosystem/grpc-gateway/runtime" + "go.uber.org/zap" "google.golang.org/grpc" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenterror" + "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer" collectorlog "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/logs/v1" @@ -49,14 +51,17 @@ type otlpReceiver struct { stopOnce sync.Once startServerOnce sync.Once + + logger *zap.Logger } // newOtlpReceiver just creates the OpenTelemetry receiver services. It is the caller's // responsibility to invoke the respective Start*Reception methods as well // as the various Stop*Reception methods to end it. -func newOtlpReceiver(cfg *Config) (*otlpReceiver, error) { +func newOtlpReceiver(cfg *Config, logger *zap.Logger) (*otlpReceiver, error) { r := &otlpReceiver{ - cfg: cfg, + cfg: cfg, + logger: logger, } if cfg.GRPC != nil { opts, err := cfg.GRPC.ToServerOption() @@ -84,43 +89,89 @@ func newOtlpReceiver(cfg *Config) (*otlpReceiver, error) { return r, nil } -// Start runs the trace receiver on the gRPC server. Currently -// it also enables the metrics receiver too. -func (r *otlpReceiver) Start(_ context.Context, host component.Host) error { - if r.traceReceiver == nil && r.metricsReceiver == nil && r.logReceiver == nil { - return errors.New("cannot start receiver: no consumers were specified") +func (r *otlpReceiver) startGRPCServer(cfg *configgrpc.GRPCServerSettings, host component.Host) error { + r.logger.Info("Starting GRPC server on endpoint " + cfg.NetAddr.Endpoint) + var gln net.Listener + gln, err := cfg.ToListener() + if err != nil { + return err } + go func() { + if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil { + host.ReportFatalError(errGrpc) + } + }() + return nil +} +func (r *otlpReceiver) startHTTPServer(cfg *confighttp.HTTPServerSettings, host component.Host) error { + r.logger.Info("Starting HTTP server on endpoint " + cfg.Endpoint) + var hln net.Listener + hln, err := r.cfg.HTTP.ToListener() + if err != nil { + return err + } + go func() { + if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil { + host.ReportFatalError(errHTTP) + } + }() + return nil +} + +func (r *otlpReceiver) startProtocolServers(host component.Host) error { var err error - r.startServerOnce.Do(func() { - if r.cfg.GRPC != nil { - var gln net.Listener - gln, err = r.cfg.GRPC.ToListener() + if r.cfg.GRPC != nil { + err = r.startGRPCServer(r.cfg.GRPC, host) + if err != nil { + return err + } + if r.cfg.GRPC.NetAddr.Endpoint == defaultGRPCEndpoint { + r.logger.Info("Setting up a second GRPC listener on legacy endpoint " + legacyGRPCEndpoint) + + // Copy the config. + cfgLegacyGRPC := r.cfg.GRPC + // And use the legacy endpoint. + cfgLegacyGRPC.NetAddr.Endpoint = legacyGRPCEndpoint + err = r.startGRPCServer(cfgLegacyGRPC, host) if err != nil { - return + return err } - go func() { - if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil { - host.ReportFatalError(errGrpc) - } - }() } - if r.cfg.HTTP != nil { - r.serverHTTP = r.cfg.HTTP.ToServer( - r.gatewayMux, - confighttp.WithErrorHandler(errorHandler), - ) - var hln net.Listener - hln, err = r.cfg.HTTP.ToListener() + } + if r.cfg.HTTP != nil { + r.serverHTTP = r.cfg.HTTP.ToServer( + r.gatewayMux, + confighttp.WithErrorHandler(errorHandler), + ) + err = r.startHTTPServer(r.cfg.HTTP, host) + if err != nil { + return err + } + if r.cfg.HTTP.Endpoint == defaultHTTPEndpoint { + r.logger.Info("Setting up a second HTTP listener on legacy endpoint " + legacyHTTPEndpoint) + cfgLegacyHTTP := r.cfg.HTTP + cfgLegacyHTTP.Endpoint = legacyHTTPEndpoint + err = r.startHTTPServer(cfgLegacyHTTP, host) if err != nil { - return + return err } - go func() { - if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil { - host.ReportFatalError(errHTTP) - } - }() } + } + + return err +} + +// Start runs the trace receiver on the gRPC server. Currently +// it also enables the metrics receiver too. +func (r *otlpReceiver) Start(_ context.Context, host component.Host) error { + if r.traceReceiver == nil && r.metricsReceiver == nil && r.logReceiver == nil { + return errors.New("cannot start receiver: no consumers were specified") + } + + var err error + r.startServerOnce.Do(func() { + err = r.startProtocolServers(host) }) return err } diff --git a/receiver/otlpreceiver/otlp_test.go b/receiver/otlpreceiver/otlp_test.go index 7f5cc28f217..5fa87419b03 100644 --- a/receiver/otlpreceiver/otlp_test.go +++ b/receiver/otlpreceiver/otlp_test.go @@ -30,6 +30,7 @@ import ( "github.com/gogo/protobuf/jsonpb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" spb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -698,7 +699,7 @@ func TestGRPCInvalidTLSCredentials(t *testing.T) { } // TLS is resolved during Creation of the receiver for GRPC. - _, err := createReceiver(cfg) + _, err := createReceiver(cfg, zap.NewNop()) assert.EqualError(t, err, `failed to load TLS config: for auth via TLS, either both certificate and key must be supplied, or neither`) } @@ -745,7 +746,7 @@ func newHTTPReceiver(t *testing.T, endpoint string, tc consumer.TracesConsumer, } func newReceiver(t *testing.T, factory component.ReceiverFactory, cfg *Config, tc consumer.TracesConsumer, mc consumer.MetricsConsumer) *otlpReceiver { - r, err := createReceiver(cfg) + r, err := createReceiver(cfg, zap.NewNop()) require.NoError(t, err) if tc != nil { params := component.ReceiverCreateParams{}