Skip to content

Commit

Permalink
feat: implement ListSeedPeers feature in manager (#2865)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Nov 8, 2023
1 parent 334eb2e commit 5f4fe57
Show file tree
Hide file tree
Showing 11 changed files with 229 additions and 27 deletions.
4 changes: 3 additions & 1 deletion api/manager/docs.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Code generated by swaggo/swag. DO NOT EDIT
// Package manager Code generated by swaggo/swag. DO NOT EDIT
package manager

import "github.com/swaggo/swag"
Expand Down Expand Up @@ -5278,6 +5278,8 @@ var SwaggerInfo = &swag.Spec{
Description: "Dragonfly Manager Server",
InfoInstanceName: "swagger",
SwaggerTemplate: docTemplate,
LeftDelim: "{{",
RightDelim: "}}",
}

func init() {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.21

require (
d7y.io/api/v2 v2.0.36
d7y.io/api/v2 v2.0.43
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.6
github.com/Showmax/go-fqdn v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
d7y.io/api/v2 v2.0.36 h1:H8jVBN1ydo158Zd/I+XWUy/ymaYyAn+bnSswm3Q3WGU=
d7y.io/api/v2 v2.0.36/go.mod h1:yeVjEpNTQB4vEqnTxtdzLizDzsICcBzq3zTIyhQJF5E=
d7y.io/api/v2 v2.0.43 h1:4IdL+j1CAJp4QIs71YeItKZV/lzqya98ksxoGVnaQUQ=
d7y.io/api/v2 v2.0.43/go.mod h1:yeVjEpNTQB4vEqnTxtdzLizDzsICcBzq3zTIyhQJF5E=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down
72 changes: 72 additions & 0 deletions manager/rpcserver/manager_server_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,78 @@ func (s *managerServerV1) GetSeedPeer(ctx context.Context, req *managerv1.GetSee
return &pbSeedPeer, nil
}

// List acitve seed peers configuration.
func (s *managerServerV1) ListSeedPeers(ctx context.Context, req *managerv1.ListSeedPeersRequest) (*managerv1.ListSeedPeersResponse, error) {
log := logger.WithHostnameAndIP(req.Hostname, req.Ip)
log.Debugf("list seed peers, version %s, commit %s", req.Version, req.Commit)

// Cache hit.
var pbListSeedPeersResponse managerv1.ListSeedPeersResponse
cacheKey := pkgredis.MakeSeedPeersKeyForPeerInManager(req.Hostname, req.Ip)

if err := s.cache.Get(ctx, cacheKey, &pbListSeedPeersResponse); err != nil {
log.Warnf("%s cache miss because of %s", cacheKey, err.Error())
} else {
log.Debugf("%s cache hit", cacheKey)
return &pbListSeedPeersResponse, nil
}

// Cache miss and search seed peer.
seedPeer := models.SeedPeer{}
if err := s.db.WithContext(ctx).Preload("SeedPeerCluster").First(&seedPeer, models.SeedPeer{
Hostname: req.Hostname,
IP: req.Ip,
}).Error; err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

var seedPeers []models.SeedPeer
if err := s.db.WithContext(ctx).Find(&seedPeers, models.SeedPeer{
State: models.SeedPeerStateActive,
SeedPeerClusterID: uint(seedPeer.SeedPeerClusterID),
}).Error; err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

if len(seedPeers) == 0 {
return nil, status.Error(codes.NotFound, "seed peer not found")
}

// Construct seed peers.
for _, seedPeer := range seedPeers {
pbListSeedPeersResponse.SeedPeers = append(pbListSeedPeersResponse.SeedPeers, &managerv1.SeedPeer{
Id: uint64(seedPeer.ID),
Hostname: seedPeer.Hostname,
Type: seedPeer.Type,
Idc: seedPeer.IDC,
Location: seedPeer.Location,
Ip: seedPeer.IP,
Port: seedPeer.Port,
DownloadPort: seedPeer.DownloadPort,
ObjectStoragePort: seedPeer.ObjectStoragePort,
State: seedPeer.State,
SeedPeerClusterId: uint64(seedPeer.SeedPeerClusterID),
SeedPeerCluster: &managerv1.SeedPeerCluster{
Id: uint64(seedPeer.SeedPeerCluster.ID),
Name: seedPeer.SeedPeerCluster.Name,
Bio: seedPeer.SeedPeerCluster.BIO,
},
})
}

// Cache data.
if err := s.cache.Once(&cachev8.Item{
Ctx: ctx,
Key: cacheKey,
Value: &pbListSeedPeersResponse,
TTL: s.cache.TTL,
}); err != nil {
log.Error(err)
}

return &pbListSeedPeersResponse, nil
}

// Update SeedPeer configuration.
func (s *managerServerV1) UpdateSeedPeer(ctx context.Context, req *managerv1.UpdateSeedPeerRequest) (*managerv1.SeedPeer, error) {
log := logger.WithHostnameAndIP(req.Hostname, req.Ip)
Expand Down
72 changes: 72 additions & 0 deletions manager/rpcserver/manager_server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,78 @@ func (s *managerServerV2) GetSeedPeer(ctx context.Context, req *managerv2.GetSee
return &pbSeedPeer, nil
}

// List acitve seed peers configuration.
func (s *managerServerV2) ListSeedPeers(ctx context.Context, req *managerv2.ListSeedPeersRequest) (*managerv2.ListSeedPeersResponse, error) {
log := logger.WithHostnameAndIP(req.Hostname, req.Ip)
log.Debugf("list seed peers, version %s, commit %s", req.Version, req.Commit)

// Cache hit.
var pbListSeedPeersResponse managerv2.ListSeedPeersResponse
cacheKey := pkgredis.MakeSeedPeersKeyForPeerInManager(req.Hostname, req.Ip)

if err := s.cache.Get(ctx, cacheKey, &pbListSeedPeersResponse); err != nil {
log.Warnf("%s cache miss because of %s", cacheKey, err.Error())
} else {
log.Debugf("%s cache hit", cacheKey)
return &pbListSeedPeersResponse, nil
}

// Cache miss and search seed peer.
seedPeer := models.SeedPeer{}
if err := s.db.WithContext(ctx).Preload("SeedPeerCluster").First(&seedPeer, models.SeedPeer{
Hostname: req.Hostname,
IP: req.Ip,
}).Error; err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

var seedPeers []models.SeedPeer
if err := s.db.WithContext(ctx).Find(&seedPeers, models.SeedPeer{
State: models.SeedPeerStateActive,
SeedPeerClusterID: uint(seedPeer.SeedPeerClusterID),
}).Error; err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

if len(seedPeers) == 0 {
return nil, status.Error(codes.NotFound, "seed peer not found")
}

// Construct seed peers.
for _, seedPeer := range seedPeers {
pbListSeedPeersResponse.SeedPeers = append(pbListSeedPeersResponse.SeedPeers, &managerv2.SeedPeer{
Id: uint64(seedPeer.ID),
Hostname: seedPeer.Hostname,
Type: seedPeer.Type,
Idc: &seedPeer.IDC,
Location: &seedPeer.Location,
Ip: seedPeer.IP,
Port: seedPeer.Port,
DownloadPort: seedPeer.DownloadPort,
ObjectStoragePort: seedPeer.ObjectStoragePort,
State: seedPeer.State,
SeedPeerClusterId: uint64(seedPeer.SeedPeerClusterID),
SeedPeerCluster: &managerv2.SeedPeerCluster{
Id: uint64(seedPeer.SeedPeerCluster.ID),
Name: seedPeer.SeedPeerCluster.Name,
Bio: seedPeer.SeedPeerCluster.BIO,
},
})
}

// Cache data.
if err := s.cache.Once(&cachev8.Item{
Ctx: ctx,
Key: cacheKey,
Value: &pbListSeedPeersResponse,
TTL: s.cache.TTL,
}); err != nil {
log.Error(err)
}

return &pbListSeedPeersResponse, nil
}

// Update SeedPeer configuration.
func (s *managerServerV2) UpdateSeedPeer(ctx context.Context, req *managerv2.UpdateSeedPeerRequest) (*managerv2.SeedPeer, error) {
log := logger.WithHostnameAndIP(req.Hostname, req.Ip)
Expand Down
5 changes: 5 additions & 0 deletions pkg/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ func MakePeerKeyInManager(hostname, ip string) string {
return MakeKeyInManager(PeersNamespace, fmt.Sprintf("%s-%s", hostname, ip))
}

// MakeSeedPeersKeyForPeerInManager make seed peers key for peer in manager.
func MakeSeedPeersKeyForPeerInManager(hostname, ip string) string {
return MakeKeyInManager(PeersNamespace, fmt.Sprintf("%s-%s:seed-peers", hostname, ip))
}

// MakeSchedulersKeyForPeerInManager make schedulers key for peer in manager.
func MakeSchedulersKeyForPeerInManager(hostname, ip string) string {
return MakeKeyInManager(PeersNamespace, fmt.Sprintf("%s-%s:schedulers", hostname, ip))
Expand Down
51 changes: 51 additions & 0 deletions pkg/redis/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,57 @@ func Test_MakePeerKeyInManager(t *testing.T) {
}
}

func Test_MakeSeedPeersKeyForPeerInManager(t *testing.T) {
tests := []struct {
name string
hostname string
ip string
expect func(t *testing.T, s string)
}{
{
name: "make seed peer key for peer in manager",
hostname: "bar",
ip: "127.0.0.1",
expect: func(t *testing.T, s string) {
assert := assert.New(t)
assert.Equal(s, "manager:peers:bar-127.0.0.1:seed-peers")
},
},
{
name: "hostname is empty",
hostname: "",
ip: "127.0.0.1",
expect: func(t *testing.T, s string) {
assert := assert.New(t)
assert.Equal(s, "manager:peers:-127.0.0.1:seed-peers")
},
},
{
name: "ip is empty",
hostname: "bar",
ip: "",
expect: func(t *testing.T, s string) {
assert := assert.New(t)
assert.Equal(s, "manager:peers:bar-:seed-peers")
},
},
{
name: "hostname and ip are empty",
hostname: "",
ip: "",
expect: func(t *testing.T, s string) {
assert := assert.New(t)
assert.Equal(s, "manager:peers:-:seed-peers")
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
tc.expect(t, MakeSeedPeersKeyForPeerInManager(tc.hostname, tc.ip))
})
}
}

func Test_MakeSchedulersKeyForPeerInManager(t *testing.T) {
tests := []struct {
name string
Expand Down
8 changes: 4 additions & 4 deletions scheduler/scheduling/scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ func ConstructSuccessSmallTaskResponse(candidateParent *resource.Peer) *schedule
ParentId: &candidateParentPiece.ParentID,
Offset: candidateParentPiece.Offset,
Length: candidateParentPiece.Length,
TrafficType: candidateParentPiece.TrafficType,
TrafficType: &candidateParentPiece.TrafficType,
Cost: durationpb.New(candidateParentPiece.Cost),
CreatedAt: timestamppb.New(candidateParentPiece.CreatedAt),
}
Expand Down Expand Up @@ -617,7 +617,7 @@ func ConstructSuccessSmallTaskResponse(candidateParent *resource.Peer) *schedule
ParentId: &taskPiece.ParentID,
Offset: taskPiece.Offset,
Length: taskPiece.Length,
TrafficType: taskPiece.TrafficType,
TrafficType: &taskPiece.TrafficType,
Cost: durationpb.New(taskPiece.Cost),
CreatedAt: timestamppb.New(taskPiece.CreatedAt),
}
Expand Down Expand Up @@ -741,7 +741,7 @@ func ConstructSuccessNormalTaskResponse(dynconfig config.DynconfigInterface, can
ParentId: &candidateParentPiece.ParentID,
Offset: candidateParentPiece.Offset,
Length: candidateParentPiece.Length,
TrafficType: candidateParentPiece.TrafficType,
TrafficType: &candidateParentPiece.TrafficType,
Cost: durationpb.New(candidateParentPiece.Cost),
CreatedAt: timestamppb.New(candidateParentPiece.CreatedAt),
}
Expand Down Expand Up @@ -792,7 +792,7 @@ func ConstructSuccessNormalTaskResponse(dynconfig config.DynconfigInterface, can
ParentId: &taskPiece.ParentID,
Offset: taskPiece.Offset,
Length: taskPiece.Length,
TrafficType: taskPiece.TrafficType,
TrafficType: &taskPiece.TrafficType,
Cost: durationpb.New(taskPiece.Cost),
CreatedAt: timestamppb.New(taskPiece.CreatedAt),
}
Expand Down
12 changes: 6 additions & 6 deletions scheduler/scheduling/scheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1286,7 +1286,7 @@ func TestScheduling_ConstructSuccessSmallTaskResponse(t *testing.T) {
Offset: mockPiece.Offset,
Length: mockPiece.Length,
Digest: mockPiece.Digest.String(),
TrafficType: mockPiece.TrafficType,
TrafficType: &mockPiece.TrafficType,
Cost: durationpb.New(mockPiece.Cost),
CreatedAt: timestamppb.New(mockPiece.CreatedAt),
},
Expand All @@ -1313,7 +1313,7 @@ func TestScheduling_ConstructSuccessSmallTaskResponse(t *testing.T) {
Offset: mockPiece.Offset,
Length: mockPiece.Length,
Digest: mockPiece.Digest.String(),
TrafficType: mockPiece.TrafficType,
TrafficType: &mockPiece.TrafficType,
Cost: durationpb.New(mockPiece.Cost),
CreatedAt: timestamppb.New(mockPiece.CreatedAt),
},
Expand Down Expand Up @@ -1446,7 +1446,7 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) {
Offset: mockPiece.Offset,
Length: mockPiece.Length,
Digest: mockPiece.Digest.String(),
TrafficType: mockPiece.TrafficType,
TrafficType: &mockPiece.TrafficType,
Cost: durationpb.New(mockPiece.Cost),
CreatedAt: timestamppb.New(mockPiece.CreatedAt),
},
Expand All @@ -1473,7 +1473,7 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) {
Offset: mockPiece.Offset,
Length: mockPiece.Length,
Digest: mockPiece.Digest.String(),
TrafficType: mockPiece.TrafficType,
TrafficType: &mockPiece.TrafficType,
Cost: durationpb.New(mockPiece.Cost),
CreatedAt: timestamppb.New(mockPiece.CreatedAt),
},
Expand Down Expand Up @@ -1580,7 +1580,7 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) {
Offset: mockPiece.Offset,
Length: mockPiece.Length,
Digest: mockPiece.Digest.String(),
TrafficType: mockPiece.TrafficType,
TrafficType: &mockPiece.TrafficType,
Cost: durationpb.New(mockPiece.Cost),
CreatedAt: timestamppb.New(mockPiece.CreatedAt),
},
Expand All @@ -1607,7 +1607,7 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) {
Offset: mockPiece.Offset,
Length: mockPiece.Length,
Digest: mockPiece.Digest.String(),
TrafficType: mockPiece.TrafficType,
TrafficType: &mockPiece.TrafficType,
Cost: durationpb.New(mockPiece.Cost),
CreatedAt: timestamppb.New(mockPiece.CreatedAt),
},
Expand Down
6 changes: 3 additions & 3 deletions scheduler/service/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (v *V2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*c
ParentId: &piece.ParentID,
Offset: piece.Offset,
Length: piece.Length,
TrafficType: piece.TrafficType,
TrafficType: &piece.TrafficType,
Cost: durationpb.New(piece.Cost),
CreatedAt: timestamppb.New(piece.CreatedAt),
}
Expand Down Expand Up @@ -282,7 +282,7 @@ func (v *V2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*c
ParentId: &piece.ParentID,
Offset: piece.Offset,
Length: piece.Length,
TrafficType: piece.TrafficType,
TrafficType: &piece.TrafficType,
Cost: durationpb.New(piece.Cost),
CreatedAt: timestamppb.New(piece.CreatedAt),
}
Expand Down Expand Up @@ -435,7 +435,7 @@ func (v *V2) StatTask(ctx context.Context, req *schedulerv2.StatTaskRequest) (*c
ParentId: &piece.ParentID,
Offset: piece.Offset,
Length: piece.Length,
TrafficType: piece.TrafficType,
TrafficType: &piece.TrafficType,
Cost: durationpb.New(piece.Cost),
CreatedAt: timestamppb.New(piece.CreatedAt),
}
Expand Down
Loading

0 comments on commit 5f4fe57

Please sign in to comment.