Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimize parameters in seed peer DownloadTask #2947

Merged
merged 2 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.21

require (
d7y.io/api/v2 v2.0.62
d7y.io/api/v2 v2.0.64
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.6
github.com/Showmax/go-fqdn v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
d7y.io/api/v2 v2.0.62 h1:q4/r24DxWT+4zsGGMe8HqbjC3cw+B/s2+gwI2oKC7Og=
d7y.io/api/v2 v2.0.62/go.mod h1:ve0R4ePgRYZVdnVyhWTOi2LdP3/gyf21ZwP2xij+3Io=
d7y.io/api/v2 v2.0.64 h1:2mdQ0maJZZgogfQHoCyzi1TBczyby1WeyFau13ywmDw=
d7y.io/api/v2 v2.0.64/go.mod h1:ve0R4ePgRYZVdnVyhWTOi2LdP3/gyf21ZwP2xij+3Io=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down
67 changes: 16 additions & 51 deletions pkg/rpc/dfdaemon/client/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"google.golang.org/grpc"

commonv2 "d7y.io/api/v2/pkg/apis/common/v2"
dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2"

logger "d7y.io/dragonfly/v2/internal/dflog"
Expand Down Expand Up @@ -61,96 +60,62 @@ func GetV2(ctx context.Context, target string, opts ...grpc.DialOption) (V2, err
}

return &v2{
DfdaemonClient: dfdaemonv2.NewDfdaemonClient(conn),
ClientConn: conn,
DfdaemonUploadClient: dfdaemonv2.NewDfdaemonUploadClient(conn),
ClientConn: conn,
}, nil
}

// V2 is the interface for v2 version of the grpc client.
type V2 interface {
// SyncPieces syncs pieces from the other peers.
SyncPieces(context.Context, *dfdaemonv2.SyncPiecesRequest, ...grpc.CallOption) (dfdaemonv2.DfdaemonUpload_SyncPiecesClient, error)

// DownloadPiece downloads piece from the other peer.
DownloadPiece(context.Context, *dfdaemonv2.DownloadPieceRequest, ...grpc.CallOption) (*dfdaemonv2.DownloadPieceResponse, error)

// DownloadTask downloads task back-to-source.
// DownloadTask downloads task from the other peer.
DownloadTask(context.Context, *dfdaemonv2.DownloadTaskRequest, ...grpc.CallOption) error

// UploadTask uploads task to p2p network.
UploadTask(context.Context, *dfdaemonv2.UploadTaskRequest, ...grpc.CallOption) error

// StatTask stats task information.
StatTask(context.Context, *dfdaemonv2.StatTaskRequest, ...grpc.CallOption) (*commonv2.Task, error)

// DeleteTask deletes task from p2p network.
DeleteTask(context.Context, *dfdaemonv2.DeleteTaskRequest, ...grpc.CallOption) error

// Close tears down the ClientConn and all underlying connections.
Close() error
}

// v2 provides v2 version of the dfdaemon grpc function.
type v2 struct {
dfdaemonv2.DfdaemonClient
dfdaemonv2.DfdaemonUploadClient
*grpc.ClientConn
}

// Trigger client to download file.
func (v *v2) SyncPieces(ctx context.Context, req *dfdaemonv2.DownloadPieceRequest, opts ...grpc.CallOption) (*dfdaemonv2.DownloadPieceResponse, error) {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()

return v.DfdaemonClient.DownloadPiece(
ctx,
req,
opts...,
)
}

// DownloadTask downloads task back-to-source.
func (v *v2) DownloadTask(ctx context.Context, req *dfdaemonv2.DownloadTaskRequest, opts ...grpc.CallOption) error {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()

_, err := v.DfdaemonClient.DownloadTask(
ctx,
req,
opts...,
)

return err
}

// UploadTask uploads task to p2p network.
func (v *v2) UploadTask(ctx context.Context, req *dfdaemonv2.UploadTaskRequest, opts ...grpc.CallOption) error {
// SyncPieces syncs pieces from the other peers.
func (v *v2) SyncPieces(ctx context.Context, req *dfdaemonv2.SyncPiecesRequest, opts ...grpc.CallOption) (dfdaemonv2.DfdaemonUpload_SyncPiecesClient, error) {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()

_, err := v.DfdaemonClient.UploadTask(
return v.DfdaemonUploadClient.SyncPieces(
ctx,
req,
opts...,
)

return err
}

// StatTask stats task information.
func (v *v2) StatTask(ctx context.Context, req *dfdaemonv2.StatTaskRequest, opts ...grpc.CallOption) (*commonv2.Task, error) {
// DownloadPiece downloads piece from the other peer.
func (v *v2) DownloadPiece(ctx context.Context, req *dfdaemonv2.DownloadPieceRequest, opts ...grpc.CallOption) (*dfdaemonv2.DownloadPieceResponse, error) {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()

return v.DfdaemonClient.StatTask(
return v.DfdaemonUploadClient.DownloadPiece(
ctx,
req,
opts...,
)
}

// DeleteTask deletes task from p2p network.
func (v *v2) DeleteTask(ctx context.Context, req *dfdaemonv2.DeleteTaskRequest, opts ...grpc.CallOption) error {
// DownloadTask downloads task from the other peer.
func (v *v2) DownloadTask(ctx context.Context, req *dfdaemonv2.DownloadTaskRequest, opts ...grpc.CallOption) error {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()

_, err := v.DfdaemonClient.DeleteTask(
_, err := v.DfdaemonUploadClient.DownloadTask(
ctx,
req,
opts...,
Expand Down
53 changes: 7 additions & 46 deletions pkg/rpc/dfdaemon/client/mocks/client_v2_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions scheduler/resource/seed_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/net/http"
"d7y.io/dragonfly/v2/pkg/rpc/common"
"d7y.io/dragonfly/v2/pkg/types"
"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/metrics"
)
Expand All @@ -47,7 +46,7 @@ const (
type SeedPeer interface {
// DownloadTask downloads task back-to-source.
// Used only in v2 version of the grpc.
DownloadTask(context.Context, *Task, types.HostType) error
DownloadTask(context.Context, *commonv2.Download) error

// TriggerTask triggers the seed peer to download task.
// Used only in v1 version of the grpc.
Expand Down Expand Up @@ -88,7 +87,7 @@ func newSeedPeer(cfg *config.ResourceConfig, client SeedPeerClient, peerManager
// TODO Implement DownloadTask
// DownloadTask downloads task back-to-source.
// Used only in v2 version of the grpc.
func (s *seedPeer) DownloadTask(ctx context.Context, task *Task, hostType types.HostType) error {
func (s *seedPeer) DownloadTask(ctx context.Context, download *commonv2.Download) error {
// ctx, cancel := context.WithCancel(trace.ContextWithSpan(context.Background(), trace.SpanFromContext(ctx)))
// defer cancel()

Expand Down
10 changes: 5 additions & 5 deletions scheduler/resource/seed_peer_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions scheduler/service/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,7 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S
blocklist := set.NewSafeSet[string]()
blocklist.Add(peer.ID)
if task.FSM.Is(resource.TaskStateFailed) || !task.HasAvailablePeer(blocklist) {
if err := v.downloadTaskBySeedPeer(ctx, peer); err != nil {
if err := v.downloadTaskBySeedPeer(ctx, req.GetDownload(), peer); err != nil {
// Collect RegisterPeerFailureCount metrics.
metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
Expand Down Expand Up @@ -1302,7 +1302,7 @@ func (v *V2) handleResource(ctx context.Context, stream schedulerv2.Scheduler_An
}

// downloadTaskBySeedPeer downloads task by seed peer.
func (v *V2) downloadTaskBySeedPeer(ctx context.Context, peer *resource.Peer) error {
func (v *V2) downloadTaskBySeedPeer(ctx context.Context, download *commonv2.Download, peer *resource.Peer) error {
// Trigger the first download task based on different priority levels,
// refer to https://github.com/dragonflyoss/api/blob/main/pkg/apis/common/v2/common.proto#L74.
priority := peer.CalculatePriority(v.dynconfig)
Expand All @@ -1312,7 +1312,7 @@ func (v *V2) downloadTaskBySeedPeer(ctx context.Context, peer *resource.Peer) er
// Super peer is first triggered to download back-to-source.
if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) {
if err := v.resource.SeedPeer().DownloadTask(context.Background(), peer.Task, hostType); err != nil {
if err := v.resource.SeedPeer().DownloadTask(context.Background(), download); err != nil {
peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error())
return
}
Expand All @@ -1325,7 +1325,7 @@ func (v *V2) downloadTaskBySeedPeer(ctx context.Context, peer *resource.Peer) er
// Strong peer is first triggered to download back-to-source.
if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) {
if err := v.resource.SeedPeer().DownloadTask(context.Background(), peer.Task, hostType); err != nil {
if err := v.resource.SeedPeer().DownloadTask(context.Background(), download); err != nil {
peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error())
return
}
Expand All @@ -1338,7 +1338,7 @@ func (v *V2) downloadTaskBySeedPeer(ctx context.Context, peer *resource.Peer) er
// Weak peer is first triggered to download back-to-source.
if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) {
if err := v.resource.SeedPeer().DownloadTask(context.Background(), peer.Task, hostType); err != nil {
if err := v.resource.SeedPeer().DownloadTask(context.Background(), download); err != nil {
peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error())
return
}
Expand Down
Loading
Loading