Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ genrule(
# gazelle:exclude pkg/model/notificationevent.pb.validate.go
# gazelle:exclude pkg/model/piped.pb.validate.go
# gazelle:exclude pkg/model/piped_stats.pb.validate.go
# gazelle:exclude pkg/model/planpreview.pb.validate.go
# gazelle:exclude pkg/model/project.pb.validate.go
# gazelle:exclude pkg/model/role.pb.validate.go
# gazelle:exclude pkg/model/user.pb.validate.go
Expand Down
2 changes: 1 addition & 1 deletion cmd/pipecd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (s *server) run(ctx context.Context, t cli.Telemetry) error {
datastore.NewAPIKeyStore(ds),
t.Logger,
)
service = grpcapi.NewAPI(ds, cmds, cmdOutputStore, t.Logger)
service = grpcapi.NewAPI(ds, cmds, cmdOutputStore, cfg.Address, t.Logger)
opts = []rpc.Option{
rpc.WithPort(s.apiPort),
rpc.WithGracePeriod(s.gracePeriod),
Expand Down
46 changes: 40 additions & 6 deletions pkg/app/api/grpcapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,16 @@ type API struct {
commandStore commandstore.Store
commandOutputGetter commandOutputGetter

logger *zap.Logger
webBaseURL string
logger *zap.Logger
}

// NewAPI creates a new API instance.
func NewAPI(
ds datastore.DataStore,
cmds commandstore.Store,
cog commandOutputGetter,
webBaseURL string,
logger *zap.Logger,
) *API {
a := &API{
Expand All @@ -62,6 +64,7 @@ func NewAPI(
eventStore: datastore.NewEventStore(ds),
commandStore: cmds,
commandOutputGetter: cog,
webBaseURL: webBaseURL,
logger: logger.Named("api"),
}
return a
Expand Down Expand Up @@ -437,7 +440,20 @@ func (a *API) GetPlanPreviewResults(ctx context.Context, req *apiservice.GetPlan
return nil, err
}

const freshDuration = 24 * time.Hour
const (
freshDuration = 24 * time.Hour
defaultCommandHandleTimeout = 5 * time.Minute
)

var (
handledCommands = make([]string, 0, len(req.Commands))
results = make([]*model.PlanPreviewCommandResult, 0, len(req.Commands))
)

commandHandleTimeout := time.Duration(req.CommandHandleTimeout) * time.Second
if commandHandleTimeout == 0 {
commandHandleTimeout = defaultCommandHandleTimeout
}

// Validate based on command model stored in datastore.
for _, commandID := range req.Commands {
Expand All @@ -456,24 +472,35 @@ func (a *API) GetPlanPreviewResults(ctx context.Context, req *apiservice.GetPlan
if cmd.Type != model.Command_BUILD_PLAN_PREVIEW {
return nil, status.Error(codes.FailedPrecondition, fmt.Sprint("Command %s is not a plan preview command", commandID))
}

if !cmd.IsHandled() {
return nil, status.Error(codes.FailedPrecondition, fmt.Sprintf("Command %s is not completed yet", commandID))
if time.Since(time.Unix(cmd.CreatedAt, 0)) <= commandHandleTimeout {
return nil, status.Error(codes.NotFound, fmt.Sprintf("No command ouput for command %d because it is not completed yet", commandID))
}
results = append(results, &model.PlanPreviewCommandResult{
CommandId: cmd.Id,
PipedId: cmd.PipedId,
Error: fmt.Sprintf("Timed out, maybe the Piped is offline currently."),
})
continue
}

// There is no reason to fetch output data of command that has been completed a long time ago.
// So in order to prevent unintended actions, we disallow that ability.
if time.Since(time.Unix(cmd.HandledAt, 0)) > freshDuration {
return nil, status.Error(codes.FailedPrecondition, fmt.Sprintf("The output data for command %s is too old for access", commandID))
}
}

results := make([]*model.PlanPreviewCommandResult, 0, len(req.Commands))
handledCommands = append(handledCommands, commandID)
}

// Fetch ouput data to build results.
for _, commandID := range req.Commands {
for _, commandID := range handledCommands {
data, err := a.commandOutputGetter.Get(ctx, commandID)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("Failed to retrieve output data of command %s", commandID))
}

var result model.PlanPreviewCommandResult
if err := json.Unmarshal(data, &result); err != nil {
a.logger.Error("failed to unmarshal planpreview command result",
Expand All @@ -482,9 +509,16 @@ func (a *API) GetPlanPreviewResults(ctx context.Context, req *apiservice.GetPlan
)
return nil, status.Error(codes.Internal, fmt.Sprintf("Failed to decode output data of command %s", commandID))
}

results = append(results, &result)
}

// All URL fields inside the result model are empty.
// So we fill them before sending to the client.
for _, r := range results {
r.FillURLs(a.webBaseURL)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I thought putting this inside right after the loop is enough (like you did before commit 😄 ). But if you intend to be more clear, it's on you

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will keep this as it is because there are 2 loops (L480, L512) that are updating this list.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is true 👍

}

return &apiservice.GetPlanPreviewResultsResponse{
Results: results,
}, nil
Expand Down
2 changes: 2 additions & 0 deletions pkg/app/api/service/apiservice/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ message RequestPlanPreviewResponse {

message GetPlanPreviewResultsRequest {
repeated string commands = 1;
// Maximum number of seconds a Piped can take to handle a command.
int64 command_handle_timeout = 2;
}

message GetPlanPreviewResultsResponse {
Expand Down
13 changes: 12 additions & 1 deletion pkg/app/pipectl/cmd/planpreview/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
Expand All @@ -15,3 +15,14 @@ go_library(
"@org_golang_google_grpc//status:go_default_library",
],
)

go_test(
name = "go_default_test",
size = "small",
srcs = ["planpreview_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/model:go_default_library",
"@com_github_stretchr_testify//assert:go_default_library",
],
)
Loading