Skip to content
1 change: 1 addition & 0 deletions cmd/pipecd/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"//pkg/app/api/stagelogstore:go_default_library",
"//pkg/app/ops/handler:go_default_library",
"//pkg/app/ops/insightcollector:go_default_library",
"//pkg/app/ops/orphancommandcleaner:go_default_library",
"//pkg/backoff:go_default_library",
"//pkg/cache/rediscache:go_default_library",
"//pkg/cli:go_default_library",
Expand Down
7 changes: 7 additions & 0 deletions cmd/pipecd/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pipe-cd/pipe/pkg/admin"
"github.com/pipe-cd/pipe/pkg/app/ops/handler"
"github.com/pipe-cd/pipe/pkg/app/ops/insightcollector"
"github.com/pipe-cd/pipe/pkg/app/ops/orphancommandcleaner"
"github.com/pipe-cd/pipe/pkg/backoff"
"github.com/pipe-cd/pipe/pkg/cli"
"github.com/pipe-cd/pipe/pkg/datastore"
Expand Down Expand Up @@ -98,6 +99,12 @@ func (s *ops) run(ctx context.Context, t cli.Telemetry) error {
}
}()

// Starting orphan commands cleaner
cleaner := orphancommandcleaner.NewOrphanCommandCleaner(ds, t.Logger)
group.Go(func() error {
return cleaner.Run(ctx)
})

// Starting a cron job for insight collector.
if s.enableInsightCollector {
collector := insightcollector.NewInsightCollector(ds, fs, t.Logger)
Expand Down
13 changes: 13 additions & 0 deletions pkg/app/ops/orphancommandcleaner/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "go_default_library",
srcs = ["orphancommandcleaner.go"],
importpath = "github.com/pipe-cd/pipe/pkg/app/ops/orphancommandcleaner",
visibility = ["//visibility:public"],
deps = [
"//pkg/datastore:go_default_library",
"//pkg/model:go_default_library",
"@org_uber_go_zap//:go_default_library",
],
)
79 changes: 79 additions & 0 deletions pkg/app/ops/orphancommandcleaner/orphancommandcleaner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package orphancommandcleaner

import (
"context"
"time"

"go.uber.org/zap"

"github.com/pipe-cd/pipe/pkg/datastore"
"github.com/pipe-cd/pipe/pkg/model"
)

var (
commandTimeOut = 24 * time.Hour
interval = 6 * time.Hour
)

type OrphanCommandCleaner struct {
commandstore datastore.CommandStore
logger *zap.Logger
}

func NewOrphanCommandCleaner(
ds datastore.DataStore,
logger *zap.Logger,
) *OrphanCommandCleaner {
return &OrphanCommandCleaner{
commandstore: datastore.NewCommandStore(ds),
logger: logger.Named("orphan-command-cleaner"),
}
}

func (c *OrphanCommandCleaner) Run(ctx context.Context) error {
t := time.NewTicker(interval)
for {
select {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check the context.Done to exit when ops was terminated.

for {
    select {
          case <- ctx.Done():
                 return nil
         case <- t.C:
             ...
}
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, thanks

case <-ctx.Done():
return nil
case <-t.C:
start := time.Now()
if err := c.updateOrphanCommandsStatus(ctx); err == nil {
c.logger.Info("successfully update orphan commands status", zap.Duration("duration", time.Since(start)))
}
}
}
}

func (c *OrphanCommandCleaner) updateOrphanCommandsStatus(ctx context.Context) error {
timeout := time.Now().Add(-commandTimeOut).Unix()
opts := datastore.ListOptions{
Filters: []datastore.ListFilter{
{
Field: "Status",
Operator: "==",
Value: model.CommandStatus_COMMAND_NOT_HANDLED_YET,
},
{
Field: "CreatedAt",
Operator: "<=",
Value: timeout,
},
},
}
commands, err := c.commandstore.ListCommands(ctx, opts)
if err != nil {
return err
}

for _, command := range commands {
if err := c.commandstore.UpdateCommand(ctx, command.Id, func(cmd *model.Command) error {
cmd.Status = model.CommandStatus_COMMAND_TIMEOUT
return nil
}); err != nil {
c.logger.Error("failed to update orphan command", zap.Error(err), zap.String("id", command.Id), zap.String("type", command.Type.String()))
}
}

return nil
}