Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] add the scaling metric support #36

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .DS_Store
Binary file not shown.
Binary file added pkg/.DS_Store
Binary file not shown.
Binary file added pkg/metrics/.DS_Store
Binary file not shown.
6 changes: 3 additions & 3 deletions pkg/metrics/sls/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ func (ss *SLSMetricSource) getSLSIngressQuery(params *SLSIngressParams, metricNa

func (ss *SLSMetricSource) getSLSIngressMetrics(namespace string, requirements labels.Requirements, metricName string) (values []external_metrics.ExternalMetricValue, err error) {

params, err := getSLSParams(requirements)
params, err := getSLSIngressParams(requirements)
if err != nil {
return values, fmt.Errorf("failed to get sls params,because of %v", err)
return values, fmt.Errorf("failed to get sls params, because of %v", err)
}

client, err := ss.Client(params.Internal)
Expand Down Expand Up @@ -122,5 +122,5 @@ func (ss *SLSMetricSource) getSLSIngressMetrics(namespace string, requirements l

return values, err
}
return values, errors.New("Query sls timeout,it might because of too many logs.")
return values, fmt.Errorf("query sls timeout, it might because of too many logs")
}
5 changes: 3 additions & 2 deletions pkg/metrics/sls/ingress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package sls

import (
"fmt"
"k8s.io/apimachinery/pkg/labels"
"testing"

"k8s.io/apimachinery/pkg/labels"
)

func TestInvalidGetSLSParams(t *testing.T) {
r := make([]labels.Requirement, 0)

_, e := getSLSParams(r)
_, e := getSLSIngressParams(r)
if e != nil {
t.Log("pass TestInvalidGetSLSParams")
return
Expand Down
93 changes: 93 additions & 0 deletions pkg/metrics/sls/scaling.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package sls

import (
"fmt"
"strconv"
"time"

slssdk "github.com/aliyun/aliyun-log-go-sdk"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
log "k8s.io/klog/v2"
"k8s.io/metrics/pkg/apis/external_metrics"
)

const MLLogstore string = "internal-ml-log"
const MLPredQuery string = `
* and series_prediction and "__tag__:__job_name__":%s | select time, value from (
(select max(__time__) as pred_time, max_by("__tag__:__batch_id__", __time__) as batch_id from log where cast(json_extract(result, '$.entity') as varchar)='%s' and cast(json_extract(result, '$.metric') as varchar)='%s' and not result_type='prediction_error') t1
join
(select "__tag__:__batch_id__" as batch_id, cast(json_extract(result, '$.time') as bigint) as time, cast(json_extract(result, '$.expect_value') as double) as value
from log where cast(json_extract(result, '$.entity') as varchar)='%s' and cast(json_extract(result, '$.metric') as varchar)='%s' limit 100000) t2
on t1.batch_id=t2.batch_id) where time > %d order by time
`

type SLSScalingParams struct {
SLSGlobalParams
}

func (ss *SLSMetricSource) getSLSScalingQuery(params *SLSScalingParams, metricName string) (begin int64, end int64, query string, err error) {
now := time.Now().Unix()
begin = now - int64(params.Interval)
end = now

switch metricName {
case SLS_SCALING:
query = fmt.Sprintf(MLPredQuery, params.JobName, params.Entity, params.Metric, params.Entity, params.Metric, now)
default:
err = fmt.Errorf("failed to get ml prediction query: unsupported metric %s(qps)", metricName)
log.Errorf(err.Error())
}
return begin, end, query, err
}

func (ss *SLSMetricSource) getSLSScalingMetrics(namespace string, requirements labels.Requirements, metricName string) (values []external_metrics.ExternalMetricValue, err error) {

params, err := getSLSScalingParams(requirements)
if err != nil {
log.Errorf("failed to get scaling metrics for sls scaling params error: %v", err)
return values, err
}

client, err := ss.Client(params.Internal)
if err != nil {
log.Errorf("failed to get scaling metrics for sls client error: %v", err)
return values, err
}

begin, end, query, err := ss.getSLSScalingQuery(params, metricName)
if err != nil {
return values, err
}

var resp *slssdk.GetLogsResponse
for i := 0; i < params.MaxRetry; i++ {
resp, err = client.GetLogs(params.Project, params.LogStore, "", begin, end, query, 10000, 0, false)
if err != nil || len(resp.Logs) == 0 {
return values, err
}

if !resp.IsComplete() {
continue
}

// just reture the next metric
log := resp.Logs[0]
ts, err := strconv.ParseInt(log["time"], 10, 64)
if err != nil {
return values, err
}
val, err := strconv.ParseFloat(log["value"], 64)
if err != nil {
return values, err
}
values = append(values, external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(val), resource.DecimalSI),
Timestamp: metav1.Unix(ts, 0),
})
break
}
return values, nil
}
78 changes: 78 additions & 0 deletions pkg/metrics/sls/scaling_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package sls

import (
"fmt"
"os"
"strconv"
"testing"

sls "github.com/aliyun/aliyun-log-go-sdk"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/metrics/pkg/apis/external_metrics"
)

func TestGetSLSPredResult(t *testing.T) {
ss := &SLSMetricSource{}
param := &SLSScalingParams{
SLSGlobalParams{
Project: "k8s-log-c0ae5df15fbf34b47ba3a9684e6ee2bee",
LogStore: "internal-ml-log",
Internal: false,
MaxRetry: 3,
Interval: 24 * 60 * 60,

JobName: "etl-1648450179377-917195",
Entity: "service.ali.com-default-new-nginx-80",
Metric: "metric",
},
}
client := sls.CreateNormalInterface(
"cn-beijing.log.aliyuncs.com",
os.Getenv("shiji_test_sub_ak_id"),
os.Getenv("shiji_test_sub_ak_key"),
"",
)

begin, end, query, err := ss.getSLSScalingQuery(param, SLS_SCALING)
if err != nil {
t.Errorf(err.Error())
return
}
fmt.Printf("prediction query:\n%s\n", query)

resp, err := client.GetLogs(param.Project, param.LogStore, "", begin, end, query, 10000, 0, false)
if err != nil || len(resp.Logs) == 0 {
t.Errorf("failed to get sls response: err info %v", err)
return
}
if !resp.IsComplete() {
t.Errorf("sls response is not complete")
return
}

var values []external_metrics.ExternalMetricValue
for _, log := range resp.Logs {
ts, err := strconv.ParseInt(log["time"], 10, 64)
if err != nil {
t.Errorf(err.Error())
return
}
val, err := strconv.ParseFloat(log["value"], 64)
if err != nil {
t.Errorf(err.Error())
return
}
values = append(values, external_metrics.ExternalMetricValue{
MetricName: SLS_SCALING,
// TODO: values format need to be decided
Value: *resource.NewScaledQuantity(int64(val), resource.Scale(-6)),
Timestamp: metav1.Unix(ts, 0),
})
}
if len(values) == 0 {
t.Errorf("sls prediction response has empty result")
return
}
fmt.Printf("sls prediction response has %d values\n", len(values))
}
Loading