Skip to content

Commit 3f16c00

Browse files
committed
Set invocation ID in newResource
1 parent 58d80db commit 3f16c00

10 files changed

+13
-19
lines changed

scheduler/metrics/metrics.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func (s *Metrics) Equal(other *Metrics) bool {
8585
return true
8686
}
8787

88-
func getOtelMeters(tableName, clientID, invocationID string) *OtelMeters {
88+
func getOtelMeters(tableName, clientID string) *OtelMeters {
8989
resources, err := otel.Meter(OtelName).Int64Counter("sync.table.resources",
9090
metric.WithDescription("Number of resources synced for a table"),
9191
metric.WithUnit("/{tot}"),
@@ -136,22 +136,21 @@ func getOtelMeters(tableName, clientID, invocationID string) *OtelMeters {
136136
attributes: []attribute.KeyValue{
137137
attribute.Key("sync.client.id").String(clientID),
138138
attribute.Key("sync.table.name").String(tableName),
139-
attribute.Key("sync.invocation.id").String(invocationID),
140139
},
141140
}
142141
}
143142

144-
func (s *Metrics) InitWithClients(table *schema.Table, clients []schema.ClientMeta, invocationID string) {
143+
func (s *Metrics) InitWithClients(table *schema.Table, clients []schema.ClientMeta) {
145144
s.TableClient[table.Name] = make(map[string]*TableClientMetrics, len(clients))
146145
for _, client := range clients {
147146
tableName := table.Name
148147
clientID := client.ID()
149148
s.TableClient[tableName][clientID] = &TableClientMetrics{
150-
otelMeters: getOtelMeters(tableName, clientID, invocationID),
149+
otelMeters: getOtelMeters(tableName, clientID),
151150
}
152151
}
153152
for _, relation := range table.Relations {
154-
s.InitWithClients(relation, clients, invocationID)
153+
s.InitWithClients(relation, clients)
155154
}
156155
}
157156

scheduler/queue/scheduler_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func TestScheduler(t *testing.T) {
7979
}
8080

8181
for _, tc := range tableClients {
82-
m.InitWithClients(tc.Table, []schema.ClientMeta{tc.Client}, scheduler.invocationID)
82+
m.InitWithClients(tc.Table, []schema.ClientMeta{tc.Client})
8383
}
8484

8585
resolvedResources := make(chan *schema.Resource)

scheduler/queue/worker.go

-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ func (w *worker) resolveTable(ctx context.Context, table *schema.Table, client s
7070
"sync.table."+table.Name,
7171
trace.WithAttributes(
7272
attribute.Key("sync.client.id").String(clientName),
73-
attribute.Key("sync.invocation.id").String(w.invocationID),
7473
),
7574
)
7675
defer span.End()

scheduler/scheduler.go

+1-6
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@ import (
1414
"github.com/rs/zerolog"
1515
"github.com/samber/lo"
1616
"go.opentelemetry.io/otel"
17-
"go.opentelemetry.io/otel/attribute"
18-
"go.opentelemetry.io/otel/trace"
1917
"golang.org/x/sync/semaphore"
2018
)
2119

@@ -196,10 +194,7 @@ func (s *Scheduler) SyncAll(ctx context.Context, client schema.ClientMeta, table
196194
}
197195

198196
func (s *Scheduler) Sync(ctx context.Context, client schema.ClientMeta, tables schema.Tables, res chan<- message.SyncMessage, opts ...SyncOption) error {
199-
ctx, span := otel.Tracer(metrics.OtelName).Start(ctx,
200-
"sync",
201-
trace.WithAttributes(attribute.Key("sync.invocation.id").String(s.invocationID)),
202-
)
197+
ctx, span := otel.Tracer(metrics.OtelName).Start(ctx, "sync")
203198
defer span.End()
204199
if len(tables) == 0 {
205200
return ErrNoTables

scheduler/scheduler_debug.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func (s *syncClient) syncTest(ctx context.Context, syncMultiplier int, resolvedR
5050
preInitialisedClients[i] = clients
5151
// we do this here to avoid locks so we initialize the metrics structure once in the main goroutine
5252
// and then we can just read from it in the other goroutines concurrently given we are not writing to it.
53-
s.metrics.InitWithClients(table, clients, s.invocationID)
53+
s.metrics.InitWithClients(table, clients)
5454
}
5555

5656
// First interleave the tables like in round-robin

scheduler/scheduler_dfs.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func (s *syncClient) syncDfs(ctx context.Context, resolvedResources chan<- *sche
3939
preInitialisedClients[i] = clients
4040
// we do this here to avoid locks so we initial the metrics structure once in the main goroutines
4141
// and then we can just read from it in the other goroutines concurrently given we are not writing to it.
42-
s.metrics.InitWithClients(table, clients, s.invocationID)
42+
s.metrics.InitWithClients(table, clients)
4343
}
4444

4545
tableClients := make([]tableClient, 0)
@@ -81,7 +81,6 @@ func (s *syncClient) resolveTableDfs(ctx context.Context, table *schema.Table, c
8181
"sync.table."+table.Name,
8282
trace.WithAttributes(
8383
attribute.Key("sync.client.id").String(clientName),
84-
attribute.Key("sync.invocation.id").String(s.invocationID),
8584
),
8685
)
8786
defer span.End()

scheduler/scheduler_round_robin.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func (s *syncClient) syncRoundRobin(ctx context.Context, resolvedResources chan<
3333
preInitialisedClients[i] = clients
3434
// we do this here to avoid locks so we initial the metrics structure once in the main goroutines
3535
// and then we can just read from it in the other goroutines concurrently given we are not writing to it.
36-
s.metrics.InitWithClients(table, clients, s.invocationID)
36+
s.metrics.InitWithClients(table, clients)
3737
}
3838

3939
tableClients := roundRobinInterleave(s.tables, preInitialisedClients)

scheduler/scheduler_shuffle.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func (s *syncClient) syncShuffle(ctx context.Context, resolvedResources chan<- *
3333
preInitialisedClients[i] = clients
3434
// we do this here to avoid locks so we initial the metrics structure once in the main goroutines
3535
// and then we can just read from it in the other goroutines concurrently given we are not writing to it.
36-
s.metrics.InitWithClients(table, clients, s.invocationID)
36+
s.metrics.InitWithClients(table, clients)
3737
}
3838

3939
// First interleave the tables like in round-robin

scheduler/scheduler_shuffle_queue.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func (s *syncClient) syncShuffleQueue(ctx context.Context, resolvedResources cha
2121
preInitialisedClients[i] = clients
2222
// we do this here to avoid locks so we initial the metrics structure once in the main goroutines
2323
// and then we can just read from it in the other goroutines concurrently given we are not writing to it.
24-
s.metrics.InitWithClients(table, clients, s.invocationID)
24+
s.metrics.InitWithClients(table, clients)
2525
}
2626

2727
tableClients := roundRobinInterleave(s.tables, preInitialisedClients)

serve/opentelemetry.go

+2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/cloudquery/plugin-sdk/v4/plugin"
1111
"github.com/rs/zerolog"
12+
"go.opentelemetry.io/otel/attribute"
1213

1314
"go.opentelemetry.io/otel"
1415
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp"
@@ -32,6 +33,7 @@ func newResource(p *plugin.Plugin) *resource.Resource {
3233
semconv.SchemaURL,
3334
semconv.ServiceName("cloudquery-"+p.Name()),
3435
semconv.ServiceVersion(p.Version()),
36+
attribute.Key("sync.invocation.id").String(p.InvocationID()),
3537
),
3638
)
3739
if err != nil {

0 commit comments

Comments
 (0)