-
Notifications
You must be signed in to change notification settings - Fork 268
feat(tracing): adding tracing to DA client #2968
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 31 commits
Commits
Show all changes
32 commits
Select commit
Hold shift + click to select a range
78b54f2
wip: adding tracing
chatton ae87a59
chore: only having the first tracing decorator
chatton fdd943b
chore: remove comment
chatton 74d4a5f
deps: adding pin to genproto version
chatton d796589
chore: ensuring errors reported, adding unit tests
chatton d52f187
chore: add check to validate basic
chatton b2b3218
chore: modified default
chatton 41bce54
chore: adding logging of possible error
chatton fd34073
chore: updated flag test
chatton fd7425a
chore: bump endpoint to correct port
chatton f30a577
wip: adding propagating client to engine and eth client
chatton 570509b
chore: simplify construction of rpc opts
chatton caa0684
chore: address PR feedback
chatton c154f23
chore: ensure consistent propagation settings
chatton 607f4a3
chore: adding interface for engine client and tracing implementation
chatton c5d7c41
chore: mrege main
chatton 80e2b17
chore: refactored wiring to use bool
chatton 07a45b6
chore: tidy all fix
chatton 423bb15
chore: fix go mod conflicts
chatton 3e373ce
chore: addressing PR feedback
chatton ed217d7
chore: adding eth client tracing
chatton 17eb5aa
chore: merge main
chatton 931d2ac
chore: add payload id as attribute
chatton ee2d158
chore: handle merge conflicts
chatton 8d7fc84
Merge branch 'cian/add-tracing-part-3' into cian/add-tracing-part-4
chatton 32db6c8
chore: merge main
chatton a3fa329
chore: adding tracing for DA client
chatton 776a2ea
chore: add *.test to gitignore
chatton d3bdb1e
chore: updated test
chatton 4d1d3ff
chore: adding hex encoded namespace
chatton 1ff13f3
chore: merge main
chatton 8df2f12
chore: remove accidentally committed test binary
chatton File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,6 +6,7 @@ types/pb/tendermint | |
| .vscode/launch.json | ||
| .vscode/settings.json | ||
| vendor | ||
| *.test | ||
| */**.html | ||
| *.idea | ||
| *.env | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,137 @@ | ||
| package da | ||
|
|
||
| import ( | ||
| "context" | ||
| "encoding/hex" | ||
|
|
||
| "go.opentelemetry.io/otel" | ||
| "go.opentelemetry.io/otel/attribute" | ||
| "go.opentelemetry.io/otel/codes" | ||
| "go.opentelemetry.io/otel/trace" | ||
|
|
||
| datypes "github.com/evstack/ev-node/pkg/da/types" | ||
| ) | ||
|
|
||
| // tracedClient decorates a FullClient with OpenTelemetry spans. | ||
| type tracedClient struct { | ||
| inner FullClient | ||
| tracer trace.Tracer | ||
| } | ||
|
|
||
| // WithTracingClient decorates the provided client with tracing spans. | ||
| func WithTracingClient(inner FullClient) FullClient { | ||
| return &tracedClient{inner: inner, tracer: otel.Tracer("ev-node/da")} | ||
| } | ||
|
|
||
| func (t *tracedClient) Submit(ctx context.Context, data [][]byte, gasPrice float64, namespace []byte, options []byte) datypes.ResultSubmit { | ||
| total := 0 | ||
| for _, b := range data { | ||
| total += len(b) | ||
| } | ||
| ctx, span := t.tracer.Start(ctx, "DA.Submit", | ||
| trace.WithAttributes( | ||
| attribute.Int("blob.count", len(data)), | ||
| attribute.Int("blob.total_size_bytes", total), | ||
| attribute.String("da.namespace", hex.EncodeToString(namespace)), | ||
| ), | ||
| ) | ||
| defer span.End() | ||
|
|
||
| res := t.inner.Submit(ctx, data, gasPrice, namespace, options) | ||
| if res.Code != datypes.StatusSuccess { | ||
| span.RecordError(&submitError{msg: res.Message}) | ||
| span.SetStatus(codes.Error, res.Message) | ||
| } else { | ||
| span.SetAttributes(attribute.Int64("da.height", int64(res.Height))) | ||
| } | ||
| return res | ||
| } | ||
|
|
||
| func (t *tracedClient) Retrieve(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve { | ||
| ctx, span := t.tracer.Start(ctx, "DA.Retrieve", | ||
| trace.WithAttributes( | ||
| attribute.Int("ns.length", len(namespace)), | ||
| attribute.String("da.namespace", hex.EncodeToString(namespace)), | ||
| ), | ||
| ) | ||
| defer span.End() | ||
|
|
||
| res := t.inner.Retrieve(ctx, height, namespace) | ||
|
|
||
| if res.Code != datypes.StatusSuccess && res.Code != datypes.StatusNotFound { | ||
| span.RecordError(&submitError{msg: res.Message}) | ||
| span.SetStatus(codes.Error, res.Message) | ||
| } else { | ||
| span.SetAttributes(attribute.Int("blob.count", len(res.Data))) | ||
| } | ||
| return res | ||
| } | ||
|
|
||
| func (t *tracedClient) Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) { | ||
| ctx, span := t.tracer.Start(ctx, "DA.Get", | ||
| trace.WithAttributes( | ||
| attribute.Int("id.count", len(ids)), | ||
| attribute.String("da.namespace", hex.EncodeToString(namespace)), | ||
| ), | ||
| ) | ||
| defer span.End() | ||
|
|
||
| blobs, err := t.inner.Get(ctx, ids, namespace) | ||
| if err != nil { | ||
| span.RecordError(err) | ||
| span.SetStatus(codes.Error, err.Error()) | ||
| return nil, err | ||
| } | ||
| span.SetAttributes(attribute.Int("blob.count", len(blobs))) | ||
| return blobs, nil | ||
| } | ||
|
|
||
| func (t *tracedClient) GetProofs(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Proof, error) { | ||
| ctx, span := t.tracer.Start(ctx, "DA.GetProofs", | ||
| trace.WithAttributes( | ||
| attribute.Int("id.count", len(ids)), | ||
| attribute.String("da.namespace", hex.EncodeToString(namespace)), | ||
| ), | ||
| ) | ||
| defer span.End() | ||
|
|
||
| proofs, err := t.inner.GetProofs(ctx, ids, namespace) | ||
| if err != nil { | ||
| span.RecordError(err) | ||
| span.SetStatus(codes.Error, err.Error()) | ||
| return nil, err | ||
| } | ||
| span.SetAttributes(attribute.Int("proof.count", len(proofs))) | ||
| return proofs, nil | ||
| } | ||
|
|
||
| func (t *tracedClient) Validate(ctx context.Context, ids []datypes.ID, proofs []datypes.Proof, namespace []byte) ([]bool, error) { | ||
| ctx, span := t.tracer.Start(ctx, "DA.Validate", | ||
| trace.WithAttributes( | ||
| attribute.Int("id.count", len(ids)), | ||
| attribute.String("da.namespace", hex.EncodeToString(namespace)), | ||
| ), | ||
| ) | ||
| defer span.End() | ||
| res, err := t.inner.Validate(ctx, ids, proofs, namespace) | ||
| if err != nil { | ||
| span.RecordError(err) | ||
| span.SetStatus(codes.Error, err.Error()) | ||
| return nil, err | ||
| } | ||
| span.SetAttributes(attribute.Int("result.count", len(res))) | ||
| return res, nil | ||
| } | ||
|
|
||
| func (t *tracedClient) GetHeaderNamespace() []byte { return t.inner.GetHeaderNamespace() } | ||
| func (t *tracedClient) GetDataNamespace() []byte { return t.inner.GetDataNamespace() } | ||
| func (t *tracedClient) GetForcedInclusionNamespace() []byte { | ||
| return t.inner.GetForcedInclusionNamespace() | ||
| } | ||
| func (t *tracedClient) HasForcedInclusionNamespace() bool { | ||
| return t.inner.HasForcedInclusionNamespace() | ||
| } | ||
|
|
||
| type submitError struct{ msg string } | ||
|
|
||
| func (e *submitError) Error() string { return e.msg } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,222 @@ | ||
| package da | ||
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
| "testing" | ||
|
|
||
| "github.com/stretchr/testify/require" | ||
| "go.opentelemetry.io/otel" | ||
| "go.opentelemetry.io/otel/attribute" | ||
| "go.opentelemetry.io/otel/codes" | ||
| sdktrace "go.opentelemetry.io/otel/sdk/trace" | ||
| "go.opentelemetry.io/otel/sdk/trace/tracetest" | ||
|
|
||
| datypes "github.com/evstack/ev-node/pkg/da/types" | ||
| ) | ||
|
|
||
| // mockFullClient provides function hooks for testing the tracing decorator. | ||
| type mockFullClient struct { | ||
| submitFn func(ctx context.Context, data [][]byte, gasPrice float64, namespace []byte, options []byte) datypes.ResultSubmit | ||
| retrieveFn func(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve | ||
| getFn func(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) | ||
| getProofsFn func(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Proof, error) | ||
| validateFn func(ctx context.Context, ids []datypes.ID, proofs []datypes.Proof, namespace []byte) ([]bool, error) | ||
| } | ||
|
|
||
| func (m *mockFullClient) Submit(ctx context.Context, data [][]byte, gasPrice float64, namespace []byte, options []byte) datypes.ResultSubmit { | ||
| if m.submitFn != nil { | ||
| return m.submitFn(ctx, data, gasPrice, namespace, options) | ||
| } | ||
| return datypes.ResultSubmit{} | ||
| } | ||
| func (m *mockFullClient) Retrieve(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve { | ||
| if m.retrieveFn != nil { | ||
| return m.retrieveFn(ctx, height, namespace) | ||
| } | ||
| return datypes.ResultRetrieve{} | ||
| } | ||
| func (m *mockFullClient) Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) { | ||
| if m.getFn != nil { | ||
| return m.getFn(ctx, ids, namespace) | ||
| } | ||
| return nil, nil | ||
| } | ||
| func (m *mockFullClient) GetProofs(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Proof, error) { | ||
| if m.getProofsFn != nil { | ||
| return m.getProofsFn(ctx, ids, namespace) | ||
| } | ||
| return nil, nil | ||
| } | ||
| func (m *mockFullClient) Validate(ctx context.Context, ids []datypes.ID, proofs []datypes.Proof, namespace []byte) ([]bool, error) { | ||
| if m.validateFn != nil { | ||
| return m.validateFn(ctx, ids, proofs, namespace) | ||
| } | ||
| return nil, nil | ||
| } | ||
| func (m *mockFullClient) GetHeaderNamespace() []byte { return []byte{0x01} } | ||
| func (m *mockFullClient) GetDataNamespace() []byte { return []byte{0x02} } | ||
| func (m *mockFullClient) GetForcedInclusionNamespace() []byte { return []byte{0x03} } | ||
| func (m *mockFullClient) HasForcedInclusionNamespace() bool { return true } | ||
|
|
||
| // setup a tracer provider + span recorder | ||
| func setupDATrace(t *testing.T, inner FullClient) (FullClient, *tracetest.SpanRecorder) { | ||
| t.Helper() | ||
| sr := tracetest.NewSpanRecorder() | ||
| tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) | ||
| t.Cleanup(func() { _ = tp.Shutdown(context.Background()) }) | ||
| otel.SetTracerProvider(tp) | ||
| return WithTracingClient(inner), sr | ||
| } | ||
|
|
||
| func TestTracedDA_Submit_Success(t *testing.T) { | ||
| mock := &mockFullClient{ | ||
| submitFn: func(ctx context.Context, data [][]byte, _ float64, _ []byte, _ []byte) datypes.ResultSubmit { | ||
| return datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Height: 123}} | ||
| }, | ||
| } | ||
| client, sr := setupDATrace(t, mock) | ||
| ctx := context.Background() | ||
|
|
||
| _ = client.Submit(ctx, [][]byte{[]byte("a"), []byte("bc")}, -1.0, []byte{0xaa, 0xbb}, nil) | ||
|
|
||
| spans := sr.Ended() | ||
| require.Len(t, spans, 1) | ||
| span := spans[0] | ||
| require.Equal(t, "DA.Submit", span.Name()) | ||
| require.Equal(t, codes.Unset, span.Status().Code) | ||
|
|
||
| attrs := span.Attributes() | ||
| requireAttribute(t, attrs, "blob.count", 2) | ||
| requireAttribute(t, attrs, "blob.total_size_bytes", 3) | ||
| // namespace hex string length assertion | ||
| // 2 bytes = 4 hex characters | ||
| foundNS := false | ||
| for _, a := range attrs { | ||
| if string(a.Key) == "da.namespace" { | ||
| foundNS = true | ||
| require.Equal(t, 4, len(a.Value.AsString())) | ||
| } | ||
| } | ||
| require.True(t, foundNS, "attribute da.namespace not found") | ||
| } | ||
|
|
||
| func TestTracedDA_Submit_Error(t *testing.T) { | ||
| mock := &mockFullClient{ | ||
| submitFn: func(ctx context.Context, data [][]byte, _ float64, _ []byte, _ []byte) datypes.ResultSubmit { | ||
| return datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusError, Message: "boom"}} | ||
| }, | ||
| } | ||
| client, sr := setupDATrace(t, mock) | ||
| ctx := context.Background() | ||
|
|
||
| _ = client.Submit(ctx, [][]byte{[]byte("a")}, -1.0, []byte{0xaa}, nil) | ||
|
|
||
| spans := sr.Ended() | ||
| require.Len(t, spans, 1) | ||
| span := spans[0] | ||
| require.Equal(t, codes.Error, span.Status().Code) | ||
| require.Equal(t, "boom", span.Status().Description) | ||
| } | ||
|
|
||
| func TestTracedDA_Retrieve_Success(t *testing.T) { | ||
| mock := &mockFullClient{ | ||
| retrieveFn: func(ctx context.Context, height uint64, _ []byte) datypes.ResultRetrieve { | ||
| return datypes.ResultRetrieve{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Height: height}, Data: []datypes.Blob{{}, {}}} | ||
| }, | ||
| } | ||
| client, sr := setupDATrace(t, mock) | ||
| ctx := context.Background() | ||
|
|
||
| _ = client.Retrieve(ctx, 42, []byte{0x01}) | ||
| spans := sr.Ended() | ||
| require.Len(t, spans, 1) | ||
| span := spans[0] | ||
| require.Equal(t, "DA.Retrieve", span.Name()) | ||
| attrs := span.Attributes() | ||
| requireAttribute(t, attrs, "ns.length", 1) | ||
| requireAttribute(t, attrs, "blob.count", 2) | ||
| } | ||
|
|
||
| func TestTracedDA_Retrieve_Error(t *testing.T) { | ||
| mock := &mockFullClient{ | ||
| retrieveFn: func(ctx context.Context, height uint64, _ []byte) datypes.ResultRetrieve { | ||
| return datypes.ResultRetrieve{BaseResult: datypes.BaseResult{Code: datypes.StatusError, Message: "oops"}} | ||
| }, | ||
| } | ||
| client, sr := setupDATrace(t, mock) | ||
| ctx := context.Background() | ||
|
|
||
| _ = client.Retrieve(ctx, 7, []byte{0x02}) | ||
| spans := sr.Ended() | ||
| require.Len(t, spans, 1) | ||
| span := spans[0] | ||
| require.Equal(t, codes.Error, span.Status().Code) | ||
| require.Equal(t, "oops", span.Status().Description) | ||
| } | ||
|
|
||
| func TestTracedDA_Get_Success(t *testing.T) { | ||
| mock := &mockFullClient{ | ||
| getFn: func(ctx context.Context, ids []datypes.ID, _ []byte) ([]datypes.Blob, error) { | ||
| return []datypes.Blob{{}, {}}, nil | ||
| }, | ||
| } | ||
| client, sr := setupDATrace(t, mock) | ||
| ctx := context.Background() | ||
| ids := []datypes.ID{[]byte{0x01}, []byte{0x02}} | ||
|
|
||
| blobs, err := client.Get(ctx, ids, []byte{0x01}) | ||
| require.NoError(t, err) | ||
| require.Len(t, blobs, 2) | ||
|
|
||
| spans := sr.Ended() | ||
| require.Len(t, spans, 1) | ||
| span := spans[0] | ||
| require.Equal(t, "DA.Get", span.Name()) | ||
| attrs := span.Attributes() | ||
| requireAttribute(t, attrs, "id.count", 2) | ||
| requireAttribute(t, attrs, "blob.count", 2) | ||
| } | ||
|
|
||
| func TestTracedDA_Get_Error(t *testing.T) { | ||
| mock := &mockFullClient{ | ||
| getFn: func(ctx context.Context, ids []datypes.ID, _ []byte) ([]datypes.Blob, error) { | ||
| return nil, errors.New("get failed") | ||
| }, | ||
| } | ||
| client, sr := setupDATrace(t, mock) | ||
| ctx := context.Background() | ||
| ids := []datypes.ID{[]byte{0x01}} | ||
|
|
||
| _, err := client.Get(ctx, ids, []byte{0x01}) | ||
| require.Error(t, err) | ||
|
|
||
| spans := sr.Ended() | ||
| require.Len(t, spans, 1) | ||
| span := spans[0] | ||
| require.Equal(t, codes.Error, span.Status().Code) | ||
| require.Equal(t, "get failed", span.Status().Description) | ||
| } | ||
|
|
||
| // helper copied from eth tracing tests | ||
| func requireAttribute(t *testing.T, attrs []attribute.KeyValue, key string, expected interface{}) { | ||
| t.Helper() | ||
| found := false | ||
| for _, attr := range attrs { | ||
| if string(attr.Key) == key { | ||
| found = true | ||
| switch v := expected.(type) { | ||
| case string: | ||
| require.Equal(t, v, attr.Value.AsString()) | ||
| case int64: | ||
| require.Equal(t, v, attr.Value.AsInt64()) | ||
| case int: | ||
| require.Equal(t, int64(v), attr.Value.AsInt64()) | ||
| default: | ||
| t.Fatalf("unsupported attribute type: %T", expected) | ||
| } | ||
| break | ||
| } | ||
| } | ||
| require.True(t, found, "attribute %s not found", key) | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah just saw the comment, removed it.