diff --git a/main.go b/main.go index 845fb5cc..a72fc769 100644 --- a/main.go +++ b/main.go @@ -144,7 +144,7 @@ func main() { // Create clients k8sClient := k8sinterface.NewKubernetesApi() - storageClient, err := storage.CreateStorage(clusterData.Namespace) + storageClient, err := storage.CreateStorage(clusterData.Namespace, cfg.UpdateDataPeriod) if err != nil { logger.L().Ctx(ctx).Fatal("error creating the storage client", helpers.Error(err)) } diff --git a/pkg/applicationprofilemanager/v1/applicationprofile_manager.go b/pkg/applicationprofilemanager/v1/applicationprofile_manager.go index 593618f8..9424f94b 100644 --- a/pkg/applicationprofilemanager/v1/applicationprofile_manager.go +++ b/pkg/applicationprofilemanager/v1/applicationprofile_manager.go @@ -333,7 +333,16 @@ func (am *ApplicationProfileManager) saveProfile(ctx context.Context, watchedCon // 1. try to patch object var gotErr error if err := am.storageClient.PatchApplicationProfile(slug, namespace, operations, watchedContainer.SyncChannel); err != nil { - if apierrors.IsNotFound(err) { + switch { + case apierrors.IsTimeout(err): + // backoff timeout, we have already retried for maxElapsedTime + gotErr = err + logger.L().Ctx(ctx).Debug("ApplicationProfileManager - failed to patch application profile", helpers.Error(err), + helpers.String("slug", slug), + helpers.Int("container index", watchedContainer.ContainerIndex), + helpers.String("container ID", watchedContainer.ContainerID), + helpers.String("k8s workload", watchedContainer.K8sContainerID)) + case apierrors.IsNotFound(err): // 2a. new object newObject := &v1beta1.ApplicationProfile{ ObjectMeta: metav1.ObjectMeta{ @@ -389,7 +398,7 @@ func (am *ApplicationProfileManager) saveProfile(ctx context.Context, watchedCon helpers.String("container ID", watchedContainer.ContainerID), helpers.String("k8s workload", watchedContainer.K8sContainerID)) } - } else { + default: logger.L().Debug("ApplicationProfileManager - failed to patch application profile, will get existing one and adjust patch", helpers.Error(err), helpers.String("slug", slug), helpers.Int("container index", watchedContainer.ContainerIndex), diff --git a/pkg/networkmanager/v2/network_manager.go b/pkg/networkmanager/v2/network_manager.go index a1a3f4d2..f92c0a0b 100644 --- a/pkg/networkmanager/v2/network_manager.go +++ b/pkg/networkmanager/v2/network_manager.go @@ -238,7 +238,16 @@ func (nm *NetworkManager) saveNetworkEvents(ctx context.Context, watchedContaine // 1. try to patch object var gotErr error if err := nm.storageClient.PatchNetworkNeighborhood(slug, namespace, operations, watchedContainer.SyncChannel); err != nil { - if apierrors.IsNotFound(err) { + switch { + case apierrors.IsTimeout(err): + // backoff timeout, we have already retried for maxElapsedTime + gotErr = err + logger.L().Ctx(ctx).Debug("NetworkManager - failed to patch network neighborhood", helpers.Error(err), + helpers.String("slug", slug), + helpers.Int("container index", watchedContainer.ContainerIndex), + helpers.String("container ID", watchedContainer.ContainerID), + helpers.String("k8s workload", watchedContainer.K8sContainerID)) + case apierrors.IsNotFound(err): // 2a. new object newObject := &v1beta1.NetworkNeighborhood{ ObjectMeta: metav1.ObjectMeta{ @@ -278,7 +287,7 @@ func (nm *NetworkManager) saveNetworkEvents(ctx context.Context, watchedContaine helpers.String("container ID", watchedContainer.ContainerID), helpers.String("k8s workload", watchedContainer.K8sContainerID)) } - } else { + default: logger.L().Debug("NetworkManager - failed to patch network neighborhood, will get existing one and adjust patch", helpers.Error(err), helpers.String("slug", slug), helpers.Int("container index", watchedContainer.ContainerIndex), diff --git a/pkg/storage/v1/applicationprofile.go b/pkg/storage/v1/applicationprofile.go index 4674947f..900cdaed 100644 --- a/pkg/storage/v1/applicationprofile.go +++ b/pkg/storage/v1/applicationprofile.go @@ -4,12 +4,15 @@ import ( "context" "encoding/json" "fmt" + "time" + "github.com/cenkalti/backoff/v5" "github.com/kubescape/go-logger" loggerhelpers "github.com/kubescape/go-logger/helpers" "github.com/kubescape/k8s-interface/instanceidhandler/v1/helpers" "github.com/kubescape/node-agent/pkg/utils" "github.com/kubescape/storage/pkg/apis/softwarecomposition/v1beta1" + apierrors "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" ) @@ -52,7 +55,19 @@ func (sc Storage) patchApplicationProfile(name, namespace string, operations []u return fmt.Errorf("marshal patch: %w", err) } - profile, err := sc.StorageClient.ApplicationProfiles(namespace).Patch(context.Background(), sc.modifyName(name), types.JSONPatchType, patch, v1.PatchOptions{}) + backOff := backoff.NewExponentialBackOff() + backOff.MaxInterval = 10 * time.Second + profile, err := backoff.Retry(context.Background(), func() (*v1beta1.ApplicationProfile, error) { + profile, err := sc.StorageClient.ApplicationProfiles(namespace).Patch(context.Background(), sc.modifyName(name), types.JSONPatchType, patch, v1.PatchOptions{}) + switch { + case apierrors.IsTimeout(err), apierrors.IsServerTimeout(err), apierrors.IsTooManyRequests(err): + return nil, apierrors.NewTimeoutError("backoff timeout", 0) + case err != nil: + return nil, backoff.Permanent(err) + default: + return profile, nil + } + }, backoff.WithBackOff(backOff), backoff.WithMaxElapsedTime(sc.maxElapsedTime)) if err != nil { return fmt.Errorf("patch application profile: %w", err) } diff --git a/pkg/storage/v1/network.go b/pkg/storage/v1/network.go index 4cfcb3ed..6add79cf 100644 --- a/pkg/storage/v1/network.go +++ b/pkg/storage/v1/network.go @@ -4,12 +4,15 @@ import ( "context" "encoding/json" "fmt" + "time" + "github.com/cenkalti/backoff/v5" "github.com/kubescape/go-logger" loggerhelpers "github.com/kubescape/go-logger/helpers" "github.com/kubescape/k8s-interface/instanceidhandler/v1/helpers" "github.com/kubescape/node-agent/pkg/utils" "github.com/kubescape/storage/pkg/apis/softwarecomposition/v1beta1" + apierrors "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" ) @@ -51,9 +54,22 @@ func (sc Storage) patchNetworkNeighborhood(name, namespace string, operations [] if err != nil { return fmt.Errorf("marshal patch: %w", err) } - neighborhood, err := sc.StorageClient.NetworkNeighborhoods(namespace).Patch(context.Background(), sc.modifyName(name), types.JSONPatchType, patch, v1.PatchOptions{}) + + backOff := backoff.NewExponentialBackOff() + backOff.MaxInterval = 10 * time.Second + neighborhood, err := backoff.Retry(context.Background(), func() (*v1beta1.NetworkNeighborhood, error) { + neighborhood, err := sc.StorageClient.NetworkNeighborhoods(namespace).Patch(context.Background(), sc.modifyName(name), types.JSONPatchType, patch, v1.PatchOptions{}) + switch { + case apierrors.IsTimeout(err), apierrors.IsServerTimeout(err), apierrors.IsTooManyRequests(err): + return nil, apierrors.NewTimeoutError("backoff timeout", 0) + case err != nil: + return nil, backoff.Permanent(err) + default: + return neighborhood, nil + } + }, backoff.WithBackOff(backOff), backoff.WithMaxElapsedTime(sc.maxElapsedTime)) if err != nil { - return fmt.Errorf("patch application neighborhood: %w", err) + return fmt.Errorf("patch network neighborhood: %w", err) } // check if returned neighborhood is full if status, ok := neighborhood.Annotations[helpers.StatusMetadataKey]; ok && status == helpers.TooLarge { @@ -62,7 +78,7 @@ func (sc Storage) patchNetworkNeighborhood(name, namespace string, operations [] } return nil } - // check if returned profile is completed + // check if returned neighborhood is completed if c, ok := neighborhood.Annotations[helpers.CompletionMetadataKey]; ok { if s, ok := neighborhood.Annotations[helpers.StatusMetadataKey]; ok && s == helpers.Complete && c == helpers.Completed { if channel != nil { diff --git a/pkg/storage/v1/storage.go b/pkg/storage/v1/storage.go index 42039c00..5b68dbbd 100644 --- a/pkg/storage/v1/storage.go +++ b/pkg/storage/v1/storage.go @@ -31,6 +31,7 @@ const ( type Storage struct { StorageClient spdxv1beta1.SpdxV1beta1Interface + maxElapsedTime time.Duration maxJsonPatchOperations int namespace string multiplier *int // used for testing to multiply the resources by this @@ -38,7 +39,7 @@ type Storage struct { var _ storage.StorageClient = (*Storage)(nil) -func CreateStorage(namespace string) (*Storage, error) { +func CreateStorage(namespace string, maxElapsedTime time.Duration) (*Storage, error) { var cfg *rest.Config kubeconfig := os.Getenv(KubeConfig) // use the current context in kubeconfig @@ -70,6 +71,7 @@ func CreateStorage(namespace string) (*Storage, error) { return &Storage{ StorageClient: clientset.SpdxV1beta1(), + maxElapsedTime: maxElapsedTime, maxJsonPatchOperations: 9999, namespace: namespace, multiplier: getMultiplier(),