Skip to content

Commit

Permalink
[workloadmeta/collectors/containerd] Collect image metadata (#14592)
Browse files Browse the repository at this point in the history
* [util/containerd] Rename Image to ImageOfContainer

To be able to introduce a new Image func that gets an image just by image ID,
regardless of whether it's being used in container.

* [util/containerd] Add Image func

* [workloadmeta] Add GetImage func

* [config] Add option to enable image collection in workloadmeta

* [workloadmeta/collectors/containerd] Collect image metadata
  • Loading branch information
davidor authored Jan 9, 2023
1 parent 2d3e22f commit 02e7189
Show file tree
Hide file tree
Showing 12 changed files with 560 additions and 16 deletions.
3 changes: 3 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1082,6 +1082,9 @@ func InitConfig(config Config) {
config.BindEnvAndSetDefault("inventories_max_interval", DefaultInventoriesMaxInterval) // integer seconds
config.BindEnvAndSetDefault("inventories_min_interval", DefaultInventoriesMinInterval) // integer seconds

// workloadmeta
config.BindEnvAndSetDefault("workloadmeta.image_metadata_collection.enabled", false)

// Datadog security agent (common)
config.BindEnvAndSetDefault("security_agent.cmd_port", 5010)
config.BindEnvAndSetDefault("security_agent.expvar_port", 5011)
Expand Down
14 changes: 12 additions & 2 deletions pkg/util/containerd/containerd_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ type ContainerdItf interface {
Labels(namespace string, ctn containerd.Container) (map[string]string, error)
LabelsWithContext(ctx context.Context, namespace string, ctn containerd.Container) (map[string]string, error)
ListImages(namespace string) ([]containerd.Image, error)
Image(namespace string, ctn containerd.Container) (containerd.Image, error)
Image(namespace string, name string) (containerd.Image, error)
ImageOfContainer(namespace string, ctn containerd.Container) (containerd.Image, error)
ImageSize(namespace string, ctn containerd.Container) (int64, error)
Spec(namespace string, ctn containerd.Container) (*oci.Spec, error)
SpecWithContext(ctx context.Context, namespace string, ctn containerd.Container) (*oci.Spec, error)
Expand Down Expand Up @@ -231,7 +232,16 @@ func (c *ContainerdUtil) ListImages(namespace string) ([]containerd.Image, error
}

// Image interfaces with the containerd api to get an image
func (c *ContainerdUtil) Image(namespace string, ctn containerd.Container) (containerd.Image, error) {
func (c *ContainerdUtil) Image(namespace string, name string) (containerd.Image, error) {
ctx, cancel := context.WithTimeout(context.Background(), c.queryTimeout)
defer cancel()
ctxNamespace := namespaces.WithNamespace(ctx, namespace)

return c.cl.GetImage(ctxNamespace, name)
}

// ImageOfContainer interfaces with the containerd api to get an image
func (c *ContainerdUtil) ImageOfContainer(namespace string, ctn containerd.Container) (containerd.Image, error) {
ctx, cancel := context.WithTimeout(context.Background(), c.queryTimeout)
defer cancel()
ctxNamespace := namespaces.WithNamespace(ctx, namespace)
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/containerd/containerd_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func TestInfo(t *testing.T) {
require.Equal(t, "foo", c.Image)
}

func TestImage(t *testing.T) {
func TestImageOfContainer(t *testing.T) {
mockUtil := ContainerdUtil{}

image := &mockImage{
Expand All @@ -160,7 +160,7 @@ func TestImage(t *testing.T) {
},
}

resultImage, err := mockUtil.Image(TestNamespace, container)
resultImage, err := mockUtil.ImageOfContainer(TestNamespace, container)
require.NoError(t, err)
require.Equal(t, resultImage, image)
}
Expand Down
12 changes: 9 additions & 3 deletions pkg/util/containerd/fake/containerd_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ type MockedContainerdClient struct {
MockEnvVars func(namespace string, ctn containerd.Container) (map[string]string, error)
MockMetadata func() (containerd.Version, error)
MockListImages func(namespace string) ([]containerd.Image, error)
MockImage func(namespace string, ctn containerd.Container) (containerd.Image, error)
MockImage func(namespace string, name string) (containerd.Image, error)
MockImageOfContainer func(namespace string, ctn containerd.Container) (containerd.Image, error)
MockImageSize func(namespace string, ctn containerd.Container) (int64, error)
MockTaskMetrics func(namespace string, ctn containerd.Container) (*types.Metric, error)
MockTaskPids func(namespace string, ctn containerd.Container) ([]containerd.ProcessInfo, error)
Expand Down Expand Up @@ -63,8 +64,13 @@ func (client *MockedContainerdClient) ListImages(namespace string) ([]containerd
}

// Image is a mock method
func (client *MockedContainerdClient) Image(namespace string, ctn containerd.Container) (containerd.Image, error) {
return client.MockImage(namespace, ctn)
func (client *MockedContainerdClient) Image(namespace string, name string) (containerd.Image, error) {
return client.MockImage(namespace, name)
}

// ImageOfContainer is a mock method
func (client *MockedContainerdClient) ImageOfContainer(namespace string, ctn containerd.Container) (containerd.Image, error) {
return client.MockImageOfContainer(namespace, ctn)
}

// ImageSize is a mock method
Expand Down
62 changes: 54 additions & 8 deletions pkg/workloadmeta/collectors/internal/containerd/containerd.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ const (
containerUpdateTopic = "/containers/update"
containerDeletionTopic = "/containers/delete"

imageCreationTopic = "/images/create"
imageUpdateTopic = "/images/update"
imageDeletionTopic = "/images/delete"

// These are not all the task-related topics, but enough to detect changes
// in the state of the container (only need to know if it's running or not).

Expand All @@ -56,6 +60,9 @@ var containerdTopics = []string{
containerCreationTopic,
containerUpdateTopic,
containerDeletionTopic,
imageCreationTopic,
imageUpdateTopic,
imageDeletionTopic,
TaskStartTopic,
TaskOOMTopic,
TaskExitTopic,
Expand All @@ -79,12 +86,19 @@ type collector struct {
// Container exit info (mainly exit code and exit timestamp) are attached to the corresponding task events.
// contToExitInfo caches the exit info of a task to enrich the container deletion event when it's received later.
contToExitInfo map[string]*exitInfo

knownImages *knownImages

// Map of image ID => array of repo tags
repoTags map[string][]string
}

func init() {
workloadmeta.RegisterCollector(collectorID, func() workloadmeta.Collector {
return &collector{
contToExitInfo: make(map[string]*exitInfo),
knownImages: newKnownImages(),
repoTags: make(map[string][]string),
}
})
}
Expand All @@ -110,7 +124,7 @@ func (c *collector) Start(ctx context.Context, store workloadmeta.Store) error {
eventsCtx, cancelEvents := context.WithCancel(ctx)
c.eventsChan, c.errorsChan = c.containerdClient.GetEvents().Subscribe(eventsCtx, subscribeFilters()...)

err = c.generateEventsFromContainerList(ctx)
err = c.notifyInitialEvents(ctx)
if err != nil {
cancelEvents()
return err
Expand Down Expand Up @@ -165,31 +179,36 @@ func (c *collector) stream(ctx context.Context) {
}
}

func (c *collector) generateEventsFromContainerList(ctx context.Context) error {
var events []workloadmeta.CollectorEvent
func (c *collector) notifyInitialEvents(ctx context.Context) error {
var containerEvents []workloadmeta.CollectorEvent

namespaces, err := cutil.NamespacesToWatch(ctx, c.containerdClient)
if err != nil {
return err
}

for _, namespace := range namespaces {
nsEvents, err := c.generateInitialEvents(namespace)
nsContainerEvents, err := c.generateInitialContainerEvents(namespace)
if err != nil {
return err
}
containerEvents = append(containerEvents, nsContainerEvents...)

events = append(events, nsEvents...)
if config.Datadog.GetBool("workloadmeta.image_metadata_collection.enabled") {
if err := c.notifyInitialImageEvents(ctx, namespace); err != nil {
return err
}
}
}

if len(events) > 0 {
c.store.Notify(events)
if len(containerEvents) > 0 {
c.store.Notify(containerEvents)
}

return nil
}

func (c *collector) generateInitialEvents(namespace string) ([]workloadmeta.CollectorEvent, error) {
func (c *collector) generateInitialContainerEvents(namespace string) ([]workloadmeta.CollectorEvent, error) {
var events []workloadmeta.CollectorEvent

existingContainers, err := c.containerdClient.Containers(namespace)
Expand Down Expand Up @@ -221,7 +240,30 @@ func (c *collector) generateInitialEvents(namespace string) ([]workloadmeta.Coll
return events, nil
}

func (c *collector) notifyInitialImageEvents(ctx context.Context, namespace string) error {
existingImages, err := c.containerdClient.ListImages(namespace)
if err != nil {
return err
}

for _, image := range existingImages {
if err := c.notifyEventForImage(ctx, namespace, image); err != nil {
return err
}
}

return nil
}

func (c *collector) handleEvent(ctx context.Context, containerdEvent *containerdevents.Envelope) error {
if isImageTopic(containerdEvent.Topic) {
return c.handleImageEvent(ctx, containerdEvent)
}

return c.handleContainerEvent(ctx, containerdEvent)
}

func (c *collector) handleContainerEvent(ctx context.Context, containerdEvent *containerdevents.Envelope) error {
containerID, container, err := c.extractContainerFromEvent(ctx, containerdEvent)
if err != nil {
return fmt.Errorf("cannot extract container from event: %w", err)
Expand Down Expand Up @@ -313,6 +355,10 @@ func subscribeFilters() []string {
var filters []string

for _, topic := range containerdTopics {
if isImageTopic(topic) && !config.Datadog.GetBool("workloadmeta.image_metadata_collection.enabled") {
continue
}

filters = append(filters, fmt.Sprintf(`topic==%q`, topic))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func containerdClient(container containerd.Container) fake.MockedContainerdClien
MockLabels: func(namespace string, ctn containerd.Container) (map[string]string, error) {
return labels, nil
},
MockImage: func(namespace string, ctn containerd.Container) (containerd.Image, error) {
MockImageOfContainer: func(namespace string, ctn containerd.Container) (containerd.Image, error) {
return &mockedImage{
mockName: func() string {
return imgName
Expand Down
Loading

0 comments on commit 02e7189

Please sign in to comment.