diff --git a/exporters/otlp/README.md b/exporters/otlp/README.md new file mode 100644 index 00000000000..27cd9280d0c --- /dev/null +++ b/exporters/otlp/README.md @@ -0,0 +1,18 @@ +# OpenTelemetry Collector Go Exporter + +[![GoDoc][godoc-image]][godoc-url] + + +This exporter converts OpenTelemetry [SpanData](https://github.com/open-telemetry/opentelemetry-go/blob/6769330394f78192df01cb59299e9e0f2e5e977b/sdk/export/trace/trace.go#L49) +to OpenTelemetry Protocol [Span](https://github.com/open-telemetry/opentelemetry-proto/blob/c20698d5bb483cf05de1a7c0e134b7c57e359674/opentelemetry/proto/trace/v1/trace.proto#L46) +and exports them to OpenTelemetry Collector. + + +## Installation + +```bash +$ go get -u go.opentelemetry.io/otel/exporters/otlp +``` + +[godoc-url]: https://godoc.org/go.opentelemetry.io/otel/exporters/otlp + diff --git a/exporters/otlp/alignment_test.go b/exporters/otlp/alignment_test.go new file mode 100644 index 00000000000..7a9ccfac44f --- /dev/null +++ b/exporters/otlp/alignment_test.go @@ -0,0 +1,38 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlp + +import ( + "os" + "testing" + "unsafe" + + ottest "go.opentelemetry.io/otel/internal/testing" +) + +// Ensure struct alignment prior to running tests. +func TestMain(m *testing.M) { + fields := []ottest.FieldOffset{ + { + Name: "Exporter.lastConnectErrPtr", + Offset: unsafe.Offsetof(Exporter{}.lastConnectErrPtr), + }, + } + if !ottest.Aligned8Byte(fields, os.Stderr) { + os.Exit(1) + } + + os.Exit(m.Run()) +} diff --git a/exporters/otlp/connection.go b/exporters/otlp/connection.go new file mode 100644 index 00000000000..3e01f814888 --- /dev/null +++ b/exporters/otlp/connection.go @@ -0,0 +1,113 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlp + +import ( + "math/rand" + "sync/atomic" + "time" + "unsafe" +) + +func (e *Exporter) lastConnectError() error { + errPtr := (*error)(atomic.LoadPointer(&e.lastConnectErrPtr)) + if errPtr == nil { + return nil + } + return *errPtr +} + +func (e *Exporter) saveLastConnectError(err error) { + var errPtr *error + if err != nil { + errPtr = &err + } + atomic.StorePointer(&e.lastConnectErrPtr, unsafe.Pointer(errPtr)) +} + +func (e *Exporter) setStateDisconnected(err error) { + e.saveLastConnectError(err) + select { + case e.disconnectedCh <- true: + default: + } +} + +func (e *Exporter) setStateConnected() { + e.saveLastConnectError(nil) +} + +func (e *Exporter) connected() bool { + return e.lastConnectError() == nil +} + +const defaultConnReattemptPeriod = 10 * time.Second + +func (e *Exporter) indefiniteBackgroundConnection() { + defer func() { + e.backgroundConnectionDoneCh <- true + }() + + connReattemptPeriod := e.c.reconnectionPeriod + if connReattemptPeriod <= 0 { + connReattemptPeriod = defaultConnReattemptPeriod + } + + // No strong seeding required, nano time can + // already help with pseudo uniqueness. + rng := rand.New(rand.NewSource(time.Now().UnixNano() + rand.Int63n(1024))) + + // maxJitterNanos: 70% of the connectionReattemptPeriod + maxJitterNanos := int64(0.7 * float64(connReattemptPeriod)) + + for { + // Otherwise these will be the normal scenarios to enable + // reconnection if we trip out. + // 1. If we've stopped, return entirely + // 2. Otherwise block until we are disconnected, and + // then retry connecting + select { + case <-e.stopCh: + return + + case <-e.disconnectedCh: + // Normal scenario that we'll wait for + } + + if err := e.connect(); err == nil { + e.setStateConnected() + } else { + e.setStateDisconnected(err) + } + + // Apply some jitter to avoid lockstep retrials of other + // collector-exporters. Lockstep retrials could result in an + // innocent DDOS, by clogging the machine's resources and network. + jitter := time.Duration(rng.Int63n(maxJitterNanos)) + select { + case <-e.stopCh: + return + case <-time.After(connReattemptPeriod + jitter): + } + } +} + +func (e *Exporter) connect() error { + cc, err := e.dialToCollector() + if err != nil { + return err + } + return e.enableConnections(cc) +} diff --git a/exporters/otlp/doc.go b/exporters/otlp/doc.go new file mode 100644 index 00000000000..0445f087943 --- /dev/null +++ b/exporters/otlp/doc.go @@ -0,0 +1,16 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package otlp contains an OpenTelemetry tracing exporter for OpenTelemetry Collector. +package otlp // import "go.opentelemetry.io/otel/exporters/otlp" diff --git a/exporters/otlp/example_test.go b/exporters/otlp/example_test.go new file mode 100644 index 00000000000..9030428ba89 --- /dev/null +++ b/exporters/otlp/example_test.go @@ -0,0 +1,103 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlp_test + +import ( + "context" + "fmt" + "log" + "time" + + "google.golang.org/grpc/credentials" + + "go.opentelemetry.io/otel/api/global" + "go.opentelemetry.io/otel/exporters/otlp" + sdktrace "go.opentelemetry.io/otel/sdk/trace" +) + +func Example_insecure() { + exp, err := otlp.NewExporter(otlp.WithInsecure()) + if err != nil { + log.Fatalf("Failed to create the collector exporter: %v", err) + } + defer func() { + _ = exp.Stop() + }() + + tp, _ := sdktrace.NewProvider( + sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}), + sdktrace.WithBatcher(exp, // add following two options to ensure flush + sdktrace.WithScheduleDelayMillis(5), + sdktrace.WithMaxExportBatchSize(10), + )) + if err != nil { + log.Fatalf("error creating trace provider: %v\n", err) + } + + global.SetTraceProvider(tp) + + tracer := global.TraceProvider().Tracer("test-tracer") + + // Then use the OpenTelemetry tracing library, like we normally would. + ctx, span := tracer.Start(context.Background(), "CollectorExporter-Example") + defer span.End() + + for i := 0; i < 10; i++ { + _, iSpan := tracer.Start(ctx, fmt.Sprintf("Sample-%d", i)) + <-time.After(6 * time.Millisecond) + iSpan.End() + } +} + +func Example_withTLS() { + // Please take at look at https://godoc.org/google.golang.org/grpc/credentials#TransportCredentials + // for ways on how to initialize gRPC TransportCredentials. + creds, err := credentials.NewClientTLSFromFile("my-cert.pem", "") + if err != nil { + log.Fatalf("failed to create gRPC client TLS credentials: %v", err) + } + + exp, err := otlp.NewExporter(otlp.WithTLSCredentials(creds)) + if err != nil { + log.Fatalf("failed to create the collector exporter: %v", err) + } + defer func() { + _ = exp.Stop() + }() + + tp, err := sdktrace.NewProvider( + sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}), + sdktrace.WithBatcher(exp, // add following two options to ensure flush + sdktrace.WithScheduleDelayMillis(5), + sdktrace.WithMaxExportBatchSize(10), + )) + if err != nil { + log.Fatalf("error creating trace provider: %v\n", err) + } + + global.SetTraceProvider(tp) + + tracer := global.TraceProvider().Tracer("test-tracer") + + // Then use the OpenTelemetry tracing library, like we normally would. + ctx, span := tracer.Start(context.Background(), "Securely-Talking-To-Collector-Span") + defer span.End() + + for i := 0; i < 10; i++ { + _, iSpan := tracer.Start(ctx, fmt.Sprintf("Sample-%d", i)) + <-time.After(6 * time.Millisecond) + iSpan.End() + } +} diff --git a/exporters/otlp/go.mod b/exporters/otlp/go.mod new file mode 100644 index 00000000000..2285b61764f --- /dev/null +++ b/exporters/otlp/go.mod @@ -0,0 +1,17 @@ +module go.opentelemetry.io/otel/exporters/otlp + +replace go.opentelemetry.io/otel => ../.. + +require ( + github.com/golang/protobuf v1.3.2 + github.com/google/go-cmp v0.4.0 + github.com/open-telemetry/opentelemetry-proto v0.0.0-20200219184922-5e1d5bc66d5a + github.com/stretchr/testify v1.4.0 + go.opentelemetry.io/otel v0.2.3 + golang.org/x/net v0.0.0-20190628185345-da137c7871d7 // indirect + golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7 // indirect + golang.org/x/text v0.3.2 // indirect + google.golang.org/grpc v1.27.1 +) + +go 1.13 diff --git a/exporters/otlp/go.sum b/exporters/otlp/go.sum new file mode 100644 index 00000000000..25a6ec2f1ed --- /dev/null +++ b/exporters/otlp/go.sum @@ -0,0 +1,94 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/DataDog/sketches-go v0.0.0-20190923095040-43f19ad77ff7 h1:qELHH0AWCvf98Yf+CNIJx9vOZOfHFDDzgDRYsnNk/vs= +github.com/DataDog/sketches-go v0.0.0-20190923095040-43f19ad77ff7/go.mod h1:Q5DbzQ+3AkgGwymQO7aZFNP7ns2lZKGtvRBzRXfdi60= +github.com/benbjohnson/clock v1.0.0 h1:78Jk/r6m4wCi6sndMpty7A//t4dw/RW5fV4ZgDVfX1w= +github.com/benbjohnson/clock v1.0.0/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= +github.com/census-instrumentation/opencensus-proto v0.2.1 h1:glEXhBS5PSLLv4IXzLA5yPRVX4bilULVyxxbrfOtDAk= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/open-telemetry/opentelemetry-proto v0.0.0-20200219184922-5e1d5bc66d5a h1:Cw8YiI/nHPPiLUesxJCtZfOBMvWuUNqItCMNZpOMYro= +github.com/open-telemetry/opentelemetry-proto v0.0.0-20200219184922-5e1d5bc66d5a/go.mod h1:PMR5GI0F7BSpio+rBGFxNm6SLzg3FypDTcFuQZnO+F8= +github.com/opentracing/opentracing-go v1.1.1-0.20190913142402-a7454ce5950e/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190628185345-da137c7871d7 h1:rTIdg5QFRR7XCaK4LCjBiPbx8j4DQRpdYMnGn/bJUEU= +golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7 h1:LepdCS8Gf/MVejFIt8lsiexZATdoGVyp5bcyS+rYoUI= +golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2ElGhA4+qG2zF97qiUzTM+rQ0klBOcE= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20191009194640-548a555dbc03 h1:4HYDjxeNXAOTv3o1N2tjo8UUSlhQgAD52FVkwxnWgM8= +google.golang.org/genproto v0.0.0-20191009194640-548a555dbc03/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.27.1 h1:zvIju4sqAGvwKspUQOhwnpcqSbzi7/H6QomNNjTL4sk= +google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo= +gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/exporters/otlp/mock_collector_test.go b/exporters/otlp/mock_collector_test.go new file mode 100644 index 00000000000..b83704976e7 --- /dev/null +++ b/exporters/otlp/mock_collector_test.go @@ -0,0 +1,112 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlp_test + +import ( + "context" + "fmt" + "net" + "sync" + "testing" + "time" + + "google.golang.org/grpc" + + coltracepb "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/traces/v1" + tracepb "github.com/open-telemetry/opentelemetry-proto/gen/go/trace/v1" +) + +func makeMockCollector(t *testing.T) *mockCol { + return &mockCol{t: t, wg: new(sync.WaitGroup)} +} + +type mockCol struct { + t *testing.T + + spans []*tracepb.Span + mu sync.Mutex + wg *sync.WaitGroup + + address string + stopFunc func() error + stopOnce sync.Once +} + +var _ coltracepb.TraceServiceServer = (*mockCol)(nil) + +func (mc *mockCol) Export(ctx context.Context, exp *coltracepb.ExportTraceServiceRequest) (*coltracepb.ExportTraceServiceResponse, error) { + resourceSpans := exp.GetResourceSpans() + // TODO (rghetia): handle Resources + for _, rs := range resourceSpans { + mc.spans = append(mc.spans, rs.Spans...) + } + return &coltracepb.ExportTraceServiceResponse{}, nil +} + +var errAlreadyStopped = fmt.Errorf("already stopped") + +func (mc *mockCol) stop() error { + var err = errAlreadyStopped + mc.stopOnce.Do(func() { + if mc.stopFunc != nil { + err = mc.stopFunc() + } + }) + // Give it sometime to shutdown. + <-time.After(160 * time.Millisecond) + mc.mu.Lock() + mc.wg.Wait() + mc.mu.Unlock() + return err +} + +// runMockCol is a helper function to create a mockCol +func runMockCol(t *testing.T) *mockCol { + return runMockColAtAddr(t, "localhost:0") +} + +func runMockColAtAddr(t *testing.T, addr string) *mockCol { + ln, err := net.Listen("tcp", addr) + if err != nil { + t.Fatalf("Failed to get an address: %v", err) + } + + srv := grpc.NewServer() + mc := makeMockCollector(t) + coltracepb.RegisterTraceServiceServer(srv, mc) + go func() { + _ = srv.Serve(ln) + }() + + deferFunc := func() error { + srv.Stop() + return ln.Close() + } + + _, collectorPortStr, _ := net.SplitHostPort(ln.Addr().String()) + + mc.address = "localhost:" + collectorPortStr + mc.stopFunc = deferFunc + + return mc +} + +func (mc *mockCol) getSpans() []*tracepb.Span { + mc.mu.Lock() + spans := append([]*tracepb.Span{}, mc.spans...) + mc.mu.Unlock() + + return spans +} diff --git a/exporters/otlp/options.go b/exporters/otlp/options.go new file mode 100644 index 00000000000..ede4a8f7f9a --- /dev/null +++ b/exporters/otlp/options.go @@ -0,0 +1,104 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlp + +import ( + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" +) + +const ( + DefaultCollectorPort uint16 = 55678 + DefaultCollectorHost string = "localhost" +) + +type ExporterOption func(*Config) + +type Config struct { + canDialInsecure bool + collectorAddr string + compressor string + reconnectionPeriod time.Duration + grpcDialOptions []grpc.DialOption + headers map[string]string + clientCredentials credentials.TransportCredentials +} + +// WithInsecure disables client transport security for the exporter's gRPC connection +// just like grpc.WithInsecure() https://godoc.org/google.golang.org/grpc#WithInsecure +// does. Note, by default, client security is required unless WithInsecure is used. +func WithInsecure() ExporterOption { + return func(cfg *Config) { + cfg.canDialInsecure = true + } +} + +// WithAddress allows one to set the address that the exporter will +// connect to the collector on. If unset, it will instead try to use +// connect to DefaultCollectorHost:DefaultCollectorPort. +func WithAddress(addr string) ExporterOption { + return func(cfg *Config) { + cfg.collectorAddr = addr + } +} + +// WithReconnectionPeriod allows one to set the delay between next connection attempt +// after failing to connect with the collector. +func WithReconnectionPeriod(rp time.Duration) ExporterOption { + return func(cfg *Config) { + cfg.reconnectionPeriod = rp + } +} + +// WithCompressor will set the compressor for the gRPC client to use when sending requests. +// It is the responsibility of the caller to ensure that the compressor set has been registered +// with google.golang.org/grpc/encoding. This can be done by encoding.RegisterCompressor. Some +// compressors auto-register on import, such as gzip, which can be registered by calling +// `import _ "google.golang.org/grpc/encoding/gzip"` +func WithCompressor(compressor string) ExporterOption { + return func(cfg *Config) { + cfg.compressor = compressor + } +} + +// WithHeaders will send the provided headers when the gRPC stream connection +// is instantiated. +func WithHeaders(headers map[string]string) ExporterOption { + return func(cfg *Config) { + cfg.headers = headers + } +} + +// WithTLSCredentials allows the connection to use TLS credentials +// when talking to the server. It takes in grpc.TransportCredentials instead +// of say a Certificate file or a tls.Certificate, because the retrieving +// these credentials can be done in many ways e.g. plain file, in code tls.Config +// or by certificate rotation, so it is up to the caller to decide what to use. +func WithTLSCredentials(creds credentials.TransportCredentials) ExporterOption { + return func(cfg *Config) { + cfg.clientCredentials = creds + } +} + +// WithGRPCDialOption opens support to any grpc.DialOption to be used. If it conflicts +// with some other configuration the GRPC specified via the collector the ones here will +// take preference since they are set last. +func WithGRPCDialOption(opts ...grpc.DialOption) ExporterOption { + return func(cfg *Config) { + cfg.grpcDialOptions = opts + } +} diff --git a/exporters/otlp/otlp.go b/exporters/otlp/otlp.go new file mode 100644 index 00000000000..d13fc5b2e45 --- /dev/null +++ b/exporters/otlp/otlp.go @@ -0,0 +1,249 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// code in this package is mostly copied from contrib.go.opencensus.io/exporter/ocagent/connection.go +package otlp + +import ( + "context" + "errors" + "fmt" + "sync" + "unsafe" + + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + + coltracepb "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/traces/v1" + tracepb "github.com/open-telemetry/opentelemetry-proto/gen/go/trace/v1" + + export "go.opentelemetry.io/otel/sdk/export/trace" +) + +type Exporter struct { + // mu protects the non-atomic and non-channel variables + mu sync.RWMutex + // senderMu protects the concurrent unsafe send on traceExporter client + senderMu sync.Mutex + started bool + traceExporter coltracepb.TraceServiceClient + grpcClientConn *grpc.ClientConn + lastConnectErrPtr unsafe.Pointer + + startOnce sync.Once + stopCh chan bool + disconnectedCh chan bool + + backgroundConnectionDoneCh chan bool + + c Config +} + +var _ export.SpanBatcher = (*Exporter)(nil) + +func configureOptions(cfg *Config, opts ...ExporterOption) { + for _, opt := range opts { + opt(cfg) + } +} + +func NewExporter(opts ...ExporterOption) (*Exporter, error) { + exp := NewUnstartedExporter(opts...) + if err := exp.Start(); err != nil { + return nil, err + } + return exp, nil +} + +func NewUnstartedExporter(opts ...ExporterOption) *Exporter { + e := new(Exporter) + e.c = Config{} + configureOptions(&e.c, opts...) + + // TODO (rghetia): add resources + + return e +} + +var ( + errAlreadyStarted = errors.New("already started") + errNotStarted = errors.New("not started") +) + +// Start dials to the collector, establishing a connection to it. It also +// initiates the Config and Trace services by sending over the initial +// messages that consist of the node identifier. Start invokes a background +// connector that will reattempt connections to the collector periodically +// if the connection dies. +func (e *Exporter) Start() error { + var err = errAlreadyStarted + e.startOnce.Do(func() { + e.mu.Lock() + e.started = true + e.disconnectedCh = make(chan bool, 1) + e.stopCh = make(chan bool) + e.backgroundConnectionDoneCh = make(chan bool) + e.mu.Unlock() + + // An optimistic first connection attempt to ensure that + // applications under heavy load can immediately process + // data. See https://github.com/census-ecosystem/opencensus-go-exporter-ocagent/pull/63 + if err := e.connect(); err == nil { + e.setStateConnected() + } else { + e.setStateDisconnected(err) + } + go e.indefiniteBackgroundConnection() + + err = nil + }) + + return err +} + +func (e *Exporter) prepareCollectorAddress() string { + if e.c.collectorAddr != "" { + return e.c.collectorAddr + } + return fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorPort) +} + +func (e *Exporter) enableConnections(cc *grpc.ClientConn) error { + e.mu.RLock() + started := e.started + e.mu.RUnlock() + + if !started { + return errNotStarted + } + + e.mu.Lock() + // If previous clientConn is same as the current then just return. + // This doesn't happen right now as this func is only called with new ClientConn. + // It is more about future-proofing. + if e.grpcClientConn == cc { + e.mu.Unlock() + return nil + } + // If the previous clientConn was non-nil, close it + if e.grpcClientConn != nil { + _ = e.grpcClientConn.Close() + } + e.grpcClientConn = cc + e.traceExporter = coltracepb.NewTraceServiceClient(cc) + e.mu.Unlock() + + return nil +} + +func (e *Exporter) dialToCollector() (*grpc.ClientConn, error) { + addr := e.prepareCollectorAddress() + var dialOpts []grpc.DialOption + if e.c.clientCredentials != nil { + dialOpts = append(dialOpts, grpc.WithTransportCredentials(e.c.clientCredentials)) + } else if e.c.canDialInsecure { + dialOpts = append(dialOpts, grpc.WithInsecure()) + } + if e.c.compressor != "" { + dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(e.c.compressor))) + } + if len(e.c.grpcDialOptions) != 0 { + dialOpts = append(dialOpts, e.c.grpcDialOptions...) + } + + ctx := context.Background() + if len(e.c.headers) > 0 { + ctx = metadata.NewOutgoingContext(ctx, metadata.New(e.c.headers)) + } + return grpc.DialContext(ctx, addr, dialOpts...) +} + +// Stop shuts down all the connections and resources +// related to the exporter. +// If the exporter is not started then this func does nothing. +func (e *Exporter) Stop() error { + e.mu.RLock() + cc := e.grpcClientConn + started := e.started + e.mu.RUnlock() + + if !started { + return nil + } + + // Now close the underlying gRPC connection. + var err error + if cc != nil { + err = cc.Close() + } + + // At this point we can change the state variable started + e.mu.Lock() + e.started = false + e.mu.Unlock() + close(e.stopCh) + + // Ensure that the backgroundConnector returns + <-e.backgroundConnectionDoneCh + + return err +} + +func (e *Exporter) ExportSpans(ctx context.Context, sds []*export.SpanData) { + e.uploadTraces(ctx, sds) +} + +func otSpanDataToPbSpans(sdl []*export.SpanData) []*tracepb.ResourceSpans { + if len(sdl) == 0 { + return nil + } + protoSpans := make([]*tracepb.Span, 0, len(sdl)) + for _, sd := range sdl { + if sd != nil { + protoSpans = append(protoSpans, otSpanToProtoSpan(sd)) + } + } + return []*tracepb.ResourceSpans{ + { + Resource: nil, + Spans: protoSpans, + }, + } +} + +func (e *Exporter) uploadTraces(ctx context.Context, sdl []*export.SpanData) { + select { + case <-e.stopCh: + return + + default: + if !e.connected() { + return + } + + protoSpans := otSpanDataToPbSpans(sdl) + if len(protoSpans) == 0 { + return + } + + e.senderMu.Lock() + _, err := e.traceExporter.Export(ctx, &coltracepb.ExportTraceServiceRequest{ + ResourceSpans: protoSpans, + }) + e.senderMu.Unlock() + if err != nil { + e.setStateDisconnected(err) + } + } +} diff --git a/exporters/otlp/otlp_test.go b/exporters/otlp/otlp_test.go new file mode 100644 index 00000000000..9d725e442fd --- /dev/null +++ b/exporters/otlp/otlp_test.go @@ -0,0 +1,248 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlp_test + +import ( + "context" + "fmt" + "net" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/otel/api/core" + "go.opentelemetry.io/otel/exporters/otlp" + export "go.opentelemetry.io/otel/sdk/export/trace" + sdktrace "go.opentelemetry.io/otel/sdk/trace" +) + +func TestNewExporter_endToEnd(t *testing.T) { + tests := []struct { + name string + additionalOpts []otlp.ExporterOption + }{ + { + name: "StandardExporter", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + newExporterEndToEndTest(t, test.additionalOpts) + }) + } +} + +func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption) { + mc := runMockColAtAddr(t, "localhost:56561") + + defer func() { + _ = mc.stop() + }() + + <-time.After(5 * time.Millisecond) + + opts := []otlp.ExporterOption{ + otlp.WithInsecure(), + otlp.WithAddress(mc.address), + otlp.WithReconnectionPeriod(50 * time.Millisecond), + } + + opts = append(opts, additionalOpts...) + exp, err := otlp.NewExporter(opts...) + if err != nil { + t.Fatalf("failed to create a new collector exporter: %v", err) + } + defer func() { + _ = exp.Stop() + }() + + tp, err := sdktrace.NewProvider( + sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}), + sdktrace.WithBatcher(exp, // add following two options to ensure flush + sdktrace.WithScheduleDelayMillis(15), + sdktrace.WithMaxExportBatchSize(10), + )) + assert.NoError(t, err) + + //global.SetTraceProvider(tp) + + tr := tp.Tracer("test-tracer") + // Now create few spans + m := 4 + for i := 0; i < m; i++ { + _, span := tr.Start(context.Background(), "AlwaysSample") + span.SetAttributes(core.Key("i").Int64(int64(i))) + span.End() + } + + <-time.After(40 * time.Millisecond) + + // Now shutdown the exporter + if err := exp.Stop(); err != nil { + t.Fatalf("failed to stop the exporter: %v", err) + } + + // Shutdown the collector too so that we can begin + // verification checks of expected data back. + _ = mc.stop() + + spans := mc.getSpans() + + // Now verify that we received all spans. + if got, want := len(spans), m; got != want { + t.Fatalf("span counts: got %d, want %d", got, want) + } + for i := 0; i < 4; i++ { + if gotName, want := spans[i].Name, "AlwaysSample"; gotName != want { + t.Fatalf("span name: got %s, want %s", gotName, want) + } + if got, want := spans[i].Attributes[0].IntValue, int64(i); got != want { + t.Fatalf("span attribute value: got %d, want %d", got, want) + } + } +} + +func TestNewExporter_invokeStartThenStopManyTimes(t *testing.T) { + mc := runMockCol(t) + defer func() { + _ = mc.stop() + }() + + exp, err := otlp.NewExporter(otlp.WithInsecure(), + otlp.WithReconnectionPeriod(50*time.Millisecond), + otlp.WithAddress(mc.address)) + if err != nil { + t.Fatalf("error creating exporter: %v", err) + } + defer func() { + _ = exp.Stop() + }() + + // Invoke Start numerous times, should return errAlreadyStarted + for i := 0; i < 10; i++ { + if err := exp.Start(); err == nil || !strings.Contains(err.Error(), "already started") { + t.Fatalf("#%d unexpected Start error: %v", i, err) + } + } + + _ = exp.Stop() + // Invoke Stop numerous times + for i := 0; i < 10; i++ { + if err := exp.Stop(); err != nil { + t.Fatalf(`#%d got error (%v) expected none`, i, err) + } + } +} + +func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) { + mc := runMockCol(t) + + reconnectionPeriod := 20 * time.Millisecond + exp, err := otlp.NewExporter(otlp.WithInsecure(), + otlp.WithAddress(mc.address), + otlp.WithReconnectionPeriod(reconnectionPeriod)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer func() { + _ = exp.Stop() + }() + + // We'll now stop the collector right away to simulate a connection + // dying in the midst of communication or even not existing before. + _ = mc.stop() + + // In the test below, we'll stop the collector many times, + // while exporting traces and test to ensure that we can + // reconnect. + for j := 0; j < 3; j++ { + + exp.ExportSpans(context.Background(), []*export.SpanData{{Name: "in the midst"}}) + + // Now resurrect the collector by making a new one but reusing the + // old address, and the collector should reconnect automatically. + nmc := runMockColAtAddr(t, mc.address) + + // Give the exporter sometime to reconnect + <-time.After(reconnectionPeriod * 4) + + n := 10 + for i := 0; i < n; i++ { + exp.ExportSpans(context.Background(), []*export.SpanData{{Name: "Resurrected"}}) + } + + nmaSpans := nmc.getSpans() + // Expecting 10 spanData that were sampled, given that + if g, w := len(nmaSpans), n; g != w { + t.Fatalf("Round #%d: Connected collector: spans: got %d want %d", j, g, w) + } + + dSpans := mc.getSpans() + // Expecting 0 spans to have been received by the original but now dead collector + if g, w := len(dSpans), 0; g != w { + t.Fatalf("Round #%d: Disconnected collector: spans: got %d want %d", j, g, w) + } + _ = nmc.stop() + } +} + +// This test takes a long time to run: to skip it, run tests using: -short +func TestNewExporter_collectorOnBadConnection(t *testing.T) { + if testing.Short() { + t.Skipf("Skipping this long running test") + } + + ln, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Failed to grab an available port: %v", err) + } + // Firstly close the "collector's" channel: optimistically this address won't get reused ASAP + // However, our goal of closing it is to simulate an unavailable connection + _ = ln.Close() + + _, collectorPortStr, _ := net.SplitHostPort(ln.Addr().String()) + + address := fmt.Sprintf("localhost:%s", collectorPortStr) + exp, err := otlp.NewExporter(otlp.WithInsecure(), + otlp.WithReconnectionPeriod(50*time.Millisecond), + otlp.WithAddress(address)) + if err != nil { + t.Fatalf("Despite an indefinite background reconnection, got error: %v", err) + } + _ = exp.Stop() +} + +func TestNewExporter_withAddress(t *testing.T) { + mc := runMockCol(t) + defer func() { + _ = mc.stop() + }() + + exp := otlp.NewUnstartedExporter( + otlp.WithInsecure(), + otlp.WithReconnectionPeriod(50*time.Millisecond), + otlp.WithAddress(mc.address)) + + defer func() { + _ = exp.Stop() + }() + + if err := exp.Start(); err != nil { + t.Fatalf("Unexpected Start error: %v", err) + } +} diff --git a/exporters/otlp/transform_spans.go b/exporters/otlp/transform_spans.go new file mode 100644 index 00000000000..4452b56a79b --- /dev/null +++ b/exporters/otlp/transform_spans.go @@ -0,0 +1,170 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlp + +import ( + "google.golang.org/grpc/codes" + + commonpb "github.com/open-telemetry/opentelemetry-proto/gen/go/common/v1" + tracepb "github.com/open-telemetry/opentelemetry-proto/gen/go/trace/v1" + + "go.opentelemetry.io/otel/api/core" + apitrace "go.opentelemetry.io/otel/api/trace" + export "go.opentelemetry.io/otel/sdk/export/trace" +) + +const ( + maxMessageEventsPerSpan = 128 +) + +func otSpanToProtoSpan(sd *export.SpanData) *tracepb.Span { + if sd == nil { + return nil + } + return &tracepb.Span{ + TraceId: sd.SpanContext.TraceID[:], + SpanId: sd.SpanContext.SpanID[:], + ParentSpanId: sd.ParentSpanID[:], + Status: otStatusToProtoStatus(sd.Status), + StartTimeUnixnano: uint64(sd.StartTime.Nanosecond()), + EndTimeUnixnano: uint64(sd.EndTime.Nanosecond()), + Links: otLinksToProtoLinks(sd.Links), + Kind: otSpanKindToProtoSpanKind(sd.SpanKind), + Name: sd.Name, + Attributes: otAttributesToProtoAttributes(sd.Attributes), + Events: otTimeEventsToProtoTimeEvents(sd.MessageEvents), + // TODO (rghetia): Add Tracestate: when supported. + DroppedAttributesCount: uint32(sd.DroppedAttributeCount), + DroppedEventsCount: uint32(sd.DroppedMessageEventCount), + DroppedLinksCount: uint32(sd.DroppedLinkCount), + } +} + +func otStatusToProtoStatus(status codes.Code) *tracepb.Status { + return &tracepb.Status{ + Code: tracepb.Status_StatusCode(status), + // TODO (rghetia) : Add Status Message: when supported. + } +} + +func otLinksToProtoLinks(links []apitrace.Link) []*tracepb.Span_Link { + if len(links) == 0 { + return nil + } + + sl := make([]*tracepb.Span_Link, 0, len(links)) + for _, otLink := range links { + // This redefinition is necessary to prevent otLink.*ID[:] copies + // being reused -- in short we need a new otLink per iteration. + otLink := otLink + + sl = append(sl, &tracepb.Span_Link{ + TraceId: otLink.TraceID[:], + SpanId: otLink.SpanID[:], + Attributes: otAttributesToProtoAttributes(otLink.Attributes), + }) + } + return sl +} + +func otAttributesToProtoAttributes(attrs []core.KeyValue) []*commonpb.AttributeKeyValue { + if len(attrs) == 0 { + return nil + } + out := make([]*commonpb.AttributeKeyValue, 0, len(attrs)) + for _, v := range attrs { + switch v.Value.Type() { + case core.BOOL: + out = append(out, &commonpb.AttributeKeyValue{ + Key: string(v.Key), + Type: commonpb.AttributeKeyValue_BOOL, + BoolValue: v.Value.AsBool(), + }) + case core.INT64, core.INT32, core.UINT32, core.UINT64: + out = append(out, &commonpb.AttributeKeyValue{ + Key: string(v.Key), + Type: commonpb.AttributeKeyValue_INT, + IntValue: v.Value.AsInt64(), + }) + case core.FLOAT32: + f32 := v.Value.AsFloat32() + out = append(out, &commonpb.AttributeKeyValue{ + Key: string(v.Key), + Type: commonpb.AttributeKeyValue_DOUBLE, + DoubleValue: float64(f32), + }) + case core.FLOAT64: + out = append(out, &commonpb.AttributeKeyValue{ + Key: string(v.Key), + Type: commonpb.AttributeKeyValue_DOUBLE, + DoubleValue: v.Value.AsFloat64(), + }) + case core.STRING: + out = append(out, &commonpb.AttributeKeyValue{ + Key: string(v.Key), + Type: commonpb.AttributeKeyValue_STRING, + StringValue: v.Value.AsString(), + }) + } + } + return out +} + +func otTimeEventsToProtoTimeEvents(es []export.Event) []*tracepb.Span_Event { + if len(es) == 0 { + return nil + } + + evCount := len(es) + if evCount > maxMessageEventsPerSpan { + evCount = maxMessageEventsPerSpan + } + events := make([]*tracepb.Span_Event, 0, evCount) + messageEvents := 0 + + // Transform message events + for _, e := range es { + if messageEvents >= maxMessageEventsPerSpan { + break + } + messageEvents++ + events = append(events, + &tracepb.Span_Event{ + TimeUnixnano: uint64(e.Time.Nanosecond()), + Attributes: otAttributesToProtoAttributes(e.Attributes), + // TODO (rghetia) : Add Drop Counts when supported. + }, + ) + } + + return events +} + +func otSpanKindToProtoSpanKind(kind apitrace.SpanKind) tracepb.Span_SpanKind { + switch kind { + case apitrace.SpanKindInternal: + return tracepb.Span_INTERNAL + case apitrace.SpanKindClient: + return tracepb.Span_CLIENT + case apitrace.SpanKindServer: + return tracepb.Span_SERVER + case apitrace.SpanKindProducer: + return tracepb.Span_PRODUCER + case apitrace.SpanKindConsumer: + return tracepb.Span_CONSUMER + default: + return tracepb.Span_SPAN_KIND_UNSPECIFIED + } +} diff --git a/exporters/otlp/transform_spans_test.go b/exporters/otlp/transform_spans_test.go new file mode 100644 index 00000000000..8722039678a --- /dev/null +++ b/exporters/otlp/transform_spans_test.go @@ -0,0 +1,406 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlp_test + +import ( + "context" + "testing" + "time" + + "github.com/golang/protobuf/proto" + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/codes" + + commonpb "github.com/open-telemetry/opentelemetry-proto/gen/go/common/v1" + tracepb "github.com/open-telemetry/opentelemetry-proto/gen/go/trace/v1" + + "go.opentelemetry.io/otel/api/core" + apitrace "go.opentelemetry.io/otel/api/trace" + "go.opentelemetry.io/otel/exporters/otlp" + export "go.opentelemetry.io/otel/sdk/export/trace" +) + +type testCases struct { + otSpan *export.SpanData + otlpSpan *tracepb.Span +} + +func TestOtSpanToOtlpSpan_Basic(t *testing.T) { + // The goal of this test is to ensure that each + // spanData is transformed and exported correctly! + testAndVerify("Basic End-2-End", t, func(t *testing.T) []testCases { + + startTime := time.Now() + endTime := startTime.Add(10 * time.Second) + + tcs := []testCases{ + { + otSpan: &export.SpanData{ + SpanContext: core.SpanContext{ + TraceID: core.TraceID{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}, + SpanID: core.SpanID{0xFF, 0xFE, 0xFD, 0xFC, 0xFB, 0xFA, 0xF9, 0xF8}, + }, + SpanKind: apitrace.SpanKindServer, + ParentSpanID: core.SpanID{0xEF, 0xEE, 0xED, 0xEC, 0xEB, 0xEA, 0xE9, 0xE8}, + Name: "End-To-End Here", + StartTime: startTime, + EndTime: endTime, + MessageEvents: []export.Event{ + {Time: startTime, + Attributes: []core.KeyValue{ + core.Key("CompressedByteSize").Uint64(512), + }, + }, + {Time: endTime, + Attributes: []core.KeyValue{ + core.Key("MessageEventType").String("Recv"), + }, + }, + }, + Links: []apitrace.Link{ + { + SpanContext: core.SpanContext{ + TraceID: core.TraceID{0xC0, 0xC1, 0xC2, 0xC3, 0xC4, 0xC5, 0xC6, 0xC7, 0xC8, 0xC9, 0xCA, 0xCB, 0xCC, 0xCD, 0xCE, 0xCF}, + SpanID: core.SpanID{0xB0, 0xB1, 0xB2, 0xB3, 0xB4, 0xB5, 0xB6, 0xB7}, + TraceFlags: 0, + }, + Attributes: []core.KeyValue{ + core.Key("LinkType").String("Parent"), + }, + }, + { + SpanContext: core.SpanContext{ + TraceID: core.TraceID{0xE0, 0xE1, 0xE2, 0xE3, 0xE4, 0xE5, 0xE6, 0xE7, 0xE8, 0xE9, 0xEA, 0xEB, 0xEC, 0xED, 0xEE, 0xEF}, + SpanID: core.SpanID{0xD0, 0xD1, 0xD2, 0xD3, 0xD4, 0xD5, 0xD6, 0xD7}, + TraceFlags: 0, + }, + Attributes: []core.KeyValue{ + core.Key("LinkType").String("Child"), + }, + }, + }, + Status: codes.Internal, + HasRemoteParent: true, + Attributes: []core.KeyValue{ + core.Key("timeout_ns").Int64(12e9), + }, + DroppedAttributeCount: 1, + DroppedMessageEventCount: 2, + DroppedLinkCount: 3, + }, + otlpSpan: &tracepb.Span{ + TraceId: []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}, + SpanId: []byte{0xFF, 0xFE, 0xFD, 0xFC, 0xFB, 0xFA, 0xF9, 0xF8}, + ParentSpanId: []byte{0xEF, 0xEE, 0xED, 0xEC, 0xEB, 0xEA, 0xE9, 0xE8}, + Name: "End-To-End Here", + Kind: tracepb.Span_SERVER, + StartTimeUnixnano: uint64(startTime.Nanosecond()), + EndTimeUnixnano: uint64(endTime.Nanosecond()), + Status: &tracepb.Status{ + Code: 13, + }, + Events: []*tracepb.Span_Event{ + { + TimeUnixnano: uint64(startTime.Nanosecond()), + Attributes: []*commonpb.AttributeKeyValue{ + { + Key: "CompressedByteSize", + Type: commonpb.AttributeKeyValue_INT, + StringValue: "", + IntValue: 512, + DoubleValue: 0, + BoolValue: false, + }, + }, + }, + { + TimeUnixnano: uint64(endTime.Nanosecond()), + Attributes: []*commonpb.AttributeKeyValue{ + { + Key: "MessageEventType", + Type: commonpb.AttributeKeyValue_STRING, + StringValue: "Recv", + IntValue: 0, + DoubleValue: 0, + BoolValue: false, + }, + }, + }, + }, + Links: []*tracepb.Span_Link{ + { + TraceId: []byte{0xC0, 0xC1, 0xC2, 0xC3, 0xC4, 0xC5, 0xC6, 0xC7, 0xC8, 0xC9, 0xCA, 0xCB, 0xCC, 0xCD, 0xCE, 0xCF}, + SpanId: []byte{0xB0, 0xB1, 0xB2, 0xB3, 0xB4, 0xB5, 0xB6, 0xB7}, + Attributes: []*commonpb.AttributeKeyValue{ + { + Key: "LinkType", + Type: commonpb.AttributeKeyValue_STRING, + StringValue: "Parent", + IntValue: 0, + DoubleValue: 0, + BoolValue: false, + }, + }, + }, + { + TraceId: []byte{0xE0, 0xE1, 0xE2, 0xE3, 0xE4, 0xE5, 0xE6, 0xE7, 0xE8, 0xE9, 0xEA, 0xEB, 0xEC, 0xED, 0xEE, 0xEF}, + SpanId: []byte{0xD0, 0xD1, 0xD2, 0xD3, 0xD4, 0xD5, 0xD6, 0xD7}, + Attributes: []*commonpb.AttributeKeyValue{ + { + Key: "LinkType", + Type: commonpb.AttributeKeyValue_STRING, + StringValue: "Child", + IntValue: 0, + DoubleValue: 0, + BoolValue: false, + }, + }, + }, + }, + Attributes: []*commonpb.AttributeKeyValue{ + { + Key: "timeout_ns", + Type: commonpb.AttributeKeyValue_INT, + StringValue: "", + IntValue: 12e9, + DoubleValue: 0, + BoolValue: false, + }, + }, + DroppedAttributesCount: 1, + DroppedEventsCount: 2, + DroppedLinksCount: 3, + }, + }, + } + return tcs + }) +} + +func TestOtSpanToOtlpSpan_SpanKind(t *testing.T) { + testAndVerify("Test SpanKind", t, func(t *testing.T) []testCases { + kinds := []struct { + in apitrace.SpanKind + out tracepb.Span_SpanKind + }{ + { + in: apitrace.SpanKindClient, + out: tracepb.Span_CLIENT, + }, + { + in: apitrace.SpanKindServer, + out: tracepb.Span_SERVER, + }, + { + in: apitrace.SpanKindProducer, + out: tracepb.Span_PRODUCER, + }, + { + in: apitrace.SpanKindConsumer, + out: tracepb.Span_CONSUMER, + }, + { + in: apitrace.SpanKindInternal, + out: tracepb.Span_INTERNAL, + }, + { + in: apitrace.SpanKindUnspecified, + out: tracepb.Span_SPAN_KIND_UNSPECIFIED, + }, + } + + tcs := make([]testCases, 0, len(kinds)) + for _, kind := range kinds { + otSpan, otlpSpan := getSpan() + otSpan.SpanKind = kind.in + otlpSpan.Kind = kind.out + tc := testCases{ + otSpan: otSpan, + otlpSpan: otlpSpan, + } + tcs = append(tcs, tc) + } + return tcs + }) +} + +func TestOtSpanToOtlpSpan_Attribute(t *testing.T) { + testAndVerify("Test SpanAttribute", t, func(t *testing.T) []testCases { + attrInt := &commonpb.AttributeKeyValue{ + Key: "commonInt", + Type: commonpb.AttributeKeyValue_INT, + IntValue: 25, + } + attrInt64 := &commonpb.AttributeKeyValue{ + Key: "commonInt64", + Type: commonpb.AttributeKeyValue_INT, + IntValue: 12e9, + } + kinds := []struct { + in core.KeyValue + out *commonpb.AttributeKeyValue + }{ + { + in: core.Key("commonInt").Int(25), + out: attrInt, + }, + { + in: core.Key("commonInt").Uint(25), + out: attrInt, + }, + { + in: core.Key("commonInt").Int32(25), + out: attrInt, + }, + { + in: core.Key("commonInt").Uint32(25), + out: attrInt, + }, + { + in: core.Key("commonInt64").Int64(12e9), + out: attrInt64, + }, + { + in: core.Key("commonInt64").Uint64(12e9), + out: attrInt64, + }, + { + in: core.Key("float32").Float32(3.598549), + out: &commonpb.AttributeKeyValue{ + + Key: "float32", + Type: commonpb.AttributeKeyValue_DOUBLE, + DoubleValue: 3.5985488891601562, + }, + }, + { + in: core.Key("float64").Float64(14.598549), + out: &commonpb.AttributeKeyValue{ + + Key: "float64", + Type: commonpb.AttributeKeyValue_DOUBLE, + DoubleValue: 14.598549, + }, + }, + { + in: core.Key("string").String("string"), + out: &commonpb.AttributeKeyValue{ + + Key: "string", + Type: commonpb.AttributeKeyValue_STRING, + StringValue: "string", + }, + }, + { + in: core.Key("bool").Bool(true), + out: &commonpb.AttributeKeyValue{ + + Key: "bool", + Type: commonpb.AttributeKeyValue_BOOL, + BoolValue: true, + }, + }, + } + + tcs := make([]testCases, 0, len(kinds)) + for _, kind := range kinds { + otSpan, otlpSpan := getSpan() + otSpan.Attributes = []core.KeyValue{kind.in} + otlpSpan.Attributes = []*commonpb.AttributeKeyValue{kind.out} + tc := testCases{ + otSpan: otSpan, + otlpSpan: otlpSpan, + } + tcs = append(tcs, tc) + } + return tcs + }) +} + +func getSpan() (*export.SpanData, *tracepb.Span) { + startTime := time.Now() + endTime := startTime.Add(10 * time.Second) + + otSpan := &export.SpanData{ + SpanContext: core.SpanContext{ + TraceID: core.TraceID{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}, + SpanID: core.SpanID{0xFF, 0xFE, 0xFD, 0xFC, 0xFB, 0xFA, 0xF9, 0xF8}, + }, + SpanKind: apitrace.SpanKindServer, + ParentSpanID: core.SpanID{0xEF, 0xEE, 0xED, 0xEC, 0xEB, 0xEA, 0xE9, 0xE8}, + Name: "Test Span", + StartTime: startTime, + EndTime: endTime, + } + otlpSpan := &tracepb.Span{ + TraceId: []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}, + SpanId: []byte{0xFF, 0xFE, 0xFD, 0xFC, 0xFB, 0xFA, 0xF9, 0xF8}, + ParentSpanId: []byte{0xEF, 0xEE, 0xED, 0xEC, 0xEB, 0xEA, 0xE9, 0xE8}, + Name: "Test Span", + Kind: tracepb.Span_SERVER, + StartTimeUnixnano: uint64(startTime.Nanosecond()), + EndTimeUnixnano: uint64(endTime.Nanosecond()), + Status: &tracepb.Status{ + Code: 0, + }, + } + + return otSpan, otlpSpan +} + +func testAndVerify(name string, t *testing.T, f func(t *testing.T) []testCases) { + // The goal of this test is to ensure that each + // spanData is transformed and exported correctly! + + collector := runMockCol(t) + defer func() { + _ = collector.stop() + }() + + exp, err := otlp.NewExporter(otlp.WithInsecure(), + otlp.WithAddress(collector.address), + otlp.WithReconnectionPeriod(50*time.Millisecond)) + if err != nil { + t.Fatalf("Failed to create a new collector exporter: %v", err) + } + defer func() { + _ = exp.Stop() + }() + + // Give the background collector connection sometime to setup. + <-time.After(20 * time.Millisecond) + + tcs := f(t) + + for _, tc := range tcs { + exp.ExportSpans(context.Background(), []*export.SpanData{tc.otSpan}) + } + + _ = exp.Stop() + _ = collector.stop() + + spans := collector.getSpans() + gotCount := len(spans) + wantCount := len(tcs) + if gotCount != wantCount { + t.Fatalf("%s: got %d spans, want %d spans", name, gotCount, wantCount) + } + for i, tc := range tcs { + exp.ExportSpans(context.Background(), []*export.SpanData{tc.otSpan}) + if diff := cmp.Diff(spans[i], tc.otlpSpan, cmp.Comparer(proto.Equal)); diff != "" { + t.Fatalf("%s transformed span differs %v\n", name, diff) + } + } +}