Skip to content

Commit

Permalink
feat: add reschedule handler for schduler v2
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi committed Nov 14, 2023
1 parent 86ce09f commit 0c2b168
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 40 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.21

require (
d7y.io/api/v2 v2.0.45
d7y.io/api/v2 v2.0.46
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.6
github.com/Showmax/go-fqdn v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
d7y.io/api/v2 v2.0.45 h1:a39URUlu6SpkFeeGxDTnl9QQTn4bHaEdex1ARpZfmAo=
d7y.io/api/v2 v2.0.45/go.mod h1:yeVjEpNTQB4vEqnTxtdzLizDzsICcBzq3zTIyhQJF5E=
d7y.io/api/v2 v2.0.46 h1:oPPjp3eKUDAWX9VnCdKG3Mpdwdp57a4gRfnLAZHyMiw=
d7y.io/api/v2 v2.0.46/go.mod h1:yeVjEpNTQB4vEqnTxtdzLizDzsICcBzq3zTIyhQJF5E=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down
6 changes: 4 additions & 2 deletions scheduler/scheduling/scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,11 @@ func (s *scheduling) ScheduleCandidateParents(ctx context.Context, peer *resourc
// Send NeedBackToSourceResponse to peer.
peer.Log.Infof("send NeedBackToSourceResponse, because of peer's NeedBackToSource is %t and peer's schedule count is %d",
peer.NeedBackToSource.Load(), peer.ScheduleCount.Load())
description := fmt.Sprintf("peer's NeedBackToSource is %t", peer.NeedBackToSource.Load())
if err := stream.Send(&schedulerv2.AnnouncePeerResponse{
Response: &schedulerv2.AnnouncePeerResponse_NeedBackToSourceResponse{
NeedBackToSourceResponse: &schedulerv2.NeedBackToSourceResponse{
Description: fmt.Sprintf("peer's NeedBackToSource is %t", peer.NeedBackToSource.Load()),
Description: &description,
},
},
}); err != nil {
Expand All @@ -132,10 +133,11 @@ func (s *scheduling) ScheduleCandidateParents(ctx context.Context, peer *resourc

// Send NeedBackToSourceResponse to peer.
peer.Log.Infof("send NeedBackToSourceResponse, because of scheduling exceeded RetryBackToSourceLimit %d", s.config.RetryBackToSourceLimit)
description := "scheduling exceeded RetryBackToSourceLimit"
if err := stream.Send(&schedulerv2.AnnouncePeerResponse{
Response: &schedulerv2.AnnouncePeerResponse_NeedBackToSourceResponse{
NeedBackToSourceResponse: &schedulerv2.NeedBackToSourceResponse{
Description: "scheduling exceeded RetryBackToSourceLimit",
Description: &description,
},
},
}); err != nil {
Expand Down
24 changes: 20 additions & 4 deletions scheduler/service/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error
logger.Error(err)
return err
}
case *schedulerv2.AnnouncePeerRequest_RescheduleRequest:
logger.Infof("receive AnnouncePeerRequest_RescheduleRequest: %#v", announcePeerRequest.RescheduleRequest)
if err := v.handleRescheduleRequest(ctx, req.GetPeerId()); err != nil {
logger.Error(err)
return err
}
case *schedulerv2.AnnouncePeerRequest_DownloadPeerFinishedRequest:
logger.Infof("receive AnnouncePeerRequest_DownloadPeerFinishedRequest: %#v", announcePeerRequest.DownloadPeerFinishedRequest)
if err := v.handleDownloadPeerFinishedRequest(ctx, req.GetPeerId()); err != nil {
Expand Down Expand Up @@ -949,6 +955,20 @@ func (v *V2) handleDownloadPeerBackToSourceStartedRequest(ctx context.Context, p
return nil
}

// handleRescheduleRequest handles RescheduleRequest of AnnouncePeerRequest.
func (v *V2) handleRescheduleRequest(ctx context.Context, peerID string) error {
peer, loaded := v.resource.PeerManager().Load(peerID)
if !loaded {
return status.Errorf(codes.NotFound, "peer %s not found", peerID)
}

if err := v.scheduling.ScheduleCandidateParents(ctx, peer, peer.BlockParents); err != nil {
return status.Error(codes.FailedPrecondition, err.Error())
}

return nil
}

// handleDownloadPeerFinishedRequest handles DownloadPeerFinishedRequest of AnnouncePeerRequest.
func (v *V2) handleDownloadPeerFinishedRequest(ctx context.Context, peerID string) error {
peer, loaded := v.resource.PeerManager().Load(peerID)
Expand Down Expand Up @@ -1192,10 +1212,6 @@ func (v *V2) handleDownloadPieceFailedRequest(ctx context.Context, peerID string
// Handle peer with piece temporary failed request.
peer.UpdatedAt.Store(time.Now())
peer.BlockParents.Add(req.Piece.GetParentId())
if err := v.scheduling.ScheduleCandidateParents(ctx, peer, peer.BlockParents); err != nil {
return status.Error(codes.FailedPrecondition, err.Error())
}

if parent, loaded := v.resource.PeerManager().Load(req.Piece.GetParentId()); loaded {
parent.Host.UploadFailedCount.Inc()
}
Expand Down
109 changes: 78 additions & 31 deletions scheduler/service/service_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1104,7 +1104,6 @@ func TestServiceV2_SyncProbes(t *testing.T) {
Disk: mockV2Probe.Host.Disk,
Build: mockV2Probe.Host.Build,
},
Description: "foo",
},
},
},
Expand Down Expand Up @@ -2130,6 +2129,78 @@ func TestServiceV2_handleDownloadPeerBackToSourceStartedRequest(t *testing.T) {
}
}

func TestServiceV2_handleRescheduleRequest(t *testing.T) {
tests := []struct {
name string
run func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder)
}{
{
name: "peer can not be loaded",
run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
gomock.InOrder(
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1),
)

assert := assert.New(t)
assert.ErrorIs(svc.handleRescheduleRequest(context.Background(), peer.ID), status.Errorf(codes.NotFound, "peer %s not found", peer.ID))
},
},
{
name: "reschedule failed",
run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
gomock.InOrder(
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("foo")).Times(1),
)

assert := assert.New(t)
assert.ErrorIs(svc.handleRescheduleRequest(context.Background(), peer.ID), status.Error(codes.FailedPrecondition, "foo"))
},
},
{
name: "reschedule succeeded",
run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
gomock.InOrder(
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1),
)

assert := assert.New(t)
assert.NoError(svc.handleRescheduleRequest(context.Background(), peer.ID))
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
scheduling := schedulingmocks.NewMockScheduling(ctl)
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
peerManager := resource.NewMockPeerManager(ctl)

mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)

tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), scheduling.EXPECT())
})
}
}

func TestServiceV2_handleDownloadPeerFinishedRequest(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -2887,7 +2958,7 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) {
name string
req *schedulerv2.DownloadPieceFailedRequest
run func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder)
mp *resource.MockPeerManagerMockRecorder)
}{
{
name: "peer can not be loaded",
Expand All @@ -2898,7 +2969,7 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) {
Temporary: true,
},
run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
mp *resource.MockPeerManagerMockRecorder) {
gomock.InOrder(
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1),
Expand All @@ -2917,7 +2988,7 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) {
Temporary: false,
},
run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
mp *resource.MockPeerManagerMockRecorder) {
gomock.InOrder(
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
Expand All @@ -2927,28 +2998,6 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) {
assert.ErrorIs(svc.handleDownloadPieceFailedRequest(context.Background(), peer.ID, req), status.Error(codes.FailedPrecondition, "download piece failed"))
},
},
{
name: "schedule failed",
req: &schedulerv2.DownloadPieceFailedRequest{
Piece: &commonv2.Piece{
ParentId: &mockSeedPeerID,
},
Temporary: true,
},
run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
gomock.InOrder(
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("foo")).Times(1),
)

assert := assert.New(t)
assert.ErrorIs(svc.handleDownloadPieceFailedRequest(context.Background(), peer.ID, req), status.Error(codes.FailedPrecondition, "foo"))
assert.NotEqual(peer.UpdatedAt.Load(), 0)
assert.True(peer.BlockParents.Contains(req.Piece.GetParentId()))
},
},
{
name: "parent can not be loaded",
req: &schedulerv2.DownloadPieceFailedRequest{
Expand All @@ -2958,11 +3007,10 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) {
Temporary: true,
},
run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
mp *resource.MockPeerManagerMockRecorder) {
gomock.InOrder(
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(req.Piece.GetParentId())).Return(nil, false).Times(1),
)
Expand All @@ -2983,11 +3031,10 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) {
Temporary: true,
},
run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
mp *resource.MockPeerManagerMockRecorder) {
gomock.InOrder(
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(req.Piece.GetParentId())).Return(peer, true).Times(1),
)
Expand Down Expand Up @@ -3020,7 +3067,7 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) {
peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)

tc.run(t, svc, tc.req, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), scheduling.EXPECT())
tc.run(t, svc, tc.req, peer, peerManager, res.EXPECT(), peerManager.EXPECT())
})
}
}
Expand Down

0 comments on commit 0c2b168

Please sign in to comment.