-
Notifications
You must be signed in to change notification settings - Fork 2k
client: add OTEL_RESOURCE_ATTRIBUTES
env var.
#14556
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
```release-note:improvement | ||
client: expose Nomad attributes to allocations using the `OTEL_RESOURCE_ATTRIBUTES` environment variable | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
package taskrunner | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"net/url" | ||
|
||
log "github.com/hashicorp/go-hclog" | ||
multierror "github.com/hashicorp/go-multierror" | ||
"github.com/hashicorp/nomad/client/allocrunner/interfaces" | ||
"github.com/hashicorp/nomad/nomad/structs" | ||
"go.opentelemetry.io/otel/baggage" | ||
) | ||
|
||
const envKeyOtelResourceAttrs = "OTEL_RESOURCE_ATTRIBUTES" | ||
|
||
type otelHookConfig struct { | ||
logger log.Logger | ||
alloc *structs.Allocation | ||
node *structs.Node | ||
} | ||
|
||
type otelHook struct { | ||
alloc *structs.Allocation | ||
node *structs.Node | ||
logger log.Logger | ||
} | ||
|
||
func newOtelHook(config *otelHookConfig) *otelHook { | ||
hook := &otelHook{ | ||
alloc: config.alloc, | ||
node: config.node, | ||
} | ||
hook.logger = config.logger.Named(hook.Name()). | ||
With("alloc_id", config.alloc.ID) | ||
|
||
return hook | ||
} | ||
|
||
func (h *otelHook) Name() string { | ||
return "otel" | ||
} | ||
|
||
func (h *otelHook) Prestart(_ context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error { | ||
logger := h.logger.With("task", req.Task.Name) | ||
|
||
resourceAttrsEnv, ok := req.TaskEnv.EnvMap[envKeyOtelResourceAttrs] | ||
if ok && resourceAttrsEnv == "" { | ||
logger.Debug("skipping OTEL_RESOURCE_ATTRIBUTES environment variable") | ||
return nil | ||
} | ||
|
||
resourceAttrs, err := generateBaggage(h.alloc, req.Task, h.node) | ||
if err != nil { | ||
logger.Warn("failed to generate OTEL_RESOURCE_ATTRIBUTES environment variable", "error", err) | ||
return nil | ||
} | ||
|
||
if resourceAttrsEnv != "" { | ||
logger.Debug("merging existing OTEL_RESOURCE_ATTRIBUTES environment variable values", "attrs", resourceAttrsEnv) | ||
|
||
taskBaggage, err := baggage.Parse(resourceAttrsEnv) | ||
if err != nil { | ||
logger.Warn("failed to parse task environment variable OTEL_RESOURCE_ATTRIBUTES as baggage", | ||
"otel_resource_attributes", resourceAttrsEnv, "error", err) | ||
} else { | ||
for _, m := range taskBaggage.Members() { | ||
k, v := m.Key(), m.Value() | ||
logger.Trace("found member", "key", k, "value", v) | ||
|
||
// TODO(luiz): don't create new member once baggage.Members() | ||
// returns values with `hasData` set to `true`. | ||
// https://github.com/open-telemetry/opentelemetry-go/issues/3164 | ||
member, err := baggage.NewMember(k, v) | ||
if err != nil { | ||
logger.Warn("failed to create new baggage member", "key", k, "value", v, "error", err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The ridiculous name "baggage" aside, which isn't your fault, I'm not sure we should expose that inside-baseball terminology to end-users? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah right, good point 👍 |
||
continue | ||
} | ||
|
||
resourceAttrs, err = resourceAttrs.SetMember(member) | ||
if err != nil { | ||
logger.Warn("failed to set new baggage member", "key", k, "value", v, "error", err) | ||
continue | ||
} | ||
} | ||
} | ||
} | ||
|
||
// TODO(luiz): remove decode step once the Otel SDK handles it internally. | ||
// https://github.com/open-telemetry/opentelemetry-go/pull/2963 | ||
attrs, err := url.QueryUnescape(resourceAttrs.String()) | ||
if err != nil { | ||
attrs = resourceAttrs.String() | ||
} | ||
Comment on lines
+89
to
+94
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I may be misunderstanding this PR, but this looks like it impacts the read side and not the write side. If the read side of the SDK gets fixed, don't we still need to encode on the write side so that older versions of the SDK aren't broken? (And are there lots of read-side SDKs for different languages? If there's only go, then it doesn't seem sensible for us to bake-in support for a single language in Nomad.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reading a bit more, this looks like we could end up double-encoding in the case where the user has something we're merging onto. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem was a mismatch between how the baggage spec and the collector handled encoding values. Only the baggage required encoding/decoding so the more general "fix" was an update to the spec. Other languages will need to be updated to handle this as well, but good point on supporting older versions. I will make sure to test it. Attempting to double-encode would result in an error that is handled by the using the original string. |
||
resp.Env = map[string]string{ | ||
envKeyOtelResourceAttrs: attrs, | ||
} | ||
return nil | ||
} | ||
|
||
func generateBaggage(alloc *structs.Allocation, task *structs.Task, node *structs.Node) (baggage.Baggage, error) { | ||
tgross marked this conversation as resolved.
Show resolved
Hide resolved
|
||
var mErr *multierror.Error | ||
job := alloc.Job | ||
members := []baggage.Member{ | ||
newMember("nomad.alloc.createTime", fmt.Sprintf("%v", alloc.CreateTime), mErr), | ||
newMember("nomad.alloc.id", alloc.ID, mErr), | ||
newMember("nomad.alloc.name", alloc.Name, mErr), | ||
newMember("nomad.eval.id", alloc.EvalID, mErr), | ||
newMember("nomad.group.name", alloc.TaskGroup, mErr), | ||
newMember("nomad.job.id", job.ID, mErr), | ||
newMember("nomad.job.name", job.Name, mErr), | ||
newMember("nomad.job.region", job.Region, mErr), | ||
newMember("nomad.job.type", job.Type, mErr), | ||
newMember("nomad.namespace", alloc.Namespace, mErr), | ||
newMember("nomad.node.id", node.ID, mErr), | ||
newMember("nomad.node.name", node.Name, mErr), | ||
newMember("nomad.node.datacenter", node.Datacenter, mErr), | ||
newMember("nomad.task.name", task.Name, mErr), | ||
newMember("nomad.task.driver", task.Driver, mErr), | ||
} | ||
Comment on lines
+104
to
+120
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure I understand the value to operators in providing these specific attributes to task processes. Ex. why do we want to expose the eval ID for every running process on the cluster? From a design standpoint I'm not sure it makes sense to have Nomad itself define a hard-coded set of attributes, rather than making this something the operators define (as either client configuration or in the jobspec). Could this whole thing be done via clever templating of an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah I see. I thought of some kind of configuration (either jobspec or client side), but that would be even more commitment to keep it updated, or Maybe an external project may be better for now then. I will think more about it, thanks! |
||
if job.ParentID != "" { | ||
members = append(members, newMember("nomad.job.parentId", job.ParentID, mErr)) | ||
} | ||
if node.NodeClass != "" { | ||
members = append(members, newMember("nomad.node.class", node.NodeClass, mErr)) | ||
} | ||
if err := mErr.ErrorOrNil(); err != nil { | ||
return baggage.Baggage{}, err | ||
} | ||
|
||
b, err := baggage.New(members...) | ||
if err != nil { | ||
_ = multierror.Append(mErr, err) | ||
} | ||
return b, mErr.ErrorOrNil() | ||
} | ||
|
||
func newMember(key, value string, mErr *multierror.Error) baggage.Member { | ||
m, err := baggage.NewMember(key, value) | ||
if err != nil { | ||
_ = multierror.Append(mErr, err) | ||
} | ||
return m | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
package taskrunner | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"os" | ||
"testing" | ||
|
||
"github.com/hashicorp/go-hclog" | ||
"github.com/hashicorp/nomad/ci" | ||
"github.com/hashicorp/nomad/client/allocdir" | ||
"github.com/hashicorp/nomad/client/allocrunner/interfaces" | ||
"github.com/hashicorp/nomad/client/taskenv" | ||
"github.com/hashicorp/nomad/nomad/mock" | ||
"go.opentelemetry.io/otel/baggage" | ||
|
||
"github.com/shoenig/test/must" | ||
) | ||
|
||
// Statically assert the otel hook implements the expected interfaces | ||
var _ interfaces.TaskPrestartHook = &otelHook{} | ||
|
||
func TestTaskRunner_OtelHook(t *testing.T) { | ||
ci.Parallel(t) | ||
|
||
testCases := []struct { | ||
name string | ||
taskEnv map[string]string | ||
expectNomadAttrs bool | ||
expectAdditionalAttrs map[string]string | ||
}{ | ||
{ | ||
name: "tasks have otel resource attributes env var", | ||
expectNomadAttrs: true, | ||
}, | ||
{ | ||
name: "disable otel resource attributes env var", | ||
taskEnv: map[string]string{ | ||
envKeyOtelResourceAttrs: "", | ||
}, | ||
expectNomadAttrs: false, | ||
}, | ||
{ | ||
name: "merge otel resource attributes env var", | ||
taskEnv: map[string]string{ | ||
envKeyOtelResourceAttrs: "test=true", | ||
}, | ||
expectNomadAttrs: true, | ||
expectAdditionalAttrs: map[string]string{ | ||
"test": "true", | ||
}, | ||
}, | ||
{ | ||
name: "invalid values are ignored", | ||
taskEnv: map[string]string{ | ||
envKeyOtelResourceAttrs: "not-valid", | ||
}, | ||
expectNomadAttrs: true, | ||
}, | ||
} | ||
|
||
for _, tc := range testCases { | ||
t.Run(tc.name, func(t *testing.T) { | ||
alloc := mock.Alloc() | ||
node := mock.Node() | ||
task := mock.Job().TaskGroups[0].Tasks[0] | ||
|
||
otelHook := newOtelHook(&otelHookConfig{ | ||
logger: hclog.NewNullLogger(), | ||
alloc: alloc, | ||
node: node, | ||
}) | ||
|
||
// Setup task environment with addition test values. | ||
builder := taskenv.NewBuilder(node, alloc, task, "global") | ||
taskEnv := builder.Build() | ||
for k, v := range tc.taskEnv { | ||
taskEnv.EnvMap[k] = v | ||
} | ||
|
||
// Run hook. | ||
req := &interfaces.TaskPrestartRequest{ | ||
TaskEnv: taskEnv, | ||
TaskDir: &allocdir.TaskDir{Dir: os.TempDir()}, | ||
Task: task, | ||
} | ||
resp := interfaces.TaskPrestartResponse{} | ||
err := otelHook.Prestart(context.Background(), req, &resp) | ||
must.NoError(t, err) | ||
|
||
// Read and parse resulting OTEL_RESOURCE_ATTRIBUTES env var. | ||
got := resp.Env[envKeyOtelResourceAttrs] | ||
b, err := baggage.Parse(got) | ||
must.NoError(t, err) | ||
|
||
if tc.expectNomadAttrs { | ||
must.Eq(t, b.Member("nomad.alloc.id").Value(), alloc.ID) | ||
must.Eq(t, b.Member("nomad.alloc.name").Value(), alloc.Name) | ||
must.Eq(t, b.Member("nomad.alloc.createTime").Value(), fmt.Sprintf("%v", alloc.CreateTime)) | ||
must.Eq(t, b.Member("nomad.eval.id").Value(), alloc.EvalID) | ||
must.Eq(t, b.Member("nomad.job.id").Value(), alloc.Job.ID) | ||
must.Eq(t, b.Member("nomad.job.name").Value(), alloc.Job.Name) | ||
must.Eq(t, b.Member("nomad.job.region").Value(), alloc.Job.Region) | ||
must.Eq(t, b.Member("nomad.job.type").Value(), alloc.Job.Type) | ||
must.Eq(t, b.Member("nomad.namespace").Value(), alloc.Namespace) | ||
must.Eq(t, b.Member("nomad.node.id").Value(), node.ID) | ||
must.Eq(t, b.Member("nomad.node.name").Value(), node.Name) | ||
must.Eq(t, b.Member("nomad.node.datacenter").Value(), node.Datacenter) | ||
must.Eq(t, b.Member("nomad.task.name").Value(), task.Name) | ||
must.Eq(t, b.Member("nomad.task.driver").Value(), task.Driver) | ||
|
||
if alloc.Job.ParentID != "" { | ||
must.Eq(t, b.Member("nomad.job.parentId").Value(), alloc.Job.ParentID) | ||
} else { | ||
must.Eq(t, b.Member("nomad.job.parentId"), baggage.Member{}) | ||
} | ||
|
||
if node.NodeClass != "" { | ||
must.Eq(t, b.Member("nomad.node.class").Value(), node.NodeClass) | ||
} else { | ||
must.Eq(t, b.Member("nomad.node.class"), baggage.Member{}) | ||
} | ||
} else { | ||
must.Eq(t, got, "") | ||
} | ||
|
||
if len(tc.expectAdditionalAttrs) > 0 { | ||
for k, v := range tc.expectAdditionalAttrs { | ||
must.Eq(t, b.Member(k).Value(), v) | ||
} | ||
} else { | ||
for _, m := range b.Members() { | ||
// If not additional values are expected, all attributes | ||
// must be related to Nomad. | ||
must.StrContains(t, m.Key(), "nomad") | ||
} | ||
} | ||
}) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They went with the name "baggage" for this concept? 😦
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah, that's another spec 😅
https://www.w3.org/TR/baggage/