diff --git a/pkg/app/api/grpcapi/piped_api.go b/pkg/app/api/grpcapi/piped_api.go index d7ccee37c5..247032dce8 100644 --- a/pkg/app/api/grpcapi/piped_api.go +++ b/pkg/app/api/grpcapi/piped_api.go @@ -85,11 +85,19 @@ func (a *PipedAPI) Register(server *grpc.Server) { // Ping is periodically sent to report its realtime status/stats to control-plane. // The received stats will be pushed to the metrics collector. +// Note: This rpc is deprecated, use ReportStat instead. func (a *PipedAPI) Ping(ctx context.Context, req *pipedservice.PingRequest) (*pipedservice.PingResponse, error) { return &pipedservice.PingResponse{}, nil // return nil, status.Error(codes.Unimplemented, "") } +// ReportStat is periodically sent to report its realtime status/stats to control-plane. +// The received stats will be pushed to the metrics collector. +func (a *PipedAPI) ReportStat(ctx context.Context, req *pipedservice.ReportStatRequest) (*pipedservice.ReportStatResponse, error) { + return &pipedservice.ReportStatResponse{}, nil + // return nil, status.Error(codes.Unimplemented, "") +} + // ReportPipedMeta is sent by piped while starting up to report its metadata // such as configured cloud providers. func (a *PipedAPI) ReportPipedMeta(ctx context.Context, req *pipedservice.ReportPipedMetaRequest) (*pipedservice.ReportPipedMetaResponse, error) { diff --git a/pkg/app/api/service/pipedservice/pipedclientfake/fakeclient.go b/pkg/app/api/service/pipedservice/pipedclientfake/fakeclient.go index a3d5d2fa52..ebb58c0a17 100644 --- a/pkg/app/api/service/pipedservice/pipedclientfake/fakeclient.go +++ b/pkg/app/api/service/pipedservice/pipedclientfake/fakeclient.go @@ -107,6 +107,13 @@ func (c *fakeClient) Ping(ctx context.Context, req *pipedservice.PingRequest, op return &pipedservice.PingResponse{}, nil } +// ReportStat is periodically sent to report its realtime status/stats to control-plane. +// The received stats will be pushed to the metrics collector. +func (c *fakeClient) ReportStat(ctx context.Context, req *pipedservice.ReportStatRequest, opts ...grpc.CallOption) (*pipedservice.ReportStatResponse, error) { + c.logger.Info("fake client received ReportStat rpc", zap.Any("request", req)) + return &pipedservice.ReportStatResponse{}, nil +} + // ReportPipedMeta is sent by piped while starting up to report its metadata // such as configured cloud providers. func (c *fakeClient) ReportPipedMeta(ctx context.Context, req *pipedservice.ReportPipedMetaRequest, opts ...grpc.CallOption) (*pipedservice.ReportPipedMetaResponse, error) { diff --git a/pkg/app/api/service/pipedservice/service.proto b/pkg/app/api/service/pipedservice/service.proto index c8f08e95c8..3c4e5b9628 100644 --- a/pkg/app/api/service/pipedservice/service.proto +++ b/pkg/app/api/service/pipedservice/service.proto @@ -34,7 +34,14 @@ import "pkg/model/event.proto"; service PipedService { // Ping is periodically sent to report its realtime status/stats to control-plane. // The received stats will be pushed to the metrics collector. - rpc Ping(PingRequest) returns (PingResponse) {} + // Note: This rpc is deprecated, use ReportStat instead. + rpc Ping(PingRequest) returns (PingResponse) { + option deprecated = true; + } + + // ReportStat is periodically sent to report its realtime status/stats to control-plane. + // The received stats will be pushed to the metrics collector. + rpc ReportStat(ReportStatRequest) returns (ReportStatResponse) {} // ReportPipedMeta is sent while starting up to report its metadata // such as configured cloud providers. @@ -149,11 +156,20 @@ enum ListOrder { } message PingRequest { - pipe.model.PipedStats piped_stats = 1 [(validate.rules).message.required = true]; + pipe.model.PipedStats piped_stats = 1 [(validate.rules).message.required = true, deprecated = true]; } message PingResponse { - int64 ping_interval = 1; + int64 ping_interval = 1 [deprecated = true]; +} + +message ReportStatRequest { + // Metrics byte sequence in OpenMetrics format. + bytes piped_stats = 1 [(validate.rules).bytes.min_len = 1]; +} + +message ReportStatResponse { + int64 report_interval = 1; } message ReportPipedMetaRequest { diff --git a/pkg/app/piped/cmd/piped/piped.go b/pkg/app/piped/cmd/piped/piped.go index ce2bbe271a..7536d9d3ba 100644 --- a/pkg/app/piped/cmd/piped/piped.go +++ b/pkg/app/piped/cmd/piped/piped.go @@ -119,9 +119,6 @@ func NewCommand() *cobra.Command { } func (p *piped) run(ctx context.Context, t cli.Telemetry) (runErr error) { - // Register all metrics. - registerMetrics() - group, ctx := errgroup.WithContext(ctx) if p.addLoginUserToPasswd { if err := p.insertLoginUserToPasswd(ctx); err != nil { @@ -136,6 +133,9 @@ func (p *piped) run(ctx context.Context, t cli.Telemetry) (runErr error) { return err } + // Register all metrics. + registerMetrics(cfg.PipedID) + // Initialize notifier and add piped events. notifier, err := notifier.NewNotifier(cfg, t.Logger) if err != nil { @@ -616,8 +616,14 @@ func (p *piped) insertLoginUserToPasswd(ctx context.Context) error { return nil } -func registerMetrics() { +func registerMetrics(pipedID string) { r := prometheus.DefaultRegisterer - k8scloudprovidermetrics.Register(r) - k8slivestatestoremetrics.Register(r) + // TODO: Add piped version as label. + wrapped := prometheus.WrapRegistererWith( + prometheus.Labels{"piped": pipedID}, + r, + ) + + k8scloudprovidermetrics.Register(wrapped) + k8slivestatestoremetrics.Register(wrapped) } diff --git a/pkg/app/piped/statsreporter/BUILD.bazel b/pkg/app/piped/statsreporter/BUILD.bazel index c18200650d..94e84013a8 100644 --- a/pkg/app/piped/statsreporter/BUILD.bazel +++ b/pkg/app/piped/statsreporter/BUILD.bazel @@ -7,10 +7,6 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/app/api/service/pipedservice:go_default_library", - "//pkg/model:go_default_library", - "//pkg/version:go_default_library", - "@com_github_prometheus_client_model//go:go_default_library", - "@com_github_prometheus_common//expfmt:go_default_library", "@org_golang_google_grpc//:go_default_library", "@org_uber_go_zap//:go_default_library", ], @@ -22,8 +18,4 @@ go_test( srcs = ["reporter_test.go"], data = glob(["testdata/**"]), embed = [":go_default_library"], - deps = [ - "@com_github_stretchr_testify//assert:go_default_library", - "@com_github_stretchr_testify//require:go_default_library", - ], ) diff --git a/pkg/app/piped/statsreporter/reporter.go b/pkg/app/piped/statsreporter/reporter.go index f9671ebe22..2d1e0177c4 100644 --- a/pkg/app/piped/statsreporter/reporter.go +++ b/pkg/app/piped/statsreporter/reporter.go @@ -22,18 +22,15 @@ import ( "net/http" "time" - dto "github.com/prometheus/client_model/go" - "github.com/prometheus/common/expfmt" "go.uber.org/zap" "google.golang.org/grpc" "github.com/pipe-cd/pipe/pkg/app/api/service/pipedservice" - "github.com/pipe-cd/pipe/pkg/model" - "github.com/pipe-cd/pipe/pkg/version" ) type apiClient interface { Ping(ctx context.Context, req *pipedservice.PingRequest, opts ...grpc.CallOption) (*pipedservice.PingResponse, error) + ReportStat(ctx context.Context, req *pipedservice.ReportStatRequest, opts ...grpc.CallOption) (*pipedservice.ReportStatResponse, error) } type Reporter interface { @@ -71,19 +68,10 @@ L: break L case now := <-ticker.C: - metrics, err := r.collect() - if err != nil { - continue - } - if len(metrics) == 0 { - r.logger.Info("there are no metrics to report") - continue - } - if err := r.report(ctx, metrics, now); err != nil { + if err := r.report(ctx); err != nil { continue } r.logger.Info("successfully collected and reported metrics", - zap.Int("num", len(metrics)), zap.Duration("duration", time.Since(now)), ) } @@ -93,92 +81,26 @@ L: return nil } -func (r *reporter) collect() ([]*model.PrometheusMetrics, error) { +func (r *reporter) report(ctx context.Context) error { resp, err := r.httpClient.Get(r.metricsURL) if err != nil { - r.logger.Error("failed to collect prometheus metrics", zap.Error(err)) - return nil, err + r.logger.Error("failed to fetch prometheus metrics", zap.Error(err)) + return err } defer resp.Body.Close() - metrics, err := parsePrometheusMetrics(resp.Body) + b, err := io.ReadAll(resp.Body) if err != nil { - r.logger.Error("failed to parse prometheus metrics", zap.Error(err)) - return nil, err + r.logger.Error("failed to load prometheus metrics", zap.Error(err)) + return err } - return metrics, nil -} - -func (r *reporter) report(ctx context.Context, metrics []*model.PrometheusMetrics, now time.Time) error { - req := &pipedservice.PingRequest{ - PipedStats: &model.PipedStats{ - Version: version.Get().Version, - Timestamp: now.Unix(), - PrometheusMetrics: metrics, - }, + req := &pipedservice.ReportStatRequest{ + PipedStats: b, } - if _, err := r.apiClient.Ping(ctx, req); err != nil { + if _, err := r.apiClient.ReportStat(ctx, req); err != nil { r.logger.Error("failed to report stats", zap.Error(err)) return err } return nil } - -var parser expfmt.TextParser - -// TODO: Add a metrics whitelist and fiter out not needed ones. -func parsePrometheusMetrics(reader io.Reader) ([]*model.PrometheusMetrics, error) { - metricFamily, err := parser.TextToMetricFamilies(reader) - if err != nil { - return nil, err - } - - metrics := make([]*model.PrometheusMetrics, 0, len(metricFamily)) - -L: - for _, mf := range metricFamily { - var metricType model.PrometheusMetrics_Type - - switch mf.GetType() { - case dto.MetricType_COUNTER: - metricType = model.PrometheusMetrics_COUNTER - case dto.MetricType_GAUGE: - metricType = model.PrometheusMetrics_GAUGE - default: - continue L - } - - metric := &model.PrometheusMetrics{ - Name: *mf.Name, - Type: metricType, - } - - for _, m := range mf.Metric { - sample := &model.PrometheusMetrics_Sample{ - Labels: make([]*model.PrometheusMetrics_LabelPair, 0, len(m.Label)), - } - metric.Samples = append(metric.Samples, sample) - - for _, l := range m.Label { - sample.Labels = append(sample.Labels, &model.PrometheusMetrics_LabelPair{ - Name: l.GetName(), - Value: l.GetValue(), - }) - } - - switch metric.Type { - case model.PrometheusMetrics_COUNTER: - sample.Value = m.Counter.GetValue() - case model.PrometheusMetrics_GAUGE: - sample.Value = m.Gauge.GetValue() - } - } - - if len(metric.Samples) > 0 { - metrics = append(metrics, metric) - } - } - - return metrics, nil -} diff --git a/pkg/app/piped/statsreporter/reporter_test.go b/pkg/app/piped/statsreporter/reporter_test.go index aabb2ee329..1ef9afc9d5 100644 --- a/pkg/app/piped/statsreporter/reporter_test.go +++ b/pkg/app/piped/statsreporter/reporter_test.go @@ -13,22 +13,3 @@ // limitations under the License. package statsreporter - -import ( - "os" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestParsePrometheusMetrics(t *testing.T) { - f, err := os.Open("testdata/metrics.txt") - require.NoError(t, err) - defer f.Close() - - metrics, err := parsePrometheusMetrics(f) - require.NoError(t, err) - - assert.Equal(t, 30, len(metrics)) -}