Skip to content

Commit 8f119bd

Browse files
committed
Revert "Set invocation ID in newResource"
This reverts commit 3f16c00.
1 parent 3f16c00 commit 8f119bd

10 files changed

+19
-13
lines changed

scheduler/metrics/metrics.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func (s *Metrics) Equal(other *Metrics) bool {
8585
return true
8686
}
8787

88-
func getOtelMeters(tableName, clientID string) *OtelMeters {
88+
func getOtelMeters(tableName, clientID, invocationID string) *OtelMeters {
8989
resources, err := otel.Meter(OtelName).Int64Counter("sync.table.resources",
9090
metric.WithDescription("Number of resources synced for a table"),
9191
metric.WithUnit("/{tot}"),
@@ -136,21 +136,22 @@ func getOtelMeters(tableName, clientID string) *OtelMeters {
136136
attributes: []attribute.KeyValue{
137137
attribute.Key("sync.client.id").String(clientID),
138138
attribute.Key("sync.table.name").String(tableName),
139+
attribute.Key("sync.invocation.id").String(invocationID),
139140
},
140141
}
141142
}
142143

143-
func (s *Metrics) InitWithClients(table *schema.Table, clients []schema.ClientMeta) {
144+
func (s *Metrics) InitWithClients(table *schema.Table, clients []schema.ClientMeta, invocationID string) {
144145
s.TableClient[table.Name] = make(map[string]*TableClientMetrics, len(clients))
145146
for _, client := range clients {
146147
tableName := table.Name
147148
clientID := client.ID()
148149
s.TableClient[tableName][clientID] = &TableClientMetrics{
149-
otelMeters: getOtelMeters(tableName, clientID),
150+
otelMeters: getOtelMeters(tableName, clientID, invocationID),
150151
}
151152
}
152153
for _, relation := range table.Relations {
153-
s.InitWithClients(relation, clients)
154+
s.InitWithClients(relation, clients, invocationID)
154155
}
155156
}
156157

scheduler/queue/scheduler_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func TestScheduler(t *testing.T) {
7979
}
8080

8181
for _, tc := range tableClients {
82-
m.InitWithClients(tc.Table, []schema.ClientMeta{tc.Client})
82+
m.InitWithClients(tc.Table, []schema.ClientMeta{tc.Client}, scheduler.invocationID)
8383
}
8484

8585
resolvedResources := make(chan *schema.Resource)

scheduler/queue/worker.go

+1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ func (w *worker) resolveTable(ctx context.Context, table *schema.Table, client s
7070
"sync.table."+table.Name,
7171
trace.WithAttributes(
7272
attribute.Key("sync.client.id").String(clientName),
73+
attribute.Key("sync.invocation.id").String(w.invocationID),
7374
),
7475
)
7576
defer span.End()

scheduler/scheduler.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import (
1414
"github.com/rs/zerolog"
1515
"github.com/samber/lo"
1616
"go.opentelemetry.io/otel"
17+
"go.opentelemetry.io/otel/attribute"
18+
"go.opentelemetry.io/otel/trace"
1719
"golang.org/x/sync/semaphore"
1820
)
1921

@@ -194,7 +196,10 @@ func (s *Scheduler) SyncAll(ctx context.Context, client schema.ClientMeta, table
194196
}
195197

196198
func (s *Scheduler) Sync(ctx context.Context, client schema.ClientMeta, tables schema.Tables, res chan<- message.SyncMessage, opts ...SyncOption) error {
197-
ctx, span := otel.Tracer(metrics.OtelName).Start(ctx, "sync")
199+
ctx, span := otel.Tracer(metrics.OtelName).Start(ctx,
200+
"sync",
201+
trace.WithAttributes(attribute.Key("sync.invocation.id").String(s.invocationID)),
202+
)
198203
defer span.End()
199204
if len(tables) == 0 {
200205
return ErrNoTables

scheduler/scheduler_debug.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func (s *syncClient) syncTest(ctx context.Context, syncMultiplier int, resolvedR
5050
preInitialisedClients[i] = clients
5151
// we do this here to avoid locks so we initialize the metrics structure once in the main goroutine
5252
// and then we can just read from it in the other goroutines concurrently given we are not writing to it.
53-
s.metrics.InitWithClients(table, clients)
53+
s.metrics.InitWithClients(table, clients, s.invocationID)
5454
}
5555

5656
// First interleave the tables like in round-robin

scheduler/scheduler_dfs.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func (s *syncClient) syncDfs(ctx context.Context, resolvedResources chan<- *sche
3939
preInitialisedClients[i] = clients
4040
// we do this here to avoid locks so we initial the metrics structure once in the main goroutines
4141
// and then we can just read from it in the other goroutines concurrently given we are not writing to it.
42-
s.metrics.InitWithClients(table, clients)
42+
s.metrics.InitWithClients(table, clients, s.invocationID)
4343
}
4444

4545
tableClients := make([]tableClient, 0)
@@ -81,6 +81,7 @@ func (s *syncClient) resolveTableDfs(ctx context.Context, table *schema.Table, c
8181
"sync.table."+table.Name,
8282
trace.WithAttributes(
8383
attribute.Key("sync.client.id").String(clientName),
84+
attribute.Key("sync.invocation.id").String(s.invocationID),
8485
),
8586
)
8687
defer span.End()

scheduler/scheduler_round_robin.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func (s *syncClient) syncRoundRobin(ctx context.Context, resolvedResources chan<
3333
preInitialisedClients[i] = clients
3434
// we do this here to avoid locks so we initial the metrics structure once in the main goroutines
3535
// and then we can just read from it in the other goroutines concurrently given we are not writing to it.
36-
s.metrics.InitWithClients(table, clients)
36+
s.metrics.InitWithClients(table, clients, s.invocationID)
3737
}
3838

3939
tableClients := roundRobinInterleave(s.tables, preInitialisedClients)

scheduler/scheduler_shuffle.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func (s *syncClient) syncShuffle(ctx context.Context, resolvedResources chan<- *
3333
preInitialisedClients[i] = clients
3434
// we do this here to avoid locks so we initial the metrics structure once in the main goroutines
3535
// and then we can just read from it in the other goroutines concurrently given we are not writing to it.
36-
s.metrics.InitWithClients(table, clients)
36+
s.metrics.InitWithClients(table, clients, s.invocationID)
3737
}
3838

3939
// First interleave the tables like in round-robin

scheduler/scheduler_shuffle_queue.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func (s *syncClient) syncShuffleQueue(ctx context.Context, resolvedResources cha
2121
preInitialisedClients[i] = clients
2222
// we do this here to avoid locks so we initial the metrics structure once in the main goroutines
2323
// and then we can just read from it in the other goroutines concurrently given we are not writing to it.
24-
s.metrics.InitWithClients(table, clients)
24+
s.metrics.InitWithClients(table, clients, s.invocationID)
2525
}
2626

2727
tableClients := roundRobinInterleave(s.tables, preInitialisedClients)

serve/opentelemetry.go

-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99

1010
"github.com/cloudquery/plugin-sdk/v4/plugin"
1111
"github.com/rs/zerolog"
12-
"go.opentelemetry.io/otel/attribute"
1312

1413
"go.opentelemetry.io/otel"
1514
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp"
@@ -33,7 +32,6 @@ func newResource(p *plugin.Plugin) *resource.Resource {
3332
semconv.SchemaURL,
3433
semconv.ServiceName("cloudquery-"+p.Name()),
3534
semconv.ServiceVersion(p.Version()),
36-
attribute.Key("sync.invocation.id").String(p.InvocationID()),
3735
),
3836
)
3937
if err != nil {

0 commit comments

Comments
 (0)