diff --git a/cmd/root.go b/cmd/root.go index 82d3e88..72b82a6 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -297,10 +297,11 @@ func mustInitializeQueueBackend() { queueBackend, err = backendfactory.NewBackend(logger, backendconfig.Config{ BackendType: cmdOpts.Backend, Redis: &backendconfig.RedisConfig{ - KeyPrefix: cmdOpts.Redis.KeyPrefix, - Client: cmdOpts.Redis.NewClient(), - Backoff: cmdOpts.Redis.Backoff, - ChunkSizeInGet: cmdOpts.Redis.ChunkSizeInGet, + KeyPrefix: cmdOpts.Redis.KeyPrefix, + Client: cmdOpts.Redis.NewClient(), + Backoff: cmdOpts.Redis.Backoff, + ChunkSizeInGet: cmdOpts.Redis.ChunkSizeInGet, + ChunkSizeInDelete: cmdOpts.Redis.ChunkSizeInDelete, }, }) diff --git a/pkg/backend/config/config.go b/pkg/backend/config/config.go index 1d148ff..66e18ba 100644 --- a/pkg/backend/config/config.go +++ b/pkg/backend/config/config.go @@ -30,10 +30,11 @@ type Config struct { } type RedisConfig struct { - KeyPrefix string - Client *redis.Client - Backoff BackoffConfig - ChunkSizeInGet int + KeyPrefix string + Client *redis.Client + Backoff BackoffConfig + ChunkSizeInGet int + ChunkSizeInDelete int } // TODO: support UniversalOptions @@ -52,7 +53,8 @@ type RedisClientConfig struct { IdleTimeout time.Duration `json:"idleTimeout" yaml:"idleTimeout" default:"5m"` IdleCheckFrequency time.Duration `json:"idleCheckFrequency" yaml:"idleCheckFrequency" default:"1m"` - ChunkSizeInGet int `json:"chunkSizeInGet" yaml:"chunkSizeInGet" default:"10000"` + ChunkSizeInGet int `json:"chunkSizeInGet" yaml:"chunkSizeInGet" default:"10000"` + ChunkSizeInDelete int `json:"chunkSizeInDelete" yaml:"chunkSizeInGet" default:"1000"` } func (c RedisClientConfig) NewClient() *redis.Client { diff --git a/pkg/backend/redis/queue.go b/pkg/backend/redis/queue.go index 44ea134..f9873b3 100644 --- a/pkg/backend/redis/queue.go +++ b/pkg/backend/redis/queue.go @@ -218,7 +218,7 @@ func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error { // .. task_keys = collect task keys // WATCh task_keys // MULTI - // DEL {queue_key} worker_keys task_keys + // UNLINK {queue_key} worker_keys task_keys // HDEL {all_queues_key} {queueName} // EXEC txf := func(tx *redis.Tx) error { @@ -240,8 +240,16 @@ func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error { tx.Watch(taskKeysToDelete...) keysToDelete = append(keysToDelete, taskKeysToDelete...) + chunkSize := b.ChunkSizeInGet + numOfKeysToDelete := len(keysToDelete) _, err = tx.TxPipelined(func(pipe redis.Pipeliner) error { - pipe.Del(keysToDelete...) + for begin := 0; begin < numOfKeysToDelete; begin += chunkSize { + end := begin + chunkSize + if end > numOfKeysToDelete { + end = numOfKeysToDelete + } + pipe.Unlink(keysToDelete[begin:end]...) + } pipe.HDel(b.allQueuesKey(), queue.Spec.Name) return nil }) diff --git a/pkg/backend/redis/redis_test.go b/pkg/backend/redis/redis_test.go index 36ce2c5..d097d55 100644 --- a/pkg/backend/redis/redis_test.go +++ b/pkg/backend/redis/redis_test.go @@ -107,10 +107,11 @@ var _ = Describe("Backend", func() { ibackend, err := NewBackend(logger, backendconfig.Config{ BackendType: "redis", Redis: &backendconfig.RedisConfig{ - KeyPrefix: "test", - Client: client, - Backoff: backoffConfig, - ChunkSizeInGet: 1000, + KeyPrefix: "test", + Client: client, + Backoff: backoffConfig, + ChunkSizeInGet: 1000, + ChunkSizeInDelete: 1000, }, }) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/backend/redis/task.go b/pkg/backend/redis/task.go index 295598a..e90a6a2 100644 --- a/pkg/backend/redis/task.go +++ b/pkg/backend/redis/task.go @@ -38,11 +38,11 @@ import ( ) const ( - KB = 1 << 10 - PayloadMaxSizeInKB = 1 - MessageMaxSizeInKB = 1 - HistoryLengthMax = 10 - MaxNameLength = 1024 + KB = 1 << 10 + PayloadMaxSizeInKB = 1 + MessageMaxSizeInKB = 1 + HistoryLengthMax = 10 + MaxNameLength = 1024 ) func (b *Backend) ensureQueueAndWorkerExists(queueUID, workerUID uuid.UUID) (*taskqueue.TaskQueue, *worker.Worker, error) { diff --git a/pkg/worker/worker_test.go b/pkg/worker/worker_test.go index e3b67fc..5f32855 100644 --- a/pkg/worker/worker_test.go +++ b/pkg/worker/worker_test.go @@ -74,9 +74,10 @@ var _ = Describe("Worker", func() { bcknd, err = backendfactory.NewBackend(logger, backendconfig.Config{ BackendType: "redis", Redis: &backendconfig.RedisConfig{ - Client: client, - Backoff: backendConfig, - ChunkSizeInGet: 1000, + Client: client, + Backoff: backendConfig, + ChunkSizeInGet: 1000, + ChunkSizeInDelete: 1000, }, }) Expect(err).NotTo(HaveOccurred())