Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions changelog/fragments/1761377900-input-auth-method-aws.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# REQUIRED
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# REQUIRED for all kinds
# Change summary; a 80ish characters long description of the change.
summary: Add AWS auth method for CEL and HTTP JSON inputs.
Copy link
Copy Markdown
Member

@andrewkroh andrewkroh Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are missing the CEL changes from #47260 (on my first review I assumed those changes were going to be back-ported on a separate PR).


# REQUIRED for breaking-change, deprecation, known-issue
# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# description:

# REQUIRED for breaking-change, deprecation, known-issue
# impact:

# REQUIRED for breaking-change, deprecation, known-issue
# action:

# REQUIRED for all kinds
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: filebeat

# AUTOMATED
# OPTIONAL to manually add other PR URLs
# PR URL: A link the PR that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
# pr: https://github.com/owner/repo/1234

# AUTOMATED
# OPTIONAL to manually add other issue URLs
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
# issue: https://github.com/owner/repo/1234
15 changes: 10 additions & 5 deletions x-pack/filebeat/input/cel/config_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ import (
"golang.org/x/oauth2/google"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
)

type authConfig struct {
Basic *basicAuthConfig `config:"basic"`
Token *tokenAuthConfig `config:"token"`
Digest *digestAuthConfig `config:"digest"`
OAuth2 *oAuth2Config `config:"oauth2"`
Basic *basicAuthConfig `config:"basic"`
Token *tokenAuthConfig `config:"token"`
Digest *digestAuthConfig `config:"digest"`
OAuth2 *oAuth2Config `config:"oauth2"`
AWS *aws.SignerInputConfig `config:"aws"`
}

func (c authConfig) Validate() error {
Expand All @@ -44,6 +46,9 @@ func (c authConfig) Validate() error {
if c.OAuth2.isEnabled() {
n++
}
if c.AWS.IsEnabled() {
n++
}
if n > 1 {
return errors.New("only one kind of auth can be enabled")
}
Expand Down Expand Up @@ -229,7 +234,7 @@ func (o *oAuth2Config) client(ctx context.Context, client *http.Client) (*http.C
var creds *google.Credentials
var err error
if len(o.GoogleCredentialsJSON) != 0 {
creds, err = google.CredentialsFromJSON(ctx, o.GoogleCredentialsJSON, o.Scopes...)
creds, err = google.CredentialsFromJSON(ctx, o.GoogleCredentialsJSON, o.Scopes...) //nolint:staticcheck // deprecated but no suitable replacement available
if err != nil {
return nil, fmt.Errorf("oauth2 client: error loading credentials: %w", err)
}
Expand Down
23 changes: 17 additions & 6 deletions x-pack/filebeat/input/cel/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/elastic/beats/v7/libbeat/version"
"github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httplog"
"github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httpmon"
"github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"
Expand Down Expand Up @@ -100,7 +101,7 @@ func (i input) now() time.Time {
func (input) Name() string { return inputName }

func (input) Test(src inputcursor.Source, _ v2.TestContext) error {
cfg := src.(*source).cfg
cfg := src.(*source).cfg //nolint:errcheck // src is always *source in this input implementation
if !wantClient(cfg) {
return nil
}
Expand All @@ -110,7 +111,7 @@ func (input) Test(src inputcursor.Source, _ v2.TestContext) error {
// Run starts the input and blocks until it ends completes. It will return on
// context cancellation or type invalidity errors, any other error will be retried.
func (input) Run(env v2.Context, src inputcursor.Source, crsr inputcursor.Cursor, pub inputcursor.Publisher) error {
dataStreamName := src.(*source).cfg.DataStream // May be empty.
dataStreamName := src.(*source).cfg.DataStream //nolint:errcheck // src is always *source in this input implementation

var cursor map[string]interface{}
env.UpdateStatus(status.Starting, dataStreamName)
Expand All @@ -128,7 +129,7 @@ func (input) Run(env v2.Context, src inputcursor.Source, crsr inputcursor.Cursor
parent: &env,
}
}
err := input{}.run(env, src.(*source), cursor, pub, health)
err := input{}.run(env, src.(*source), cursor, pub, health) //nolint:errcheck // src is always *source in this input implementation
if err != nil {
msg := "failed to run: " + err.Error()
if dataStreamName != "" {
Expand Down Expand Up @@ -182,7 +183,7 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
return err
}
if !ok {
return fmt.Errorf("request tracer path %q must be within %q path", path, paths.Resolve(paths.Logs, inputName))
return fmt.Errorf("request tracer path %q must be within %q path", path, paths.Resolve(paths.Logs, inputName)) //nolint:forbidigo // no per-beat path instance available here
}
cfg.Resource.Tracer.Filename = resolved
}
Expand Down Expand Up @@ -213,6 +214,7 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
Value: cfg.Auth.Token.Value,
}
}

wantDump := cfg.FailureDump.enabled() && cfg.FailureDump.Filename != ""
doCov := cfg.RecordCoverage && log.IsDebug()
httpOptions := lib.HTTPOptions{
Expand Down Expand Up @@ -864,6 +866,15 @@ func newClient(ctx context.Context, cfg config, log *logp.Logger, reg *monitorin
Password: cfg.Auth.Digest.Password,
NoReuse: noReuse,
}
} else if cfg.Auth.AWS.IsEnabled() {
// this transport runs after the other ones (the other ones wrap this one); just to be on the safe side.
// If any of the other transports add any header, it must happen before the signing.
tr, err := aws.InitializeSignerTransport(*cfg.Auth.AWS, log, c.Transport)
if err != nil {
log.Errorw("failed to initialize aws config failed for signer", "error", err)
return nil, nil, err
}
c.Transport = tr
}

var trace *httplog.LoggingRoundTripper
Expand Down Expand Up @@ -1012,7 +1023,7 @@ type socketDialer struct {
}

func (d socketDialer) Dial(_, _ string) (net.Conn, error) {
return net.Dial("unix", d.path)
return net.Dial("unix", d.path) //nolint:noctx // unix socket dial; no context propagation needed
}

func (d socketDialer) DialContext(ctx context.Context, _, _ string) (net.Conn, error) {
Expand Down Expand Up @@ -1334,7 +1345,7 @@ func test(url *url.URL) error {
return "80"
}()

_, err := net.DialTimeout("tcp", net.JoinHostPort(url.Hostname(), port), time.Second)
_, err := net.DialTimeout("tcp", net.JoinHostPort(url.Hostname(), port), time.Second) //nolint:noctx // connectivity test; explicit timeout used instead of context
if err != nil {
return fmt.Errorf("url %q is unreachable: %w", url, err)
}
Expand Down
78 changes: 67 additions & 11 deletions x-pack/filebeat/input/cel/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"path/filepath"
"reflect"
"runtime"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -615,9 +616,9 @@ var inputTests = []struct {
</item>
</order>
`
io.ReadAll(r.Body)
io.ReadAll(r.Body) //nolint:errcheck // No point checking errors in test server.
r.Body.Close()
w.Write([]byte(text))
w.Write([]byte(text)) //nolint:errcheck // No point checking errors in test server.
})
server := httptest.NewServer(r)
config["resource.url"] = server.URL
Expand Down Expand Up @@ -749,7 +750,7 @@ var inputTests = []struct {
msg = fmt.Sprintf(`{"error":"expected method was %#q"}`, http.MethodGet)
}

w.Write([]byte(msg))
w.Write([]byte(msg)) //nolint:errcheck // No point checking errors in test server.
},
want: []map[string]interface{}{
{
Expand Down Expand Up @@ -797,7 +798,7 @@ var inputTests = []struct {
msg = fmt.Sprintf(`{"error":"expected method was %#q"}`, http.MethodGet)
}

w.Write([]byte(msg))
w.Write([]byte(msg)) //nolint:errcheck // No point checking errors in test server.
},
want: []map[string]interface{}{
{
Expand Down Expand Up @@ -830,7 +831,7 @@ var inputTests = []struct {
},
handler: func(w http.ResponseWriter, r *http.Request) {
enc := json.NewEncoder(w)
enc.Encode(map[string][]any{"events": {r.Header.Get("foo")}})
enc.Encode(map[string][]any{"events": {r.Header.Get("foo")}}) //nolint:errcheck // No point checking errors in test server.
},
want: []map[string]interface{}{
{
Expand All @@ -851,7 +852,7 @@ var inputTests = []struct {
`,
},
handler: func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("hello"))
w.Write([]byte("hello")) //nolint:errcheck // No point checking errors in test server.
},
want: []map[string]interface{}{
{
Expand All @@ -872,7 +873,7 @@ var inputTests = []struct {
`,
},
handler: func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("hello"))
w.Write([]byte("hello")) //nolint:errcheck // No point checking errors in test server.
},
want: []map[string]interface{}{
{
Expand Down Expand Up @@ -911,7 +912,7 @@ var inputTests = []struct {
msg = fmt.Sprintf(`{"error":"expected method was %#q"}`, http.MethodGet)
}

w.Write([]byte(msg))
w.Write([]byte(msg)) //nolint:errcheck // No point checking errors in test server.
},
want: []map[string]interface{}{
{
Expand Down Expand Up @@ -1890,6 +1891,44 @@ var inputTests = []struct {
},
},

{
name: "Auth AWS V4 Signer",
server: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) {
s := httptest.NewServer(h)
config["resource.url"] = s.URL
t.Cleanup(s.Close)
},
config: map[string]interface{}{
"interval": 1,
"auth.aws.access_key_id": "AKIAIOSFODNN7EXAMPLE",
"auth.aws.secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"auth.aws.default_region": "us-east-1",
"auth.aws.service_name": "guardduty",
"program": `
bytes(get(state.url).Body).as(body, {
"events": [body.decode_json()]
})
`,
},
handler: awsAuthHandler("AKIAIOSFODNN7EXAMPLE", defaultHandler(http.MethodGet, "")),
want: []map[string]interface{}{
{
"hello": []interface{}{
map[string]interface{}{
"world": "moon",
},
map[string]interface{}{
"space": []interface{}{
map[string]interface{}{
"cake": "pumpkin",
},
},
},
},
},
},
},

// Multi-step requests.
{
name: "simple_multistep_GET_request",
Expand Down Expand Up @@ -2270,7 +2309,7 @@ func TestInput(t *testing.T) {

id := "test_id:" + test.name
v2Ctx := v2.Context{
Logger: logp.NewLogger("cel_test"),
Logger: logp.NewLogger("cel_test"), //nolint:forbidigo // test helper; no logp.Logger parameter available
ID: id,
IDWithoutName: id,
Cancelation: ctx,
Expand Down Expand Up @@ -2405,7 +2444,7 @@ func newChainTestServer(serve func(http.Handler) *httptest.Server) func(*testing
func newV2Context() (v2.Context, func()) {
ctx, cancel := context.WithCancel(context.Background())
return v2.Context{
Logger: logp.NewLogger("httpjson_test"),
Logger: logp.NewLogger("httpjson_test"), //nolint:forbidigo // test helper; no logp.Logger parameter available
ID: "test_id",
Cancelation: ctx,
}, cancel
Expand Down Expand Up @@ -2482,7 +2521,6 @@ func retryHandler() http.HandlerFunc {
}
}

//nolint:errcheck // No point checking errors in test server.
func tokenAuthHandler(want string, handle http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
auth := r.Header.Get("Authorization")
Expand All @@ -2495,6 +2533,24 @@ func tokenAuthHandler(want string, handle http.HandlerFunc) http.HandlerFunc {
}
}

func awsAuthHandler(expectedTokenID string, handle http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
authHeader := r.Header.Get("Authorization")
if !strings.HasPrefix(authHeader, fmt.Sprintf("AWS4-HMAC-SHA256 Credential=%s/", expectedTokenID)) {
http.Error(w, `{"error":"not authorized"}`, http.StatusBadRequest)
return
}

amzDate := r.Header.Get("X-Amz-Date")
if amzDate == "" {
http.Error(w, `{"error":"not authorized"}`, http.StatusBadRequest)
return
}

handle(w, r)
}
}

//nolint:errcheck // No point checking errors in test server.
func digestAuthHandler(user, pass, realm, nonce string, handle http.HandlerFunc) http.HandlerFunc {
chal := &digest.Challenge{
Expand Down
20 changes: 16 additions & 4 deletions x-pack/filebeat/input/httpjson/config_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,27 @@ import (
"golang.org/x/oauth2/google"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
)

type authConfig struct {
Basic *basicAuthConfig `config:"basic"`
OAuth2 *oAuth2Config `config:"oauth2"`
Basic *basicAuthConfig `config:"basic"`
OAuth2 *oAuth2Config `config:"oauth2"`
AWS *aws.SignerInputConfig `config:"aws"`
}

func (c authConfig) Validate() error {
if c.Basic.isEnabled() && c.OAuth2.isEnabled() {
var n int
if c.Basic.isEnabled() {
n++
}
if c.OAuth2.isEnabled() {
n++
}
if c.AWS.IsEnabled() {
n++
}
if n > 1 {
return errors.New("only one kind of auth can be enabled")
}
return nil
Expand Down Expand Up @@ -165,7 +177,7 @@ func (o *oAuth2Config) client(ctx context.Context, client *http.Client) (*http.C
var creds *google.Credentials
var err error
if len(o.GoogleCredentialsJSON) != 0 {
creds, err = google.CredentialsFromJSON(ctx, o.GoogleCredentialsJSON, o.Scopes...)
creds, err = google.CredentialsFromJSON(ctx, o.GoogleCredentialsJSON, o.Scopes...) //nolint:staticcheck // deprecated but no suitable replacement available
if err != nil {
return nil, fmt.Errorf("oauth2 client: error loading credentials: %w", err)
}
Expand Down
Loading
Loading