From 5f4fe5753afe3593720a1078c45767d32c7d4870 Mon Sep 17 00:00:00 2001 From: Gaius Date: Wed, 8 Nov 2023 20:45:03 +0800 Subject: [PATCH] feat: implement ListSeedPeers feature in manager (#2865) Signed-off-by: Gaius --- api/manager/docs.go | 4 +- go.mod | 2 +- go.sum | 4 +- manager/rpcserver/manager_server_v1.go | 72 +++++++++++++++++++++++++ manager/rpcserver/manager_server_v2.go | 72 +++++++++++++++++++++++++ pkg/redis/redis.go | 5 ++ pkg/redis/redis_test.go | 51 ++++++++++++++++++ scheduler/scheduling/scheduling.go | 8 +-- scheduler/scheduling/scheduling_test.go | 12 ++--- scheduler/service/service_v2.go | 6 +-- scheduler/service/service_v2_test.go | 20 +++---- 11 files changed, 229 insertions(+), 27 deletions(-) diff --git a/api/manager/docs.go b/api/manager/docs.go index b1d9c684fa5..c32d51a0abf 100644 --- a/api/manager/docs.go +++ b/api/manager/docs.go @@ -1,4 +1,4 @@ -// Code generated by swaggo/swag. DO NOT EDIT +// Package manager Code generated by swaggo/swag. DO NOT EDIT package manager import "github.com/swaggo/swag" @@ -5278,6 +5278,8 @@ var SwaggerInfo = &swag.Spec{ Description: "Dragonfly Manager Server", InfoInstanceName: "swagger", SwaggerTemplate: docTemplate, + LeftDelim: "{{", + RightDelim: "}}", } func init() { diff --git a/go.mod b/go.mod index 67a37b14b63..d9818a9da45 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2 go 1.21 require ( - d7y.io/api/v2 v2.0.36 + d7y.io/api/v2 v2.0.43 github.com/MysteriousPotato/go-lockable v1.0.0 github.com/RichardKnop/machinery v1.10.6 github.com/Showmax/go-fqdn v1.0.0 diff --git a/go.sum b/go.sum index bfb050f02e5..6649c34d3e6 100644 --- a/go.sum +++ b/go.sum @@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo= -d7y.io/api/v2 v2.0.36 h1:H8jVBN1ydo158Zd/I+XWUy/ymaYyAn+bnSswm3Q3WGU= -d7y.io/api/v2 v2.0.36/go.mod h1:yeVjEpNTQB4vEqnTxtdzLizDzsICcBzq3zTIyhQJF5E= +d7y.io/api/v2 v2.0.43 h1:4IdL+j1CAJp4QIs71YeItKZV/lzqya98ksxoGVnaQUQ= +d7y.io/api/v2 v2.0.43/go.mod h1:yeVjEpNTQB4vEqnTxtdzLizDzsICcBzq3zTIyhQJF5E= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= diff --git a/manager/rpcserver/manager_server_v1.go b/manager/rpcserver/manager_server_v1.go index 9d47d42109b..30990fbf30c 100644 --- a/manager/rpcserver/manager_server_v1.go +++ b/manager/rpcserver/manager_server_v1.go @@ -177,6 +177,78 @@ func (s *managerServerV1) GetSeedPeer(ctx context.Context, req *managerv1.GetSee return &pbSeedPeer, nil } +// List acitve seed peers configuration. +func (s *managerServerV1) ListSeedPeers(ctx context.Context, req *managerv1.ListSeedPeersRequest) (*managerv1.ListSeedPeersResponse, error) { + log := logger.WithHostnameAndIP(req.Hostname, req.Ip) + log.Debugf("list seed peers, version %s, commit %s", req.Version, req.Commit) + + // Cache hit. + var pbListSeedPeersResponse managerv1.ListSeedPeersResponse + cacheKey := pkgredis.MakeSeedPeersKeyForPeerInManager(req.Hostname, req.Ip) + + if err := s.cache.Get(ctx, cacheKey, &pbListSeedPeersResponse); err != nil { + log.Warnf("%s cache miss because of %s", cacheKey, err.Error()) + } else { + log.Debugf("%s cache hit", cacheKey) + return &pbListSeedPeersResponse, nil + } + + // Cache miss and search seed peer. + seedPeer := models.SeedPeer{} + if err := s.db.WithContext(ctx).Preload("SeedPeerCluster").First(&seedPeer, models.SeedPeer{ + Hostname: req.Hostname, + IP: req.Ip, + }).Error; err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + var seedPeers []models.SeedPeer + if err := s.db.WithContext(ctx).Find(&seedPeers, models.SeedPeer{ + State: models.SeedPeerStateActive, + SeedPeerClusterID: uint(seedPeer.SeedPeerClusterID), + }).Error; err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + if len(seedPeers) == 0 { + return nil, status.Error(codes.NotFound, "seed peer not found") + } + + // Construct seed peers. + for _, seedPeer := range seedPeers { + pbListSeedPeersResponse.SeedPeers = append(pbListSeedPeersResponse.SeedPeers, &managerv1.SeedPeer{ + Id: uint64(seedPeer.ID), + Hostname: seedPeer.Hostname, + Type: seedPeer.Type, + Idc: seedPeer.IDC, + Location: seedPeer.Location, + Ip: seedPeer.IP, + Port: seedPeer.Port, + DownloadPort: seedPeer.DownloadPort, + ObjectStoragePort: seedPeer.ObjectStoragePort, + State: seedPeer.State, + SeedPeerClusterId: uint64(seedPeer.SeedPeerClusterID), + SeedPeerCluster: &managerv1.SeedPeerCluster{ + Id: uint64(seedPeer.SeedPeerCluster.ID), + Name: seedPeer.SeedPeerCluster.Name, + Bio: seedPeer.SeedPeerCluster.BIO, + }, + }) + } + + // Cache data. + if err := s.cache.Once(&cachev8.Item{ + Ctx: ctx, + Key: cacheKey, + Value: &pbListSeedPeersResponse, + TTL: s.cache.TTL, + }); err != nil { + log.Error(err) + } + + return &pbListSeedPeersResponse, nil +} + // Update SeedPeer configuration. func (s *managerServerV1) UpdateSeedPeer(ctx context.Context, req *managerv1.UpdateSeedPeerRequest) (*managerv1.SeedPeer, error) { log := logger.WithHostnameAndIP(req.Hostname, req.Ip) diff --git a/manager/rpcserver/manager_server_v2.go b/manager/rpcserver/manager_server_v2.go index df059485b0b..c7c9b23857e 100644 --- a/manager/rpcserver/manager_server_v2.go +++ b/manager/rpcserver/manager_server_v2.go @@ -177,6 +177,78 @@ func (s *managerServerV2) GetSeedPeer(ctx context.Context, req *managerv2.GetSee return &pbSeedPeer, nil } +// List acitve seed peers configuration. +func (s *managerServerV2) ListSeedPeers(ctx context.Context, req *managerv2.ListSeedPeersRequest) (*managerv2.ListSeedPeersResponse, error) { + log := logger.WithHostnameAndIP(req.Hostname, req.Ip) + log.Debugf("list seed peers, version %s, commit %s", req.Version, req.Commit) + + // Cache hit. + var pbListSeedPeersResponse managerv2.ListSeedPeersResponse + cacheKey := pkgredis.MakeSeedPeersKeyForPeerInManager(req.Hostname, req.Ip) + + if err := s.cache.Get(ctx, cacheKey, &pbListSeedPeersResponse); err != nil { + log.Warnf("%s cache miss because of %s", cacheKey, err.Error()) + } else { + log.Debugf("%s cache hit", cacheKey) + return &pbListSeedPeersResponse, nil + } + + // Cache miss and search seed peer. + seedPeer := models.SeedPeer{} + if err := s.db.WithContext(ctx).Preload("SeedPeerCluster").First(&seedPeer, models.SeedPeer{ + Hostname: req.Hostname, + IP: req.Ip, + }).Error; err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + var seedPeers []models.SeedPeer + if err := s.db.WithContext(ctx).Find(&seedPeers, models.SeedPeer{ + State: models.SeedPeerStateActive, + SeedPeerClusterID: uint(seedPeer.SeedPeerClusterID), + }).Error; err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + if len(seedPeers) == 0 { + return nil, status.Error(codes.NotFound, "seed peer not found") + } + + // Construct seed peers. + for _, seedPeer := range seedPeers { + pbListSeedPeersResponse.SeedPeers = append(pbListSeedPeersResponse.SeedPeers, &managerv2.SeedPeer{ + Id: uint64(seedPeer.ID), + Hostname: seedPeer.Hostname, + Type: seedPeer.Type, + Idc: &seedPeer.IDC, + Location: &seedPeer.Location, + Ip: seedPeer.IP, + Port: seedPeer.Port, + DownloadPort: seedPeer.DownloadPort, + ObjectStoragePort: seedPeer.ObjectStoragePort, + State: seedPeer.State, + SeedPeerClusterId: uint64(seedPeer.SeedPeerClusterID), + SeedPeerCluster: &managerv2.SeedPeerCluster{ + Id: uint64(seedPeer.SeedPeerCluster.ID), + Name: seedPeer.SeedPeerCluster.Name, + Bio: seedPeer.SeedPeerCluster.BIO, + }, + }) + } + + // Cache data. + if err := s.cache.Once(&cachev8.Item{ + Ctx: ctx, + Key: cacheKey, + Value: &pbListSeedPeersResponse, + TTL: s.cache.TTL, + }); err != nil { + log.Error(err) + } + + return &pbListSeedPeersResponse, nil +} + // Update SeedPeer configuration. func (s *managerServerV2) UpdateSeedPeer(ctx context.Context, req *managerv2.UpdateSeedPeerRequest) (*managerv2.SeedPeer, error) { log := logger.WithHostnameAndIP(req.Hostname, req.Ip) diff --git a/pkg/redis/redis.go b/pkg/redis/redis.go index f01ad3bfd7f..3b1e248c122 100644 --- a/pkg/redis/redis.go +++ b/pkg/redis/redis.go @@ -105,6 +105,11 @@ func MakePeerKeyInManager(hostname, ip string) string { return MakeKeyInManager(PeersNamespace, fmt.Sprintf("%s-%s", hostname, ip)) } +// MakeSeedPeersKeyForPeerInManager make seed peers key for peer in manager. +func MakeSeedPeersKeyForPeerInManager(hostname, ip string) string { + return MakeKeyInManager(PeersNamespace, fmt.Sprintf("%s-%s:seed-peers", hostname, ip)) +} + // MakeSchedulersKeyForPeerInManager make schedulers key for peer in manager. func MakeSchedulersKeyForPeerInManager(hostname, ip string) string { return MakeKeyInManager(PeersNamespace, fmt.Sprintf("%s-%s:schedulers", hostname, ip)) diff --git a/pkg/redis/redis_test.go b/pkg/redis/redis_test.go index b6833e71e83..68c38f8ee48 100644 --- a/pkg/redis/redis_test.go +++ b/pkg/redis/redis_test.go @@ -280,6 +280,57 @@ func Test_MakePeerKeyInManager(t *testing.T) { } } +func Test_MakeSeedPeersKeyForPeerInManager(t *testing.T) { + tests := []struct { + name string + hostname string + ip string + expect func(t *testing.T, s string) + }{ + { + name: "make seed peer key for peer in manager", + hostname: "bar", + ip: "127.0.0.1", + expect: func(t *testing.T, s string) { + assert := assert.New(t) + assert.Equal(s, "manager:peers:bar-127.0.0.1:seed-peers") + }, + }, + { + name: "hostname is empty", + hostname: "", + ip: "127.0.0.1", + expect: func(t *testing.T, s string) { + assert := assert.New(t) + assert.Equal(s, "manager:peers:-127.0.0.1:seed-peers") + }, + }, + { + name: "ip is empty", + hostname: "bar", + ip: "", + expect: func(t *testing.T, s string) { + assert := assert.New(t) + assert.Equal(s, "manager:peers:bar-:seed-peers") + }, + }, + { + name: "hostname and ip are empty", + hostname: "", + ip: "", + expect: func(t *testing.T, s string) { + assert := assert.New(t) + assert.Equal(s, "manager:peers:-:seed-peers") + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + tc.expect(t, MakeSeedPeersKeyForPeerInManager(tc.hostname, tc.ip)) + }) + } +} + func Test_MakeSchedulersKeyForPeerInManager(t *testing.T) { tests := []struct { name string diff --git a/scheduler/scheduling/scheduling.go b/scheduler/scheduling/scheduling.go index a5a06bbef08..19d88180156 100644 --- a/scheduler/scheduling/scheduling.go +++ b/scheduler/scheduling/scheduling.go @@ -566,7 +566,7 @@ func ConstructSuccessSmallTaskResponse(candidateParent *resource.Peer) *schedule ParentId: &candidateParentPiece.ParentID, Offset: candidateParentPiece.Offset, Length: candidateParentPiece.Length, - TrafficType: candidateParentPiece.TrafficType, + TrafficType: &candidateParentPiece.TrafficType, Cost: durationpb.New(candidateParentPiece.Cost), CreatedAt: timestamppb.New(candidateParentPiece.CreatedAt), } @@ -617,7 +617,7 @@ func ConstructSuccessSmallTaskResponse(candidateParent *resource.Peer) *schedule ParentId: &taskPiece.ParentID, Offset: taskPiece.Offset, Length: taskPiece.Length, - TrafficType: taskPiece.TrafficType, + TrafficType: &taskPiece.TrafficType, Cost: durationpb.New(taskPiece.Cost), CreatedAt: timestamppb.New(taskPiece.CreatedAt), } @@ -741,7 +741,7 @@ func ConstructSuccessNormalTaskResponse(dynconfig config.DynconfigInterface, can ParentId: &candidateParentPiece.ParentID, Offset: candidateParentPiece.Offset, Length: candidateParentPiece.Length, - TrafficType: candidateParentPiece.TrafficType, + TrafficType: &candidateParentPiece.TrafficType, Cost: durationpb.New(candidateParentPiece.Cost), CreatedAt: timestamppb.New(candidateParentPiece.CreatedAt), } @@ -792,7 +792,7 @@ func ConstructSuccessNormalTaskResponse(dynconfig config.DynconfigInterface, can ParentId: &taskPiece.ParentID, Offset: taskPiece.Offset, Length: taskPiece.Length, - TrafficType: taskPiece.TrafficType, + TrafficType: &taskPiece.TrafficType, Cost: durationpb.New(taskPiece.Cost), CreatedAt: timestamppb.New(taskPiece.CreatedAt), } diff --git a/scheduler/scheduling/scheduling_test.go b/scheduler/scheduling/scheduling_test.go index 07f5b2b2769..6bce951e65d 100644 --- a/scheduler/scheduling/scheduling_test.go +++ b/scheduler/scheduling/scheduling_test.go @@ -1286,7 +1286,7 @@ func TestScheduling_ConstructSuccessSmallTaskResponse(t *testing.T) { Offset: mockPiece.Offset, Length: mockPiece.Length, Digest: mockPiece.Digest.String(), - TrafficType: mockPiece.TrafficType, + TrafficType: &mockPiece.TrafficType, Cost: durationpb.New(mockPiece.Cost), CreatedAt: timestamppb.New(mockPiece.CreatedAt), }, @@ -1313,7 +1313,7 @@ func TestScheduling_ConstructSuccessSmallTaskResponse(t *testing.T) { Offset: mockPiece.Offset, Length: mockPiece.Length, Digest: mockPiece.Digest.String(), - TrafficType: mockPiece.TrafficType, + TrafficType: &mockPiece.TrafficType, Cost: durationpb.New(mockPiece.Cost), CreatedAt: timestamppb.New(mockPiece.CreatedAt), }, @@ -1446,7 +1446,7 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) { Offset: mockPiece.Offset, Length: mockPiece.Length, Digest: mockPiece.Digest.String(), - TrafficType: mockPiece.TrafficType, + TrafficType: &mockPiece.TrafficType, Cost: durationpb.New(mockPiece.Cost), CreatedAt: timestamppb.New(mockPiece.CreatedAt), }, @@ -1473,7 +1473,7 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) { Offset: mockPiece.Offset, Length: mockPiece.Length, Digest: mockPiece.Digest.String(), - TrafficType: mockPiece.TrafficType, + TrafficType: &mockPiece.TrafficType, Cost: durationpb.New(mockPiece.Cost), CreatedAt: timestamppb.New(mockPiece.CreatedAt), }, @@ -1580,7 +1580,7 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) { Offset: mockPiece.Offset, Length: mockPiece.Length, Digest: mockPiece.Digest.String(), - TrafficType: mockPiece.TrafficType, + TrafficType: &mockPiece.TrafficType, Cost: durationpb.New(mockPiece.Cost), CreatedAt: timestamppb.New(mockPiece.CreatedAt), }, @@ -1607,7 +1607,7 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) { Offset: mockPiece.Offset, Length: mockPiece.Length, Digest: mockPiece.Digest.String(), - TrafficType: mockPiece.TrafficType, + TrafficType: &mockPiece.TrafficType, Cost: durationpb.New(mockPiece.Cost), CreatedAt: timestamppb.New(mockPiece.CreatedAt), }, diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index 59aef694c6d..87a16354f26 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -231,7 +231,7 @@ func (v *V2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*c ParentId: &piece.ParentID, Offset: piece.Offset, Length: piece.Length, - TrafficType: piece.TrafficType, + TrafficType: &piece.TrafficType, Cost: durationpb.New(piece.Cost), CreatedAt: timestamppb.New(piece.CreatedAt), } @@ -282,7 +282,7 @@ func (v *V2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*c ParentId: &piece.ParentID, Offset: piece.Offset, Length: piece.Length, - TrafficType: piece.TrafficType, + TrafficType: &piece.TrafficType, Cost: durationpb.New(piece.Cost), CreatedAt: timestamppb.New(piece.CreatedAt), } @@ -435,7 +435,7 @@ func (v *V2) StatTask(ctx context.Context, req *schedulerv2.StatTaskRequest) (*c ParentId: &piece.ParentID, Offset: piece.Offset, Length: piece.Length, - TrafficType: piece.TrafficType, + TrafficType: &piece.TrafficType, Cost: durationpb.New(piece.Cost), CreatedAt: timestamppb.New(piece.CreatedAt), } diff --git a/scheduler/service/service_v2_test.go b/scheduler/service/service_v2_test.go index 1583d6631f5..b1aa005512d 100644 --- a/scheduler/service/service_v2_test.go +++ b/scheduler/service/service_v2_test.go @@ -131,7 +131,7 @@ func TestServiceV2_StatPeer(t *testing.T) { Offset: mockPiece.Offset, Length: mockPiece.Length, Digest: mockPiece.Digest.String(), - TrafficType: mockPiece.TrafficType, + TrafficType: &mockPiece.TrafficType, Cost: durationpb.New(mockPiece.Cost), CreatedAt: timestamppb.New(mockPiece.CreatedAt), }, @@ -158,7 +158,7 @@ func TestServiceV2_StatPeer(t *testing.T) { Offset: mockPiece.Offset, Length: mockPiece.Length, Digest: mockPiece.Digest.String(), - TrafficType: mockPiece.TrafficType, + TrafficType: &mockPiece.TrafficType, Cost: durationpb.New(mockPiece.Cost), CreatedAt: timestamppb.New(mockPiece.CreatedAt), }, @@ -384,7 +384,7 @@ func TestServiceV2_StatTask(t *testing.T) { Offset: mockPiece.Offset, Length: mockPiece.Length, Digest: mockPiece.Digest.String(), - TrafficType: mockPiece.TrafficType, + TrafficType: &mockPiece.TrafficType, Cost: durationpb.New(mockPiece.Cost), CreatedAt: timestamppb.New(mockPiece.CreatedAt), }, @@ -3190,7 +3190,7 @@ func TestServiceV2_handleDownloadPieceFinishedRequest(t *testing.T) { Offset: mockPiece.Offset, Length: mockPiece.Length, Digest: "foo", - TrafficType: mockPiece.TrafficType, + TrafficType: &mockPiece.TrafficType, Cost: durationpb.New(mockPiece.Cost), CreatedAt: timestamppb.New(mockPiece.CreatedAt), }, @@ -3209,7 +3209,7 @@ func TestServiceV2_handleDownloadPieceFinishedRequest(t *testing.T) { Offset: mockPiece.Offset, Length: mockPiece.Length, Digest: mockPiece.Digest.String(), - TrafficType: mockPiece.TrafficType, + TrafficType: &mockPiece.TrafficType, Cost: durationpb.New(mockPiece.Cost), CreatedAt: timestamppb.New(mockPiece.CreatedAt), }, @@ -3233,7 +3233,7 @@ func TestServiceV2_handleDownloadPieceFinishedRequest(t *testing.T) { Offset: mockPiece.Offset, Length: mockPiece.Length, Digest: mockPiece.Digest.String(), - TrafficType: mockPiece.TrafficType, + TrafficType: &mockPiece.TrafficType, Cost: durationpb.New(mockPiece.Cost), CreatedAt: timestamppb.New(mockPiece.CreatedAt), }, @@ -3275,7 +3275,7 @@ func TestServiceV2_handleDownloadPieceFinishedRequest(t *testing.T) { Offset: mockPiece.Offset, Length: mockPiece.Length, Digest: mockPiece.Digest.String(), - TrafficType: mockPiece.TrafficType, + TrafficType: &mockPiece.TrafficType, Cost: durationpb.New(mockPiece.Cost), CreatedAt: timestamppb.New(mockPiece.CreatedAt), }, @@ -3349,7 +3349,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFinishedRequest(t *testing.T) Offset: mockPiece.Offset, Length: mockPiece.Length, Digest: "foo", - TrafficType: mockPiece.TrafficType, + TrafficType: &mockPiece.TrafficType, Cost: durationpb.New(mockPiece.Cost), CreatedAt: timestamppb.New(mockPiece.CreatedAt), }, @@ -3368,7 +3368,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFinishedRequest(t *testing.T) Offset: mockPiece.Offset, Length: mockPiece.Length, Digest: mockPiece.Digest.String(), - TrafficType: mockPiece.TrafficType, + TrafficType: &mockPiece.TrafficType, Cost: durationpb.New(mockPiece.Cost), CreatedAt: timestamppb.New(mockPiece.CreatedAt), }, @@ -3392,7 +3392,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFinishedRequest(t *testing.T) Offset: mockPiece.Offset, Length: mockPiece.Length, Digest: mockPiece.Digest.String(), - TrafficType: mockPiece.TrafficType, + TrafficType: &mockPiece.TrafficType, Cost: durationpb.New(mockPiece.Cost), CreatedAt: timestamppb.New(mockPiece.CreatedAt), },