Skip to content

Commit

Permalink
feat: add metrics for grpc api of the cache task (#3539)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Sep 25, 2024
1 parent 61c3cf4 commit 1afe79e
Show file tree
Hide file tree
Showing 5 changed files with 238 additions and 25 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.21

require (
d7y.io/api/v2 v2.0.158
d7y.io/api/v2 v2.0.159
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.8
github.com/Showmax/go-fqdn v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
d7y.io/api/v2 v2.0.158 h1:uX5Lg8AWX2Pc69bBka3AErH4vtHk6xbntTj4jhCABgo=
d7y.io/api/v2 v2.0.158/go.mod h1:VOnTWgLrGtivgyyofZCfiSDTAKDJ9ohVqM6l3S8EPCE=
d7y.io/api/v2 v2.0.159 h1:xSLq0GjqV0F8TgfZ13EDJa+eqaWcqhrEepybAoT9OnI=
d7y.io/api/v2 v2.0.159/go.mod h1:VOnTWgLrGtivgyyofZCfiSDTAKDJ9ohVqM6l3S8EPCE=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down
112 changes: 112 additions & 0 deletions scheduler/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,118 @@ var (
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
})

AnnounceCachePeerCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "announce_cache_peer_total",
Help: "Counter of the number of the announcing cache peer.",
})

AnnounceCachePeerFailureCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "announce_cache_peer_failure_total",
Help: "Counter of the number of failed of the announcing cache peer.",
})

StatCachePeerCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "stat_cache_peer_total",
Help: "Counter of the number of the stat cache peer.",
})

StatCachePeerFailureCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "stat_cache_peer_failure_total",
Help: "Counter of the number of failed of the stat cache peer.",
})

DeleteCachePeerCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "delete_cache_peer_total",
Help: "Counter of the number of the deleting cache peer.",
})

DeleteCachePeerFailureCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "delete_cache_peer_failure_total",
Help: "Counter of the number of failed of the deleting cache peer.",
})

UploadCacheTaskStartedCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "upload_cache_peer_started_total",
Help: "Counter of the number of the started uploading cache peer.",
})

UploadCacheTaskStartedFailureCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "upload_cache_peer_started_failure_total",
Help: "Counter of the number of failed of the started uploading cache peer.",
})

UploadCacheTaskFinishedCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "upload_cache_peer_finished_total",
Help: "Counter of the number of the finished uploading cache peer.",
})

UploadCacheTaskFinishedFailureCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "upload_cache_peer_finished_failure_total",
Help: "Counter of the number of failed of the finished uploading cache peer.",
})

UploadCacheTaskFailedCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "upload_cache_peer_failed_total",
Help: "Counter of the number of the failed uploading cache peer.",
})

UploadCacheTaskFailedFailureCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "upload_cache_peer_failed_failure_total",
Help: "Counter of the number of failed of the failed uploading cache peer.",
})

StatCacheTaskCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "stat_cache_task_total",
Help: "Counter of the number of the stat cache task.",
})

StatCacheTaskFailureCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "stat_cache_task_failure_total",
Help: "Counter of the number of failed of the stat cache task.",
})

DeleteCacheTaskCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "delete_cache_task_total",
Help: "Counter of the number of the delete cache task.",
})

DeleteCacheTaskFailureCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "delete_cache_task_failure_total",
Help: "Counter of the number of failed of the delete cache task.",
})

VersionGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Expand Down
97 changes: 75 additions & 22 deletions scheduler/rpcserver/scheduler_server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,56 +170,109 @@ func (s *schedulerServerV2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesSe
return nil
}

// TODO Implement the following methods.
// AnnouncePeers announces peers to scheduler.
func (s *schedulerServerV2) AnnouncePeers(stream schedulerv2.Scheduler_AnnouncePeersServer) error {
return nil
}

// TODO Implement the following methods.
// AnnounceCachePeer announces cache peer to scheduler.
func (s *schedulerServerV2) AnnounceCachePeer(stream schedulerv2.Scheduler_AnnounceCachePeerServer) error {
// Collect AnnounceCachePeerCount metrics.
metrics.AnnounceCachePeerCount.Inc()
if err := s.service.AnnounceCachePeer(stream); err != nil {
// Collect AnnounceCachePeerFailureCount metrics.
metrics.AnnounceCachePeerFailureCount.Inc()
return err
}

return nil
}

// TODO Implement the following methods.
// StatCachePeer checks information of cache peer.
func (s *schedulerServerV2) StatCachePeer(ctx context.Context, req *schedulerv2.StatCachePeerRequest) (*commonv2.CachePeer, error) {
return nil, nil
// Collect StatCachePeerCount metrics.
metrics.StatCachePeerCount.Inc()
resp, err := s.service.StatCachePeer(ctx, req)
if err != nil {
// Collect StatCachePeerFailureCount metrics.
metrics.StatCachePeerFailureCount.Inc()
return nil, err
}

return resp, nil
}

// TODO Implement the following methods.
// DeleteCachePeer releases cache peer in scheduler.
func (s *schedulerServerV2) DeleteCachePeer(ctx context.Context, req *schedulerv2.DeleteCachePeerRequest) (*emptypb.Empty, error) {
// Collect DeleteCachePeerCount metrics.
metrics.DeleteCachePeerCount.Inc()
if err := s.service.DeleteCachePeer(ctx, req); err != nil {
// Collect DeleteCachePeerFailureCount metrics.
metrics.DeleteCachePeerFailureCount.Inc()
return nil, err
}

return new(emptypb.Empty), nil
}

// TODO Implement the following methods.
// UploadCacheTaskStarted uploads the metadata of the cache task started.
func (s *schedulerServerV2) UploadCacheTaskStarted(ctx context.Context, request *schedulerv2.UploadCacheTaskStartedRequest) (*emptypb.Empty, error) {
return nil, nil
func (s *schedulerServerV2) UploadCacheTaskStarted(ctx context.Context, req *schedulerv2.UploadCacheTaskStartedRequest) (*emptypb.Empty, error) {
// Collect UploadCacheTaskStartedCount metrics.
metrics.UploadCacheTaskStartedCount.Inc()
if err := s.service.UploadCacheTaskStarted(ctx, req); err != nil {
// Collect UploadCacheTaskStartedFailureCount metrics.
metrics.UploadCacheTaskStartedFailureCount.Inc()
return nil, err
}

return new(emptypb.Empty), nil
}

// TODO Implement the following methods.
// UploadCacheTaskFinished uploads the metadata of the cache task finished.
func (s *schedulerServerV2) UploadCacheTaskFinished(ctx context.Context, request *schedulerv2.UploadCacheTaskFinishedRequest) (*commonv2.CacheTask, error) {
return nil, nil
func (s *schedulerServerV2) UploadCacheTaskFinished(ctx context.Context, req *schedulerv2.UploadCacheTaskFinishedRequest) (*commonv2.CacheTask, error) {
// Collect UploadCacheTaskFinishedCount metrics.
metrics.UploadCacheTaskFinishedCount.Inc()
resp, err := s.service.UploadCacheTaskFinished(ctx, req)
if err != nil {
// Collect UploadCacheTaskFinishedFailureCount metrics.
metrics.UploadCacheTaskFinishedFailureCount.Inc()
return nil, err
}

return resp, nil
}

// TODO Implement the following methods.
// UploadCacheTaskFailed uploads the metadata of the cache task failed.
func (s *schedulerServerV2) UploadCacheTaskFailed(ctx context.Context, request *schedulerv2.UploadCacheTaskFailedRequest) (*emptypb.Empty, error) {
return nil, nil
func (s *schedulerServerV2) UploadCacheTaskFailed(ctx context.Context, req *schedulerv2.UploadCacheTaskFailedRequest) (*emptypb.Empty, error) {
// Collect UploadCacheTaskFailedCount metrics.
metrics.UploadCacheTaskFailedCount.Inc()
if err := s.service.UploadCacheTaskFailed(ctx, req); err != nil {
// Collect UploadCacheTaskFailedFailureCount metrics.
metrics.UploadCacheTaskFailedFailureCount.Inc()
return nil, err
}

return new(emptypb.Empty), nil
}

// TODO Implement the following methods.
// StatCacheTask checks information of cache task.
func (s *schedulerServerV2) StatCacheTask(ctx context.Context, req *schedulerv2.StatCacheTaskRequest) (*commonv2.CacheTask, error) {
return nil, nil
// Collect StatCacheTaskCount metrics.
metrics.StatCacheTaskCount.Inc()
resp, err := s.service.StatCacheTask(ctx, req)
if err != nil {
// Collect StatCacheTaskFailureCount metrics.
metrics.StatCacheTaskFailureCount.Inc()
return nil, err
}

return resp, nil
}

// TODO Implement the following methods.
// DeleteCacheTask releases cache task in scheduler.
func (s *schedulerServerV2) DeleteCacheTask(ctx context.Context, req *schedulerv2.DeleteCacheTaskRequest) (*emptypb.Empty, error) {
// Collect DeleteCacheTaskCount metrics.
metrics.DeleteCacheTaskCount.Inc()
if err := s.service.DeleteCacheTask(ctx, req); err != nil {
// Collect DeleteCacheTaskFailureCount metrics.
metrics.DeleteCacheTaskFailureCount.Inc()
return nil, err
}

return new(emptypb.Empty), nil
}
48 changes: 48 additions & 0 deletions scheduler/service/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -1451,3 +1451,51 @@ func (v *V2) downloadTaskBySeedPeer(ctx context.Context, taskID string, download

return nil
}

// TODO Implement the following methods.
// AnnounceCachePeer announces cache peer to scheduler.
func (v *V2) AnnounceCachePeer(stream schedulerv2.Scheduler_AnnounceCachePeerServer) error {
return nil
}

// TODO Implement the following methods.
// StatCachePeer checks information of cache peer.
func (v *V2) StatCachePeer(ctx context.Context, req *schedulerv2.StatCachePeerRequest) (*commonv2.CachePeer, error) {
return nil, nil
}

// TODO Implement the following methods.
// DeleteCachePeer releases cache peer in scheduler.
func (v *V2) DeleteCachePeer(ctx context.Context, req *schedulerv2.DeleteCachePeerRequest) error {
return nil
}

// TODO Implement the following methods.
// UploadCacheTaskStarted uploads the metadata of the cache task started.
func (v *V2) UploadCacheTaskStarted(ctx context.Context, req *schedulerv2.UploadCacheTaskStartedRequest) error {
return nil
}

// TODO Implement the following methods.
// UploadCacheTaskFinished uploads the metadata of the cache task finished.
func (v *V2) UploadCacheTaskFinished(ctx context.Context, req *schedulerv2.UploadCacheTaskFinishedRequest) (*commonv2.CacheTask, error) {
return nil, nil
}

// TODO Implement the following methods.
// UploadCacheTaskFailed uploads the metadata of the cache task failed.
func (v *V2) UploadCacheTaskFailed(ctx context.Context, req *schedulerv2.UploadCacheTaskFailedRequest) error {
return nil
}

// TODO Implement the following methods.
// StatCacheTask checks information of cache task.
func (v *V2) StatCacheTask(ctx context.Context, req *schedulerv2.StatCacheTaskRequest) (*commonv2.CacheTask, error) {
return nil, nil
}

// TODO Implement the following methods.
// DeleteCacheTask releases cache task in scheduler.
func (v *V2) DeleteCacheTask(ctx context.Context, req *schedulerv2.DeleteCacheTaskRequest) error {
return nil
}

0 comments on commit 1afe79e

Please sign in to comment.