diff --git a/integrations/event-handler/cli.go b/integrations/event-handler/cli.go index 446158abbad91..dfad7eca35603 100644 --- a/integrations/event-handler/cli.go +++ b/integrations/event-handler/cli.go @@ -45,6 +45,10 @@ type FluentdConfig struct { // FluentdCA is a path to fluentd CA FluentdCA string `help:"fluentd TLS CA file" type:"existingfile" env:"FDWRD_FLUENTD_CA"` + + // FluentdMaxConnections caps the number of connections to fluentd. Defaults to a dynamic value + // calculated relative to app-level concurrency. + FluentdMaxConnections int `help:"Maximum number of connections to fluentd" env:"FDWRD_MAX_CONNECTIONS"` } // TeleportConfig is Teleport instance configuration @@ -237,6 +241,11 @@ func (c *StartCmdConfig) Validate() error { c.SkipSessionTypes = lib.SliceToAnonymousMap(c.SkipSessionTypesRaw) c.SkipEventTypes = lib.SliceToAnonymousMap(c.SkipEventTypesRaw) + if c.FluentdMaxConnections < 1 { + // 2x concurrency is effectively uncapped. + c.FluentdMaxConnections = c.Concurrency * 2 + } + return nil } @@ -256,6 +265,7 @@ func (c *StartCmdConfig) Dump(ctx context.Context) { log.WithField("ca", c.FluentdCA).Info("Using Fluentd ca") log.WithField("cert", c.FluentdCert).Info("Using Fluentd cert") log.WithField("key", c.FluentdKey).Info("Using Fluentd key") + log.WithField("max_connections", c.FluentdMaxConnections).Info("Using Fluentd max connections") log.WithField("window-size", c.WindowSize).Info("Using window size") if c.TeleportIdentityFile != "" { diff --git a/integrations/event-handler/cli_test.go b/integrations/event-handler/cli_test.go index 7668688880a80..b11b1af294bc1 100644 --- a/integrations/event-handler/cli_test.go +++ b/integrations/event-handler/cli_test.go @@ -45,11 +45,12 @@ func TestStartCmdConfig(t *testing.T) { Debug: false, Start: StartCmdConfig{ FluentdConfig: FluentdConfig{ - FluentdURL: "https://localhost:8888/test.log", - FluentdSessionURL: "https://localhost:8888/session", - FluentdCert: path.Join(wd, "testdata", "fake-file"), - FluentdKey: path.Join(wd, "testdata", "fake-file"), - FluentdCA: path.Join(wd, "testdata", "fake-file"), + FluentdURL: "https://localhost:8888/test.log", + FluentdSessionURL: "https://localhost:8888/session", + FluentdCert: path.Join(wd, "testdata", "fake-file"), + FluentdKey: path.Join(wd, "testdata", "fake-file"), + FluentdCA: path.Join(wd, "testdata", "fake-file"), + FluentdMaxConnections: 10, }, TeleportConfig: TeleportConfig{ TeleportAddr: "localhost:3025", @@ -83,11 +84,12 @@ func TestStartCmdConfig(t *testing.T) { Debug: true, Start: StartCmdConfig{ FluentdConfig: FluentdConfig{ - FluentdURL: "https://localhost:8888/test.log", - FluentdSessionURL: "https://localhost:8888/session", - FluentdCert: path.Join(wd, "testdata", "fake-file"), - FluentdKey: path.Join(wd, "testdata", "fake-file"), - FluentdCA: path.Join(wd, "testdata", "fake-file"), + FluentdURL: "https://localhost:8888/test.log", + FluentdSessionURL: "https://localhost:8888/session", + FluentdCert: path.Join(wd, "testdata", "fake-file"), + FluentdKey: path.Join(wd, "testdata", "fake-file"), + FluentdCA: path.Join(wd, "testdata", "fake-file"), + FluentdMaxConnections: 10, }, TeleportConfig: TeleportConfig{ TeleportAddr: "localhost:3025", @@ -121,11 +123,12 @@ func TestStartCmdConfig(t *testing.T) { Debug: true, Start: StartCmdConfig{ FluentdConfig: FluentdConfig{ - FluentdURL: "https://localhost:8888/test.log", - FluentdSessionURL: "https://localhost:8888/session", - FluentdCert: path.Join(wd, "testdata", "fake-file"), - FluentdKey: path.Join(wd, "testdata", "fake-file"), - FluentdCA: path.Join(wd, "testdata", "fake-file"), + FluentdURL: "https://localhost:8888/test.log", + FluentdSessionURL: "https://localhost:8888/session", + FluentdCert: path.Join(wd, "testdata", "fake-file"), + FluentdKey: path.Join(wd, "testdata", "fake-file"), + FluentdCA: path.Join(wd, "testdata", "fake-file"), + FluentdMaxConnections: 10, }, TeleportConfig: TeleportConfig{ TeleportAddr: "localhost:3025", diff --git a/integrations/event-handler/fake_fluentd_test.go b/integrations/event-handler/fake_fluentd_test.go index 070265398ee4d..019a063262582 100644 --- a/integrations/event-handler/fake_fluentd_test.go +++ b/integrations/event-handler/fake_fluentd_test.go @@ -123,9 +123,10 @@ func (f *FakeFluentd) createServer() error { // GetClientConfig returns FlientdConfig to connect to this fake fluentd server instance func (f *FakeFluentd) GetClientConfig() FluentdConfig { return FluentdConfig{ - FluentdCA: f.caCertPath, - FluentdCert: f.clientCertPath, - FluentdKey: f.clientKeyPath, + FluentdCA: f.caCertPath, + FluentdCert: f.clientCertPath, + FluentdKey: f.clientKeyPath, + FluentdMaxConnections: 3, } } diff --git a/integrations/event-handler/fluentd_client.go b/integrations/event-handler/fluentd_client.go index 92015bef4d312..32d18c31755af 100644 --- a/integrations/event-handler/fluentd_client.go +++ b/integrations/event-handler/fluentd_client.go @@ -27,6 +27,7 @@ import ( "github.com/gravitational/trace" log "github.com/sirupsen/logrus" + "golang.org/x/net/http2" tlib "github.com/gravitational/teleport/integrations/lib" ) @@ -40,6 +41,7 @@ const ( type FluentdClient struct { // client HTTP client to send requests client *http.Client + sem chan struct{} } // NewFluentdClient creates new FluentdClient @@ -55,22 +57,34 @@ func NewFluentdClient(c *FluentdConfig) (*FluentdClient, error) { return nil, trace.BadParameter("both fluentd_cert and fluentd_key should be specified") } + if c.FluentdMaxConnections <= 0 { + return nil, trace.BadParameter("fluentd_max_connections should be greater than 0") + } + ca, err := getCertPool(c) if err != nil { return nil, trace.Wrap(err) } - client := &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{ - RootCAs: ca, - Certificates: certs, - }, + transport := &http.Transport{ + TLSClientConfig: &tls.Config{ + RootCAs: ca, + Certificates: certs, }, - Timeout: httpTimeout, + MaxIdleConnsPerHost: c.FluentdMaxConnections, + IdleConnTimeout: httpTimeout, + } + + if err := http2.ConfigureTransport(transport); err != nil { + return nil, trace.Wrap(err) } - return &FluentdClient{client: client}, nil + client := &http.Client{ + Transport: transport, + Timeout: httpTimeout, + } + + return &FluentdClient{client: client, sem: make(chan struct{}, c.FluentdMaxConnections)}, nil } // getCertPool reads CA certificate and returns CA cert pool if passed @@ -91,6 +105,11 @@ func getCertPool(c *FluentdConfig) (*x509.CertPool, error) { // Send sends event to fluentd func (f *FluentdClient) Send(ctx context.Context, url string, b []byte) error { + f.sem <- struct{}{} + defer func() { + <-f.sem + }() + log.WithField("payload", string(b)).Debug("Sending event to Fluentd") req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(b)) diff --git a/integrations/event-handler/go.mod b/integrations/event-handler/go.mod index c18e7d6d34798..0a1f127c409b3 100644 --- a/integrations/event-handler/go.mod +++ b/integrations/event-handler/go.mod @@ -18,6 +18,7 @@ require ( github.com/sethvargo/go-limiter v0.7.2 github.com/sirupsen/logrus v1.9.3 github.com/stretchr/testify v1.9.0 + golang.org/x/net v0.30.0 golang.org/x/time v0.5.0 google.golang.org/protobuf v1.34.2 ) @@ -282,15 +283,14 @@ require ( go.opentelemetry.io/otel/trace v1.24.0 // indirect go.opentelemetry.io/proto/otlp v1.0.0 // indirect go.starlark.net v0.0.0-20230525235612-a134d8f9ddca // indirect - golang.org/x/crypto v0.24.0 // indirect + golang.org/x/crypto v0.28.0 // indirect golang.org/x/exp v0.0.0-20231108232855-2478ac86f678 // indirect golang.org/x/mod v0.17.0 // indirect - golang.org/x/net v0.26.0 // indirect golang.org/x/oauth2 v0.19.0 // indirect - golang.org/x/sync v0.7.0 // indirect - golang.org/x/sys v0.21.0 // indirect - golang.org/x/term v0.21.0 // indirect - golang.org/x/text v0.16.0 // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/sys v0.26.0 // indirect + golang.org/x/term v0.25.0 // indirect + golang.org/x/text v0.19.0 // indirect google.golang.org/api v0.177.0 // indirect google.golang.org/genproto v0.0.0-20240401170217-c3f982113cda // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240429193739-8cf5692501f6 // indirect diff --git a/integrations/event-handler/go.sum b/integrations/event-handler/go.sum index 88ce097eb7833..7e61ea6cfc746 100644 --- a/integrations/event-handler/go.sum +++ b/integrations/event-handler/go.sum @@ -997,8 +997,8 @@ golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= -golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= -golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= +golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= +golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20231108232855-2478ac86f678 h1:mchzmB1XO2pMaKFRqk/+MV3mgGG96aqaPXaMifQU47w= golang.org/x/exp v0.0.0-20231108232855-2478ac86f678/go.mod h1:zk2irFbV9DP96SEBUUAy67IdHUaZuSnrz1n472HUCLE= @@ -1041,8 +1041,8 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= -golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= -golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= +golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= +golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.6.0/go.mod h1:ycmewcwgD4Rpr3eZJLSB4Kyyljb3qDh40vJ8STE5HKw= @@ -1056,8 +1056,8 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= -golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1098,8 +1098,8 @@ golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= -golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= +golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -1111,8 +1111,8 @@ golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.10.0/go.mod h1:lpqdcUyK/oCiQxvxVrppt5ggO2KCZ5QblwqPnfZ6d5o= golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= -golang.org/x/term v0.21.0 h1:WVXCp+/EBEHOj53Rvu+7KiT/iElMrO8ACK16SMZ3jaA= -golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0= +golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24= +golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -1125,8 +1125,8 @@ golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= -golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= +golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=