Skip to content

feat: delete jobs in batches #3682

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions manager/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,9 @@ type GCConfig struct {

// TTL is the ttl for job.
TTL time.Duration `yaml:"ttl" mapstructure:"ttl"`

// BatchSize is the batch size when operating gorm database.
BatchSize int `yaml:"batchSize" mapstructure:"batchSize"`
}

type PreheatConfig struct {
Expand Down Expand Up @@ -442,8 +445,9 @@ func New() *Config {
},
Job: JobConfig{
GC: GCConfig{
Interval: DefaultJobGCInterval,
TTL: DefaultJobGCTTL,
Interval: DefaultJobGCInterval,
TTL: DefaultJobGCTTL,
BatchSize: DefaultJobGCBatchSize,
},
Preheat: PreheatConfig{
RegistryTimeout: DefaultJobPreheatRegistryTimeout,
Expand Down Expand Up @@ -629,6 +633,10 @@ func (cfg *Config) Validate() error {
return errors.New("gc requires parameter ttl")
}

if cfg.Job.GC.BatchSize == 0 {
return errors.New("gc requires parameter batchSize")
}

if cfg.Job.Preheat.RegistryTimeout == 0 {
return errors.New("preheat requires parameter registryTimeout")
}
Expand Down
20 changes: 18 additions & 2 deletions manager/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,9 @@ func TestConfig_Load(t *testing.T) {
},
Job: JobConfig{
GC: GCConfig{
Interval: 1 * time.Second,
TTL: 1 * time.Second,
Interval: 1 * time.Second,
TTL: 1 * time.Second,
BatchSize: 100,
},
Preheat: PreheatConfig{
RegistryTimeout: DefaultJobPreheatRegistryTimeout,
Expand Down Expand Up @@ -765,6 +766,21 @@ func TestConfig_Validate(t *testing.T) {
assert.EqualError(err, "gc requires parameter ttl")
},
},
{
name: "gc requires parameter batchSize",
config: New(),
mock: func(cfg *Config) {
cfg.Auth.JWT = mockJWTConfig
cfg.Database.Type = DatabaseTypeMysql
cfg.Database.Mysql = mockMysqlConfig
cfg.Database.Redis = mockRedisConfig
cfg.Job.GC.BatchSize = 0
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.EqualError(err, "gc requires parameter batchSize")
},
},
{
name: "preheat requires parameter registryTimeout",
config: New(),
Expand Down
3 changes: 3 additions & 0 deletions manager/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ const (
// DefaultJobGCTTL is the default ttl for job.
DefaultJobGCTTL = 12 * time.Hour

// DefaultJobGCBatchSize is the default batch size for operating on the database in gc job.
DefaultJobGCBatchSize = 5000

// DefaultJobPreheatRegistryTimeout is the default timeout for requesting registry to get token and manifest.
DefaultJobPreheatRegistryTimeout = 1 * time.Minute

Expand Down
1 change: 1 addition & 0 deletions manager/config/testdata/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ job:
gc:
interval: 1s
ttl: 1s
batchSize: 100
preheat:
registryTimeout: 1m
tls:
Expand Down
20 changes: 19 additions & 1 deletion manager/job/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (gc *gc) Serve() {
select {
case <-tick.C:
logger.Infof("gc job started")
if err := gc.db.WithContext(context.Background()).Where("created_at < ?", time.Now().Add(-gc.config.Job.GC.TTL)).Unscoped().Delete(&models.Job{}).Error; err != nil {
if err := gc.deleteInBatches(context.Background()); err != nil {
logger.Errorf("gc job failed: %v", err)
}
case <-gc.done:
Expand All @@ -74,3 +74,21 @@ func (gc *gc) Serve() {
func (gc *gc) Stop() {
close(gc.done)
}

// deleteInBatches deletes jobs in batches.
func (gc *gc) deleteInBatches(ctx context.Context) error {
for {
result := gc.db.WithContext(ctx).Where("created_at < ?", time.Now().Add(-gc.config.Job.GC.TTL)).Limit(gc.config.Job.GC.BatchSize).Unscoped().Delete(&models.Job{})
if result.Error != nil {
return result.Error
}

if result.RowsAffected == 0 {
break
}

logger.Infof("gc job deleted %d jobs", result.RowsAffected)
}

return nil
}
Loading