@@ -18,26 +18,26 @@ import (
18
18
"encoding/json"
19
19
"fmt"
20
20
"io"
21
+ "log/slog"
21
22
"math"
22
23
"os"
23
24
"sort"
24
25
"strings"
25
26
"time"
26
27
27
28
"cloud.google.com/go/bigquery"
28
- "github.com/go-kit/log"
29
- "github.com/go-kit/log/level"
30
29
"github.com/pkg/errors"
31
30
"github.com/prometheus/client_golang/prometheus"
32
31
"github.com/prometheus/common/model"
32
+ "github.com/prometheus/common/promslog"
33
33
"github.com/prometheus/prometheus/prompb"
34
34
"google.golang.org/api/iterator"
35
35
"google.golang.org/api/option"
36
36
)
37
37
38
38
// BigqueryClient allows sending batches of Prometheus samples to Bigquery.
39
39
type BigqueryClient struct {
40
- logger log .Logger
40
+ logger * slog .Logger
41
41
client bigquery.Client
42
42
datasetID string
43
43
tableID string
@@ -50,13 +50,16 @@ type BigqueryClient struct {
50
50
}
51
51
52
52
// NewClient creates a new Client.
53
- func NewClient (logger log .Logger , googleAPIjsonkeypath , googleProjectID , googleAPIdatasetID , googleAPItableID string , remoteTimeout time.Duration ) * BigqueryClient {
53
+ func NewClient (logger * slog .Logger , googleAPIjsonkeypath , googleProjectID , googleAPIdatasetID , googleAPItableID string , remoteTimeout time.Duration ) * BigqueryClient {
54
54
ctx := context .Background ()
55
+ if logger == nil {
56
+ logger = promslog .NewNopLogger ()
57
+ }
55
58
bigQueryClientOptions := []option.ClientOption {}
56
59
if googleAPIjsonkeypath != "" {
57
60
jsonFile , err := os .Open (googleAPIjsonkeypath )
58
61
if err != nil {
59
- level .Error (logger ). Log ( "err " , err ) //nolint:errcheck
62
+ logger .Error ("failed to open google api json key" , slog . Any ( "error " , err ))
60
63
os .Exit (1 )
61
64
}
62
65
@@ -65,7 +68,7 @@ func NewClient(logger log.Logger, googleAPIjsonkeypath, googleProjectID, googleA
65
68
var result map [string ]interface {}
66
69
err = json .Unmarshal ([]byte (byteValue ), & result )
67
70
if err != nil {
68
- level .Error (logger ). Log ( "err " , err ) //nolint:errcheck
71
+ logger .Error ("failed to unmarshal google api json key" , slog . Any ( "error " , err ))
69
72
os .Exit (1 )
70
73
}
71
74
@@ -80,14 +83,10 @@ func NewClient(logger log.Logger, googleAPIjsonkeypath, googleProjectID, googleA
80
83
c , err := bigquery .NewClient (ctx , googleProjectID , bigQueryClientOptions ... )
81
84
82
85
if err != nil {
83
- level .Error (logger ). Log ( "err " , err ) //nolint:errcheck
86
+ logger .Error ("failed to create new bigquery client" , slog . Any ( "error " , err ))
84
87
os .Exit (1 )
85
88
}
86
89
87
- if logger == nil {
88
- logger = log .NewNopLogger ()
89
- }
90
-
91
90
return & BigqueryClient {
92
91
logger : logger ,
93
92
client : * c ,
@@ -180,7 +179,7 @@ func (c *BigqueryClient) Write(timeseries []*prompb.TimeSeries) error {
180
179
for _ , s := range samples {
181
180
v := float64 (s .Value )
182
181
if math .IsNaN (v ) || math .IsInf (v , 0 ) {
183
- //level.Debug( c.logger).Log("msg", " cannot send to BigQuery , skipping sample", "value", v, "sample", s)
182
+ c .logger . Debug ( " cannot send to bigquery , skipping sample" , slog . Any ( "value" , v ), slog . Any ( "sample" , s ) )
184
183
c .ignoredSamples .Inc ()
185
184
continue
186
185
}
@@ -259,7 +258,7 @@ func (c *BigqueryClient) Read(req *prompb.ReadRequest) (*prompb.ReadResponse, er
259
258
}
260
259
duration := time .Since (begin ).Seconds ()
261
260
c .sqlQueryDuration .Observe (duration )
262
- level . Debug ( c .logger ). Log ( "msg" , "BigQuery SQL query" , "rows" , iter .TotalRows , "duration" , duration ) //nolint:errcheck
261
+ c .logger . Debug ( "bigquery sql query" , slog . Any ( "rows" , iter .TotalRows ), slog . Any ( "duration" , duration ))
263
262
}
264
263
265
264
resp := prompb.ReadResponse {
@@ -312,7 +311,7 @@ func (c *BigqueryClient) buildCommand(q *prompb.Query) (string, error) {
312
311
matchers = append (matchers , fmt .Sprintf ("timestamp <= TIMESTAMP_MILLIS(%v)" , q .EndTimestampMs ))
313
312
314
313
query := fmt .Sprintf ("SELECT metricname, tags, UNIX_MILLIS(timestamp) as timestamp, value FROM %s.%s WHERE %v ORDER BY timestamp" , c .datasetID , c .tableID , strings .Join (matchers , " AND " ))
315
- level . Debug ( c .logger ). Log ( "msg" , "BigQuery read" , "sql query" , query ) //nolint:errcheck
314
+ c .logger . Debug ( "bigquery read" , slog . Any ( "sql query" , query ))
316
315
317
316
return query , nil
318
317
}
0 commit comments