Skip to content

Commit

Permalink
feat: remove TinyTaskResponse and SmallTaskResponse message (#2881)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Nov 14, 2023
1 parent d24c20b commit 86ce09f
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 1,075 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.21

require (
d7y.io/api/v2 v2.0.43
d7y.io/api/v2 v2.0.45
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.6
github.com/Showmax/go-fqdn v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
d7y.io/api/v2 v2.0.43 h1:4IdL+j1CAJp4QIs71YeItKZV/lzqya98ksxoGVnaQUQ=
d7y.io/api/v2 v2.0.43/go.mod h1:yeVjEpNTQB4vEqnTxtdzLizDzsICcBzq3zTIyhQJF5E=
d7y.io/api/v2 v2.0.45 h1:a39URUlu6SpkFeeGxDTnl9QQTn4bHaEdex1ARpZfmAo=
d7y.io/api/v2 v2.0.45/go.mod h1:yeVjEpNTQB4vEqnTxtdzLizDzsICcBzq3zTIyhQJF5E=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down
168 changes: 0 additions & 168 deletions scheduler/scheduling/scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,174 +544,6 @@ func (s *scheduling) filterCandidateParents(peer *resource.Peer, blocklist set.S
return candidateParents
}

// ConstructSuccessSmallTaskResponse constructs scheduling successful response of the small task.
// Used only in v2 version of the grpc.
func ConstructSuccessSmallTaskResponse(candidateParent *resource.Peer) *schedulerv2.AnnouncePeerResponse_SmallTaskResponse {
parent := &commonv2.Peer{
Id: candidateParent.ID,
Priority: candidateParent.Priority,
Cost: durationpb.New(candidateParent.Cost.Load()),
State: candidateParent.FSM.Current(),
NeedBackToSource: candidateParent.NeedBackToSource.Load(),
CreatedAt: timestamppb.New(candidateParent.CreatedAt.Load()),
UpdatedAt: timestamppb.New(candidateParent.UpdatedAt.Load()),
}

// Set range to parent.
if candidateParent.Range != nil {
parent.Range = &commonv2.Range{
Start: candidateParent.Range.Start,
Length: candidateParent.Range.Length,
}
}

// Set pieces to parent.
candidateParent.Pieces.Range(func(key, value any) bool {
candidateParentPiece, ok := value.(*resource.Piece)
if !ok {
candidateParent.Log.Errorf("invalid piece %s %#v", key, value)
return true
}

piece := &commonv2.Piece{
Number: candidateParentPiece.Number,
ParentId: &candidateParentPiece.ParentID,
Offset: candidateParentPiece.Offset,
Length: candidateParentPiece.Length,
TrafficType: &candidateParentPiece.TrafficType,
Cost: durationpb.New(candidateParentPiece.Cost),
CreatedAt: timestamppb.New(candidateParentPiece.CreatedAt),
}

if candidateParentPiece.Digest != nil {
piece.Digest = candidateParentPiece.Digest.String()
}

parent.Pieces = append(parent.Pieces, piece)
return true
})

// Set task to parent.
parent.Task = &commonv2.Task{
Id: candidateParent.Task.ID,
Type: candidateParent.Task.Type,
Url: candidateParent.Task.URL,
Tag: &candidateParent.Task.Tag,
Application: &candidateParent.Task.Application,
Filters: candidateParent.Task.Filters,
Header: candidateParent.Task.Header,
PieceLength: candidateParent.Task.PieceLength,
ContentLength: candidateParent.Task.ContentLength.Load(),
PieceCount: candidateParent.Task.TotalPieceCount.Load(),
SizeScope: candidateParent.Task.SizeScope(),
State: candidateParent.Task.FSM.Current(),
PeerCount: int32(candidateParent.Task.PeerCount()),
CreatedAt: timestamppb.New(candidateParent.Task.CreatedAt.Load()),
UpdatedAt: timestamppb.New(candidateParent.Task.UpdatedAt.Load()),
}

// Set digest to parent task.
if candidateParent.Task.Digest != nil {
dgst := candidateParent.Task.Digest.String()
parent.Task.Digest = &dgst
}

// Set pieces to parent task.
candidateParent.Task.Pieces.Range(func(key, value any) bool {
taskPiece, ok := value.(*resource.Piece)
if !ok {
candidateParent.Task.Log.Errorf("invalid piece %s %#v", key, value)
return true
}

piece := &commonv2.Piece{
Number: taskPiece.Number,
ParentId: &taskPiece.ParentID,
Offset: taskPiece.Offset,
Length: taskPiece.Length,
TrafficType: &taskPiece.TrafficType,
Cost: durationpb.New(taskPiece.Cost),
CreatedAt: timestamppb.New(taskPiece.CreatedAt),
}

if taskPiece.Digest != nil {
piece.Digest = taskPiece.Digest.String()
}

parent.Task.Pieces = append(parent.Task.Pieces, piece)
return true
})

// Set host to parent.
parent.Host = &commonv2.Host{
Id: candidateParent.Host.ID,
Type: uint32(candidateParent.Host.Type),
Hostname: candidateParent.Host.Hostname,
Ip: candidateParent.Host.IP,
Port: candidateParent.Host.Port,
DownloadPort: candidateParent.Host.DownloadPort,
Os: candidateParent.Host.OS,
Platform: candidateParent.Host.Platform,
PlatformFamily: candidateParent.Host.PlatformFamily,
PlatformVersion: candidateParent.Host.PlatformVersion,
KernelVersion: candidateParent.Host.KernelVersion,
Cpu: &commonv2.CPU{
LogicalCount: candidateParent.Host.CPU.LogicalCount,
PhysicalCount: candidateParent.Host.CPU.PhysicalCount,
Percent: candidateParent.Host.CPU.Percent,
ProcessPercent: candidateParent.Host.CPU.ProcessPercent,
Times: &commonv2.CPUTimes{
User: candidateParent.Host.CPU.Times.User,
System: candidateParent.Host.CPU.Times.System,
Idle: candidateParent.Host.CPU.Times.Idle,
Nice: candidateParent.Host.CPU.Times.Nice,
Iowait: candidateParent.Host.CPU.Times.Iowait,
Irq: candidateParent.Host.CPU.Times.Irq,
Softirq: candidateParent.Host.CPU.Times.Softirq,
Steal: candidateParent.Host.CPU.Times.Steal,
Guest: candidateParent.Host.CPU.Times.Guest,
GuestNice: candidateParent.Host.CPU.Times.GuestNice,
},
},
Memory: &commonv2.Memory{
Total: candidateParent.Host.Memory.Total,
Available: candidateParent.Host.Memory.Available,
Used: candidateParent.Host.Memory.Used,
UsedPercent: candidateParent.Host.Memory.UsedPercent,
ProcessUsedPercent: candidateParent.Host.Memory.ProcessUsedPercent,
Free: candidateParent.Host.Memory.Free,
},
Network: &commonv2.Network{
TcpConnectionCount: candidateParent.Host.Network.TCPConnectionCount,
UploadTcpConnectionCount: candidateParent.Host.Network.UploadTCPConnectionCount,
Location: &candidateParent.Host.Network.Location,
Idc: &candidateParent.Host.Network.IDC,
},
Disk: &commonv2.Disk{
Total: candidateParent.Host.Disk.Total,
Free: candidateParent.Host.Disk.Free,
Used: candidateParent.Host.Disk.Used,
UsedPercent: candidateParent.Host.Disk.UsedPercent,
InodesTotal: candidateParent.Host.Disk.InodesTotal,
InodesUsed: candidateParent.Host.Disk.InodesUsed,
InodesFree: candidateParent.Host.Disk.InodesFree,
InodesUsedPercent: candidateParent.Host.Disk.InodesUsedPercent,
},
Build: &commonv2.Build{
GitVersion: candidateParent.Host.Build.GitVersion,
GitCommit: &candidateParent.Host.Build.GitCommit,
GoVersion: &candidateParent.Host.Build.GoVersion,
Platform: &candidateParent.Host.Build.Platform,
},
}

return &schedulerv2.AnnouncePeerResponse_SmallTaskResponse{
SmallTaskResponse: &schedulerv2.SmallTaskResponse{
CandidateParent: parent,
},
}
}

// ConstructSuccessNormalTaskResponse constructs scheduling successful response of the normal task.
// Used only in v2 version of the grpc.
func ConstructSuccessNormalTaskResponse(dynconfig config.DynconfigInterface, candidateParents []*resource.Peer) *schedulerv2.AnnouncePeerResponse_NormalTaskResponse {
Expand Down
153 changes: 0 additions & 153 deletions scheduler/scheduling/scheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1291,159 +1291,6 @@ func TestScheduling_FindSuccessParent(t *testing.T) {
}
}

func TestScheduling_ConstructSuccessSmallTaskResponse(t *testing.T) {
tests := []struct {
name string
expect func(t *testing.T, resp *schedulerv2.AnnouncePeerResponse_SmallTaskResponse, candidateParent *resource.Peer)
}{
{
name: "construct success",
expect: func(t *testing.T, resp *schedulerv2.AnnouncePeerResponse_SmallTaskResponse, candidateParent *resource.Peer) {
dgst := candidateParent.Task.Digest.String()

assert := assert.New(t)
assert.EqualValues(resp, &schedulerv2.AnnouncePeerResponse_SmallTaskResponse{
SmallTaskResponse: &schedulerv2.SmallTaskResponse{
CandidateParent: &commonv2.Peer{
Id: candidateParent.ID,
Range: &commonv2.Range{
Start: candidateParent.Range.Start,
Length: candidateParent.Range.Length,
},
Priority: candidateParent.Priority,
Pieces: []*commonv2.Piece{
{
Number: mockPiece.Number,
ParentId: &mockPiece.ParentID,
Offset: mockPiece.Offset,
Length: mockPiece.Length,
Digest: mockPiece.Digest.String(),
TrafficType: &mockPiece.TrafficType,
Cost: durationpb.New(mockPiece.Cost),
CreatedAt: timestamppb.New(mockPiece.CreatedAt),
},
},
Cost: durationpb.New(candidateParent.Cost.Load()),
State: candidateParent.FSM.Current(),
Task: &commonv2.Task{
Id: candidateParent.Task.ID,
Type: candidateParent.Task.Type,
Url: candidateParent.Task.URL,
Digest: &dgst,
Tag: &candidateParent.Task.Tag,
Application: &candidateParent.Task.Application,
Filters: candidateParent.Task.Filters,
Header: candidateParent.Task.Header,
PieceLength: candidateParent.Task.PieceLength,
ContentLength: candidateParent.Task.ContentLength.Load(),
PieceCount: candidateParent.Task.TotalPieceCount.Load(),
SizeScope: candidateParent.Task.SizeScope(),
Pieces: []*commonv2.Piece{
{
Number: mockPiece.Number,
ParentId: &mockPiece.ParentID,
Offset: mockPiece.Offset,
Length: mockPiece.Length,
Digest: mockPiece.Digest.String(),
TrafficType: &mockPiece.TrafficType,
Cost: durationpb.New(mockPiece.Cost),
CreatedAt: timestamppb.New(mockPiece.CreatedAt),
},
},
State: candidateParent.Task.FSM.Current(),
PeerCount: int32(candidateParent.Task.PeerCount()),
CreatedAt: timestamppb.New(candidateParent.Task.CreatedAt.Load()),
UpdatedAt: timestamppb.New(candidateParent.Task.UpdatedAt.Load()),
},
Host: &commonv2.Host{
Id: candidateParent.Host.ID,
Type: uint32(candidateParent.Host.Type),
Hostname: candidateParent.Host.Hostname,
Ip: candidateParent.Host.IP,
Port: candidateParent.Host.Port,
DownloadPort: candidateParent.Host.DownloadPort,
Os: candidateParent.Host.OS,
Platform: candidateParent.Host.Platform,
PlatformFamily: candidateParent.Host.PlatformFamily,
PlatformVersion: candidateParent.Host.PlatformVersion,
KernelVersion: candidateParent.Host.KernelVersion,
Cpu: &commonv2.CPU{
LogicalCount: candidateParent.Host.CPU.LogicalCount,
PhysicalCount: candidateParent.Host.CPU.PhysicalCount,
Percent: candidateParent.Host.CPU.Percent,
ProcessPercent: candidateParent.Host.CPU.ProcessPercent,
Times: &commonv2.CPUTimes{
User: candidateParent.Host.CPU.Times.User,
System: candidateParent.Host.CPU.Times.System,
Idle: candidateParent.Host.CPU.Times.Idle,
Nice: candidateParent.Host.CPU.Times.Nice,
Iowait: candidateParent.Host.CPU.Times.Iowait,
Irq: candidateParent.Host.CPU.Times.Irq,
Softirq: candidateParent.Host.CPU.Times.Softirq,
Steal: candidateParent.Host.CPU.Times.Steal,
Guest: candidateParent.Host.CPU.Times.Guest,
GuestNice: candidateParent.Host.CPU.Times.GuestNice,
},
},
Memory: &commonv2.Memory{
Total: candidateParent.Host.Memory.Total,
Available: candidateParent.Host.Memory.Available,
Used: candidateParent.Host.Memory.Used,
UsedPercent: candidateParent.Host.Memory.UsedPercent,
ProcessUsedPercent: candidateParent.Host.Memory.ProcessUsedPercent,
Free: candidateParent.Host.Memory.Free,
},
Network: &commonv2.Network{
TcpConnectionCount: candidateParent.Host.Network.TCPConnectionCount,
UploadTcpConnectionCount: candidateParent.Host.Network.UploadTCPConnectionCount,
Location: &candidateParent.Host.Network.Location,
Idc: &candidateParent.Host.Network.IDC,
},
Disk: &commonv2.Disk{
Total: candidateParent.Host.Disk.Total,
Free: candidateParent.Host.Disk.Free,
Used: candidateParent.Host.Disk.Used,
UsedPercent: candidateParent.Host.Disk.UsedPercent,
InodesTotal: candidateParent.Host.Disk.InodesTotal,
InodesUsed: candidateParent.Host.Disk.InodesUsed,
InodesFree: candidateParent.Host.Disk.InodesFree,
InodesUsedPercent: candidateParent.Host.Disk.InodesUsedPercent,
},
Build: &commonv2.Build{
GitVersion: candidateParent.Host.Build.GitVersion,
GitCommit: &candidateParent.Host.Build.GitCommit,
GoVersion: &candidateParent.Host.Build.GoVersion,
Platform: &candidateParent.Host.Build.Platform,
},
},
NeedBackToSource: candidateParent.NeedBackToSource.Load(),
CreatedAt: timestamppb.New(candidateParent.CreatedAt.Load()),
UpdatedAt: timestamppb.New(candidateParent.UpdatedAt.Load()),
},
},
})
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
candidateParent := resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, mockTask, mockHost, resource.WithRange(nethttp.Range{
Start: 1,
Length: 10,
}))
candidateParent.StorePiece(&mockPiece)
candidateParent.Task.StorePiece(&mockPiece)

tc.expect(t, ConstructSuccessSmallTaskResponse(candidateParent), candidateParent)
})
}
}

func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) {
tests := []struct {
name string
Expand Down
Loading

0 comments on commit 86ce09f

Please sign in to comment.