diff --git a/cmd/pipecd/BUILD.bazel b/cmd/pipecd/BUILD.bazel index 5c8368b9b4..213427af08 100644 --- a/cmd/pipecd/BUILD.bazel +++ b/cmd/pipecd/BUILD.bazel @@ -22,6 +22,7 @@ go_library( "//pkg/app/api/stagelogstore:go_default_library", "//pkg/app/ops/handler:go_default_library", "//pkg/app/ops/insightcollector:go_default_library", + "//pkg/app/ops/orphancommandcleaner:go_default_library", "//pkg/backoff:go_default_library", "//pkg/cache/rediscache:go_default_library", "//pkg/cli:go_default_library", diff --git a/cmd/pipecd/ops.go b/cmd/pipecd/ops.go index 4e341e3d56..8b991b35f9 100644 --- a/cmd/pipecd/ops.go +++ b/cmd/pipecd/ops.go @@ -27,6 +27,7 @@ import ( "github.com/pipe-cd/pipe/pkg/admin" "github.com/pipe-cd/pipe/pkg/app/ops/handler" "github.com/pipe-cd/pipe/pkg/app/ops/insightcollector" + "github.com/pipe-cd/pipe/pkg/app/ops/orphancommandcleaner" "github.com/pipe-cd/pipe/pkg/backoff" "github.com/pipe-cd/pipe/pkg/cli" "github.com/pipe-cd/pipe/pkg/datastore" @@ -98,6 +99,12 @@ func (s *ops) run(ctx context.Context, t cli.Telemetry) error { } }() + // Starting orphan commands cleaner + cleaner := orphancommandcleaner.NewOrphanCommandCleaner(ds, t.Logger) + group.Go(func() error { + return cleaner.Run(ctx) + }) + // Starting a cron job for insight collector. if s.enableInsightCollector { collector := insightcollector.NewInsightCollector(ds, fs, t.Logger) diff --git a/pkg/app/ops/orphancommandcleaner/BUILD.bazel b/pkg/app/ops/orphancommandcleaner/BUILD.bazel new file mode 100644 index 0000000000..8f2390615e --- /dev/null +++ b/pkg/app/ops/orphancommandcleaner/BUILD.bazel @@ -0,0 +1,13 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["orphancommandcleaner.go"], + importpath = "github.com/pipe-cd/pipe/pkg/app/ops/orphancommandcleaner", + visibility = ["//visibility:public"], + deps = [ + "//pkg/datastore:go_default_library", + "//pkg/model:go_default_library", + "@org_uber_go_zap//:go_default_library", + ], +) diff --git a/pkg/app/ops/orphancommandcleaner/orphancommandcleaner.go b/pkg/app/ops/orphancommandcleaner/orphancommandcleaner.go new file mode 100644 index 0000000000..1afce08713 --- /dev/null +++ b/pkg/app/ops/orphancommandcleaner/orphancommandcleaner.go @@ -0,0 +1,79 @@ +package orphancommandcleaner + +import ( + "context" + "time" + + "go.uber.org/zap" + + "github.com/pipe-cd/pipe/pkg/datastore" + "github.com/pipe-cd/pipe/pkg/model" +) + +var ( + commandTimeOut = 24 * time.Hour + interval = 6 * time.Hour +) + +type OrphanCommandCleaner struct { + commandstore datastore.CommandStore + logger *zap.Logger +} + +func NewOrphanCommandCleaner( + ds datastore.DataStore, + logger *zap.Logger, +) *OrphanCommandCleaner { + return &OrphanCommandCleaner{ + commandstore: datastore.NewCommandStore(ds), + logger: logger.Named("orphan-command-cleaner"), + } +} + +func (c *OrphanCommandCleaner) Run(ctx context.Context) error { + t := time.NewTicker(interval) + for { + select { + case <-ctx.Done(): + return nil + case <-t.C: + start := time.Now() + if err := c.updateOrphanCommandsStatus(ctx); err == nil { + c.logger.Info("successfully update orphan commands status", zap.Duration("duration", time.Since(start))) + } + } + } +} + +func (c *OrphanCommandCleaner) updateOrphanCommandsStatus(ctx context.Context) error { + timeout := time.Now().Add(-commandTimeOut).Unix() + opts := datastore.ListOptions{ + Filters: []datastore.ListFilter{ + { + Field: "Status", + Operator: "==", + Value: model.CommandStatus_COMMAND_NOT_HANDLED_YET, + }, + { + Field: "CreatedAt", + Operator: "<=", + Value: timeout, + }, + }, + } + commands, err := c.commandstore.ListCommands(ctx, opts) + if err != nil { + return err + } + + for _, command := range commands { + if err := c.commandstore.UpdateCommand(ctx, command.Id, func(cmd *model.Command) error { + cmd.Status = model.CommandStatus_COMMAND_TIMEOUT + return nil + }); err != nil { + c.logger.Error("failed to update orphan command", zap.Error(err), zap.String("id", command.Id), zap.String("type", command.Type.String())) + } + } + + return nil +}