Skip to content

Commit

Permalink
Change default OTLP port number
Browse files Browse the repository at this point in the history
This implements specification change open-telemetry/opentelemetry-specification#1221

To make transition to new port numbers less painful OTLP receiver will
also accept data on the legacy port numbers when it is configured to
use the default endpoint. Users who use the default Collector config
can continue sending data to the legacy ports and have a graceful period
to update their senders to start sending to the new ports.

Note that OTLP/HTTP continues using a separate port number from OTLP/gRPC.
There is separate work in progress to use one port for both.
  • Loading branch information
Tigran Najaryan authored and tigrannajaryan committed Nov 10, 2020
1 parent bf818a2 commit dbe5792
Show file tree
Hide file tree
Showing 6 changed files with 6,773 additions and 36 deletions.
6,686 changes: 6,686 additions & 0 deletions build.log

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions receiver/otlpreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ receivers:
The following settings are configurable:
- `endpoint` (default = 0.0.0.0:55680): host:port to which the receiver is
going to receive data. The valid syntax is described at
https://github.com/grpc/grpc/blob/master/doc/naming.md.
- `endpoint` (default = 0.0.0.0:4317 for grpc protocol, 0.0.0.0:4318 http protocol):
host:port to which the receiver is going to receive data. The valid syntax is
described at https://github.com/grpc/grpc/blob/master/doc/naming.md.

## Advanced Configuration

Expand All @@ -48,7 +48,7 @@ IMPORTANT: bytes fields are encoded as base64 strings.

To write traces with HTTP/JSON, `POST` to `[address]/v1/traces` for traces,
to `[address]/v1/metrics` for metrics, to `[address]/v1/logs` for logs. The default
port is `55681`.
port is `4318`.

The HTTP/JSON endpoint can also optionally configure
[CORS](https://fetch.spec.whatwg.org/#cors-protocol), which is enabled by
Expand All @@ -59,7 +59,7 @@ receivers:
otlp:
protocols:
http:
endpoint: "localhost:55681"
endpoint: "localhost:4318"
cors_allowed_origins:
- http://test.com
# Origins can have wildcards with *, use * by itself to match any origin.
Expand Down
10 changes: 5 additions & 5 deletions receiver/otlpreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestLoadConfig(t *testing.T) {
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
NetAddr: confignet.NetAddr{
Endpoint: "0.0.0.0:55680",
Endpoint: "0.0.0.0:4317",
Transport: "tcp",
},
ReadBufferSize: 512 * 1024,
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestLoadConfig(t *testing.T) {
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
NetAddr: confignet.NetAddr{
Endpoint: "0.0.0.0:55680",
Endpoint: "0.0.0.0:4317",
Transport: "tcp",
},
MaxRecvMsgSizeMiB: 32,
Expand All @@ -139,7 +139,7 @@ func TestLoadConfig(t *testing.T) {
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
NetAddr: confignet.NetAddr{
Endpoint: "0.0.0.0:55680",
Endpoint: "0.0.0.0:4317",
Transport: "tcp",
},
TLSSetting: &configtls.TLSServerSetting{
Expand All @@ -151,7 +151,7 @@ func TestLoadConfig(t *testing.T) {
ReadBufferSize: 512 * 1024,
},
HTTP: &confighttp.HTTPServerSettings{
Endpoint: "0.0.0.0:55681",
Endpoint: "0.0.0.0:4318",
TLSSetting: &configtls.TLSServerSetting{
TLSSetting: configtls.TLSSetting{
CertFile: "test.crt",
Expand All @@ -170,7 +170,7 @@ func TestLoadConfig(t *testing.T) {
},
Protocols: Protocols{
HTTP: &confighttp.HTTPServerSettings{
Endpoint: "0.0.0.0:55681",
Endpoint: "0.0.0.0:4318",
CorsOrigins: []string{"https://*.test.com", "https://test.com"},
},
},
Expand Down
26 changes: 16 additions & 10 deletions receiver/otlpreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"

"github.com/spf13/viper"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
Expand All @@ -37,6 +38,11 @@ const (
protoGRPC = "grpc"
protoHTTP = "http"
protocolsFieldName = "protocols"

defaultGRPCEndpoint = "0.0.0.0:4317"
defaultHTTPEndpoint = "0.0.0.0:4318"
legacyGRPCEndpoint = "0.0.0.0:55680"
legacyHTTPEndpoint = "0.0.0.0:55681"
)

func NewFactory() component.ReceiverFactory {
Expand All @@ -59,14 +65,14 @@ func createDefaultConfig() configmodels.Receiver {
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
NetAddr: confignet.NetAddr{
Endpoint: "0.0.0.0:55680",
Endpoint: defaultGRPCEndpoint,
Transport: "tcp",
},
// We almost write 0 bytes, so no need to tune WriteBufferSize.
ReadBufferSize: 512 * 1024,
},
HTTP: &confighttp.HTTPServerSettings{
Endpoint: "0.0.0.0:55681",
Endpoint: defaultHTTPEndpoint,
},
},
}
Expand Down Expand Up @@ -117,11 +123,11 @@ func customUnmarshaler(componentViperSection *viper.Viper, intoCfg interface{})
// CreateTracesReceiver creates a trace receiver based on provided config.
func createTraceReceiver(
ctx context.Context,
_ component.ReceiverCreateParams,
params component.ReceiverCreateParams,
cfg configmodels.Receiver,
nextConsumer consumer.TracesConsumer,
) (component.TracesReceiver, error) {
r, err := createReceiver(cfg)
r, err := createReceiver(cfg, params.Logger)
if err != nil {
return nil, err
}
Expand All @@ -134,11 +140,11 @@ func createTraceReceiver(
// CreateMetricsReceiver creates a metrics receiver based on provided config.
func createMetricsReceiver(
ctx context.Context,
_ component.ReceiverCreateParams,
params component.ReceiverCreateParams,
cfg configmodels.Receiver,
consumer consumer.MetricsConsumer,
) (component.MetricsReceiver, error) {
r, err := createReceiver(cfg)
r, err := createReceiver(cfg, params.Logger)
if err != nil {
return nil, err
}
Expand All @@ -151,11 +157,11 @@ func createMetricsReceiver(
// CreateLogReceiver creates a log receiver based on provided config.
func createLogReceiver(
ctx context.Context,
_ component.ReceiverCreateParams,
params component.ReceiverCreateParams,
cfg configmodels.Receiver,
consumer consumer.LogsConsumer,
) (component.LogsReceiver, error) {
r, err := createReceiver(cfg)
r, err := createReceiver(cfg, params.Logger)
if err != nil {
return nil, err
}
Expand All @@ -165,7 +171,7 @@ func createLogReceiver(
return r, nil
}

func createReceiver(cfg configmodels.Receiver) (*otlpReceiver, error) {
func createReceiver(cfg configmodels.Receiver, logger *zap.Logger) (*otlpReceiver, error) {
rCfg := cfg.(*Config)

// There must be one receiver for both metrics and traces. We maintain a map of
Expand All @@ -176,7 +182,7 @@ func createReceiver(cfg configmodels.Receiver) (*otlpReceiver, error) {
if !ok {
var err error
// We don't have a receiver, so create one.
receiver, err = newOtlpReceiver(rCfg)
receiver, err = newOtlpReceiver(rCfg, logger)
if err != nil {
return nil, err
}
Expand Down
72 changes: 58 additions & 14 deletions receiver/otlpreceiver/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (
"sync"

gatewayruntime "github.com/grpc-ecosystem/grpc-gateway/runtime"
"go.uber.org/zap"
"google.golang.org/grpc"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/consumer"
collectorlog "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/logs/v1"
Expand All @@ -49,14 +51,17 @@ type otlpReceiver struct {

stopOnce sync.Once
startServerOnce sync.Once

logger *zap.Logger
}

// newOtlpReceiver just creates the OpenTelemetry receiver services. It is the caller's
// responsibility to invoke the respective Start*Reception methods as well
// as the various Stop*Reception methods to end it.
func newOtlpReceiver(cfg *Config) (*otlpReceiver, error) {
func newOtlpReceiver(cfg *Config, logger *zap.Logger) (*otlpReceiver, error) {
r := &otlpReceiver{
cfg: cfg,
cfg: cfg,
logger: logger,
}
if cfg.GRPC != nil {
opts, err := cfg.GRPC.ToServerOption()
Expand Down Expand Up @@ -84,6 +89,36 @@ func newOtlpReceiver(cfg *Config) (*otlpReceiver, error) {
return r, nil
}

func (r *otlpReceiver) startGRPCServer(cfg *configgrpc.GRPCServerSettings, host component.Host) error {
r.logger.Info("Starting GRPC server on endpoint " + cfg.NetAddr.Endpoint)
var gln net.Listener
gln, err := cfg.ToListener()
if err != nil {
return err
}
go func() {
if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil {
host.ReportFatalError(errGrpc)
}
}()
return nil
}

func (r *otlpReceiver) startHTTPServer(cfg *confighttp.HTTPServerSettings, host component.Host) error {
r.logger.Info("Starting HTTP server on endpoint " + cfg.Endpoint)
var hln net.Listener
hln, err := r.cfg.HTTP.ToListener()
if err != nil {
return err
}
go func() {
if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil {
host.ReportFatalError(errHTTP)
}
}()
return nil
}

// Start runs the trace receiver on the gRPC server. Currently
// it also enables the metrics receiver too.
func (r *otlpReceiver) Start(_ context.Context, host component.Host) error {
Expand All @@ -94,32 +129,41 @@ func (r *otlpReceiver) Start(_ context.Context, host component.Host) error {
var err error
r.startServerOnce.Do(func() {
if r.cfg.GRPC != nil {
var gln net.Listener
gln, err = r.cfg.GRPC.ToListener()
err = r.startGRPCServer(r.cfg.GRPC, host)
if err != nil {
return
}
go func() {
if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil {
host.ReportFatalError(errGrpc)
if r.cfg.GRPC.NetAddr.Endpoint == defaultGRPCEndpoint {
r.logger.Info("Setting up a second GRPC listener on legacy endpoint " + legacyGRPCEndpoint)

// Copy the config.
cfgLegacyGRPC := r.cfg.GRPC
// And use the legacy endpoint.
cfgLegacyGRPC.NetAddr.Endpoint = legacyGRPCEndpoint
err = r.startGRPCServer(cfgLegacyGRPC, host)
if err != nil {
return
}
}()
}
}
if r.cfg.HTTP != nil {
r.serverHTTP = r.cfg.HTTP.ToServer(
r.gatewayMux,
confighttp.WithErrorHandler(errorHandler),
)
var hln net.Listener
hln, err = r.cfg.HTTP.ToListener()
err = r.startHTTPServer(r.cfg.HTTP, host)
if err != nil {
return
}
go func() {
if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil {
host.ReportFatalError(errHTTP)
if r.cfg.HTTP.Endpoint == defaultHTTPEndpoint {
r.logger.Info("Setting up a second HTTP listener on legacy endpoint " + legacyHTTPEndpoint)
cfgLegacyHTTP := r.cfg.HTTP
cfgLegacyHTTP.Endpoint = legacyHTTPEndpoint
err = r.startHTTPServer(cfgLegacyHTTP, host)
if err != nil {
return
}
}()
}
}
})
return err
Expand Down
5 changes: 3 additions & 2 deletions receiver/otlpreceiver/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/gogo/protobuf/jsonpb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -698,7 +699,7 @@ func TestGRPCInvalidTLSCredentials(t *testing.T) {
}

// TLS is resolved during Creation of the receiver for GRPC.
_, err := createReceiver(cfg)
_, err := createReceiver(cfg, zap.NewNop())
assert.EqualError(t, err,
`failed to load TLS config: for auth via TLS, either both certificate and key must be supplied, or neither`)
}
Expand Down Expand Up @@ -745,7 +746,7 @@ func newHTTPReceiver(t *testing.T, endpoint string, tc consumer.TracesConsumer,
}

func newReceiver(t *testing.T, factory component.ReceiverFactory, cfg *Config, tc consumer.TracesConsumer, mc consumer.MetricsConsumer) *otlpReceiver {
r, err := createReceiver(cfg)
r, err := createReceiver(cfg, zap.NewNop())
require.NoError(t, err)
if tc != nil {
params := component.ReceiverCreateParams{}
Expand Down

0 comments on commit dbe5792

Please sign in to comment.