Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p|r|s*: Reference ScaledObject's/ScaledJob's name in the scalers log #3494

Merged
merged 1 commit into from
Aug 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions pkg/scalers/postgresql_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"fmt"
"strconv"

"github.com/go-logr/logr"
// PostreSQL drive required for this scaler
_ "github.com/lib/pq"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)
Expand All @@ -20,6 +20,7 @@ type postgreSQLScaler struct {
metricType v2beta2.MetricTargetType
metadata *postgreSQLMetadata
connection *sql.DB
logger logr.Logger
}

type postgreSQLMetadata struct {
Expand All @@ -31,28 +32,29 @@ type postgreSQLMetadata struct {
scalerIndex int
}

var postgreSQLLog = logf.Log.WithName("postgreSQL_scaler")

// NewPostgreSQLScaler creates a new postgreSQL scaler
func NewPostgreSQLScaler(config *ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
return nil, fmt.Errorf("error getting scaler metric type: %s", err)
}

logger := InitializeLogger(config, "postgresql_scaler")

meta, err := parsePostgreSQLMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing postgreSQL metadata: %s", err)
}

conn, err := getConnection(meta)
conn, err := getConnection(meta, logger)
if err != nil {
return nil, fmt.Errorf("error establishing postgreSQL connection: %s", err)
}
return &postgreSQLScaler{
metricType: metricType,
metadata: meta,
connection: conn,
logger: logger,
}, nil
}

Expand Down Expand Up @@ -142,15 +144,15 @@ func parsePostgreSQLMetadata(config *ScalerConfig) (*postgreSQLMetadata, error)
return &meta, nil
}

func getConnection(meta *postgreSQLMetadata) (*sql.DB, error) {
func getConnection(meta *postgreSQLMetadata, logger logr.Logger) (*sql.DB, error) {
db, err := sql.Open("postgres", meta.connection)
if err != nil {
postgreSQLLog.Error(err, fmt.Sprintf("Found error opening postgreSQL: %s", err))
logger.Error(err, fmt.Sprintf("Found error opening postgreSQL: %s", err))
return nil, err
}
err = db.Ping()
if err != nil {
postgreSQLLog.Error(err, fmt.Sprintf("Found error pinging postgreSQL: %s", err))
logger.Error(err, fmt.Sprintf("Found error pinging postgreSQL: %s", err))
return nil, err
}
return db, nil
Expand All @@ -160,7 +162,7 @@ func getConnection(meta *postgreSQLMetadata) (*sql.DB, error) {
func (s *postgreSQLScaler) Close(context.Context) error {
err := s.connection.Close()
if err != nil {
postgreSQLLog.Error(err, "Error closing postgreSQL connection")
s.logger.Error(err, "Error closing postgreSQL connection")
return err
}
return nil
Expand All @@ -180,7 +182,7 @@ func (s *postgreSQLScaler) getActiveNumber(ctx context.Context) (float64, error)
var id float64
err := s.connection.QueryRowContext(ctx, s.metadata.query).Scan(&id)
if err != nil {
postgreSQLLog.Error(err, fmt.Sprintf("could not query postgreSQL: %s", err))
s.logger.Error(err, fmt.Sprintf("could not query postgreSQL: %s", err))
return 0, fmt.Errorf("could not query postgreSQL: %s", err)
}
return id, nil
Expand Down
4 changes: 3 additions & 1 deletion pkg/scalers/postgresql_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package scalers
import (
"context"
"testing"

"github.com/go-logr/logr"
)

type parsePostgreSQLMetadataTestData struct {
Expand Down Expand Up @@ -45,7 +47,7 @@ func TestPosgresSQLGetMetricSpecForScaling(t *testing.T) {
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
mockPostgresSQLScaler := postgreSQLScaler{"", meta, nil}
mockPostgresSQLScaler := postgreSQLScaler{"", meta, nil, logr.Discard()}

metricSpec := mockPostgresSQLScaler.GetMetricSpecForScaling(context.Background())
metricName := metricSpec[0].External.Metric.Name
Expand Down
29 changes: 15 additions & 14 deletions pkg/scalers/predictkube_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
tc "github.com/dysnix/predictkube-libs/external/types_convertation"
"github.com/dysnix/predictkube-proto/external/proto/commonproto"
pb "github.com/dysnix/predictkube-proto/external/proto/services"
"github.com/go-logr/logr"
"github.com/go-playground/validator/v10"
"github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
Expand All @@ -28,7 +29,6 @@ import (
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/kedacore/keda/v2/pkg/scalers/authentication"
kedautil "github.com/kedacore/keda/v2/pkg/util"
Expand Down Expand Up @@ -81,6 +81,7 @@ type PredictKubeScaler struct {
grpcClient pb.MlEngineServiceClient
healthClient health.HealthClient
api v1.API
logger logr.Logger
}

type predictKubeMetadata struct {
Expand All @@ -96,8 +97,6 @@ type predictKubeMetadata struct {
scalerIndex int
}

var predictKubeLog = logf.Log.WithName("predictkube_scaler")

func (s *PredictKubeScaler) setupClientConn() error {
clientOpt, err := pc.SetGrpcClientOptions(grpcConf,
&libs.Base{
Expand Down Expand Up @@ -141,31 +140,33 @@ func (s *PredictKubeScaler) setupClientConn() error {
func NewPredictKubeScaler(ctx context.Context, config *ScalerConfig) (*PredictKubeScaler, error) {
s := &PredictKubeScaler{}

logger := InitializeLogger(config, "predictkube_scaler")
s.logger = logger

metricType, err := GetMetricTargetType(config)
if err != nil {
predictKubeLog.Error(err, "error getting scaler metric type")
logger.Error(err, "error getting scaler metric type")
return nil, fmt.Errorf("error getting scaler metric type: %s", err)
}

s.metricType = metricType

meta, err := parsePredictKubeMetadata(config)
if err != nil {
predictKubeLog.Error(err, "error parsing PredictKube metadata")
logger.Error(err, "error parsing PredictKube metadata")
return nil, fmt.Errorf("error parsing PredictKube metadata: %3s", err)
}

s.metadata = meta

err = s.initPredictKubePrometheusConn(ctx)
if err != nil {
predictKubeLog.Error(err, "error create Prometheus client and API objects")
logger.Error(err, "error create Prometheus client and API objects")
return nil, fmt.Errorf("error create Prometheus client and API objects: %3s", err)
}

err = s.setupClientConn()
if err != nil {
predictKubeLog.Error(err, "error init GRPC client")
logger.Error(err, "error init GRPC client")
return nil, fmt.Errorf("error init GRPC client: %3s", err)
}

Expand Down Expand Up @@ -222,17 +223,17 @@ func (s *PredictKubeScaler) GetMetricSpecForScaling(context.Context) []v2beta2.M
func (s *PredictKubeScaler) GetMetrics(ctx context.Context, metricName string, _ labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
value, err := s.doPredictRequest(ctx)
if err != nil {
predictKubeLog.Error(err, "error executing query to predict controller service")
s.logger.Error(err, "error executing query to predict controller service")
return []external_metrics.ExternalMetricValue{}, err
}

if value == 0 {
err = errors.New("empty response after predict request")
predictKubeLog.Error(err, "")
s.logger.Error(err, "")
return nil, err
}

predictKubeLog.V(1).Info(fmt.Sprintf("predict value is: %f", value))
s.logger.V(1).Info(fmt.Sprintf("predict value is: %f", value))

metric := GenerateMetricInMili(metricName, value)

Expand Down Expand Up @@ -285,7 +286,7 @@ func (s *PredictKubeScaler) doQuery(ctx context.Context) ([]*commonproto.Item, e
val, warns, err := s.api.QueryRange(ctx, s.metadata.query, r)

if len(warns) > 0 {
predictKubeLog.V(1).Info("warnings", warns)
s.logger.V(1).Info("warnings", warns)
}

if err != nil {
Expand Down Expand Up @@ -473,15 +474,15 @@ func (s *PredictKubeScaler) initPredictKubePrometheusConn(ctx context.Context) (
authentication.FastHTTP,
s.metadata.prometheusAuth,
); err != nil {
predictKubeLog.V(1).Error(err, "init Prometheus client http transport")
s.logger.V(1).Error(err, "init Prometheus client http transport")
return err
}

if s.prometheusClient, err = api.NewClient(api.Config{
Address: s.metadata.prometheusAddress,
RoundTripper: roundTripper,
}); err != nil {
predictKubeLog.V(1).Error(err, "init Prometheus client")
s.logger.V(1).Error(err, "init Prometheus client")
return err
}

Expand Down
24 changes: 14 additions & 10 deletions pkg/scalers/prometheus_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
"strconv"
"time"

"github.com/go-logr/logr"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/kedacore/keda/v2/pkg/scalers/authentication"
kedautil "github.com/kedacore/keda/v2/pkg/util"
Expand All @@ -39,6 +39,7 @@ type prometheusScaler struct {
metricType v2beta2.MetricTargetType
metadata *prometheusMetadata
httpClient *http.Client
logger logr.Logger
}

type prometheusMetadata struct {
Expand Down Expand Up @@ -70,15 +71,15 @@ type promQueryResult struct {
} `json:"data"`
}

var prometheusLog = logf.Log.WithName("prometheus_scaler")

// NewPrometheusScaler creates a new prometheusScaler
func NewPrometheusScaler(config *ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
return nil, fmt.Errorf("error getting scaler metric type: %s", err)
}

logger := InitializeLogger(config, "prometheus_scaler")

meta, err := parsePrometheusMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing prometheus metadata: %s", err)
Expand All @@ -92,7 +93,7 @@ func NewPrometheusScaler(config *ScalerConfig) (Scaler, error) {
authentication.NetHTTP,
meta.prometheusAuth,
); err != nil {
predictKubeLog.V(1).Error(err, "init Prometheus client http transport")
logger.V(1).Error(err, "init Prometheus client http transport")
return nil, err
}
}
Expand All @@ -101,6 +102,7 @@ func NewPrometheusScaler(config *ScalerConfig) (Scaler, error) {
metricType: metricType,
metadata: meta,
httpClient: httpClient,
logger: logger,
}, nil
}

Expand Down Expand Up @@ -178,7 +180,7 @@ func parsePrometheusMetadata(config *ScalerConfig) (meta *prometheusMetadata, er
func (s *prometheusScaler) IsActive(ctx context.Context) (bool, error) {
val, err := s.ExecutePromQuery(ctx)
if err != nil {
prometheusLog.Error(err, "error executing prometheus query")
s.logger.Error(err, "error executing prometheus query")
return false, err
}

Expand Down Expand Up @@ -240,7 +242,9 @@ func (s *prometheusScaler) ExecutePromQuery(ctx context.Context) (float64, error
_ = r.Body.Close()

if !(r.StatusCode >= 200 && r.StatusCode <= 299) {
return -1, fmt.Errorf("prometheus query api returned error. status: %d response: %s", r.StatusCode, string(b))
err := fmt.Errorf("prometheus query api returned error. status: %d response: %s", r.StatusCode, string(b))
s.logger.Error(err, "prometheus query api returned error")
return -1, err
}

var result promQueryResult
Expand Down Expand Up @@ -273,10 +277,10 @@ func (s *prometheusScaler) ExecutePromQuery(ctx context.Context) (float64, error

val := result.Data.Result[0].Value[1]
if val != nil {
s := val.(string)
v, err = strconv.ParseFloat(s, 64)
str := val.(string)
v, err = strconv.ParseFloat(str, 64)
if err != nil {
prometheusLog.Error(err, "Error converting prometheus value", "prometheus_value", s)
s.logger.Error(err, "Error converting prometheus value", "prometheus_value", str)
return -1, err
}
}
Expand All @@ -287,7 +291,7 @@ func (s *prometheusScaler) ExecutePromQuery(ctx context.Context) (float64, error
func (s *prometheusScaler) GetMetrics(ctx context.Context, metricName string, _ labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
val, err := s.ExecutePromQuery(ctx)
if err != nil {
prometheusLog.Error(err, "error executing prometheus query")
s.logger.Error(err, "error executing prometheus query")
return []external_metrics.ExternalMetricValue{}, err
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/scalers/prometheus_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"
"testing"

"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -234,6 +235,7 @@ func TestPrometheusScalerExecutePromQuery(t *testing.T) {
ignoreNullValues: testData.ignoreNullValues,
},
httpClient: http.DefaultClient,
logger: logr.Discard(),
}

value, err := scaler.ExecutePromQuery(context.TODO())
Expand Down
9 changes: 5 additions & 4 deletions pkg/scalers/rabbitmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import (
"strconv"
"time"

"github.com/go-logr/logr"
"github.com/streadway/amqp"
v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)
Expand Down Expand Up @@ -57,6 +57,7 @@ type rabbitMQScaler struct {
connection *amqp.Connection
channel *amqp.Channel
httpClient *http.Client
logger logr.Logger
}

type rabbitMQMetadata struct {
Expand Down Expand Up @@ -96,8 +97,6 @@ type publishDetail struct {
Rate float64 `json:"rate"`
}

var rabbitmqLog = logf.Log.WithName("rabbitmq_scaler")

// NewRabbitMQScaler creates a new rabbitMQ scaler
func NewRabbitMQScaler(config *ScalerConfig) (Scaler, error) {
s := &rabbitMQScaler{}
Expand All @@ -108,6 +107,8 @@ func NewRabbitMQScaler(config *ScalerConfig) (Scaler, error) {
}
s.metricType = metricType

s.logger = InitializeLogger(config, "rabbitmq_scaler")

meta, err := parseRabbitMQMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing rabbitmq metadata: %s", err)
Expand Down Expand Up @@ -361,7 +362,7 @@ func (s *rabbitMQScaler) Close(context.Context) error {
if s.connection != nil {
err := s.connection.Close()
if err != nil {
rabbitmqLog.Error(err, "Error closing rabbitmq connection")
s.logger.Error(err, "Error closing rabbitmq connection")
return err
}
}
Expand Down
Loading