Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: when the redis is disabled, AnnounceHost need to skip store redis #3712

Merged
merged 1 commit into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,12 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err
}

// Initialize persistent cache resource.
s.persistentCacheResource, err = persistentcache.New(cfg, s.gc, rdb, peerClientTransportCredentials)
if err != nil {
logger.Errorf("failed to create persistent cache resource: %v", err)
return nil, err
if rdb != nil {
s.persistentCacheResource, err = persistentcache.New(cfg, s.gc, rdb, peerClientTransportCredentials)
if err != nil {
logger.Errorf("failed to create persistent cache resource: %v", err)
return nil, err
}
}

// Initialize job service.
Expand Down
39 changes: 38 additions & 1 deletion scheduler/service/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,12 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
}
}

// Handle the persistent cache host.
// Handle the persistent cache host. If redis is not enabled,
// it will not support the persistent cache feature.
if v.persistentCacheResource == nil {
return nil
}

persistentCacheHost, loaded := v.persistentCacheResource.HostManager().Load(ctx, req.Host.GetId())
if !loaded {
options := []persistentcache.HostOption{}
Expand Down Expand Up @@ -1573,11 +1578,19 @@ func (v *V2) downloadTaskBySeedPeer(ctx context.Context, taskID string, download
// TODO Implement the following methods.
// AnnouncePersistentCachePeer announces persistent cache peer to scheduler.
func (v *V2) AnnouncePersistentCachePeer(stream schedulerv2.Scheduler_AnnouncePersistentCachePeerServer) error {
if v.persistentCacheResource == nil {
return status.Error(codes.FailedPrecondition, "redis is not enabled")
}

return nil
}

// StatPersistentCachePeer checks information of persistent cache peer.
func (v *V2) StatPersistentCachePeer(ctx context.Context, req *schedulerv2.StatPersistentCachePeerRequest) (*commonv2.PersistentCachePeer, error) {
if v.persistentCacheResource == nil {
return nil, status.Error(codes.FailedPrecondition, "redis is not enabled")
}

log := logger.WithPeer(req.HostId, req.TaskId, req.PeerId)
peer, loaded := v.persistentCacheResource.PeerManager().Load(ctx, req.GetPeerId())
if !loaded {
Expand Down Expand Up @@ -1694,6 +1707,10 @@ func (v *V2) StatPersistentCachePeer(ctx context.Context, req *schedulerv2.StatP

// DeletePersistentCachePeer releases persistent cache peer in scheduler.
func (v *V2) DeletePersistentCachePeer(ctx context.Context, req *schedulerv2.DeletePersistentCachePeerRequest) error {
if v.persistentCacheResource == nil {
return status.Error(codes.FailedPrecondition, "redis is not enabled")
}

log := logger.WithPeer(req.GetHostId(), req.GetTaskId(), req.GetPeerId())
if err := v.persistentCacheResource.PeerManager().Delete(ctx, req.GetPeerId()); err != nil {
log.Errorf("delete persistent cache peer %s error %s", req.GetPeerId(), err)
Expand All @@ -1707,6 +1724,10 @@ func (v *V2) DeletePersistentCachePeer(ctx context.Context, req *schedulerv2.Del

// UploadPersistentCacheTaskStarted uploads the metadata of the persistent cache task started.
func (v *V2) UploadPersistentCacheTaskStarted(ctx context.Context, req *schedulerv2.UploadPersistentCacheTaskStartedRequest) error {
if v.persistentCacheResource == nil {
return status.Error(codes.FailedPrecondition, "redis is not enabled")
}

log := logger.WithPeer(req.GetHostId(), req.GetTaskId(), req.GetPeerId())
host, loaded := v.persistentCacheResource.HostManager().Load(ctx, req.GetHostId())
if !loaded {
Expand Down Expand Up @@ -1764,6 +1785,10 @@ func (v *V2) UploadPersistentCacheTaskStarted(ctx context.Context, req *schedule

// UploadPersistentCacheTaskFinished uploads the metadata of the persistent cache task finished.
func (v *V2) UploadPersistentCacheTaskFinished(ctx context.Context, req *schedulerv2.UploadPersistentCacheTaskFinishedRequest) (*commonv2.PersistentCacheTask, error) {
if v.persistentCacheResource == nil {
return nil, status.Error(codes.FailedPrecondition, "redis is not enabled")
}

log := logger.WithPeer(req.GetHostId(), req.GetTaskId(), req.GetPeerId())
// Handle peer with task finished request, load peer and update it.
peer, loaded := v.persistentCacheResource.PeerManager().Load(ctx, req.GetPeerId())
Expand Down Expand Up @@ -1830,6 +1855,10 @@ func (v *V2) UploadPersistentCacheTaskFinished(ctx context.Context, req *schedul

// UploadPersistentCacheTaskFailed uploads the metadata of the persistent cache task failed.
func (v *V2) UploadPersistentCacheTaskFailed(ctx context.Context, req *schedulerv2.UploadPersistentCacheTaskFailedRequest) error {
if v.persistentCacheResource == nil {
return status.Error(codes.FailedPrecondition, "redis is not enabled")
}

log := logger.WithPeer(req.GetHostId(), req.GetTaskId(), req.GetPeerId())
// Handle peer with task failed request, load peer and update it.
peer, loaded := v.persistentCacheResource.PeerManager().Load(ctx, req.GetPeerId())
Expand Down Expand Up @@ -1866,6 +1895,10 @@ func (v *V2) UploadPersistentCacheTaskFailed(ctx context.Context, req *scheduler

// StatPersistentCacheTask checks information of persistent cache task.
func (v *V2) StatPersistentCacheTask(ctx context.Context, req *schedulerv2.StatPersistentCacheTaskRequest) (*commonv2.PersistentCacheTask, error) {
if v.persistentCacheResource == nil {
return nil, status.Error(codes.FailedPrecondition, "redis is not enabled")
}

log := logger.WithHostAndTaskID(req.GetHostId(), req.GetTaskId())
task, loaded := v.persistentCacheResource.TaskManager().Load(ctx, req.GetTaskId())
if !loaded {
Expand Down Expand Up @@ -1904,6 +1937,10 @@ func (v *V2) StatPersistentCacheTask(ctx context.Context, req *schedulerv2.StatP

// DeletePersistentCacheTask releases persistent cache task in scheduler.
func (v *V2) DeletePersistentCacheTask(ctx context.Context, req *schedulerv2.DeletePersistentCacheTaskRequest) error {
if v.persistentCacheResource == nil {
return status.Error(codes.FailedPrecondition, "redis is not enabled")
}

log := logger.WithHostAndTaskID(req.GetHostId(), req.GetTaskId())
if err := v.persistentCacheResource.PeerManager().DeleteAllByTaskID(ctx, req.GetTaskId()); err != nil {
log.Errorf("delete persistent cache peers by task %s error %s", req.GetTaskId(), err)
Expand Down
Loading