Skip to content

Commit

Permalink
client: add OTEL_RESOURCE_ATTRIBUTES env var.
Browse files Browse the repository at this point in the history
Add new task hook to inject a `OTEL_RESOURCE_ATTRIBUTES` environment
variable with Nomad attributes into tasks. The attributes set are
related to the alloc and specific task that is running, the node where
the alloc is running, and the job and eval that generated the alloc.

These attributes are merged if the task already defines a
`OTEL_RESOURCE_ATTRIBUTES` environment variable, or disabled if the
value defined by the task is an empty string.
  • Loading branch information
lgfa29 committed Sep 13, 2022
1 parent df04cd1 commit fd7aff3
Show file tree
Hide file tree
Showing 7 changed files with 400 additions and 0 deletions.
144 changes: 144 additions & 0 deletions client/allocrunner/taskrunner/otel_hook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package taskrunner

import (
"context"
"fmt"
"net/url"

log "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/nomad/structs"
"go.opentelemetry.io/otel/baggage"
)

const envKeyOtelResourceAttrs = "OTEL_RESOURCE_ATTRIBUTES"

type otelHookConfig struct {
logger log.Logger
alloc *structs.Allocation
node *structs.Node
}

type otelHook struct {
alloc *structs.Allocation
node *structs.Node
logger log.Logger
}

func newOtelHook(config *otelHookConfig) *otelHook {
hook := &otelHook{
alloc: config.alloc,
node: config.node,
}
hook.logger = config.logger.Named(hook.Name()).
With("alloc_id", config.alloc.ID)

return hook
}

func (h *otelHook) Name() string {
return "otel"
}

func (h *otelHook) Prestart(_ context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error {
logger := h.logger.With("task", req.Task.Name)

resourceAttrsEnv, ok := req.TaskEnv.EnvMap[envKeyOtelResourceAttrs]
if ok && resourceAttrsEnv == "" {
logger.Debug("skipping OTEL_RESOURCE_ATTRIBUTES environment variable")
return nil
}

resourceAttrs, err := generateBaggage(h.alloc, req.Task, h.node)
if err != nil {
logger.Warn("failed to generate OTEL_RESOURCE_ATTRIBUTES environment variable", "error", err)
return nil
}

if resourceAttrsEnv != "" {
logger.Debug("merging existing OTEL_RESOURCE_ATTRIBUTES environment variable values", "attrs", resourceAttrsEnv)

taskBaggage, err := baggage.Parse(resourceAttrsEnv)
if err != nil {
logger.Warn("failed to parse task environment variable OTEL_RESOURCE_ATTRIBUTES as baggage",
"otel_resource_attributes", resourceAttrsEnv, "error", err)
} else {
for _, m := range taskBaggage.Members() {
k, v := m.Key(), m.Value()
logger.Trace("found member", "key", k, "value", v)

// TODO(luiz): don't create new member once baggage.Members()
// returns values with `hasData` set to `true`.
// https://github.com/open-telemetry/opentelemetry-go/issues/3164
member, err := baggage.NewMember(k, v)
if err != nil {
logger.Warn("failed to create new baggage member", "key", k, "value", v, "error", err)
continue
}

resourceAttrs, err = resourceAttrs.SetMember(member)
if err != nil {
logger.Warn("failed to set new baggage member", "key", k, "value", v, "error", err)
continue
}
}
}
}

// TODO(luiz): remove decode step once the Otel SDK handles it internally.
// https://github.com/open-telemetry/opentelemetry-go/pull/2963
attrs, err := url.QueryUnescape(resourceAttrs.String())
if err != nil {
attrs = resourceAttrs.String()
}
resp.Env = map[string]string{
envKeyOtelResourceAttrs: attrs,
}
return nil
}

func generateBaggage(alloc *structs.Allocation, task *structs.Task, node *structs.Node) (baggage.Baggage, error) {
var mErr *multierror.Error
job := alloc.Job
members := []baggage.Member{
newMember("nomad.alloc.createTime", fmt.Sprintf("%v", alloc.CreateTime), mErr),
newMember("nomad.alloc.id", alloc.ID, mErr),
newMember("nomad.alloc.name", alloc.Name, mErr),
newMember("nomad.eval.id", alloc.EvalID, mErr),
newMember("nomad.group.name", alloc.TaskGroup, mErr),
newMember("nomad.job.id", job.ID, mErr),
newMember("nomad.job.name", job.Name, mErr),
newMember("nomad.job.region", job.Region, mErr),
newMember("nomad.job.type", job.Type, mErr),
newMember("nomad.namespace", alloc.Namespace, mErr),
newMember("nomad.node.id", node.ID, mErr),
newMember("nomad.node.name", node.Name, mErr),
newMember("nomad.node.datacenter", node.Datacenter, mErr),
newMember("nomad.task.name", task.Name, mErr),
newMember("nomad.task.driver", task.Driver, mErr),
}
if job.ParentID != "" {
members = append(members, newMember("nomad.job.parentId", job.ParentID, mErr))
}
if node.NodeClass != "" {
members = append(members, newMember("nomad.node.class", node.NodeClass, mErr))
}
if err := mErr.ErrorOrNil(); err != nil {
return baggage.Baggage{}, err
}

b, err := baggage.New(members...)
if err != nil {
_ = multierror.Append(mErr, err)
}
return b, mErr.ErrorOrNil()
}

func newMember(key, value string, mErr *multierror.Error) baggage.Member {
m, err := baggage.NewMember(key, value)
if err != nil {
_ = multierror.Append(mErr, err)
}
return m
}
140 changes: 140 additions & 0 deletions client/allocrunner/taskrunner/otel_hook_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package taskrunner

import (
"context"
"fmt"
"os"
"testing"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/nomad/mock"
"go.opentelemetry.io/otel/baggage"

"github.com/shoenig/test/must"
)

// Statically assert the otel hook implements the expected interfaces
var _ interfaces.TaskPrestartHook = &otelHook{}

func TestTaskRunner_OtelHook(t *testing.T) {
ci.Parallel(t)

testCases := []struct {
name string
taskEnv map[string]string
expectNomadAttrs bool
expectAdditionalAttrs map[string]string
}{
{
name: "tasks have otel resource attributes env var",
expectNomadAttrs: true,
},
{
name: "disable otel resource attributes env var",
taskEnv: map[string]string{
envKeyOtelResourceAttrs: "",
},
expectNomadAttrs: false,
},
{
name: "merge otel resource attributes env var",
taskEnv: map[string]string{
envKeyOtelResourceAttrs: "test=true",
},
expectNomadAttrs: true,
expectAdditionalAttrs: map[string]string{
"test": "true",
},
},
{
name: "invalid values are ignored",
taskEnv: map[string]string{
envKeyOtelResourceAttrs: "not-valid",
},
expectNomadAttrs: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
alloc := mock.Alloc()
node := mock.Node()
task := mock.Job().TaskGroups[0].Tasks[0]

otelHook := newOtelHook(&otelHookConfig{
logger: hclog.NewNullLogger(),
alloc: alloc,
node: node,
})

// Setup task environment with addition test values.
builder := taskenv.NewBuilder(node, alloc, task, "global")
taskEnv := builder.Build()
for k, v := range tc.taskEnv {
taskEnv.EnvMap[k] = v
}

// Run hook.
req := &interfaces.TaskPrestartRequest{
TaskEnv: taskEnv,
TaskDir: &allocdir.TaskDir{Dir: os.TempDir()},
Task: task,
}
resp := interfaces.TaskPrestartResponse{}
err := otelHook.Prestart(context.Background(), req, &resp)
must.NoError(t, err)

// Read and parse resulting OTEL_RESOURCE_ATTRIBUTES env var.
got := resp.Env[envKeyOtelResourceAttrs]
b, err := baggage.Parse(got)
must.NoError(t, err)

if tc.expectNomadAttrs {
must.Eq(t, b.Member("nomad.alloc.id").Value(), alloc.ID)
must.Eq(t, b.Member("nomad.alloc.name").Value(), alloc.Name)
must.Eq(t, b.Member("nomad.alloc.createTime").Value(), fmt.Sprintf("%v", alloc.CreateTime))
must.Eq(t, b.Member("nomad.eval.id").Value(), alloc.EvalID)
must.Eq(t, b.Member("nomad.job.id").Value(), alloc.Job.ID)
must.Eq(t, b.Member("nomad.job.name").Value(), alloc.Job.Name)
must.Eq(t, b.Member("nomad.job.region").Value(), alloc.Job.Region)
must.Eq(t, b.Member("nomad.job.type").Value(), alloc.Job.Type)
must.Eq(t, b.Member("nomad.namespace").Value(), alloc.Namespace)
must.Eq(t, b.Member("nomad.node.id").Value(), node.ID)
must.Eq(t, b.Member("nomad.node.name").Value(), node.Name)
must.Eq(t, b.Member("nomad.node.datacenter").Value(), node.Datacenter)
must.Eq(t, b.Member("nomad.task.name").Value(), task.Name)
must.Eq(t, b.Member("nomad.task.driver").Value(), task.Driver)

if alloc.Job.ParentID != "" {
must.Eq(t, b.Member("nomad.job.parentId").Value(), alloc.Job.ParentID)
} else {
must.Eq(t, b.Member("nomad.job.parentId"), baggage.Member{})
}

if node.NodeClass != "" {
must.Eq(t, b.Member("nomad.node.class").Value(), node.NodeClass)
} else {
must.Eq(t, b.Member("nomad.node.class"), baggage.Member{})
}
} else {
must.Eq(t, got, "")
}

if len(tc.expectAdditionalAttrs) > 0 {
for k, v := range tc.expectAdditionalAttrs {
must.Eq(t, b.Member(k).Value(), v)
}
} else {
for _, m := range b.Members() {
// If not additional values are expected, all attributes
// must be related to Nomad.
must.StrContains(t, m.Key(), "nomad")
}
}
})
}
}
5 changes: 5 additions & 0 deletions client/allocrunner/taskrunner/task_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ func (tr *TaskRunner) initHooks() {
newArtifactHook(tr, tr.getter, hookLogger),
newStatsHook(tr, tr.clientConfig.StatsCollectionInterval, hookLogger),
newDeviceHook(tr.devicemanager, hookLogger),
newOtelHook(&otelHookConfig{
logger: hookLogger,
alloc: tr.Alloc(),
node: tr.clientConfig.Node,
}),
}

// If the task has a CSI stanza, add the hook.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ require (
github.com/zclconf/go-cty v1.8.0
github.com/zclconf/go-cty-yaml v1.0.2
go.etcd.io/bbolt v1.3.6
go.opentelemetry.io/otel v1.9.0
go.uber.org/goleak v1.1.12
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d
golang.org/x/exp v0.0.0-20220609121020-a51bd0440498
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1294,6 +1294,8 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opentelemetry.io/otel v1.9.0 h1:8WZNQFIB2a71LnANS9JeyidJKKGOOremcUtb/OtHISw=
go.opentelemetry.io/otel v1.9.0/go.mod h1:np4EoPGzoPs3O67xUVNoPPcmSvsfOxNlNA4F4AC+0Eo=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
Expand Down
Loading

0 comments on commit fd7aff3

Please sign in to comment.