diff --git a/integrations/event-handler/cli.go b/integrations/event-handler/cli.go index 93b2cfcf78924..d5f89a4ad92a0 100644 --- a/integrations/event-handler/cli.go +++ b/integrations/event-handler/cli.go @@ -46,6 +46,10 @@ type FluentdConfig struct { // FluentdCA is a path to fluentd CA FluentdCA string `help:"fluentd TLS CA file" type:"existingfile" env:"FDWRD_FLUENTD_CA"` + + // FluentdMaxConnections caps the number of connections to fluentd. Defaults to a dynamic value + // calculated relative to app-level concurrency. + FluentdMaxConnections int `help:"Maximum number of connections to fluentd" env:"FDWRD_MAX_CONNECTIONS"` } // TeleportConfig is Teleport instance configuration @@ -238,6 +242,11 @@ func (c *StartCmdConfig) Validate() error { c.SkipSessionTypes = lib.SliceToAnonymousMap(c.SkipSessionTypesRaw) c.SkipEventTypes = lib.SliceToAnonymousMap(c.SkipEventTypesRaw) + if c.FluentdMaxConnections < 1 { + // 2x concurrency is effectively uncapped. + c.FluentdMaxConnections = c.Concurrency * 2 + } + return nil } @@ -245,6 +254,7 @@ func (c *StartCmdConfig) Validate() error { func (c *StartCmdConfig) Dump(ctx context.Context, log *slog.Logger) { // Log configuration variables log.InfoContext(ctx, "Using batch size", "batch", c.BatchSize) + log.InfoContext(ctx, "Using concurrency", "concurrency", c.Concurrency) log.InfoContext(ctx, "Using type filter", "types", c.Types) log.InfoContext(ctx, "Using type exclude filter", "skip_event_types", c.SkipEventTypes) log.InfoContext(ctx, "Skipping session events of type", "types", c.SkipSessionTypes) @@ -255,6 +265,7 @@ func (c *StartCmdConfig) Dump(ctx context.Context, log *slog.Logger) { log.InfoContext(ctx, "Using Fluentd ca", "ca", c.FluentdCA) log.InfoContext(ctx, "Using Fluentd cert", "cert", c.FluentdCert) log.InfoContext(ctx, "Using Fluentd key", "key", c.FluentdKey) + log.InfoContext(ctx, "Using Fluentd max connections", "max_connections", c.FluentdMaxConnections) log.InfoContext(ctx, "Using window size", "window_size", c.WindowSize) if c.TeleportIdentityFile != "" { diff --git a/integrations/event-handler/cli_test.go b/integrations/event-handler/cli_test.go index 70eef6eea9c06..b0b408a89ddfd 100644 --- a/integrations/event-handler/cli_test.go +++ b/integrations/event-handler/cli_test.go @@ -45,11 +45,12 @@ func TestStartCmdConfig(t *testing.T) { Debug: false, Start: StartCmdConfig{ FluentdConfig: FluentdConfig{ - FluentdURL: "https://localhost:8888/test.log", - FluentdSessionURL: "https://localhost:8888/session", - FluentdCert: filepath.Join(wd, "testdata", "fake-file"), - FluentdKey: filepath.Join(wd, "testdata", "fake-file"), - FluentdCA: filepath.Join(wd, "testdata", "fake-file"), + FluentdURL: "https://localhost:8888/test.log", + FluentdSessionURL: "https://localhost:8888/session", + FluentdCert: filepath.Join(wd, "testdata", "fake-file"), + FluentdKey: filepath.Join(wd, "testdata", "fake-file"), + FluentdCA: filepath.Join(wd, "testdata", "fake-file"), + FluentdMaxConnections: 10, }, TeleportConfig: TeleportConfig{ TeleportAddr: "localhost:3025", @@ -83,11 +84,12 @@ func TestStartCmdConfig(t *testing.T) { Debug: true, Start: StartCmdConfig{ FluentdConfig: FluentdConfig{ - FluentdURL: "https://localhost:8888/test.log", - FluentdSessionURL: "https://localhost:8888/session", - FluentdCert: filepath.Join(wd, "testdata", "fake-file"), - FluentdKey: filepath.Join(wd, "testdata", "fake-file"), - FluentdCA: filepath.Join(wd, "testdata", "fake-file"), + FluentdURL: "https://localhost:8888/test.log", + FluentdSessionURL: "https://localhost:8888/session", + FluentdCert: filepath.Join(wd, "testdata", "fake-file"), + FluentdKey: filepath.Join(wd, "testdata", "fake-file"), + FluentdCA: filepath.Join(wd, "testdata", "fake-file"), + FluentdMaxConnections: 10, }, TeleportConfig: TeleportConfig{ TeleportAddr: "localhost:3025", @@ -121,11 +123,12 @@ func TestStartCmdConfig(t *testing.T) { Debug: true, Start: StartCmdConfig{ FluentdConfig: FluentdConfig{ - FluentdURL: "https://localhost:8888/test.log", - FluentdSessionURL: "https://localhost:8888/session", - FluentdCert: filepath.Join(wd, "testdata", "fake-file"), - FluentdKey: filepath.Join(wd, "testdata", "fake-file"), - FluentdCA: filepath.Join(wd, "testdata", "fake-file"), + FluentdURL: "https://localhost:8888/test.log", + FluentdSessionURL: "https://localhost:8888/session", + FluentdCert: filepath.Join(wd, "testdata", "fake-file"), + FluentdKey: filepath.Join(wd, "testdata", "fake-file"), + FluentdCA: filepath.Join(wd, "testdata", "fake-file"), + FluentdMaxConnections: 10, }, TeleportConfig: TeleportConfig{ TeleportAddr: "localhost:3025", diff --git a/integrations/event-handler/fake_fluentd_test.go b/integrations/event-handler/fake_fluentd_test.go index 2d9293b7925aa..ecf286569f12d 100644 --- a/integrations/event-handler/fake_fluentd_test.go +++ b/integrations/event-handler/fake_fluentd_test.go @@ -123,9 +123,10 @@ func (f *FakeFluentd) createServer() error { // GetClientConfig returns FlientdConfig to connect to this fake fluentd server instance func (f *FakeFluentd) GetClientConfig() FluentdConfig { return FluentdConfig{ - FluentdCA: f.caCertPath, - FluentdCert: f.clientCertPath, - FluentdKey: f.clientKeyPath, + FluentdCA: f.caCertPath, + FluentdCert: f.clientCertPath, + FluentdKey: f.clientKeyPath, + FluentdMaxConnections: 3, } } diff --git a/integrations/event-handler/fluentd_client.go b/integrations/event-handler/fluentd_client.go index c3bea94ce97ee..e1369adc8c661 100644 --- a/integrations/event-handler/fluentd_client.go +++ b/integrations/event-handler/fluentd_client.go @@ -27,6 +27,7 @@ import ( "time" "github.com/gravitational/trace" + "golang.org/x/net/http2" tlib "github.com/gravitational/teleport/integrations/lib" ) @@ -41,6 +42,7 @@ type FluentdClient struct { // client HTTP client to send requests client *http.Client log *slog.Logger + sem chan struct{} } // NewFluentdClient creates new FluentdClient @@ -56,22 +58,34 @@ func NewFluentdClient(c *FluentdConfig, log *slog.Logger) (*FluentdClient, error return nil, trace.BadParameter("both fluentd_cert and fluentd_key should be specified") } + if c.FluentdMaxConnections <= 0 { + return nil, trace.BadParameter("fluentd_max_connections should be greater than 0") + } + ca, err := getCertPool(c) if err != nil { return nil, trace.Wrap(err) } - client := &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{ - RootCAs: ca, - Certificates: certs, - }, + transport := &http.Transport{ + TLSClientConfig: &tls.Config{ + RootCAs: ca, + Certificates: certs, }, - Timeout: httpTimeout, + MaxIdleConnsPerHost: c.FluentdMaxConnections, + IdleConnTimeout: httpTimeout, + } + + if err := http2.ConfigureTransport(transport); err != nil { + return nil, trace.Wrap(err) } - return &FluentdClient{client: client, log: log}, nil + client := &http.Client{ + Transport: transport, + Timeout: httpTimeout, + } + + return &FluentdClient{client: client, log: log, sem: make(chan struct{}, c.FluentdMaxConnections)}, nil } // getCertPool reads CA certificate and returns CA cert pool if passed @@ -92,6 +106,11 @@ func getCertPool(c *FluentdConfig) (*x509.CertPool, error) { // Send sends event to fluentd func (f *FluentdClient) Send(ctx context.Context, url string, b []byte) error { + f.sem <- struct{}{} + defer func() { + <-f.sem + }() + f.log.DebugContext(ctx, "Sending event to Fluentd", "payload", string(b)) req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(b)) diff --git a/integrations/event-handler/go.mod b/integrations/event-handler/go.mod index c8f708495cf18..8878d0502ae73 100644 --- a/integrations/event-handler/go.mod +++ b/integrations/event-handler/go.mod @@ -15,6 +15,7 @@ require ( github.com/peterbourgon/diskv/v3 v3.0.1 github.com/sethvargo/go-limiter v1.0.0 github.com/stretchr/testify v1.9.0 + golang.org/x/net v0.30.0 golang.org/x/time v0.6.0 google.golang.org/protobuf v1.35.1 ) @@ -280,14 +281,13 @@ require ( go.opentelemetry.io/otel/trace v1.30.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.starlark.net v0.0.0-20230525235612-a134d8f9ddca // indirect - golang.org/x/crypto v0.27.0 // indirect + golang.org/x/crypto v0.28.0 // indirect golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/mod v0.21.0 // indirect - golang.org/x/net v0.29.0 // indirect golang.org/x/oauth2 v0.23.0 // indirect golang.org/x/sync v0.8.0 // indirect - golang.org/x/sys v0.25.0 // indirect - golang.org/x/term v0.24.0 // indirect + golang.org/x/sys v0.26.0 // indirect + golang.org/x/term v0.25.0 // indirect golang.org/x/text v0.19.0 // indirect google.golang.org/api v0.197.0 // indirect google.golang.org/genproto v0.0.0-20240903143218-8af14fe29dc1 // indirect diff --git a/integrations/event-handler/go.sum b/integrations/event-handler/go.sum index 53d4fb062f01e..1d9b000a4f7a0 100644 --- a/integrations/event-handler/go.sum +++ b/integrations/event-handler/go.sum @@ -1650,8 +1650,8 @@ golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= -golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= -golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= +golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= +golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1777,8 +1777,8 @@ golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= -golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= -golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= +golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1920,8 +1920,8 @@ golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= -golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= +golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -1938,8 +1938,8 @@ golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= -golang.org/x/term v0.24.0 h1:Mh5cbb+Zk2hqqXNO7S1iTjEphVL+jb8ZWaqh/g+JWkM= -golang.org/x/term v0.24.0/go.mod h1:lOBK/LVxemqiMij05LGJ0tzNr8xlmwBRJ81PX6wVLH8= +golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24= +golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=