Skip to content

Commit

Permalink
add retry to patch AP and NN in case of timeout
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Bertschy <[email protected]>
  • Loading branch information
matthyx committed Feb 13, 2025
1 parent ad64250 commit d42bffb
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 10 deletions.
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func main() {

// Create clients
k8sClient := k8sinterface.NewKubernetesApi()
storageClient, err := storage.CreateStorage(clusterData.Namespace)
storageClient, err := storage.CreateStorage(clusterData.Namespace, cfg.UpdateDataPeriod)
if err != nil {
logger.L().Ctx(ctx).Fatal("error creating the storage client", helpers.Error(err))
}
Expand Down
13 changes: 11 additions & 2 deletions pkg/applicationprofilemanager/v1/applicationprofile_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,16 @@ func (am *ApplicationProfileManager) saveProfile(ctx context.Context, watchedCon
// 1. try to patch object
var gotErr error
if err := am.storageClient.PatchApplicationProfile(slug, namespace, operations, watchedContainer.SyncChannel); err != nil {
if apierrors.IsNotFound(err) {
switch {
case apierrors.IsTimeout(err):
// backoff timeout, we have already retried for maxElapsedTime
gotErr = err
logger.L().Ctx(ctx).Debug("ApplicationProfileManager - failed to patch application profile", helpers.Error(err),
helpers.String("slug", slug),
helpers.Int("container index", watchedContainer.ContainerIndex),
helpers.String("container ID", watchedContainer.ContainerID),
helpers.String("k8s workload", watchedContainer.K8sContainerID))
case apierrors.IsNotFound(err):
// 2a. new object
newObject := &v1beta1.ApplicationProfile{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -389,7 +398,7 @@ func (am *ApplicationProfileManager) saveProfile(ctx context.Context, watchedCon
helpers.String("container ID", watchedContainer.ContainerID),
helpers.String("k8s workload", watchedContainer.K8sContainerID))
}
} else {
default:
logger.L().Debug("ApplicationProfileManager - failed to patch application profile, will get existing one and adjust patch", helpers.Error(err),
helpers.String("slug", slug),
helpers.Int("container index", watchedContainer.ContainerIndex),
Expand Down
13 changes: 11 additions & 2 deletions pkg/networkmanager/v2/network_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,16 @@ func (nm *NetworkManager) saveNetworkEvents(ctx context.Context, watchedContaine
// 1. try to patch object
var gotErr error
if err := nm.storageClient.PatchNetworkNeighborhood(slug, namespace, operations, watchedContainer.SyncChannel); err != nil {
if apierrors.IsNotFound(err) {
switch {
case apierrors.IsTimeout(err):
// backoff timeout, we have already retried for maxElapsedTime
gotErr = err
logger.L().Ctx(ctx).Debug("NetworkManager - failed to patch network neighborhood", helpers.Error(err),
helpers.String("slug", slug),
helpers.Int("container index", watchedContainer.ContainerIndex),
helpers.String("container ID", watchedContainer.ContainerID),
helpers.String("k8s workload", watchedContainer.K8sContainerID))
case apierrors.IsNotFound(err):
// 2a. new object
newObject := &v1beta1.NetworkNeighborhood{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -278,7 +287,7 @@ func (nm *NetworkManager) saveNetworkEvents(ctx context.Context, watchedContaine
helpers.String("container ID", watchedContainer.ContainerID),
helpers.String("k8s workload", watchedContainer.K8sContainerID))
}
} else {
default:
logger.L().Debug("NetworkManager - failed to patch network neighborhood, will get existing one and adjust patch", helpers.Error(err),
helpers.String("slug", slug),
helpers.Int("container index", watchedContainer.ContainerIndex),
Expand Down
17 changes: 16 additions & 1 deletion pkg/storage/v1/applicationprofile.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/cenkalti/backoff/v5"
"github.com/kubescape/go-logger"
loggerhelpers "github.com/kubescape/go-logger/helpers"
"github.com/kubescape/k8s-interface/instanceidhandler/v1/helpers"
"github.com/kubescape/node-agent/pkg/utils"
"github.com/kubescape/storage/pkg/apis/softwarecomposition/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)
Expand Down Expand Up @@ -52,7 +55,19 @@ func (sc Storage) patchApplicationProfile(name, namespace string, operations []u
return fmt.Errorf("marshal patch: %w", err)
}

profile, err := sc.StorageClient.ApplicationProfiles(namespace).Patch(context.Background(), sc.modifyName(name), types.JSONPatchType, patch, v1.PatchOptions{})
backOff := backoff.NewExponentialBackOff()
backOff.MaxInterval = 10 * time.Second
profile, err := backoff.Retry(context.Background(), func() (*v1beta1.ApplicationProfile, error) {
profile, err := sc.StorageClient.ApplicationProfiles(namespace).Patch(context.Background(), sc.modifyName(name), types.JSONPatchType, patch, v1.PatchOptions{})
switch {
case apierrors.IsTimeout(err), apierrors.IsServerTimeout(err), apierrors.IsTooManyRequests(err):
return nil, apierrors.NewTimeoutError("backoff timeout", 0)
case err != nil:
return nil, backoff.Permanent(err)
default:
return profile, nil
}
}, backoff.WithBackOff(backOff), backoff.WithMaxElapsedTime(sc.maxElapsedTime))
if err != nil {
return fmt.Errorf("patch application profile: %w", err)
}
Expand Down
22 changes: 19 additions & 3 deletions pkg/storage/v1/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/cenkalti/backoff/v5"
"github.com/kubescape/go-logger"
loggerhelpers "github.com/kubescape/go-logger/helpers"
"github.com/kubescape/k8s-interface/instanceidhandler/v1/helpers"
"github.com/kubescape/node-agent/pkg/utils"
"github.com/kubescape/storage/pkg/apis/softwarecomposition/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)
Expand Down Expand Up @@ -51,9 +54,22 @@ func (sc Storage) patchNetworkNeighborhood(name, namespace string, operations []
if err != nil {
return fmt.Errorf("marshal patch: %w", err)
}
neighborhood, err := sc.StorageClient.NetworkNeighborhoods(namespace).Patch(context.Background(), sc.modifyName(name), types.JSONPatchType, patch, v1.PatchOptions{})

backOff := backoff.NewExponentialBackOff()
backOff.MaxInterval = 10 * time.Second
neighborhood, err := backoff.Retry(context.Background(), func() (*v1beta1.NetworkNeighborhood, error) {
neighborhood, err := sc.StorageClient.NetworkNeighborhoods(namespace).Patch(context.Background(), sc.modifyName(name), types.JSONPatchType, patch, v1.PatchOptions{})
switch {
case apierrors.IsTimeout(err), apierrors.IsServerTimeout(err), apierrors.IsTooManyRequests(err):
return nil, apierrors.NewTimeoutError("backoff timeout", 0)
case err != nil:
return nil, backoff.Permanent(err)
default:
return neighborhood, nil
}
}, backoff.WithBackOff(backOff), backoff.WithMaxElapsedTime(sc.maxElapsedTime))
if err != nil {
return fmt.Errorf("patch application neighborhood: %w", err)
return fmt.Errorf("patch network neighborhood: %w", err)
}
// check if returned neighborhood is full
if status, ok := neighborhood.Annotations[helpers.StatusMetadataKey]; ok && status == helpers.TooLarge {
Expand All @@ -62,7 +78,7 @@ func (sc Storage) patchNetworkNeighborhood(name, namespace string, operations []
}
return nil
}
// check if returned profile is completed
// check if returned neighborhood is completed
if c, ok := neighborhood.Annotations[helpers.CompletionMetadataKey]; ok {
if s, ok := neighborhood.Annotations[helpers.StatusMetadataKey]; ok && s == helpers.Complete && c == helpers.Completed {
if channel != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/v1/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ const (

type Storage struct {
StorageClient spdxv1beta1.SpdxV1beta1Interface
maxElapsedTime time.Duration
maxJsonPatchOperations int
namespace string
multiplier *int // used for testing to multiply the resources by this
}

var _ storage.StorageClient = (*Storage)(nil)

func CreateStorage(namespace string) (*Storage, error) {
func CreateStorage(namespace string, maxElapsedTime time.Duration) (*Storage, error) {
var cfg *rest.Config
kubeconfig := os.Getenv(KubeConfig)
// use the current context in kubeconfig
Expand Down Expand Up @@ -70,6 +71,7 @@ func CreateStorage(namespace string) (*Storage, error) {

return &Storage{
StorageClient: clientset.SpdxV1beta1(),
maxElapsedTime: maxElapsedTime,
maxJsonPatchOperations: 9999,
namespace: namespace,
multiplier: getMultiplier(),
Expand Down

0 comments on commit d42bffb

Please sign in to comment.