Skip to content

Commit

Permalink
feat: optimize parameters in seed peer DownloadTask (#2947)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Dec 19, 2023
1 parent a58fa9c commit 003376c
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 133 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.21

require (
d7y.io/api/v2 v2.0.62
d7y.io/api/v2 v2.0.64
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.6
github.com/Showmax/go-fqdn v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
d7y.io/api/v2 v2.0.62 h1:q4/r24DxWT+4zsGGMe8HqbjC3cw+B/s2+gwI2oKC7Og=
d7y.io/api/v2 v2.0.62/go.mod h1:ve0R4ePgRYZVdnVyhWTOi2LdP3/gyf21ZwP2xij+3Io=
d7y.io/api/v2 v2.0.64 h1:2mdQ0maJZZgogfQHoCyzi1TBczyby1WeyFau13ywmDw=
d7y.io/api/v2 v2.0.64/go.mod h1:ve0R4ePgRYZVdnVyhWTOi2LdP3/gyf21ZwP2xij+3Io=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down
67 changes: 16 additions & 51 deletions pkg/rpc/dfdaemon/client/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"google.golang.org/grpc"

commonv2 "d7y.io/api/v2/pkg/apis/common/v2"
dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2"

logger "d7y.io/dragonfly/v2/internal/dflog"
Expand Down Expand Up @@ -61,96 +60,62 @@ func GetV2(ctx context.Context, target string, opts ...grpc.DialOption) (V2, err
}

return &v2{
DfdaemonClient: dfdaemonv2.NewDfdaemonClient(conn),
ClientConn: conn,
DfdaemonUploadClient: dfdaemonv2.NewDfdaemonUploadClient(conn),
ClientConn: conn,
}, nil
}

// V2 is the interface for v2 version of the grpc client.
type V2 interface {
// SyncPieces syncs pieces from the other peers.
SyncPieces(context.Context, *dfdaemonv2.SyncPiecesRequest, ...grpc.CallOption) (dfdaemonv2.DfdaemonUpload_SyncPiecesClient, error)

// DownloadPiece downloads piece from the other peer.
DownloadPiece(context.Context, *dfdaemonv2.DownloadPieceRequest, ...grpc.CallOption) (*dfdaemonv2.DownloadPieceResponse, error)

// DownloadTask downloads task back-to-source.
// DownloadTask downloads task from the other peer.
DownloadTask(context.Context, *dfdaemonv2.DownloadTaskRequest, ...grpc.CallOption) error

// UploadTask uploads task to p2p network.
UploadTask(context.Context, *dfdaemonv2.UploadTaskRequest, ...grpc.CallOption) error

// StatTask stats task information.
StatTask(context.Context, *dfdaemonv2.StatTaskRequest, ...grpc.CallOption) (*commonv2.Task, error)

// DeleteTask deletes task from p2p network.
DeleteTask(context.Context, *dfdaemonv2.DeleteTaskRequest, ...grpc.CallOption) error

// Close tears down the ClientConn and all underlying connections.
Close() error
}

// v2 provides v2 version of the dfdaemon grpc function.
type v2 struct {
dfdaemonv2.DfdaemonClient
dfdaemonv2.DfdaemonUploadClient
*grpc.ClientConn
}

// Trigger client to download file.
func (v *v2) SyncPieces(ctx context.Context, req *dfdaemonv2.DownloadPieceRequest, opts ...grpc.CallOption) (*dfdaemonv2.DownloadPieceResponse, error) {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()

return v.DfdaemonClient.DownloadPiece(
ctx,
req,
opts...,
)
}

// DownloadTask downloads task back-to-source.
func (v *v2) DownloadTask(ctx context.Context, req *dfdaemonv2.DownloadTaskRequest, opts ...grpc.CallOption) error {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()

_, err := v.DfdaemonClient.DownloadTask(
ctx,
req,
opts...,
)

return err
}

// UploadTask uploads task to p2p network.
func (v *v2) UploadTask(ctx context.Context, req *dfdaemonv2.UploadTaskRequest, opts ...grpc.CallOption) error {
// SyncPieces syncs pieces from the other peers.
func (v *v2) SyncPieces(ctx context.Context, req *dfdaemonv2.SyncPiecesRequest, opts ...grpc.CallOption) (dfdaemonv2.DfdaemonUpload_SyncPiecesClient, error) {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()

_, err := v.DfdaemonClient.UploadTask(
return v.DfdaemonUploadClient.SyncPieces(
ctx,
req,
opts...,
)

return err
}

// StatTask stats task information.
func (v *v2) StatTask(ctx context.Context, req *dfdaemonv2.StatTaskRequest, opts ...grpc.CallOption) (*commonv2.Task, error) {
// DownloadPiece downloads piece from the other peer.
func (v *v2) DownloadPiece(ctx context.Context, req *dfdaemonv2.DownloadPieceRequest, opts ...grpc.CallOption) (*dfdaemonv2.DownloadPieceResponse, error) {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()

return v.DfdaemonClient.StatTask(
return v.DfdaemonUploadClient.DownloadPiece(
ctx,
req,
opts...,
)
}

// DeleteTask deletes task from p2p network.
func (v *v2) DeleteTask(ctx context.Context, req *dfdaemonv2.DeleteTaskRequest, opts ...grpc.CallOption) error {
// DownloadTask downloads task from the other peer.
func (v *v2) DownloadTask(ctx context.Context, req *dfdaemonv2.DownloadTaskRequest, opts ...grpc.CallOption) error {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()

_, err := v.DfdaemonClient.DeleteTask(
_, err := v.DfdaemonUploadClient.DownloadTask(
ctx,
req,
opts...,
Expand Down
53 changes: 7 additions & 46 deletions pkg/rpc/dfdaemon/client/mocks/client_v2_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions scheduler/resource/seed_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/net/http"
"d7y.io/dragonfly/v2/pkg/rpc/common"
"d7y.io/dragonfly/v2/pkg/types"
"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/metrics"
)
Expand All @@ -47,7 +46,7 @@ const (
type SeedPeer interface {
// DownloadTask downloads task back-to-source.
// Used only in v2 version of the grpc.
DownloadTask(context.Context, *Task, types.HostType) error
DownloadTask(context.Context, *commonv2.Download) error

// TriggerTask triggers the seed peer to download task.
// Used only in v1 version of the grpc.
Expand Down Expand Up @@ -88,7 +87,7 @@ func newSeedPeer(cfg *config.ResourceConfig, client SeedPeerClient, peerManager
// TODO Implement DownloadTask
// DownloadTask downloads task back-to-source.
// Used only in v2 version of the grpc.
func (s *seedPeer) DownloadTask(ctx context.Context, task *Task, hostType types.HostType) error {
func (s *seedPeer) DownloadTask(ctx context.Context, download *commonv2.Download) error {
// ctx, cancel := context.WithCancel(trace.ContextWithSpan(context.Background(), trace.SpanFromContext(ctx)))
// defer cancel()

Expand Down
10 changes: 5 additions & 5 deletions scheduler/resource/seed_peer_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions scheduler/service/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,7 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S
blocklist := set.NewSafeSet[string]()
blocklist.Add(peer.ID)
if task.FSM.Is(resource.TaskStateFailed) || !task.HasAvailablePeer(blocklist) {
if err := v.downloadTaskBySeedPeer(ctx, peer); err != nil {
if err := v.downloadTaskBySeedPeer(ctx, req.GetDownload(), peer); err != nil {
// Collect RegisterPeerFailureCount metrics.
metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
Expand Down Expand Up @@ -1302,7 +1302,7 @@ func (v *V2) handleResource(ctx context.Context, stream schedulerv2.Scheduler_An
}

// downloadTaskBySeedPeer downloads task by seed peer.
func (v *V2) downloadTaskBySeedPeer(ctx context.Context, peer *resource.Peer) error {
func (v *V2) downloadTaskBySeedPeer(ctx context.Context, download *commonv2.Download, peer *resource.Peer) error {
// Trigger the first download task based on different priority levels,
// refer to https://github.com/dragonflyoss/api/blob/main/pkg/apis/common/v2/common.proto#L74.
priority := peer.CalculatePriority(v.dynconfig)
Expand All @@ -1312,7 +1312,7 @@ func (v *V2) downloadTaskBySeedPeer(ctx context.Context, peer *resource.Peer) er
// Super peer is first triggered to download back-to-source.
if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) {
if err := v.resource.SeedPeer().DownloadTask(context.Background(), peer.Task, hostType); err != nil {
if err := v.resource.SeedPeer().DownloadTask(context.Background(), download); err != nil {
peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error())
return
}
Expand All @@ -1325,7 +1325,7 @@ func (v *V2) downloadTaskBySeedPeer(ctx context.Context, peer *resource.Peer) er
// Strong peer is first triggered to download back-to-source.
if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) {
if err := v.resource.SeedPeer().DownloadTask(context.Background(), peer.Task, hostType); err != nil {
if err := v.resource.SeedPeer().DownloadTask(context.Background(), download); err != nil {
peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error())
return
}
Expand All @@ -1338,7 +1338,7 @@ func (v *V2) downloadTaskBySeedPeer(ctx context.Context, peer *resource.Peer) er
// Weak peer is first triggered to download back-to-source.
if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) {
if err := v.resource.SeedPeer().DownloadTask(context.Background(), peer.Task, hostType); err != nil {
if err := v.resource.SeedPeer().DownloadTask(context.Background(), download); err != nil {
peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error())
return
}
Expand Down
Loading

0 comments on commit 003376c

Please sign in to comment.