Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement StatPersistentCachePeerRequest and StatPersistentCacheTaskRequest for persistent cache #3603

Merged
merged 1 commit into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions scheduler/resource/persistentcache/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ type Build struct {
// Golang version.
GoVersion string `csv:"goVersion"`

// Rust version.
RustVersion string `csv:"rustVersion"`

// Build platform.
Platform string `csv:"platform"`
}
Expand Down
5 changes: 4 additions & 1 deletion scheduler/resource/persistentcache/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ type Peer struct {
// ID is persistent cache peer id.
ID string

// Persistent is whether the peer is persistent.
Persistent bool

// Pieces is finished pieces bitset.
FinishedPieces *bitset.BitSet

Expand Down Expand Up @@ -91,7 +94,7 @@ type Peer struct {
}

// New persistent cache peer instance.
func NewPeer(id, state string, finishedPieces *bitset.BitSet, blockParents []string, task *Task, host *Host,
func NewPeer(id, state string, persistent bool, finishedPieces *bitset.BitSet, blockParents []string, task *Task, host *Host,
cost time.Duration, createdAt, updatedAt time.Time, log *logger.SugaredLoggerOnWith) *Peer {
p := &Peer{
ID: id,
Expand Down
8 changes: 8 additions & 0 deletions scheduler/resource/persistentcache/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ func (p *peerManager) Load(ctx context.Context, peerID string) (*Peer, bool) {
return nil, false
}

persistent, err := strconv.ParseBool(rawPeer["persistent"])
if err != nil {
log.Errorf("parsing persistent failed: %v", err)
return nil, false
}

finishedPieces := &bitset.BitSet{}
if err := finishedPieces.UnmarshalBinary([]byte(rawPeer["finished_pieces"])); err != nil {
log.Errorf("unmarshal finished pieces failed: %v", err)
Expand Down Expand Up @@ -123,6 +129,7 @@ func (p *peerManager) Load(ctx context.Context, peerID string) (*Peer, bool) {
return NewPeer(
rawPeer["id"],
rawPeer["state"],
persistent,
finishedPieces,
blockParents,
task,
Expand Down Expand Up @@ -153,6 +160,7 @@ func (p *peerManager) Store(ctx context.Context, peer *Peer) error {
pipe.HSet(ctx,
pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peer.ID),
"id", peer.ID,
"persistent", peer.Persistent,
"finished_pieces", finishedPieces,
"state", peer.FSM.Current(),
"block_parents", blockParents,
Expand Down
8 changes: 5 additions & 3 deletions scheduler/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,24 @@ import (

"d7y.io/dragonfly/v2/pkg/rpc/scheduler/server"
"d7y.io/dragonfly/v2/scheduler/config"
resource "d7y.io/dragonfly/v2/scheduler/resource/standard"
"d7y.io/dragonfly/v2/scheduler/resource/persistentcache"
"d7y.io/dragonfly/v2/scheduler/resource/standard"
"d7y.io/dragonfly/v2/scheduler/scheduling"
"d7y.io/dragonfly/v2/scheduler/storage"
)

// New returns a new scheduler server from the given options.
func New(
cfg *config.Config,
resource resource.Resource,
resource standard.Resource,
persistentCacheResource persistentcache.Resource,
scheduling scheduling.Scheduling,
dynconfig config.DynconfigInterface,
storage storage.Storage,
opts ...grpc.ServerOption,
) *grpc.Server {
return server.New(
newSchedulerServerV1(cfg, resource, scheduling, dynconfig, storage),
newSchedulerServerV2(cfg, resource, scheduling, dynconfig, storage),
newSchedulerServerV2(cfg, resource, persistentCacheResource, scheduling, dynconfig, storage),
opts...)
}
8 changes: 5 additions & 3 deletions scheduler/rpcserver/rpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import (

"d7y.io/dragonfly/v2/scheduler/config"
configmocks "d7y.io/dragonfly/v2/scheduler/config/mocks"
resource "d7y.io/dragonfly/v2/scheduler/resource/standard"
"d7y.io/dragonfly/v2/scheduler/resource/persistentcache"
"d7y.io/dragonfly/v2/scheduler/resource/standard"
"d7y.io/dragonfly/v2/scheduler/scheduling/mocks"
storagemocks "d7y.io/dragonfly/v2/scheduler/storage/mocks"
)
Expand Down Expand Up @@ -59,11 +60,12 @@ func TestRPCServer_New(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
scheduling := mocks.NewMockScheduling(ctl)
res := resource.NewMockResource(ctl)
resource := standard.NewMockResource(ctl)
persistentCacheResource := persistentcache.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)

svr := New(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
svr := New(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig, storage)
tc.expect(t, svr)
})
}
Expand Down
9 changes: 5 additions & 4 deletions scheduler/rpcserver/scheduler_server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ import (

"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/metrics"
resource "d7y.io/dragonfly/v2/scheduler/resource/standard"
"d7y.io/dragonfly/v2/scheduler/resource/persistentcache"
"d7y.io/dragonfly/v2/scheduler/resource/standard"
"d7y.io/dragonfly/v2/scheduler/scheduling"
"d7y.io/dragonfly/v2/scheduler/service"
"d7y.io/dragonfly/v2/scheduler/storage"
)

// TODO Implement v2 version of the rpc server apis.
// schedulerServerV2 is v2 version of the scheduler grpc server.
type schedulerServerV2 struct {
// Service interface.
Expand All @@ -42,12 +42,13 @@ type schedulerServerV2 struct {
// newSchedulerServerV2 returns v2 version of the scheduler server.
func newSchedulerServerV2(
cfg *config.Config,
resource resource.Resource,
resource standard.Resource,
persistentCacheResource persistentcache.Resource,
scheduling scheduling.Scheduling,
dynconfig config.DynconfigInterface,
storage storage.Storage,
) schedulerv2.SchedulerServer {
return &schedulerServerV2{service.NewV2(cfg, resource, scheduling, dynconfig, storage)}
return &schedulerServerV2{service.NewV2(cfg, resource, persistentCacheResource, scheduling, dynconfig, storage)}
}

// AnnouncePeer announces peer to scheduler.
Expand Down
6 changes: 3 additions & 3 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ import (
"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/job"
"d7y.io/dragonfly/v2/scheduler/metrics"
persistentcache "d7y.io/dragonfly/v2/scheduler/resource/persistentcache"
standard "d7y.io/dragonfly/v2/scheduler/resource/standard"
"d7y.io/dragonfly/v2/scheduler/resource/persistentcache"
"d7y.io/dragonfly/v2/scheduler/resource/standard"
"d7y.io/dragonfly/v2/scheduler/rpcserver"
"d7y.io/dragonfly/v2/scheduler/scheduling"
"d7y.io/dragonfly/v2/scheduler/storage"
Expand Down Expand Up @@ -212,7 +212,7 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err
schedulerServerOptions = append(schedulerServerOptions, grpc.Creds(rpc.NewInsecureCredentials()))
}

svr := rpcserver.New(cfg, resource, scheduling, dynconfig, s.storage, schedulerServerOptions...)
svr := rpcserver.New(cfg, resource, s.persistentCacheResource, scheduling, dynconfig, s.storage, schedulerServerOptions...)
s.grpcServer = svr

// Initialize metrics.
Expand Down
8 changes: 4 additions & 4 deletions scheduler/service/service_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,7 @@ func (v *V1) triggerSeedPeerTask(ctx context.Context, rg *http.Range, task *reso
}

// storeTask stores a new task or reuses a previous task.
func (v *V1) storeTask(ctx context.Context, req *schedulerv1.PeerTaskRequest, typ commonv2.TaskType) *resource.Task {
func (v *V1) storeTask(_ context.Context, req *schedulerv1.PeerTaskRequest, typ commonv2.TaskType) *resource.Task {
filteredQueryParams := strings.Split(req.UrlMeta.GetFilter(), idgen.FilteredQueryParamsSeparator)

task, loaded := v.resource.TaskManager().Load(req.GetTaskId())
Expand All @@ -834,7 +834,7 @@ func (v *V1) storeTask(ctx context.Context, req *schedulerv1.PeerTaskRequest, ty
}

// storeHost stores a new host or reuses a previous host.
func (v *V1) storeHost(ctx context.Context, peerHost *schedulerv1.PeerHost) *resource.Host {
func (v *V1) storeHost(_ context.Context, peerHost *schedulerv1.PeerHost) *resource.Host {
host, loaded := v.resource.HostManager().Load(peerHost.Id)
if !loaded {
options := []resource.HostOption{resource.WithNetwork(resource.Network{
Expand Down Expand Up @@ -866,7 +866,7 @@ func (v *V1) storeHost(ctx context.Context, peerHost *schedulerv1.PeerHost) *res
}

// storePeer stores a new peer or reuses a previous peer.
func (v *V1) storePeer(ctx context.Context, id string, priority commonv1.Priority, rg string, task *resource.Task, host *resource.Host) *resource.Peer {
func (v *V1) storePeer(_ context.Context, id string, priority commonv1.Priority, rg string, task *resource.Task, host *resource.Host) *resource.Peer {
peer, loaded := v.resource.PeerManager().Load(id)
if !loaded {
options := []resource.PeerOption{}
Expand Down Expand Up @@ -1057,7 +1057,7 @@ func (v *V1) handleBeginOfPiece(ctx context.Context, peer *resource.Peer) {
func (v *V1) handleEndOfPiece(ctx context.Context, peer *resource.Peer) {}

// handlePieceSuccess handles successful piece.
func (v *V1) handlePieceSuccess(ctx context.Context, peer *resource.Peer, pieceResult *schedulerv1.PieceResult) {
func (v *V1) handlePieceSuccess(_ context.Context, peer *resource.Peer, pieceResult *schedulerv1.PieceResult) {
// Distinguish traffic type.
trafficType := commonv2.TrafficType_REMOTE_PEER
if resource.IsPieceBackToSource(pieceResult.DstPid) {
Expand Down
Loading
Loading