Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/app/api/grpcapi/piped_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,19 @@ func (a *PipedAPI) Register(server *grpc.Server) {

// Ping is periodically sent to report its realtime status/stats to control-plane.
// The received stats will be pushed to the metrics collector.
// Note: This rpc is deprecated, use ReportStat instead.
func (a *PipedAPI) Ping(ctx context.Context, req *pipedservice.PingRequest) (*pipedservice.PingResponse, error) {
return &pipedservice.PingResponse{}, nil
// return nil, status.Error(codes.Unimplemented, "")
}

// ReportStat is periodically sent to report its realtime status/stats to control-plane.
// The received stats will be pushed to the metrics collector.
func (a *PipedAPI) ReportStat(ctx context.Context, req *pipedservice.ReportStatRequest) (*pipedservice.ReportStatResponse, error) {
return &pipedservice.ReportStatResponse{}, nil
// return nil, status.Error(codes.Unimplemented, "")
}

// ReportPipedMeta is sent by piped while starting up to report its metadata
// such as configured cloud providers.
func (a *PipedAPI) ReportPipedMeta(ctx context.Context, req *pipedservice.ReportPipedMetaRequest) (*pipedservice.ReportPipedMetaResponse, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ func (c *fakeClient) Ping(ctx context.Context, req *pipedservice.PingRequest, op
return &pipedservice.PingResponse{}, nil
}

// ReportStat is periodically sent to report its realtime status/stats to control-plane.
// The received stats will be pushed to the metrics collector.
func (c *fakeClient) ReportStat(ctx context.Context, req *pipedservice.ReportStatRequest, opts ...grpc.CallOption) (*pipedservice.ReportStatResponse, error) {
c.logger.Info("fake client received ReportStat rpc", zap.Any("request", req))
return &pipedservice.ReportStatResponse{}, nil
}

// ReportPipedMeta is sent by piped while starting up to report its metadata
// such as configured cloud providers.
func (c *fakeClient) ReportPipedMeta(ctx context.Context, req *pipedservice.ReportPipedMetaRequest, opts ...grpc.CallOption) (*pipedservice.ReportPipedMetaResponse, error) {
Expand Down
22 changes: 19 additions & 3 deletions pkg/app/api/service/pipedservice/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,14 @@ import "pkg/model/event.proto";
service PipedService {
// Ping is periodically sent to report its realtime status/stats to control-plane.
// The received stats will be pushed to the metrics collector.
rpc Ping(PingRequest) returns (PingResponse) {}
// Note: This rpc is deprecated, use ReportStat instead.
rpc Ping(PingRequest) returns (PingResponse) {
option deprecated = true;
}

// ReportStat is periodically sent to report its realtime status/stats to control-plane.
// The received stats will be pushed to the metrics collector.
rpc ReportStat(ReportStatRequest) returns (ReportStatResponse) {}

// ReportPipedMeta is sent while starting up to report its metadata
// such as configured cloud providers.
Expand Down Expand Up @@ -149,11 +156,20 @@ enum ListOrder {
}

message PingRequest {
pipe.model.PipedStats piped_stats = 1 [(validate.rules).message.required = true];
pipe.model.PipedStats piped_stats = 1 [(validate.rules).message.required = true, deprecated = true];
}

message PingResponse {
int64 ping_interval = 1;
int64 ping_interval = 1 [deprecated = true];
}

message ReportStatRequest {
// Metrics byte sequence in OpenMetrics format.
bytes piped_stats = 1 [(validate.rules).bytes.min_len = 1];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. For those who look at this first will be more likely to wonder why it's represented as a byte sequence. I feel like it should be commented like: "Metrics byte sequence in OpenMetrics format"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Lets me adopt your idea, thx 🙆‍♂️

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed by 2bb3bcb 🙏

}

message ReportStatResponse {
int64 report_interval = 1;
}

message ReportPipedMetaRequest {
Expand Down
18 changes: 12 additions & 6 deletions pkg/app/piped/cmd/piped/piped.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,6 @@ func NewCommand() *cobra.Command {
}

func (p *piped) run(ctx context.Context, t cli.Telemetry) (runErr error) {
// Register all metrics.
registerMetrics()

group, ctx := errgroup.WithContext(ctx)
if p.addLoginUserToPasswd {
if err := p.insertLoginUserToPasswd(ctx); err != nil {
Expand All @@ -136,6 +133,9 @@ func (p *piped) run(ctx context.Context, t cli.Telemetry) (runErr error) {
return err
}

// Register all metrics.
registerMetrics(cfg.PipedID)

// Initialize notifier and add piped events.
notifier, err := notifier.NewNotifier(cfg, t.Logger)
if err != nil {
Expand Down Expand Up @@ -616,8 +616,14 @@ func (p *piped) insertLoginUserToPasswd(ctx context.Context) error {
return nil
}

func registerMetrics() {
func registerMetrics(pipedID string) {
r := prometheus.DefaultRegisterer
k8scloudprovidermetrics.Register(r)
k8slivestatestoremetrics.Register(r)
// TODO: Add piped version as label.
wrapped := prometheus.WrapRegistererWith(
prometheus.Labels{"piped": pipedID},
r,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. One thing we have to update is fixing cli to use this wrapped register instead of the default one.
https://github.com/pipe-cd/pipe/blob/master/pkg/cli/cmd.go#L121

But that can be done in another PR.

Copy link
Member Author

@khanhtc1202 khanhtc1202 Jul 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, lets me address another missing in this PR 🙏
ref: 0eb829f

)

k8scloudprovidermetrics.Register(wrapped)
k8slivestatestoremetrics.Register(wrapped)
}
8 changes: 0 additions & 8 deletions pkg/app/piped/statsreporter/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/app/api/service/pipedservice:go_default_library",
"//pkg/model:go_default_library",
"//pkg/version:go_default_library",
"@com_github_prometheus_client_model//go:go_default_library",
"@com_github_prometheus_common//expfmt:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_uber_go_zap//:go_default_library",
],
Expand All @@ -22,8 +18,4 @@ go_test(
srcs = ["reporter_test.go"],
data = glob(["testdata/**"]),
embed = [":go_default_library"],
deps = [
"@com_github_stretchr_testify//assert:go_default_library",
"@com_github_stretchr_testify//require:go_default_library",
],
)
100 changes: 11 additions & 89 deletions pkg/app/piped/statsreporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,15 @@ import (
"net/http"
"time"

dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/pipe-cd/pipe/pkg/app/api/service/pipedservice"
"github.com/pipe-cd/pipe/pkg/model"
"github.com/pipe-cd/pipe/pkg/version"
)

type apiClient interface {
Ping(ctx context.Context, req *pipedservice.PingRequest, opts ...grpc.CallOption) (*pipedservice.PingResponse, error)
ReportStat(ctx context.Context, req *pipedservice.ReportStatRequest, opts ...grpc.CallOption) (*pipedservice.ReportStatResponse, error)
}

type Reporter interface {
Expand Down Expand Up @@ -71,19 +68,10 @@ L:
break L

case now := <-ticker.C:
metrics, err := r.collect()
if err != nil {
continue
}
if len(metrics) == 0 {
r.logger.Info("there are no metrics to report")
continue
}
if err := r.report(ctx, metrics, now); err != nil {
if err := r.report(ctx); err != nil {
continue
}
r.logger.Info("successfully collected and reported metrics",
zap.Int("num", len(metrics)),
zap.Duration("duration", time.Since(now)),
)
}
Expand All @@ -93,92 +81,26 @@ L:
return nil
}

func (r *reporter) collect() ([]*model.PrometheusMetrics, error) {
func (r *reporter) report(ctx context.Context) error {
resp, err := r.httpClient.Get(r.metricsURL)
if err != nil {
r.logger.Error("failed to collect prometheus metrics", zap.Error(err))
return nil, err
r.logger.Error("failed to fetch prometheus metrics", zap.Error(err))
return err
}
defer resp.Body.Close()

metrics, err := parsePrometheusMetrics(resp.Body)
b, err := io.ReadAll(resp.Body)
if err != nil {
r.logger.Error("failed to parse prometheus metrics", zap.Error(err))
return nil, err
r.logger.Error("failed to load prometheus metrics", zap.Error(err))
return err
}

return metrics, nil
}

func (r *reporter) report(ctx context.Context, metrics []*model.PrometheusMetrics, now time.Time) error {
req := &pipedservice.PingRequest{
PipedStats: &model.PipedStats{
Version: version.Get().Version,
Timestamp: now.Unix(),
PrometheusMetrics: metrics,
},
req := &pipedservice.ReportStatRequest{
PipedStats: b,
}
if _, err := r.apiClient.Ping(ctx, req); err != nil {
if _, err := r.apiClient.ReportStat(ctx, req); err != nil {
r.logger.Error("failed to report stats", zap.Error(err))
return err
}
return nil
}

var parser expfmt.TextParser

// TODO: Add a metrics whitelist and fiter out not needed ones.
func parsePrometheusMetrics(reader io.Reader) ([]*model.PrometheusMetrics, error) {
metricFamily, err := parser.TextToMetricFamilies(reader)
if err != nil {
return nil, err
}

metrics := make([]*model.PrometheusMetrics, 0, len(metricFamily))

L:
for _, mf := range metricFamily {
var metricType model.PrometheusMetrics_Type

switch mf.GetType() {
case dto.MetricType_COUNTER:
metricType = model.PrometheusMetrics_COUNTER
case dto.MetricType_GAUGE:
metricType = model.PrometheusMetrics_GAUGE
default:
continue L
}

metric := &model.PrometheusMetrics{
Name: *mf.Name,
Type: metricType,
}

for _, m := range mf.Metric {
sample := &model.PrometheusMetrics_Sample{
Labels: make([]*model.PrometheusMetrics_LabelPair, 0, len(m.Label)),
}
metric.Samples = append(metric.Samples, sample)

for _, l := range m.Label {
sample.Labels = append(sample.Labels, &model.PrometheusMetrics_LabelPair{
Name: l.GetName(),
Value: l.GetValue(),
})
}

switch metric.Type {
case model.PrometheusMetrics_COUNTER:
sample.Value = m.Counter.GetValue()
case model.PrometheusMetrics_GAUGE:
sample.Value = m.Gauge.GetValue()
}
}

if len(metric.Samples) > 0 {
metrics = append(metrics, metric)
}
}

return metrics, nil
}
19 changes: 0 additions & 19 deletions pkg/app/piped/statsreporter/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,3 @@
// limitations under the License.

package statsreporter

import (
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestParsePrometheusMetrics(t *testing.T) {
f, err := os.Open("testdata/metrics.txt")
require.NoError(t, err)
defer f.Close()

metrics, err := parsePrometheusMetrics(f)
require.NoError(t, err)

assert.Equal(t, 30, len(metrics))
}