Skip to content

Commit

Permalink
Handle files using workerpool
Browse files Browse the repository at this point in the history
Signed-off-by: David Wertenteil <[email protected]>
  • Loading branch information
David Wertenteil committed Jul 26, 2023
1 parent b5aaf17 commit 51d8cef
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 46 deletions.
1 change: 0 additions & 1 deletion pkg/containerwatcher/v1/container_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ type IGContainerWatcher struct {
tracerExec *tracerexec.Tracer
tracerOpen *traceropen.Tracer
eventWorkerPool *workerpool.WorkerPool
storageWorkerPool *workerpool.WorkerPool
}

var _ containerwatcher.ContainerWatcher = (*IGContainerWatcher)(nil)
Expand Down
108 changes: 66 additions & 42 deletions pkg/relevancymanager/v1/relevancy_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ import (
"time"

"github.com/armosec/utils-k8s-go/wlid"
"github.com/gammazero/workerpool"
containercollection "github.com/inspektor-gadget/inspektor-gadget/pkg/container-collection"
"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
"github.com/kubescape/k8s-interface/instanceidhandler"
instanceidhandlerV1 "github.com/kubescape/k8s-interface/instanceidhandler/v1"
"github.com/kubescape/k8s-interface/k8sinterface"
"github.com/kubescape/k8s-interface/names"
"github.com/kubescape/k8s-interface/workloadinterface"
"github.com/spf13/afero"
"go.opentelemetry.io/otel"
Expand All @@ -36,6 +38,9 @@ const (
StepEventAggregator = "StepEventAggregator"
)

// Number of workers for handling list of files and submitting to the storage. This number should not be too high so the storage wont get overwhelmed.
const fileWorkersConcurrency = 4

var (
containerHasTerminatedError = errors.New("container has terminated")
)
Expand All @@ -51,6 +56,7 @@ type RelevancyManager struct {
sbomFs afero.Fs
storageClient storageclient.StorageClient
watchedContainers sync.Map
fileWorkerPool *workerpool.WorkerPool
}

var _ relevancymanager.RelevancyManagerClient = (*RelevancyManager)(nil)
Expand All @@ -65,9 +71,56 @@ func CreateRelevancyManager(cfg config.Config, clusterName string, fileHandler f
sbomFs: sbomFs,
storageClient: storageClient,
watchedContainers: sync.Map{},
fileWorkerPool: workerpool.New(fileWorkersConcurrency),
}, nil
}

// Handle relevant data
func (rm *RelevancyManager) handleRelevancy(ctx context.Context, containerData watchedContainerData, containerID string) {

ctxPostSBOM, spanPostSBOM := otel.Tracer("").Start(ctx, "PostFilterSBOM")
defer spanPostSBOM.End()

if err := containerData.sbomClient.ValidateSBOM(ctx); err != nil {
logger.L().Debug("SBOM is incomplete", helpers.String("container ID", containerID), helpers.String("k8s workload", containerData.k8sContainerID), helpers.Error(err))
containerData.syncChannel[StepValidateSBOM] <- err
}

fileList, err := rm.fileHandler.GetFiles(containerData.k8sContainerID)
if err != nil {
logger.L().Debug("failed to get file list", helpers.String("container ID", containerID), helpers.String("k8s workload", containerData.k8sContainerID), helpers.Error(err))
return
}
logger.L().Debug("fileList generated", helpers.String("container ID", containerID), helpers.String("k8s workload", containerData.k8sContainerID), helpers.String("file list", fmt.Sprintf("%v", fileList)))

if err = containerData.sbomClient.FilterSBOM(ctx, fileList); err != nil {
rm.fileHandler.AddFiles(containerData.k8sContainerID, fileList)
ctx, span := otel.Tracer("").Start(ctxPostSBOM, "FilterSBOM")
defer span.End()
logger.L().Ctx(ctx).Warning("failed to filter SBOM", helpers.String("container ID", containerID), helpers.String("k8s workload", containerData.k8sContainerID), helpers.Error(err))
return
}
filterSBOMKey, err := containerData.instanceID.GetSlug()
if err != nil {
rm.fileHandler.AddFiles(containerData.k8sContainerID, fileList)
ctx, span := otel.Tracer("").Start(ctxPostSBOM, "filterSBOMKey")
defer span.End()
logger.L().Ctx(ctx).Warning("failed to get filterSBOMKey for store filter SBOM", helpers.String("container ID", containerID), helpers.String("k8s workload", containerData.k8sContainerID), helpers.Error(err))
return
}
// it is safe to use containerData.imageID directly since we needed it to retrieve the SBOM
if err = containerData.sbomClient.StoreFilterSBOM(ctx, containerData.imageID, filterSBOMKey); err != nil {
if !errors.Is(err, sbom.IsAlreadyExist()) {
rm.fileHandler.AddFiles(containerData.k8sContainerID, fileList)
ctx, span := otel.Tracer("").Start(ctxPostSBOM, "StoreFilterSBOM")
defer span.End()
logger.L().Ctx(ctx).Error("failed to store filtered SBOM", helpers.String("container ID", containerID), helpers.String("k8s workload", containerData.k8sContainerID), helpers.Error(err))
}
return
}

logger.L().Info("filtered SBOM has been stored successfully", helpers.String("containerID", containerID), helpers.String("k8s workload", containerData.k8sContainerID))
}
func (rm *RelevancyManager) afterTimerActions(ctx context.Context) error {
for {
afterTimerActionsData := <-rm.afterTimerActionsChannel
Expand All @@ -80,7 +133,7 @@ func (rm *RelevancyManager) afterTimerActions(ctx context.Context) error {

if rm.cfg.EnableRelevancy && afterTimerActionsData.service == RelevantCVEsService {

ctxPostSBOM, spanPostSBOM := otel.Tracer("").Start(ctx, "PostFilterSBOM")
// ctxPostSBOM, spanPostSBOM := otel.Tracer("").Start(ctx, "PostFilterSBOM")
if err := <-containerData.syncChannel[StepGetSBOM]; err != nil {
logger.L().Debug("failed to get SBOM", helpers.String("container ID", afterTimerActionsData.containerID), helpers.String("k8s workload", containerData.k8sContainerID), helpers.Error(err))
continue
Expand All @@ -90,46 +143,11 @@ func (rm *RelevancyManager) afterTimerActions(ctx context.Context) error {
logger.L().Debug("sbom client not yet created", helpers.String("container ID", afterTimerActionsData.containerID), helpers.String("k8s workload", containerData.k8sContainerID))
continue
}
if err := containerData.sbomClient.ValidateSBOM(ctx); err != nil {
logger.L().Warning("SBOM is incomplete", helpers.String("container ID", afterTimerActionsData.containerID), helpers.String("k8s workload", containerData.k8sContainerID), helpers.Error(err))
containerData.syncChannel[StepValidateSBOM] <- err
}

fileList, err := rm.fileHandler.GetFiles(containerData.k8sContainerID)
if err != nil {
logger.L().Debug("failed to get file list", helpers.String("container ID", afterTimerActionsData.containerID), helpers.String("k8s workload", containerData.k8sContainerID), helpers.Error(err))
continue
}
logger.L().Debug("fileList generated", helpers.String("container ID", afterTimerActionsData.containerID), helpers.String("k8s workload", containerData.k8sContainerID), helpers.String("file list", fmt.Sprintf("%v", fileList)))

if err = containerData.sbomClient.FilterSBOM(ctx, fileList); err != nil {
rm.fileHandler.AddFiles(containerData.k8sContainerID, fileList)
ctx, span := otel.Tracer("").Start(ctxPostSBOM, "FilterSBOM")
logger.L().Ctx(ctx).Warning("failed to filter SBOM", helpers.String("container ID", afterTimerActionsData.containerID), helpers.String("k8s workload", containerData.k8sContainerID), helpers.Error(err))
span.End()
continue
}
filterSBOMKey, err := containerData.instanceID.GetSlug()
if err != nil {
rm.fileHandler.AddFiles(containerData.k8sContainerID, fileList)
ctx, span := otel.Tracer("").Start(ctxPostSBOM, "filterSBOMKey")
logger.L().Ctx(ctx).Warning("failed to get filterSBOMKey for store filter SBOM", helpers.String("container ID", afterTimerActionsData.containerID), helpers.String("k8s workload", containerData.k8sContainerID), helpers.Error(err))
span.End()
continue
}
// it is safe to use containerData.imageID directly since we needed it to retrieve the SBOM
if err = containerData.sbomClient.StoreFilterSBOM(ctx, containerData.imageID, filterSBOMKey); err != nil {
if !errors.Is(err, sbom.IsAlreadyExist()) {
rm.fileHandler.AddFiles(containerData.k8sContainerID, fileList)
ctx, span := otel.Tracer("").Start(ctxPostSBOM, "StoreFilterSBOM")
logger.L().Ctx(ctx).Error("failed to store filtered SBOM", helpers.String("container ID", afterTimerActionsData.containerID), helpers.String("k8s workload", containerData.k8sContainerID), helpers.Error(err))
span.End()
}
continue
}

logger.L().Info("filtered SBOM has been stored successfully", helpers.String("containerID", afterTimerActionsData.containerID), helpers.String("k8s workload", containerData.k8sContainerID))
spanPostSBOM.End()
// handle collection of relevant data
rm.fileWorkerPool.Submit(func() {
rm.handleRelevancy(ctx, containerData, afterTimerActionsData.containerID)
})
}
}
}
Expand Down Expand Up @@ -191,15 +209,21 @@ func (rm *RelevancyManager) getSBOM(ctx context.Context, container *containercol
}
// create sbomClient
sbomClient := sbom.CreateSBOMStorageClient(rm.storageClient, parentWlid, instanceID, rm.sbomFs)

// get SBOM
err = sbomClient.GetSBOM(ctx, imageTag, imageID)
if err := sbomClient.GetSBOM(ctx, imageTag, imageID); err != nil {
if errors.Is(err, names.ErrInvalidSlug) {
logger.L().Ctx(ctx).Error("failed to get SBOM", helpers.Error(err), helpers.String("wlid", parentWlid), helpers.String("pod name", container.Podname), helpers.String("imageTag", imageTag), helpers.String("imageID", imageID))
}
watchedContainer.syncChannel[StepGetSBOM] <- err
}

// save watchedContainer with new fields
watchedContainer.imageID = imageID
watchedContainer.instanceID = instanceID
watchedContainer.sbomClient = sbomClient
rm.watchedContainers.Store(container.ID, watchedContainer)
// notify the channel
watchedContainer.syncChannel[StepGetSBOM] <- err
}

func (rm *RelevancyManager) parsePodData(ctx context.Context, pod *workloadinterface.Workload, container *containercollection.Container) (string, string, string, instanceidhandler.IInstanceID, error) {
Expand Down
3 changes: 0 additions & 3 deletions pkg/sbom/sbom.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
v1 "node-agent/pkg/sbom/v1"
"node-agent/pkg/storageclient"

"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
"github.com/kubescape/k8s-interface/instanceidhandler"
"github.com/kubescape/k8s-interface/names"
"github.com/spf13/afero"
Expand Down Expand Up @@ -58,7 +56,6 @@ func (sc *SBOMStructure) GetSBOM(ctx context.Context, imageTag, imageID string)

SBOMKey, err := names.ImageInfoToSlug(imageTag, imageID)
if err != nil {
logger.L().Ctx(ctx).Error("Failed to create SBOM key", helpers.Error(err), helpers.String("imageTag", imageTag), helpers.String("imageID", imageID))
return err
}

Expand Down

0 comments on commit 51d8cef

Please sign in to comment.