From 62abfb0077ba84fdda9a12a4cf38fe440999246e Mon Sep 17 00:00:00 2001 From: Gaius Date: Tue, 14 Nov 2023 21:38:21 +0800 Subject: [PATCH] feat: add reschedule handler for schduler v2 Signed-off-by: Gaius --- go.mod | 2 +- go.sum | 4 +- scheduler/scheduling/scheduling.go | 6 +- scheduler/scheduling/scheduling_test.go | 11 ++- scheduler/service/service_v2.go | 24 +++++- scheduler/service/service_v2_test.go | 109 +++++++++++++++++------- 6 files changed, 112 insertions(+), 44 deletions(-) diff --git a/go.mod b/go.mod index 6c30c1793be..fed1ee86eff 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2 go 1.21 require ( - d7y.io/api/v2 v2.0.45 + d7y.io/api/v2 v2.0.46 github.com/MysteriousPotato/go-lockable v1.0.0 github.com/RichardKnop/machinery v1.10.6 github.com/Showmax/go-fqdn v1.0.0 diff --git a/go.sum b/go.sum index 565ed6fc1f3..5d54adb2d88 100644 --- a/go.sum +++ b/go.sum @@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo= -d7y.io/api/v2 v2.0.45 h1:a39URUlu6SpkFeeGxDTnl9QQTn4bHaEdex1ARpZfmAo= -d7y.io/api/v2 v2.0.45/go.mod h1:yeVjEpNTQB4vEqnTxtdzLizDzsICcBzq3zTIyhQJF5E= +d7y.io/api/v2 v2.0.46 h1:oPPjp3eKUDAWX9VnCdKG3Mpdwdp57a4gRfnLAZHyMiw= +d7y.io/api/v2 v2.0.46/go.mod h1:yeVjEpNTQB4vEqnTxtdzLizDzsICcBzq3zTIyhQJF5E= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= diff --git a/scheduler/scheduling/scheduling.go b/scheduler/scheduling/scheduling.go index d717aeeb589..cab36f8f931 100644 --- a/scheduler/scheduling/scheduling.go +++ b/scheduler/scheduling/scheduling.go @@ -107,10 +107,11 @@ func (s *scheduling) ScheduleCandidateParents(ctx context.Context, peer *resourc // Send NeedBackToSourceResponse to peer. peer.Log.Infof("send NeedBackToSourceResponse, because of peer's NeedBackToSource is %t and peer's schedule count is %d", peer.NeedBackToSource.Load(), peer.ScheduleCount.Load()) + description := fmt.Sprintf("peer's NeedBackToSource is %t", peer.NeedBackToSource.Load()) if err := stream.Send(&schedulerv2.AnnouncePeerResponse{ Response: &schedulerv2.AnnouncePeerResponse_NeedBackToSourceResponse{ NeedBackToSourceResponse: &schedulerv2.NeedBackToSourceResponse{ - Description: fmt.Sprintf("peer's NeedBackToSource is %t", peer.NeedBackToSource.Load()), + Description: &description, }, }, }); err != nil { @@ -132,10 +133,11 @@ func (s *scheduling) ScheduleCandidateParents(ctx context.Context, peer *resourc // Send NeedBackToSourceResponse to peer. peer.Log.Infof("send NeedBackToSourceResponse, because of scheduling exceeded RetryBackToSourceLimit %d", s.config.RetryBackToSourceLimit) + description := "scheduling exceeded RetryBackToSourceLimit" if err := stream.Send(&schedulerv2.AnnouncePeerResponse{ Response: &schedulerv2.AnnouncePeerResponse_NeedBackToSourceResponse{ NeedBackToSourceResponse: &schedulerv2.NeedBackToSourceResponse{ - Description: "scheduling exceeded RetryBackToSourceLimit", + Description: &description, }, }, }); err != nil { diff --git a/scheduler/scheduling/scheduling_test.go b/scheduler/scheduling/scheduling_test.go index 4a38088902b..f905efa45ce 100644 --- a/scheduler/scheduling/scheduling_test.go +++ b/scheduler/scheduling/scheduling_test.go @@ -232,6 +232,9 @@ func TestScheduling_New(t *testing.T) { } func TestScheduling_ScheduleCandidateParents(t *testing.T) { + needBackToSourceDescription := "peer's NeedBackToSource is true" + exceededLimitDescription := "scheduling exceeded RetryBackToSourceLimit" + tests := []struct { name string mock func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv2.Scheduler_AnnouncePeerServer, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) @@ -294,7 +297,7 @@ func TestScheduling_ScheduleCandidateParents(t *testing.T) { ma.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{ Response: &schedulerv2.AnnouncePeerResponse_NeedBackToSourceResponse{ NeedBackToSourceResponse: &schedulerv2.NeedBackToSourceResponse{ - Description: "peer's NeedBackToSource is true", + Description: &needBackToSourceDescription, }, }, })).Return(errors.New("foo")).Times(1) @@ -319,7 +322,7 @@ func TestScheduling_ScheduleCandidateParents(t *testing.T) { ma.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{ Response: &schedulerv2.AnnouncePeerResponse_NeedBackToSourceResponse{ NeedBackToSourceResponse: &schedulerv2.NeedBackToSourceResponse{ - Description: "peer's NeedBackToSource is true", + Description: &needBackToSourceDescription, }, }, })).Return(nil).Times(1) @@ -361,7 +364,7 @@ func TestScheduling_ScheduleCandidateParents(t *testing.T) { ma.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{ Response: &schedulerv2.AnnouncePeerResponse_NeedBackToSourceResponse{ NeedBackToSourceResponse: &schedulerv2.NeedBackToSourceResponse{ - Description: "scheduling exceeded RetryBackToSourceLimit", + Description: &exceededLimitDescription, }, }, })).Return(errors.New("foo")).Times(1), @@ -388,7 +391,7 @@ func TestScheduling_ScheduleCandidateParents(t *testing.T) { ma.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{ Response: &schedulerv2.AnnouncePeerResponse_NeedBackToSourceResponse{ NeedBackToSourceResponse: &schedulerv2.NeedBackToSourceResponse{ - Description: "scheduling exceeded RetryBackToSourceLimit", + Description: &exceededLimitDescription, }, }, })).Return(nil).Times(1), diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index dd89a711b4a..a92c6164a31 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -132,6 +132,12 @@ func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error logger.Error(err) return err } + case *schedulerv2.AnnouncePeerRequest_RescheduleRequest: + logger.Infof("receive AnnouncePeerRequest_RescheduleRequest: %#v", announcePeerRequest.RescheduleRequest) + if err := v.handleRescheduleRequest(ctx, req.GetPeerId()); err != nil { + logger.Error(err) + return err + } case *schedulerv2.AnnouncePeerRequest_DownloadPeerFinishedRequest: logger.Infof("receive AnnouncePeerRequest_DownloadPeerFinishedRequest: %#v", announcePeerRequest.DownloadPeerFinishedRequest) if err := v.handleDownloadPeerFinishedRequest(ctx, req.GetPeerId()); err != nil { @@ -949,6 +955,20 @@ func (v *V2) handleDownloadPeerBackToSourceStartedRequest(ctx context.Context, p return nil } +// handleRescheduleRequest handles RescheduleRequest of AnnouncePeerRequest. +func (v *V2) handleRescheduleRequest(ctx context.Context, peerID string) error { + peer, loaded := v.resource.PeerManager().Load(peerID) + if !loaded { + return status.Errorf(codes.NotFound, "peer %s not found", peerID) + } + + if err := v.scheduling.ScheduleCandidateParents(ctx, peer, peer.BlockParents); err != nil { + return status.Error(codes.FailedPrecondition, err.Error()) + } + + return nil +} + // handleDownloadPeerFinishedRequest handles DownloadPeerFinishedRequest of AnnouncePeerRequest. func (v *V2) handleDownloadPeerFinishedRequest(ctx context.Context, peerID string) error { peer, loaded := v.resource.PeerManager().Load(peerID) @@ -1192,10 +1212,6 @@ func (v *V2) handleDownloadPieceFailedRequest(ctx context.Context, peerID string // Handle peer with piece temporary failed request. peer.UpdatedAt.Store(time.Now()) peer.BlockParents.Add(req.Piece.GetParentId()) - if err := v.scheduling.ScheduleCandidateParents(ctx, peer, peer.BlockParents); err != nil { - return status.Error(codes.FailedPrecondition, err.Error()) - } - if parent, loaded := v.resource.PeerManager().Load(req.Piece.GetParentId()); loaded { parent.Host.UploadFailedCount.Inc() } diff --git a/scheduler/service/service_v2_test.go b/scheduler/service/service_v2_test.go index 25597f6bba2..08dfc8a66ec 100644 --- a/scheduler/service/service_v2_test.go +++ b/scheduler/service/service_v2_test.go @@ -1104,7 +1104,6 @@ func TestServiceV2_SyncProbes(t *testing.T) { Disk: mockV2Probe.Host.Disk, Build: mockV2Probe.Host.Build, }, - Description: "foo", }, }, }, @@ -2130,6 +2129,78 @@ func TestServiceV2_handleDownloadPeerBackToSourceStartedRequest(t *testing.T) { } } +func TestServiceV2_handleRescheduleRequest(t *testing.T) { + tests := []struct { + name string + run func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, + mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) + }{ + { + name: "peer can not be loaded", + run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, + mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1), + ) + + assert := assert.New(t) + assert.ErrorIs(svc.handleRescheduleRequest(context.Background(), peer.ID), status.Errorf(codes.NotFound, "peer %s not found", peer.ID)) + }, + }, + { + name: "reschedule failed", + run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, + mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("foo")).Times(1), + ) + + assert := assert.New(t) + assert.ErrorIs(svc.handleRescheduleRequest(context.Background(), peer.ID), status.Error(codes.FailedPrecondition, "foo")) + }, + }, + { + name: "reschedule succeeded", + run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, + mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1), + ) + + assert := assert.New(t) + assert.NoError(svc.handleRescheduleRequest(context.Background(), peer.ID)) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + scheduling := schedulingmocks.NewMockScheduling(ctl) + res := resource.NewMockResource(ctl) + dynconfig := configmocks.NewMockDynconfigInterface(ctl) + storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) + peerManager := resource.NewMockPeerManager(ctl) + + mockHost := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) + + tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), scheduling.EXPECT()) + }) + } +} + func TestServiceV2_handleDownloadPeerFinishedRequest(t *testing.T) { tests := []struct { name string @@ -2887,7 +2958,7 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) { name string req *schedulerv2.DownloadPieceFailedRequest run func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, - mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) + mp *resource.MockPeerManagerMockRecorder) }{ { name: "peer can not be loaded", @@ -2898,7 +2969,7 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) { Temporary: true, }, run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, - mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + mp *resource.MockPeerManagerMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1), @@ -2917,7 +2988,7 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) { Temporary: false, }, run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, - mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + mp *resource.MockPeerManagerMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), @@ -2927,28 +2998,6 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) { assert.ErrorIs(svc.handleDownloadPieceFailedRequest(context.Background(), peer.ID, req), status.Error(codes.FailedPrecondition, "download piece failed")) }, }, - { - name: "schedule failed", - req: &schedulerv2.DownloadPieceFailedRequest{ - Piece: &commonv2.Piece{ - ParentId: &mockSeedPeerID, - }, - Temporary: true, - }, - run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, - mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { - gomock.InOrder( - mr.PeerManager().Return(peerManager).Times(1), - mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), - ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("foo")).Times(1), - ) - - assert := assert.New(t) - assert.ErrorIs(svc.handleDownloadPieceFailedRequest(context.Background(), peer.ID, req), status.Error(codes.FailedPrecondition, "foo")) - assert.NotEqual(peer.UpdatedAt.Load(), 0) - assert.True(peer.BlockParents.Contains(req.Piece.GetParentId())) - }, - }, { name: "parent can not be loaded", req: &schedulerv2.DownloadPieceFailedRequest{ @@ -2958,11 +3007,10 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) { Temporary: true, }, run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, - mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + mp *resource.MockPeerManagerMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), - ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1), mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(req.Piece.GetParentId())).Return(nil, false).Times(1), ) @@ -2983,11 +3031,10 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) { Temporary: true, }, run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, - mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + mp *resource.MockPeerManagerMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), - ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1), mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(req.Piece.GetParentId())).Return(peer, true).Times(1), ) @@ -3020,7 +3067,7 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) { peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) - tc.run(t, svc, tc.req, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), scheduling.EXPECT()) + tc.run(t, svc, tc.req, peer, peerManager, res.EXPECT(), peerManager.EXPECT()) }) } }