Skip to content

feat(kubernetes): add kubernetes_watch input for real-time resource monitoring#624

Merged
jem-davies merged 23 commits intowarpstreamlabs:mainfrom
aronchick:feature/kubernetes-watch-upstream
Jan 12, 2026
Merged

feat(kubernetes): add kubernetes_watch input for real-time resource monitoring#624
jem-davies merged 23 commits intowarpstreamlabs:mainfrom
aronchick:feature/kubernetes-watch-upstream

Conversation

@aronchick
Copy link
Copy Markdown
Contributor

Summary

Add a new kubernetes_watch input that uses the Kubernetes Watch API to stream real-time events for cluster resources. This enables event-driven pipelines that react to Kubernetes state changes.

Features

  • Watch any resource type via dynamic discovery (pods, deployments, CRDs, etc.)
  • Per-namespace or cluster-wide monitoring
  • Label and field selector filtering
  • Automatic reconnection with exponential backoff
  • Proper handling of 410 Gone errors (expected watch API behavior)

Implementation Highlights

  • Uses DeferredDiscoveryRESTMapper for dynamic GVR resolution (no static resource list)
  • Supports in-cluster auth, kubeconfig, or explicit credentials
  • Resource version tracking for resumable watches

Example Configuration

input:
  kubernetes_watch:
    resource: pods
    namespaces:
      - default
    label_selector: "app=myapp"

Test Plan

  • Unit tests for config parsing
  • Unit tests for auth configuration
  • Lint passes (0 issues)
  • All tests pass
  • Manual testing with local cluster

Files Changed (~1,060 lines)

  • internal/impl/kubernetes/ - New kubernetes input package
  • go.mod, go.sum - k8s client-go dependencies

@aronchick aronchick marked this pull request as ready for review December 20, 2025 18:19
@aronchick aronchick mentioned this pull request Dec 20, 2025
@aronchick
Copy link
Copy Markdown
Contributor Author

I've built some simple testing scripts as well.

https://github.com/aronchick/bento/tree/kubernetes-integration-tests/testing/kubernetes

Comment thread internal/impl/kubernetes/auth.go Outdated
Comment thread internal/impl/kubernetes/auth.go Outdated
Comment thread internal/impl/kubernetes/config.go Outdated
Comment thread internal/impl/kubernetes/config.go Outdated
Comment thread internal/impl/kubernetes/auth.go
Comment thread internal/impl/kubernetes/input_watch.go Outdated
Comment thread internal/impl/kubernetes/input_watch.go Outdated
Comment thread internal/impl/kubernetes/input_watch.go
Comment thread internal/impl/kubernetes/input_watch.go
Comment thread internal/impl/kubernetes/input_watch_test.go
Copy link
Copy Markdown
Contributor Author

@aronchick aronchick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think i got to everything!

Comment thread internal/impl/kubernetes/input_watch.go
Comment thread internal/impl/kubernetes/input_watch.go
Comment thread internal/impl/kubernetes/auth.go
Comment thread internal/impl/kubernetes/input_watch.go
Comment thread internal/impl/kubernetes/input_watch.go Outdated
Comment thread internal/impl/kubernetes/input_watch.go
Comment thread internal/impl/kubernetes/input_watch.go
Comment thread internal/impl/kubernetes/package.go Outdated
Comment thread internal/impl/kubernetes/auth.go Outdated
@aronchick aronchick requested a review from gregfurman December 23, 2025 04:03
@aronchick
Copy link
Copy Markdown
Contributor Author

aronchick commented Dec 24, 2025

@gregfurman I've addressed your feedback - think i got to everything. Just LMK if you need anything else!

Copy link
Copy Markdown
Collaborator

@gregfurman gregfurman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing all my feedback. The only blocking things left to address IMO is the Close method and use of caching. Looking forward to getting this in!

Comment thread internal/impl/io/input_file.go Outdated
Comment thread internal/impl/io/metrics_json_api.go Outdated
Comment thread internal/impl/io/processor_command.go Outdated
Comment thread internal/impl/kubernetes/auth_unix.go
Comment thread internal/impl/kubernetes/config.go Outdated
Comment thread internal/impl/kubernetes/input_watch.go Outdated
Comment thread internal/impl/kubernetes/input_watch.go
Comment thread internal/impl/kubernetes/input_watch.go
Comment thread internal/impl/kubernetes/input_watch.go
Comment thread internal/impl/kubernetes/input_watch.go
@aronchick
Copy link
Copy Markdown
Contributor Author

@gregfurman All feedback addressed and tests passing locally - ready for re-review!

…onitoring

Add a new kubernetes_watch input that uses the Kubernetes Watch API to stream
real-time events (ADDED, MODIFIED, DELETED) for cluster resources. This enables
event-driven pipelines that react to Kubernetes state changes.

Features:
- Watch standard resources (pods, services, deployments, etc.)
- Support for Custom Resource Definitions (CRDs) via dynamic client
- Per-namespace or cluster-wide monitoring
- Label and field selector filtering
- Automatic reconnection with exponential backoff on watch expiration
- Proper handling of 410 Gone errors (expected watch API behavior)

The implementation includes:
- Flexible authentication (in-cluster, kubeconfig, explicit credentials)
- Resource version tracking for resumable watches
- Graceful shutdown coordination
- Comprehensive test coverage
Replace the static standardResources map with the client-go RESTMapper
for dynamic GVR (Group/Version/Resource) resolution. This approach:

- Automatically supports any resource type the cluster knows about
- Handles CRDs without special configuration needed
- Correctly resolves plural/singular resource names
- Uses cached discovery to minimize API calls

The RESTMapper queries the cluster's discovery API once and caches the
results, then resolves resource names to their full GVR on demand.
Use the internal filesystem abstraction (ifs) for reading token files
in buildExplicitClient. This allows for proper isolation and testing
of filesystem operations.
Move InClusterNamespace to auth_unix.go with Unix-only build tag since
it reads /var/run/secrets/kubernetes.io/serviceaccount/namespace which
doesn't exist on Windows. Add auth_windows.go stub that returns default.
Change label_selector from string to map for better YAML ergonomics.
Users can now write:
  label_selector:
    app: myapp
    env: prod

Instead of:
  label_selector: "app=myapp,env=prod"

Add LabelSelectorFromMap helper to convert map to Kubernetes format.
Change field_selector from string to map for consistency with
label_selector. Users can now write:
  field_selector:
    "status.phase": Running

Instead of:
  field_selector: "status.phase=Running"
Add a LintRule to validate that event_types values are one of
ADDED, MODIFIED, or DELETED. Update description to document
valid values.
…tion

Replace custom calculateBackoff function with the standard retries
package. This adds configurable max_retries and backoff fields with
exponential backoff behavior from cenkalti/backoff library.

Remove unused backoff constants and custom implementation.
Replace custom requestContext helper with the built-in SoftStopCtx
method from shutdown.Signaller. This eliminates the need for spawning
goroutines to monitor the stop channel.

Remove now-unused request_context.go file.
Use comma-ok idiom when reading from eventChan to properly detect
when the channel is closed and return ErrEndOfInput.
…HasStopped

Add WaitGroup to track watch goroutines and ensure they complete before
closing the eventChan. Remove unnecessary TriggerHasStopped call which
is typically managed by the framework.
Internal packages don't need extensive package-level docs. The
component documentation lives in the ConfigSpec descriptions.
Update test case to reflect the label_selector field change from
string to map format.
Add kubeconfig_yaml field to allow passing kubeconfig content directly
as a string instead of a file path. This enables reading kubeconfig
from secrets or environment variables.

Uses client-go's clientcmd.NewClientConfigFromBytes() to parse the
raw YAML content, with support for context override when specified.
Reverts cosmetic changes to input_file.go and metrics_json_api.go
to keep the PR diff focused on kubernetes_watch changes only.
Eliminate the metaKeyCache type and its usage in favor of direct string concatenation for generating metadata keys. Update the metadataDescription function to follow Go naming conventions. This change streamlines the code and improves clarity in how metadata keys are constructed for Kubernetes resources.
- TestCloseTriggersEndOfInput: verifies Close causes Read to return ErrEndOfInput
- TestCloseDrainsEventsBeforeShutdown: verifies events can be read before close
- TestReadReturnsErrEndOfInputOnClosedChannel: verifies closed channel handling
- TestReadRespectsContextCancellation: verifies context cancellation works
- TestConcurrentReadsAndClose: verifies thread-safety with multiple readers
@aronchick aronchick force-pushed the feature/kubernetes-watch-upstream branch from ba0309f to bccd629 Compare December 31, 2025 23:01
@aronchick
Copy link
Copy Markdown
Contributor Author

@gregfurman tag you're it. (i thinki got everything)

@gregfurman
Copy link
Copy Markdown
Collaborator

@aronchick Will give this a final look today! Thanks for addressing all my feedback 🙌

Copy link
Copy Markdown
Collaborator

@gregfurman gregfurman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Last, you just need to add the kubernetes implementation to public/components in order to register this plugin to the global bento environment.

Here's an example of this for the mqtt component:

package mqtt
import (
// Bring in the internal plugin definitions.
_ "github.com/warpstreamlabs/bento/internal/impl/mqtt"
)

then we can add this to public/components/all so that importing in github.com/warpstreamlabs/bento/public/components/all will automatically register the component to the global environment i.e

_ "github.com/warpstreamlabs/bento/public/components/mqtt"

Lastly, run make docs and commit the generated .md files for the kubernetes component that will be used in the bento doc site.

Once done, think it's good to go!

@gregfurman
Copy link
Copy Markdown
Collaborator

@jem-davies The K8s API adds an extra 24.4 MB to the bento binary size (at least on Darwin) which ends up constituing like 11% of the size. Should we add this package behind the x_bento_extra build tag? Or perhaps a custom kubernetes build tag could work as well.

See gsa analysis output below of bento binary 👇

┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ bento                                                                                                              │
├─────────┬─────────────────────────────────────────────────────────────────────────────────────┬────────┬───────────┤
│ PERCENT │ NAME                                                                                │ SIZE   │ TYPE      │
├─────────┼─────────────────────────────────────────────────────────────────────────────────────┼────────┼───────────┤
│ 12.10%  │ __rodata __TEXT                                                                     │ 27 MB  │ section   │
│ 12.03%  │ __rodata __DATA_CONST                                                               │ 27 MB  │ section   │
│ 6.91%   │ k8s.io/api                                                                          │ 15 MB  │ vendor    │
│ 5.85%   │ github.com/aws/aws-sdk-go-v2                                                        │ 13 MB  │ vendor    │
│ 5.30%   │ github.com/elastic/go-elasticsearch/v9                                              │ 12 MB  │ vendor    │
│ 5.14%   │ github.com/apache/arrow-go/v18                                                      │ 11 MB  │ vendor    │
│ 4.25%   │ k8s.io/client-go                                                                    │ 9.4 MB │ vendor    │
├─────────┼─────────────────────────────────────────────────────────────────────────────────────┼────────┼───────────┤
│ 100.00% │ Known                                                                               │ 222 MB │           │
│ 100%    │ Total                                                                               │ 222 MB │           │
└─────────┴─────────────────────────────────────────────────────────────────────────────────────┴────────┴───────────┘

@jem-davies
Copy link
Copy Markdown
Collaborator

@jem-davies The K8s API adds an extra 24.4 MB to the bento binary size (at least on Darwin) which ends up constituing like 11% of the size. Should we add this package behind the x_bento_extra build tag? Or perhaps a custom kubernetes build tag could work as well.

My take is that if a user is worried about binary size or say, CVE's in components they don't use - then we should advise them to create their own distribution where they import select components.

So therefore I don't think we should use a build tag requiring users to specifically 'opt-in' to a k8s component.

@jem-davies jem-davies merged commit 6ffe8a7 into warpstreamlabs:main Jan 12, 2026
3 checks passed
jem-davies pushed a commit to jem-davies/bento that referenced this pull request Feb 2, 2026
…onitoring (warpstreamlabs#624)

* feat(kubernetes): add kubernetes_watch input for real-time resource monitoring

Add a new kubernetes_watch input that uses the Kubernetes Watch API to stream
real-time events (ADDED, MODIFIED, DELETED) for cluster resources. This enables
event-driven pipelines that react to Kubernetes state changes.

Features:
- Watch standard resources (pods, services, deployments, etc.)
- Support for Custom Resource Definitions (CRDs) via dynamic client
- Per-namespace or cluster-wide monitoring
- Label and field selector filtering
- Automatic reconnection with exponential backoff on watch expiration
- Proper handling of 410 Gone errors (expected watch API behavior)

The implementation includes:
- Flexible authentication (in-cluster, kubeconfig, explicit credentials)
- Resource version tracking for resumable watches
- Graceful shutdown coordination
- Comprehensive test coverage

---------

Co-authored-by: Greg Furman <gregfurman99@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants