Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 139 additions & 19 deletions aws/table_aws_tagging_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package aws

import (
"context"
"encoding/json"
"errors"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/resourcegroupstaggingapi"
Expand All @@ -10,6 +12,7 @@ import (
"github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto"
"github.com/turbot/steampipe-plugin-sdk/v5/plugin"
"github.com/turbot/steampipe-plugin-sdk/v5/plugin/transform"
"github.com/turbot/steampipe-plugin-sdk/v5/query_cache"
)

func tableAwsTaggingResource(_ context.Context) *plugin.Table {
Expand All @@ -27,6 +30,14 @@ func tableAwsTaggingResource(_ context.Context) *plugin.Table {
List: &plugin.ListConfig{
Hydrate: listTaggingResources,
Tags: map[string]string{"service": "tag", "action": "GetResources"},
KeyColumns: []*plugin.KeyColumn{
{
Name: "resource_types",
Require: plugin.Optional,
Operators: []string{"="},
CacheMatch: query_cache.CacheMatchExact,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ParthaI As far as I can tell there's no caching being applied for this query, the query times are the almost the same when you rerun a query, so it behaves very differently then when another table returns cached data. Any idea how to debug this further? Or should I just remove this line then?

},
},
},
GetMatrixItemFunc: SupportedRegionMatrix(AWS_TAGGING_SERVICE_ID),
Columns: awsRegionalColumns([]*plugin.Column{
Expand Down Expand Up @@ -66,6 +77,12 @@ func tableAwsTaggingResource(_ context.Context) *plugin.Table {
Type: proto.ColumnType_JSON,
Transform: transform.FromField("Tags"),
},
{
Name: "resource_types",
Description: "The resource types to filter by. Accepts a JSON array of strings in formats: 'service' (e.g., \"ec2\") or 'service:resourceType' (e.g., \"ec2:instance\").",
Type: proto.ColumnType_JSON,
Transform: transform.FromQual("resource_types"),
},

/// Steampipe standard columns
{
Expand Down Expand Up @@ -100,48 +117,149 @@ func listTaggingResources(ctx context.Context, d *plugin.QueryData, _ *plugin.Hy
return nil, err
}

input := &resourcegroupstaggingapi.GetResourcesInput{
ResourcesPerPage: aws.Int32(100),
// Parse resource type filters from query qualifiers
resourceTypes, err := parseResourceTypesFilter(d)
if err != nil {
return nil, err
}

// Reduce the basic request limit down if the user has only requested a small number of rows
if d.QueryContext.Limit != nil {
limit := int32(*d.QueryContext.Limit)
if limit < *input.ResourcesPerPage {
if limit < 1 {
input.ResourcesPerPage = aws.Int32(1)
} else {
input.ResourcesPerPage = aws.Int32(limit)
}
// Split resource types into batches to respect API limits (max 100 per request)
batches := createResourceTypeBatches(resourceTypes)

// Track seen resources to avoid duplicates across batches
seenResources := make(map[string]bool)

// Process each batch of resource types
for _, batch := range batches {
if err := fetchResourcesForBatch(ctx, d, svc, batch, seenResources); err != nil {
return nil, err
}
// Check if context has been cancelled or if the limit has been hit
if d.RowsRemaining(ctx) == 0 {
break
}
}

return nil, nil
}

// parseResourceTypesFilter extracts and validates resource types from query data
func parseResourceTypesFilter(d *plugin.QueryData) ([]string, error) {
resourceTypesValue := d.EqualsQuals["resource_types"].GetJsonbValue()
if resourceTypesValue == "" {
return nil, nil
}

var resourceTypes []string
if err := json.Unmarshal([]byte(resourceTypesValue), &resourceTypes); err != nil {
return nil, errors.New("failed to parse 'resource_types' qualifier: value must be a JSON array of strings, e.g. [\"ec2:instance\", \"s3:bucket\", \"rds\"]")
}

return resourceTypes, nil
}

// createResourceTypeBatches splits resource types into batches for API requests
func createResourceTypeBatches(resourceTypes []string) [][]string {
const maxBatchSize = 100 // AWS Resource Groups Tagging API limit

// If no resource types specified, make a single request without filters
if len(resourceTypes) == 0 {
return [][]string{{}} // Single empty batch for unfiltered requests
}

// Split resource types into batches that don't exceed API limit
var batches [][]string
for i := 0; i < len(resourceTypes); i += maxBatchSize {
end := i + maxBatchSize
if end > len(resourceTypes) {
end = len(resourceTypes)
}
batches = append(batches, resourceTypes[i:end])
}

return batches
}

// fetchResourcesForBatch fetches resources for a specific batch of resource types
func fetchResourcesForBatch(ctx context.Context, d *plugin.QueryData, svc *resourcegroupstaggingapi.Client, resourceTypes []string, seenResources map[string]bool) error {
// Build API input with pagination settings and resource type filters
input := buildGetResourcesInput(d, resourceTypes)

// Create paginator to handle large result sets
paginator := resourcegroupstaggingapi.NewGetResourcesPaginator(svc, input, func(o *resourcegroupstaggingapi.GetResourcesPaginatorOptions) {
o.Limit = *input.ResourcesPerPage
o.StopOnDuplicateToken = true
})

// List call - iterate through all pages
for paginator.HasMorePages() {
// apply rate limiting
// Apply rate limiting
d.WaitForListRateLimit(ctx)

output, err := paginator.NextPage(ctx)
if err != nil {
plugin.Logger(ctx).Error("aws_tagging_resource.listTaggingResources", "api_error", err)
return nil, err
return err
}

for _, resource := range output.ResourceTagMappingList {
d.StreamListItem(ctx, resource)
// Process the resources from this page
if err := processResourceBatch(ctx, d, output.ResourceTagMappingList, seenResources); err != nil {
return err
}

// Context can be cancelled due to manual cancellation or the limit has been hit
if d.RowsRemaining(ctx) == 0 {
return nil, nil
// Check if context has been cancelled or if the limit has been hit
if d.RowsRemaining(ctx) == 0 {
return nil
}
}

return nil
}

// buildGetResourcesInput creates the API input with appropriate pagination settings
func buildGetResourcesInput(d *plugin.QueryData, resourceTypes []string) *resourcegroupstaggingapi.GetResourcesInput {
input := &resourcegroupstaggingapi.GetResourcesInput{
ResourcesPerPage: aws.Int32(100),
}

// Add resource type filters if specified
if len(resourceTypes) > 0 {
input.ResourceTypeFilters = resourceTypes
}

// Reduce the basic request limit down if the user has only requested a small number of rows
if d.QueryContext.Limit != nil {
limit := int32(*d.QueryContext.Limit)
if limit < *input.ResourcesPerPage {
if limit < 1 {
input.ResourcesPerPage = aws.Int32(1)
} else {
input.ResourcesPerPage = aws.Int32(limit)
}
}
}

return nil, err
return input
}

// processResourceBatch handles deduplication and streaming of resources
func processResourceBatch(ctx context.Context, d *plugin.QueryData, resources []types.ResourceTagMapping, seenResources map[string]bool) error {
for _, resource := range resources {
// Deduplicate based on ARN
arn := aws.ToString(resource.ResourceARN)
if seenResources[arn] {
continue // Skip duplicate
}
seenResources[arn] = true

d.StreamListItem(ctx, resource)

// Context can be cancelled due to manual cancellation or the limit has been hit
if d.RowsRemaining(ctx) == 0 {
return nil
}
}
return nil
}

//// HYDRATE FUNCTIONS
Expand All @@ -156,6 +274,7 @@ func getTaggingResource(ctx context.Context, d *plugin.QueryData, _ *plugin.Hydr
return nil, err
}

// Build request for specific resource ARN
param := &resourcegroupstaggingapi.GetResourcesInput{
ResourceARNList: []string{arn},
}
Expand All @@ -166,6 +285,7 @@ func getTaggingResource(ctx context.Context, d *plugin.QueryData, _ *plugin.Hydr
return nil, err
}

// Return the first resource if found
if op != nil && len(op.ResourceTagMappingList) > 0 {
return op.ResourceTagMappingList[0], nil
}
Expand Down
Loading
Loading