From 0c2b16891e1ef6f8d674fbdd10e63b4ad9264420 Mon Sep 17 00:00:00 2001
From: Gaius <gaius.qi@gmail.com>
Date: Tue, 14 Nov 2023 21:38:21 +0800
Subject: [PATCH] feat: add reschedule handler for schduler v2

Signed-off-by: Gaius <gaius.qi@gmail.com>
---
 go.mod                               |   2 +-
 go.sum                               |   4 +-
 scheduler/scheduling/scheduling.go   |   6 +-
 scheduler/service/service_v2.go      |  24 +++++-
 scheduler/service/service_v2_test.go | 109 +++++++++++++++++++--------
 5 files changed, 105 insertions(+), 40 deletions(-)

diff --git a/go.mod b/go.mod
index 6c30c1793be..fed1ee86eff 100644
--- a/go.mod
+++ b/go.mod
@@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
 go 1.21
 
 require (
-	d7y.io/api/v2 v2.0.45
+	d7y.io/api/v2 v2.0.46
 	github.com/MysteriousPotato/go-lockable v1.0.0
 	github.com/RichardKnop/machinery v1.10.6
 	github.com/Showmax/go-fqdn v1.0.0
diff --git a/go.sum b/go.sum
index 565ed6fc1f3..5d54adb2d88 100644
--- a/go.sum
+++ b/go.sum
@@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
 cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
 cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
 cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
-d7y.io/api/v2 v2.0.45 h1:a39URUlu6SpkFeeGxDTnl9QQTn4bHaEdex1ARpZfmAo=
-d7y.io/api/v2 v2.0.45/go.mod h1:yeVjEpNTQB4vEqnTxtdzLizDzsICcBzq3zTIyhQJF5E=
+d7y.io/api/v2 v2.0.46 h1:oPPjp3eKUDAWX9VnCdKG3Mpdwdp57a4gRfnLAZHyMiw=
+d7y.io/api/v2 v2.0.46/go.mod h1:yeVjEpNTQB4vEqnTxtdzLizDzsICcBzq3zTIyhQJF5E=
 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
 dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
 github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
diff --git a/scheduler/scheduling/scheduling.go b/scheduler/scheduling/scheduling.go
index d717aeeb589..cab36f8f931 100644
--- a/scheduler/scheduling/scheduling.go
+++ b/scheduler/scheduling/scheduling.go
@@ -107,10 +107,11 @@ func (s *scheduling) ScheduleCandidateParents(ctx context.Context, peer *resourc
 				// Send NeedBackToSourceResponse to peer.
 				peer.Log.Infof("send NeedBackToSourceResponse, because of peer's NeedBackToSource is %t and peer's schedule count is %d",
 					peer.NeedBackToSource.Load(), peer.ScheduleCount.Load())
+				description := fmt.Sprintf("peer's NeedBackToSource is %t", peer.NeedBackToSource.Load())
 				if err := stream.Send(&schedulerv2.AnnouncePeerResponse{
 					Response: &schedulerv2.AnnouncePeerResponse_NeedBackToSourceResponse{
 						NeedBackToSourceResponse: &schedulerv2.NeedBackToSourceResponse{
-							Description: fmt.Sprintf("peer's NeedBackToSource is %t", peer.NeedBackToSource.Load()),
+							Description: &description,
 						},
 					},
 				}); err != nil {
@@ -132,10 +133,11 @@ func (s *scheduling) ScheduleCandidateParents(ctx context.Context, peer *resourc
 
 				// Send NeedBackToSourceResponse to peer.
 				peer.Log.Infof("send NeedBackToSourceResponse, because of scheduling exceeded RetryBackToSourceLimit %d", s.config.RetryBackToSourceLimit)
+				description := "scheduling exceeded RetryBackToSourceLimit"
 				if err := stream.Send(&schedulerv2.AnnouncePeerResponse{
 					Response: &schedulerv2.AnnouncePeerResponse_NeedBackToSourceResponse{
 						NeedBackToSourceResponse: &schedulerv2.NeedBackToSourceResponse{
-							Description: "scheduling exceeded RetryBackToSourceLimit",
+							Description: &description,
 						},
 					},
 				}); err != nil {
diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go
index dd89a711b4a..a92c6164a31 100644
--- a/scheduler/service/service_v2.go
+++ b/scheduler/service/service_v2.go
@@ -132,6 +132,12 @@ func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error
 				logger.Error(err)
 				return err
 			}
+		case *schedulerv2.AnnouncePeerRequest_RescheduleRequest:
+			logger.Infof("receive AnnouncePeerRequest_RescheduleRequest: %#v", announcePeerRequest.RescheduleRequest)
+			if err := v.handleRescheduleRequest(ctx, req.GetPeerId()); err != nil {
+				logger.Error(err)
+				return err
+			}
 		case *schedulerv2.AnnouncePeerRequest_DownloadPeerFinishedRequest:
 			logger.Infof("receive AnnouncePeerRequest_DownloadPeerFinishedRequest: %#v", announcePeerRequest.DownloadPeerFinishedRequest)
 			if err := v.handleDownloadPeerFinishedRequest(ctx, req.GetPeerId()); err != nil {
@@ -949,6 +955,20 @@ func (v *V2) handleDownloadPeerBackToSourceStartedRequest(ctx context.Context, p
 	return nil
 }
 
+// handleRescheduleRequest handles RescheduleRequest of AnnouncePeerRequest.
+func (v *V2) handleRescheduleRequest(ctx context.Context, peerID string) error {
+	peer, loaded := v.resource.PeerManager().Load(peerID)
+	if !loaded {
+		return status.Errorf(codes.NotFound, "peer %s not found", peerID)
+	}
+
+	if err := v.scheduling.ScheduleCandidateParents(ctx, peer, peer.BlockParents); err != nil {
+		return status.Error(codes.FailedPrecondition, err.Error())
+	}
+
+	return nil
+}
+
 // handleDownloadPeerFinishedRequest handles DownloadPeerFinishedRequest of AnnouncePeerRequest.
 func (v *V2) handleDownloadPeerFinishedRequest(ctx context.Context, peerID string) error {
 	peer, loaded := v.resource.PeerManager().Load(peerID)
@@ -1192,10 +1212,6 @@ func (v *V2) handleDownloadPieceFailedRequest(ctx context.Context, peerID string
 		// Handle peer with piece temporary failed request.
 		peer.UpdatedAt.Store(time.Now())
 		peer.BlockParents.Add(req.Piece.GetParentId())
-		if err := v.scheduling.ScheduleCandidateParents(ctx, peer, peer.BlockParents); err != nil {
-			return status.Error(codes.FailedPrecondition, err.Error())
-		}
-
 		if parent, loaded := v.resource.PeerManager().Load(req.Piece.GetParentId()); loaded {
 			parent.Host.UploadFailedCount.Inc()
 		}
diff --git a/scheduler/service/service_v2_test.go b/scheduler/service/service_v2_test.go
index 25597f6bba2..08dfc8a66ec 100644
--- a/scheduler/service/service_v2_test.go
+++ b/scheduler/service/service_v2_test.go
@@ -1104,7 +1104,6 @@ func TestServiceV2_SyncProbes(t *testing.T) {
 											Disk:            mockV2Probe.Host.Disk,
 											Build:           mockV2Probe.Host.Build,
 										},
-										Description: "foo",
 									},
 								},
 							},
@@ -2130,6 +2129,78 @@ func TestServiceV2_handleDownloadPeerBackToSourceStartedRequest(t *testing.T) {
 	}
 }
 
+func TestServiceV2_handleRescheduleRequest(t *testing.T) {
+	tests := []struct {
+		name string
+		run  func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
+			mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder)
+	}{
+		{
+			name: "peer can not be loaded",
+			run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
+				mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
+				gomock.InOrder(
+					mr.PeerManager().Return(peerManager).Times(1),
+					mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1),
+				)
+
+				assert := assert.New(t)
+				assert.ErrorIs(svc.handleRescheduleRequest(context.Background(), peer.ID), status.Errorf(codes.NotFound, "peer %s not found", peer.ID))
+			},
+		},
+		{
+			name: "reschedule failed",
+			run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
+				mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
+				gomock.InOrder(
+					mr.PeerManager().Return(peerManager).Times(1),
+					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
+					ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("foo")).Times(1),
+				)
+
+				assert := assert.New(t)
+				assert.ErrorIs(svc.handleRescheduleRequest(context.Background(), peer.ID), status.Error(codes.FailedPrecondition, "foo"))
+			},
+		},
+		{
+			name: "reschedule succeeded",
+			run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
+				mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
+				gomock.InOrder(
+					mr.PeerManager().Return(peerManager).Times(1),
+					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
+					ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1),
+				)
+
+				assert := assert.New(t)
+				assert.NoError(svc.handleRescheduleRequest(context.Background(), peer.ID))
+			},
+		},
+	}
+
+	for _, tc := range tests {
+		t.Run(tc.name, func(t *testing.T) {
+			ctl := gomock.NewController(t)
+			defer ctl.Finish()
+			scheduling := schedulingmocks.NewMockScheduling(ctl)
+			res := resource.NewMockResource(ctl)
+			dynconfig := configmocks.NewMockDynconfigInterface(ctl)
+			storage := storagemocks.NewMockStorage(ctl)
+			networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
+			peerManager := resource.NewMockPeerManager(ctl)
+
+			mockHost := resource.NewHost(
+				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
+				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
+			mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
+			peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
+			svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
+
+			tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), scheduling.EXPECT())
+		})
+	}
+}
+
 func TestServiceV2_handleDownloadPeerFinishedRequest(t *testing.T) {
 	tests := []struct {
 		name string
@@ -2887,7 +2958,7 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) {
 		name string
 		req  *schedulerv2.DownloadPieceFailedRequest
 		run  func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
-			mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder)
+			mp *resource.MockPeerManagerMockRecorder)
 	}{
 		{
 			name: "peer can not be loaded",
@@ -2898,7 +2969,7 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) {
 				Temporary: true,
 			},
 			run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
-				mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
+				mp *resource.MockPeerManagerMockRecorder) {
 				gomock.InOrder(
 					mr.PeerManager().Return(peerManager).Times(1),
 					mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1),
@@ -2917,7 +2988,7 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) {
 				Temporary: false,
 			},
 			run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
-				mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
+				mp *resource.MockPeerManagerMockRecorder) {
 				gomock.InOrder(
 					mr.PeerManager().Return(peerManager).Times(1),
 					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
@@ -2927,28 +2998,6 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) {
 				assert.ErrorIs(svc.handleDownloadPieceFailedRequest(context.Background(), peer.ID, req), status.Error(codes.FailedPrecondition, "download piece failed"))
 			},
 		},
-		{
-			name: "schedule failed",
-			req: &schedulerv2.DownloadPieceFailedRequest{
-				Piece: &commonv2.Piece{
-					ParentId: &mockSeedPeerID,
-				},
-				Temporary: true,
-			},
-			run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
-				mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
-				gomock.InOrder(
-					mr.PeerManager().Return(peerManager).Times(1),
-					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
-					ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("foo")).Times(1),
-				)
-
-				assert := assert.New(t)
-				assert.ErrorIs(svc.handleDownloadPieceFailedRequest(context.Background(), peer.ID, req), status.Error(codes.FailedPrecondition, "foo"))
-				assert.NotEqual(peer.UpdatedAt.Load(), 0)
-				assert.True(peer.BlockParents.Contains(req.Piece.GetParentId()))
-			},
-		},
 		{
 			name: "parent can not be loaded",
 			req: &schedulerv2.DownloadPieceFailedRequest{
@@ -2958,11 +3007,10 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) {
 				Temporary: true,
 			},
 			run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
-				mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
+				mp *resource.MockPeerManagerMockRecorder) {
 				gomock.InOrder(
 					mr.PeerManager().Return(peerManager).Times(1),
 					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
-					ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1),
 					mr.PeerManager().Return(peerManager).Times(1),
 					mp.Load(gomock.Eq(req.Piece.GetParentId())).Return(nil, false).Times(1),
 				)
@@ -2983,11 +3031,10 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) {
 				Temporary: true,
 			},
 			run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
-				mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
+				mp *resource.MockPeerManagerMockRecorder) {
 				gomock.InOrder(
 					mr.PeerManager().Return(peerManager).Times(1),
 					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
-					ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1),
 					mr.PeerManager().Return(peerManager).Times(1),
 					mp.Load(gomock.Eq(req.Piece.GetParentId())).Return(peer, true).Times(1),
 				)
@@ -3020,7 +3067,7 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) {
 			peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
 			svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
 
-			tc.run(t, svc, tc.req, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), scheduling.EXPECT())
+			tc.run(t, svc, tc.req, peer, peerManager, res.EXPECT(), peerManager.EXPECT())
 		})
 	}
 }