Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/app/piped/cmd/piped/piped.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/pipe-cd/pipecd/pkg/app/piped/controller/controllermetrics"
"github.com/pipe-cd/pipecd/pkg/app/piped/driftdetector"
"github.com/pipe-cd/pipecd/pkg/app/piped/eventwatcher"
"github.com/pipe-cd/pipecd/pkg/app/piped/livestatereporter"
"github.com/pipe-cd/pipecd/pkg/app/piped/livestatestore"
k8slivestatestoremetrics "github.com/pipe-cd/pipecd/pkg/app/piped/livestatestore/kubernetes/kubernetesmetrics"
"github.com/pipe-cd/pipecd/pkg/app/piped/notifier"
Expand Down Expand Up @@ -355,6 +356,14 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) {
liveStateGetter = s.Getter()
}

// Start running application live state reporter.
{
r := livestatereporter.NewReporter(applicationLister, liveStateGetter, apiClient, cfg, input.Logger)
group.Go(func() error {
return r.Run(ctx)
})
}

decrypter, err := p.initializeSecretDecrypter(cfg)
if err != nil {
input.Logger.Error("failed to initialize secret decrypter", zap.Error(err))
Expand All @@ -366,6 +375,7 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) {
d, err := driftdetector.NewDetector(
applicationLister,
gitClient,
liveStateGetter,
apiClient,
appManifestsCache,
cfg,
Expand Down
59 changes: 59 additions & 0 deletions pkg/app/piped/driftdetector/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import (
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"

"github.com/pipe-cd/pipecd/pkg/app/piped/driftdetector/cloudrun"
"github.com/pipe-cd/pipecd/pkg/app/piped/driftdetector/kubernetes"
"github.com/pipe-cd/pipecd/pkg/app/piped/driftdetector/terraform"
"github.com/pipe-cd/pipecd/pkg/app/piped/livestatestore"
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
"github.com/pipe-cd/pipecd/pkg/cache"
"github.com/pipe-cd/pipecd/pkg/config"
Expand Down Expand Up @@ -74,6 +78,7 @@ type providerDetector interface {
func NewDetector(
appLister applicationLister,
gitClient gitClient,
stateGetter livestatestore.Getter,
apiClient apiClient,
appManifestsCache cache.Cache,
cfg *config.PipedSpec,
Expand All @@ -92,6 +97,60 @@ func NewDetector(

for _, cp := range cfg.PlatformProviders {
switch cp.Type {
case model.PlatformProviderKubernetes:
sg, ok := stateGetter.KubernetesGetter(cp.Name)
if !ok {
return nil, fmt.Errorf(format, cp.Name)
}
d.detectors = append(d.detectors, kubernetes.NewDetector(
cp,
appLister,
gitClient,
sg,
d,
appManifestsCache,
cfg,
sd,
logger,
))

case model.PlatformProviderCloudRun:
sg, ok := stateGetter.CloudRunGetter(cp.Name)
if !ok {
return nil, fmt.Errorf(format, cp.Name)
}
d.detectors = append(d.detectors, cloudrun.NewDetector(
cp,
appLister,
gitClient,
sg,
d,
appManifestsCache,
cfg,
sd,
logger,
))

case model.PlatformProviderTerraform:
if !*cp.TerraformConfig.DriftDetectionEnabled {
continue
}
sg, ok := stateGetter.TerraformGetter(cp.Name)
if !ok {
return nil, fmt.Errorf(format, cp.Name)
}
d.detectors = append(d.detectors, terraform.NewDetector(
cp,
appLister,
gitClient,
sg,
d,
appManifestsCache,
cfg,
sd,
logger,
))

default:
}
}
Expand Down
19 changes: 18 additions & 1 deletion pkg/app/piped/livestatereporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"

"github.com/pipe-cd/pipecd/pkg/app/piped/livestatereporter/cloudrun"
"github.com/pipe-cd/pipecd/pkg/app/piped/livestatereporter/kubernetes"
"github.com/pipe-cd/pipecd/pkg/app/piped/livestatestore"
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
"github.com/pipe-cd/pipecd/pkg/config"
"github.com/pipe-cd/pipecd/pkg/model"
Expand Down Expand Up @@ -53,7 +56,7 @@ type providerReporter interface {
ProviderName() string
}

func NewReporter(appLister applicationLister, apiClient apiClient, cfg *config.PipedSpec, logger *zap.Logger) Reporter {
func NewReporter(appLister applicationLister, stateGetter livestatestore.Getter, apiClient apiClient, cfg *config.PipedSpec, logger *zap.Logger) Reporter {
r := &reporter{
reporters: make([]providerReporter, 0, len(cfg.PlatformProviders)),
logger: logger.Named("live-state-reporter"),
Expand All @@ -62,6 +65,20 @@ func NewReporter(appLister applicationLister, apiClient apiClient, cfg *config.P
const errFmt = "unable to find live state getter for platform provider: %s"
for _, cp := range cfg.PlatformProviders {
switch cp.Type {
case model.PlatformProviderKubernetes:
sg, ok := stateGetter.KubernetesGetter(cp.Name)
if !ok {
r.logger.Error(fmt.Sprintf(errFmt, cp.Name))
continue
}
r.reporters = append(r.reporters, kubernetes.NewReporter(cp, appLister, sg, apiClient, logger))
case model.PlatformProviderCloudRun:
sg, ok := stateGetter.CloudRunGetter(cp.Name)
if !ok {
r.logger.Error(fmt.Sprintf(errFmt, cp.Name))
continue
}
r.reporters = append(r.reporters, cloudrun.NewReporter(cp, appLister, sg, apiClient, logger))
}
}

Expand Down
80 changes: 80 additions & 0 deletions pkg/app/pipedv1/analysisprovider/http/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2024 The PipeCD Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package http provides a way to analyze with http requests.
// This allows you to do smoke tests, load tests and so on, at your leisure.
package http

import (
"context"
"fmt"
"net/http"
"time"

"github.com/pipe-cd/pipecd/pkg/config"
)

const (
ProviderType = "HTTP"
defaultTimeout = 30 * time.Second
)

type Provider struct {
client *http.Client
}

func (p *Provider) Type() string {
return ProviderType
}

func NewProvider(timeout time.Duration) *Provider {
if timeout == 0 {
timeout = defaultTimeout
}
return &Provider{
client: &http.Client{Timeout: timeout},
}
}

// Run sends an HTTP request and then evaluate whether the response is expected one.
func (p *Provider) Run(ctx context.Context, cfg *config.AnalysisHTTP) (bool, string, error) {
req, err := p.makeRequest(ctx, cfg)
if err != nil {
return false, "", err
}

res, err := p.client.Do(req)
if err != nil {
return false, "", err
}
defer res.Body.Close()

if res.StatusCode != cfg.ExpectedCode {
return false, "", fmt.Errorf("unexpected status code %d", res.StatusCode)
}
// TODO: Decide how to check if the body is expected one.
return true, "", nil
}

func (p *Provider) makeRequest(ctx context.Context, cfg *config.AnalysisHTTP) (*http.Request, error) {
req, err := http.NewRequestWithContext(ctx, cfg.Method, cfg.URL, nil)
if err != nil {
return nil, err
}
req.Header = make(http.Header, len(cfg.Headers))
for _, h := range cfg.Headers {
req.Header.Set(h.Key, h.Value)
}
return req, nil
}
47 changes: 47 additions & 0 deletions pkg/app/pipedv1/analysisprovider/log/factory/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2024 The PipeCD Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package factory

import (
"fmt"
"os"

"go.uber.org/zap"

"github.com/pipe-cd/pipecd/pkg/app/pipedv1/analysisprovider/log"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/analysisprovider/log/stackdriver"
"github.com/pipe-cd/pipecd/pkg/config"
"github.com/pipe-cd/pipecd/pkg/model"
)

// NewProvider generates an appropriate provider according to analysis provider config.
func NewProvider(providerCfg *config.PipedAnalysisProvider, logger *zap.Logger) (provider log.Provider, err error) {
switch providerCfg.Type {
case model.AnalysisProviderStackdriver:
cfg := providerCfg.StackdriverConfig
sa, err := os.ReadFile(cfg.ServiceAccountFile)
if err != nil {
return nil, err
}
provider, err = stackdriver.NewProvider(sa)
if err != nil {
return nil, err
}

default:
return nil, fmt.Errorf("any of providers config not found")
}
return provider, nil
}
28 changes: 28 additions & 0 deletions pkg/app/pipedv1/analysisprovider/log/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2024 The PipeCD Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package log

import (
"context"
)

// Provider represents a client for log provider which provides logs for analysis.
type Provider interface {
Type() string
// Evaluate runs the given query against the log provider,
// and then checks if there is at least one error log.
// Returns the result reason if non-error occurred.
Evaluate(ctx context.Context, query string) (result bool, reason string, err error)
}
43 changes: 43 additions & 0 deletions pkg/app/pipedv1/analysisprovider/log/stackdriver/stackdriver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2024 The PipeCD Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package stackdriver

import (
"context"
"time"
)

const ProviderType = "StackdriverLogging"

// Provider is a client for stackdriver.
type Provider struct {
serviceAccount []byte

timeout time.Duration
}

func NewProvider(serviceAccount []byte) (*Provider, error) {
return &Provider{
serviceAccount: serviceAccount,
}, nil
}

func (p *Provider) Type() string {
return ProviderType
}

func (p *Provider) Evaluate(ctx context.Context, query string) (bool, string, error) {
return false, "", nil
}
Loading