diff --git a/go.mod b/go.mod
index a31b41385c73f..55e4fbbb083c9 100644
--- a/go.mod
+++ b/go.mod
@@ -55,7 +55,7 @@ require (
github.com/andybalholm/brotli v1.2.0
github.com/aquasecurity/libbpfgo v0.5.1-libbpf-1.2
github.com/armon/go-radix v1.0.0
- github.com/aws/aws-sdk-go-v2 v1.39.5
+ github.com/aws/aws-sdk-go-v2 v1.39.6
github.com/aws/aws-sdk-go-v2/config v1.31.16
github.com/aws/aws-sdk-go-v2/credentials v1.18.20
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.20.20
@@ -66,6 +66,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/applicationautoscaling v1.41.1
github.com/aws/aws-sdk-go-v2/service/athena v1.55.9
github.com/aws/aws-sdk-go-v2/service/bedrockruntime v1.42.1
+ github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.58.7
github.com/aws/aws-sdk-go-v2/service/dax v1.29.4
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.52.3
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.32.1
@@ -313,9 +314,9 @@ require (
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
github.com/atotto/clipboard v0.1.4 // indirect
github.com/aws/aws-sdk-go v1.55.8 // indirect
- github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2 // indirect
- github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.12 // indirect
- github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.12 // indirect
+ github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3 // indirect
+ github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13 // indirect
+ github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.12 // indirect
github.com/aws/aws-sdk-go-v2/service/ecr v1.45.1 // indirect
diff --git a/go.sum b/go.sum
index 279cb31136885..9601da3febbcc 100644
--- a/go.sum
+++ b/go.sum
@@ -794,10 +794,10 @@ github.com/aws/aws-sdk-go v1.49.12/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3Tj
github.com/aws/aws-sdk-go v1.55.8 h1:JRmEUbU52aJQZ2AjX4q4Wu7t4uZjOu71uyNmaWlUkJQ=
github.com/aws/aws-sdk-go v1.55.8/go.mod h1:ZkViS9AqA6otK+JBBNH2++sx1sgxrPKcSzPPvQkUtXk=
github.com/aws/aws-sdk-go-v2 v1.18.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
-github.com/aws/aws-sdk-go-v2 v1.39.5 h1:e/SXuia3rkFtapghJROrydtQpfQaaUgd1cUvyO1mp2w=
-github.com/aws/aws-sdk-go-v2 v1.39.5/go.mod h1:yWSxrnioGUZ4WVv9TgMrNUeLV3PFESn/v+6T/Su8gnM=
-github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2 h1:t9yYsydLYNBk9cJ73rgPhPWqOh/52fcWDQB5b1JsKSY=
-github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2/go.mod h1:IusfVNTmiSN3t4rhxWFaBAqn+mcNdwKtPcV16eYdgko=
+github.com/aws/aws-sdk-go-v2 v1.39.6 h1:2JrPCVgWJm7bm83BDwY5z8ietmeJUbh3O2ACnn+Xsqk=
+github.com/aws/aws-sdk-go-v2 v1.39.6/go.mod h1:c9pm7VwuW0UPxAEYGyTmyurVcNrbF6Rt/wixFqDhcjE=
+github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3 h1:DHctwEM8P8iTXFxC/QK0MRjwEpWQeM9yzidCRjldUz0=
+github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3/go.mod h1:xdCzcZEtnSTKVDOmUZs4l/j3pSV6rpo1WXl5ugNsL8Y=
github.com/aws/aws-sdk-go-v2/config v1.18.25/go.mod h1:dZnYpD5wTW/dQF0rRNLVypB396zWCcPiBIvdvSWHEg4=
github.com/aws/aws-sdk-go-v2/config v1.31.16 h1:E4Tz+tJiPc7kGnXwIfCyUj6xHJNpENlY11oKpRTgsjc=
github.com/aws/aws-sdk-go-v2/config v1.31.16/go.mod h1:2S9hBElpCyGMifv14WxQ7EfPumgoeCPZUpuPX8VtW34=
@@ -816,11 +816,11 @@ github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.6.12/go.mod h1:T/o6k3LG7Ew45+Jz
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.2 h1:9/HxDeIgA7DcKK6e6ZaP5PQiXugYbNERx3Z5u30mN+k=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.2/go.mod h1:3N1RoxKNcVHmbOKVMMw8pvMs5TUhGYPQP/aq1zmAWqo=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33/go.mod h1:7i0PF1ME/2eUPFcjkVIwq+DOygHEoK92t5cDqNgYbIw=
-github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.12 h1:p/9flfXdoAnwJnuW9xHEAFY22R3A6skYkW19JFF9F+8=
-github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.12/go.mod h1:ZTLHakoVCTtW8AaLGSwJ3LXqHD9uQKnOcv1TrpO6u2k=
+github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13 h1:a+8/MLcWlIxo1lF9xaGt3J/u3yOZx+CdSveSNwjhD40=
+github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13/go.mod h1:oGnKwIYZ4XttyU2JWxFrwvhF6YKiK/9/wmE3v3Iu9K8=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27/go.mod h1:UrHnn3QV/d0pBZ6QBAEQcqFLf8FAzLmoUfPVIueOvoM=
-github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.12 h1:2lTWFvRcnWFFLzHWmtddu5MTchc5Oj2OOey++99tPZ0=
-github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.12/go.mod h1:hI92pK+ho8HVcWMHKHrK3Uml4pfG7wvL86FzO0LVtQQ=
+github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13 h1:HBSI2kDkMdWz4ZM7FjwE7e/pWDEZ+nR95x8Ztet1ooY=
+github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13/go.mod h1:YE94ZoDArI7awZqJzBAZ3PDD2zSfuP7w6P2knOzIn8M=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.34/go.mod h1:Etz2dj6UHYuw+Xw830KfzCfWGMzqvUTCjUj5b76GVDc=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 h1:WKuaxf++XKWlHWu9ECbMlha8WOEGm0OUEZqm4K/Gcfk=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4/go.mod h1:ZWy7j6v1vWGmPReu0iSGvRiise4YI5SkR3OHKTZ6Wuc=
@@ -832,6 +832,8 @@ github.com/aws/aws-sdk-go-v2/service/athena v1.55.9 h1:w50cPLPIyWSzh4bqgA/h0nzRw
github.com/aws/aws-sdk-go-v2/service/athena v1.55.9/go.mod h1:jTVF/+wNGjLD94jaJxDqhWexDeH7r4zZkQ7bbboAf1I=
github.com/aws/aws-sdk-go-v2/service/bedrockruntime v1.42.1 h1:F/ZU3z+tNCIDhUD8wFEalX1GMdtU0SQlIXXi/hPFFpE=
github.com/aws/aws-sdk-go-v2/service/bedrockruntime v1.42.1/go.mod h1:PfutSAwCVczCH5sBPjuPc1pkjaSokL4DsJNlrLC3kww=
+github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.58.7 h1:Yj4NvoEEdSxA90x/uCBskzeF3OxZr72Yaf64n0fIVR4=
+github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.58.7/go.mod h1:9/Q0/HtqBTLMksFse42wZjUq0jJrUuo4XlnXy/uSoeg=
github.com/aws/aws-sdk-go-v2/service/dax v1.29.4 h1:IBtJf1olfwspLm3d20sDFstQTlHgL+dA2NtdXQMDius=
github.com/aws/aws-sdk-go-v2/service/dax v1.29.4/go.mod h1:XciMFO7HCGFzOlvNl42HsWNakjQtt/JHzW26VzlahGI=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.52.3 h1:28+obyib2FhFKASJ6qSPbuteiy0nvvcvfItdAAYure0=
diff --git a/integrations/event-handler/go.mod b/integrations/event-handler/go.mod
index 39e955d8ea923..a43384c55485d 100644
--- a/integrations/event-handler/go.mod
+++ b/integrations/event-handler/go.mod
@@ -71,14 +71,14 @@ require (
github.com/alecthomas/units v0.0.0-20240927000941-0f3dac36c52b // indirect
github.com/armon/go-radix v1.0.0 // indirect
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
- github.com/aws/aws-sdk-go-v2 v1.39.5 // indirect
- github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2 // indirect
+ github.com/aws/aws-sdk-go-v2 v1.39.6 // indirect
+ github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3 // indirect
github.com/aws/aws-sdk-go-v2/config v1.31.16 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.18.20 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.12 // indirect
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.2 // indirect
- github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.12 // indirect
- github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.12 // indirect
+ github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13 // indirect
+ github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.12 // indirect
github.com/aws/aws-sdk-go-v2/service/athena v1.55.9 // indirect
diff --git a/integrations/event-handler/go.sum b/integrations/event-handler/go.sum
index 8634c4c88bfa6..041b107eb3246 100644
--- a/integrations/event-handler/go.sum
+++ b/integrations/event-handler/go.sum
@@ -760,10 +760,10 @@ github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 h1:DklsrG3d
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw=
github.com/aws/aws-sdk-go v1.55.8 h1:JRmEUbU52aJQZ2AjX4q4Wu7t4uZjOu71uyNmaWlUkJQ=
github.com/aws/aws-sdk-go v1.55.8/go.mod h1:ZkViS9AqA6otK+JBBNH2++sx1sgxrPKcSzPPvQkUtXk=
-github.com/aws/aws-sdk-go-v2 v1.39.5 h1:e/SXuia3rkFtapghJROrydtQpfQaaUgd1cUvyO1mp2w=
-github.com/aws/aws-sdk-go-v2 v1.39.5/go.mod h1:yWSxrnioGUZ4WVv9TgMrNUeLV3PFESn/v+6T/Su8gnM=
-github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2 h1:t9yYsydLYNBk9cJ73rgPhPWqOh/52fcWDQB5b1JsKSY=
-github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2/go.mod h1:IusfVNTmiSN3t4rhxWFaBAqn+mcNdwKtPcV16eYdgko=
+github.com/aws/aws-sdk-go-v2 v1.39.6 h1:2JrPCVgWJm7bm83BDwY5z8ietmeJUbh3O2ACnn+Xsqk=
+github.com/aws/aws-sdk-go-v2 v1.39.6/go.mod h1:c9pm7VwuW0UPxAEYGyTmyurVcNrbF6Rt/wixFqDhcjE=
+github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3 h1:DHctwEM8P8iTXFxC/QK0MRjwEpWQeM9yzidCRjldUz0=
+github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3/go.mod h1:xdCzcZEtnSTKVDOmUZs4l/j3pSV6rpo1WXl5ugNsL8Y=
github.com/aws/aws-sdk-go-v2/config v1.31.16 h1:E4Tz+tJiPc7kGnXwIfCyUj6xHJNpENlY11oKpRTgsjc=
github.com/aws/aws-sdk-go-v2/config v1.31.16/go.mod h1:2S9hBElpCyGMifv14WxQ7EfPumgoeCPZUpuPX8VtW34=
github.com/aws/aws-sdk-go-v2/credentials v1.18.20 h1:KFndAnHd9NUuzikHjQ8D5CfFVO+bgELkmcGY8yAw98Q=
@@ -772,10 +772,10 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.12 h1:VO3FIM2TDbm0kqp6sFNR0P
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.12/go.mod h1:6C39gB8kg82tx3r72muZSrNhHia9rjGkX7ORaS2GKNE=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.2 h1:9/HxDeIgA7DcKK6e6ZaP5PQiXugYbNERx3Z5u30mN+k=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.2/go.mod h1:3N1RoxKNcVHmbOKVMMw8pvMs5TUhGYPQP/aq1zmAWqo=
-github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.12 h1:p/9flfXdoAnwJnuW9xHEAFY22R3A6skYkW19JFF9F+8=
-github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.12/go.mod h1:ZTLHakoVCTtW8AaLGSwJ3LXqHD9uQKnOcv1TrpO6u2k=
-github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.12 h1:2lTWFvRcnWFFLzHWmtddu5MTchc5Oj2OOey++99tPZ0=
-github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.12/go.mod h1:hI92pK+ho8HVcWMHKHrK3Uml4pfG7wvL86FzO0LVtQQ=
+github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13 h1:a+8/MLcWlIxo1lF9xaGt3J/u3yOZx+CdSveSNwjhD40=
+github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13/go.mod h1:oGnKwIYZ4XttyU2JWxFrwvhF6YKiK/9/wmE3v3Iu9K8=
+github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13 h1:HBSI2kDkMdWz4ZM7FjwE7e/pWDEZ+nR95x8Ztet1ooY=
+github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13/go.mod h1:YE94ZoDArI7awZqJzBAZ3PDD2zSfuP7w6P2knOzIn8M=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 h1:WKuaxf++XKWlHWu9ECbMlha8WOEGm0OUEZqm4K/Gcfk=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4/go.mod h1:ZWy7j6v1vWGmPReu0iSGvRiise4YI5SkR3OHKTZ6Wuc=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.12 h1:itu4KHu8JK/N6NcLIISlf3LL1LccMqruLUXZ9y7yBZw=
diff --git a/integrations/terraform-mwi/go.mod b/integrations/terraform-mwi/go.mod
index 4ebcad8fde40d..da24a1394eb2b 100644
--- a/integrations/terraform-mwi/go.mod
+++ b/integrations/terraform-mwi/go.mod
@@ -84,8 +84,8 @@ require (
github.com/armon/go-radix v1.0.0 // indirect
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
github.com/aws/aws-sdk-go v1.55.8 // indirect
- github.com/aws/aws-sdk-go-v2 v1.39.5 // indirect
- github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2 // indirect
+ github.com/aws/aws-sdk-go-v2 v1.39.6 // indirect
+ github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3 // indirect
github.com/aws/aws-sdk-go-v2/config v1.31.16 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.18.20 // indirect
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.20.20 // indirect
@@ -93,13 +93,14 @@ require (
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.12 // indirect
github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.6.12 // indirect
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.2 // indirect
- github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.12 // indirect
- github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.12 // indirect
+ github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13 // indirect
+ github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.12 // indirect
github.com/aws/aws-sdk-go-v2/service/applicationautoscaling v1.41.1 // indirect
github.com/aws/aws-sdk-go-v2/service/athena v1.55.9 // indirect
github.com/aws/aws-sdk-go-v2/service/bedrockruntime v1.42.1 // indirect
+ github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.58.7 // indirect
github.com/aws/aws-sdk-go-v2/service/dax v1.29.4 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.52.3 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.32.1 // indirect
diff --git a/integrations/terraform-mwi/go.sum b/integrations/terraform-mwi/go.sum
index a60d1b48c9f64..95b7380071fd3 100644
--- a/integrations/terraform-mwi/go.sum
+++ b/integrations/terraform-mwi/go.sum
@@ -796,10 +796,10 @@ github.com/aws/aws-sdk-go v1.49.12/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3Tj
github.com/aws/aws-sdk-go v1.55.8 h1:JRmEUbU52aJQZ2AjX4q4Wu7t4uZjOu71uyNmaWlUkJQ=
github.com/aws/aws-sdk-go v1.55.8/go.mod h1:ZkViS9AqA6otK+JBBNH2++sx1sgxrPKcSzPPvQkUtXk=
github.com/aws/aws-sdk-go-v2 v1.18.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
-github.com/aws/aws-sdk-go-v2 v1.39.5 h1:e/SXuia3rkFtapghJROrydtQpfQaaUgd1cUvyO1mp2w=
-github.com/aws/aws-sdk-go-v2 v1.39.5/go.mod h1:yWSxrnioGUZ4WVv9TgMrNUeLV3PFESn/v+6T/Su8gnM=
-github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2 h1:t9yYsydLYNBk9cJ73rgPhPWqOh/52fcWDQB5b1JsKSY=
-github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2/go.mod h1:IusfVNTmiSN3t4rhxWFaBAqn+mcNdwKtPcV16eYdgko=
+github.com/aws/aws-sdk-go-v2 v1.39.6 h1:2JrPCVgWJm7bm83BDwY5z8ietmeJUbh3O2ACnn+Xsqk=
+github.com/aws/aws-sdk-go-v2 v1.39.6/go.mod h1:c9pm7VwuW0UPxAEYGyTmyurVcNrbF6Rt/wixFqDhcjE=
+github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3 h1:DHctwEM8P8iTXFxC/QK0MRjwEpWQeM9yzidCRjldUz0=
+github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3/go.mod h1:xdCzcZEtnSTKVDOmUZs4l/j3pSV6rpo1WXl5ugNsL8Y=
github.com/aws/aws-sdk-go-v2/config v1.18.25/go.mod h1:dZnYpD5wTW/dQF0rRNLVypB396zWCcPiBIvdvSWHEg4=
github.com/aws/aws-sdk-go-v2/config v1.31.16 h1:E4Tz+tJiPc7kGnXwIfCyUj6xHJNpENlY11oKpRTgsjc=
github.com/aws/aws-sdk-go-v2/config v1.31.16/go.mod h1:2S9hBElpCyGMifv14WxQ7EfPumgoeCPZUpuPX8VtW34=
@@ -818,11 +818,11 @@ github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.6.12/go.mod h1:T/o6k3LG7Ew45+Jz
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.2 h1:9/HxDeIgA7DcKK6e6ZaP5PQiXugYbNERx3Z5u30mN+k=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.2/go.mod h1:3N1RoxKNcVHmbOKVMMw8pvMs5TUhGYPQP/aq1zmAWqo=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33/go.mod h1:7i0PF1ME/2eUPFcjkVIwq+DOygHEoK92t5cDqNgYbIw=
-github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.12 h1:p/9flfXdoAnwJnuW9xHEAFY22R3A6skYkW19JFF9F+8=
-github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.12/go.mod h1:ZTLHakoVCTtW8AaLGSwJ3LXqHD9uQKnOcv1TrpO6u2k=
+github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13 h1:a+8/MLcWlIxo1lF9xaGt3J/u3yOZx+CdSveSNwjhD40=
+github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13/go.mod h1:oGnKwIYZ4XttyU2JWxFrwvhF6YKiK/9/wmE3v3Iu9K8=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27/go.mod h1:UrHnn3QV/d0pBZ6QBAEQcqFLf8FAzLmoUfPVIueOvoM=
-github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.12 h1:2lTWFvRcnWFFLzHWmtddu5MTchc5Oj2OOey++99tPZ0=
-github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.12/go.mod h1:hI92pK+ho8HVcWMHKHrK3Uml4pfG7wvL86FzO0LVtQQ=
+github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13 h1:HBSI2kDkMdWz4ZM7FjwE7e/pWDEZ+nR95x8Ztet1ooY=
+github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13/go.mod h1:YE94ZoDArI7awZqJzBAZ3PDD2zSfuP7w6P2knOzIn8M=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.34/go.mod h1:Etz2dj6UHYuw+Xw830KfzCfWGMzqvUTCjUj5b76GVDc=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 h1:WKuaxf++XKWlHWu9ECbMlha8WOEGm0OUEZqm4K/Gcfk=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4/go.mod h1:ZWy7j6v1vWGmPReu0iSGvRiise4YI5SkR3OHKTZ6Wuc=
@@ -834,6 +834,8 @@ github.com/aws/aws-sdk-go-v2/service/athena v1.55.9 h1:w50cPLPIyWSzh4bqgA/h0nzRw
github.com/aws/aws-sdk-go-v2/service/athena v1.55.9/go.mod h1:jTVF/+wNGjLD94jaJxDqhWexDeH7r4zZkQ7bbboAf1I=
github.com/aws/aws-sdk-go-v2/service/bedrockruntime v1.42.1 h1:F/ZU3z+tNCIDhUD8wFEalX1GMdtU0SQlIXXi/hPFFpE=
github.com/aws/aws-sdk-go-v2/service/bedrockruntime v1.42.1/go.mod h1:PfutSAwCVczCH5sBPjuPc1pkjaSokL4DsJNlrLC3kww=
+github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.58.7 h1:Yj4NvoEEdSxA90x/uCBskzeF3OxZr72Yaf64n0fIVR4=
+github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.58.7/go.mod h1:9/Q0/HtqBTLMksFse42wZjUq0jJrUuo4XlnXy/uSoeg=
github.com/aws/aws-sdk-go-v2/service/dax v1.29.4 h1:IBtJf1olfwspLm3d20sDFstQTlHgL+dA2NtdXQMDius=
github.com/aws/aws-sdk-go-v2/service/dax v1.29.4/go.mod h1:XciMFO7HCGFzOlvNl42HsWNakjQtt/JHzW26VzlahGI=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.52.3 h1:28+obyib2FhFKASJ6qSPbuteiy0nvvcvfItdAAYure0=
diff --git a/integrations/terraform/go.mod b/integrations/terraform/go.mod
index 92e8daf896431..12f267c24ff96 100644
--- a/integrations/terraform/go.mod
+++ b/integrations/terraform/go.mod
@@ -88,8 +88,8 @@ require (
github.com/armon/go-radix v1.0.0 // indirect
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
github.com/aws/aws-sdk-go v1.55.8 // indirect
- github.com/aws/aws-sdk-go-v2 v1.39.5 // indirect
- github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2 // indirect
+ github.com/aws/aws-sdk-go-v2 v1.39.6 // indirect
+ github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3 // indirect
github.com/aws/aws-sdk-go-v2/config v1.31.16 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.18.20 // indirect
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.20.20 // indirect
@@ -97,13 +97,14 @@ require (
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.12 // indirect
github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.6.12 // indirect
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.2 // indirect
- github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.12 // indirect
- github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.12 // indirect
+ github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13 // indirect
+ github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.12 // indirect
github.com/aws/aws-sdk-go-v2/service/applicationautoscaling v1.41.1 // indirect
github.com/aws/aws-sdk-go-v2/service/athena v1.55.9 // indirect
github.com/aws/aws-sdk-go-v2/service/bedrockruntime v1.42.1 // indirect
+ github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.58.7 // indirect
github.com/aws/aws-sdk-go-v2/service/dax v1.29.4 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.52.3 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.32.1 // indirect
diff --git a/integrations/terraform/go.sum b/integrations/terraform/go.sum
index b0d666a3f86fe..b760f24919b37 100644
--- a/integrations/terraform/go.sum
+++ b/integrations/terraform/go.sum
@@ -816,10 +816,10 @@ github.com/aws/aws-sdk-go v1.49.12/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3Tj
github.com/aws/aws-sdk-go v1.55.8 h1:JRmEUbU52aJQZ2AjX4q4Wu7t4uZjOu71uyNmaWlUkJQ=
github.com/aws/aws-sdk-go v1.55.8/go.mod h1:ZkViS9AqA6otK+JBBNH2++sx1sgxrPKcSzPPvQkUtXk=
github.com/aws/aws-sdk-go-v2 v1.18.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
-github.com/aws/aws-sdk-go-v2 v1.39.5 h1:e/SXuia3rkFtapghJROrydtQpfQaaUgd1cUvyO1mp2w=
-github.com/aws/aws-sdk-go-v2 v1.39.5/go.mod h1:yWSxrnioGUZ4WVv9TgMrNUeLV3PFESn/v+6T/Su8gnM=
-github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2 h1:t9yYsydLYNBk9cJ73rgPhPWqOh/52fcWDQB5b1JsKSY=
-github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2/go.mod h1:IusfVNTmiSN3t4rhxWFaBAqn+mcNdwKtPcV16eYdgko=
+github.com/aws/aws-sdk-go-v2 v1.39.6 h1:2JrPCVgWJm7bm83BDwY5z8ietmeJUbh3O2ACnn+Xsqk=
+github.com/aws/aws-sdk-go-v2 v1.39.6/go.mod h1:c9pm7VwuW0UPxAEYGyTmyurVcNrbF6Rt/wixFqDhcjE=
+github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3 h1:DHctwEM8P8iTXFxC/QK0MRjwEpWQeM9yzidCRjldUz0=
+github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3/go.mod h1:xdCzcZEtnSTKVDOmUZs4l/j3pSV6rpo1WXl5ugNsL8Y=
github.com/aws/aws-sdk-go-v2/config v1.18.25/go.mod h1:dZnYpD5wTW/dQF0rRNLVypB396zWCcPiBIvdvSWHEg4=
github.com/aws/aws-sdk-go-v2/config v1.31.16 h1:E4Tz+tJiPc7kGnXwIfCyUj6xHJNpENlY11oKpRTgsjc=
github.com/aws/aws-sdk-go-v2/config v1.31.16/go.mod h1:2S9hBElpCyGMifv14WxQ7EfPumgoeCPZUpuPX8VtW34=
@@ -838,11 +838,11 @@ github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.6.12/go.mod h1:T/o6k3LG7Ew45+Jz
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.2 h1:9/HxDeIgA7DcKK6e6ZaP5PQiXugYbNERx3Z5u30mN+k=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.2/go.mod h1:3N1RoxKNcVHmbOKVMMw8pvMs5TUhGYPQP/aq1zmAWqo=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33/go.mod h1:7i0PF1ME/2eUPFcjkVIwq+DOygHEoK92t5cDqNgYbIw=
-github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.12 h1:p/9flfXdoAnwJnuW9xHEAFY22R3A6skYkW19JFF9F+8=
-github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.12/go.mod h1:ZTLHakoVCTtW8AaLGSwJ3LXqHD9uQKnOcv1TrpO6u2k=
+github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13 h1:a+8/MLcWlIxo1lF9xaGt3J/u3yOZx+CdSveSNwjhD40=
+github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13/go.mod h1:oGnKwIYZ4XttyU2JWxFrwvhF6YKiK/9/wmE3v3Iu9K8=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27/go.mod h1:UrHnn3QV/d0pBZ6QBAEQcqFLf8FAzLmoUfPVIueOvoM=
-github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.12 h1:2lTWFvRcnWFFLzHWmtddu5MTchc5Oj2OOey++99tPZ0=
-github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.12/go.mod h1:hI92pK+ho8HVcWMHKHrK3Uml4pfG7wvL86FzO0LVtQQ=
+github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13 h1:HBSI2kDkMdWz4ZM7FjwE7e/pWDEZ+nR95x8Ztet1ooY=
+github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13/go.mod h1:YE94ZoDArI7awZqJzBAZ3PDD2zSfuP7w6P2knOzIn8M=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.34/go.mod h1:Etz2dj6UHYuw+Xw830KfzCfWGMzqvUTCjUj5b76GVDc=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 h1:WKuaxf++XKWlHWu9ECbMlha8WOEGm0OUEZqm4K/Gcfk=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4/go.mod h1:ZWy7j6v1vWGmPReu0iSGvRiise4YI5SkR3OHKTZ6Wuc=
@@ -854,6 +854,8 @@ github.com/aws/aws-sdk-go-v2/service/athena v1.55.9 h1:w50cPLPIyWSzh4bqgA/h0nzRw
github.com/aws/aws-sdk-go-v2/service/athena v1.55.9/go.mod h1:jTVF/+wNGjLD94jaJxDqhWexDeH7r4zZkQ7bbboAf1I=
github.com/aws/aws-sdk-go-v2/service/bedrockruntime v1.42.1 h1:F/ZU3z+tNCIDhUD8wFEalX1GMdtU0SQlIXXi/hPFFpE=
github.com/aws/aws-sdk-go-v2/service/bedrockruntime v1.42.1/go.mod h1:PfutSAwCVczCH5sBPjuPc1pkjaSokL4DsJNlrLC3kww=
+github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.58.7 h1:Yj4NvoEEdSxA90x/uCBskzeF3OxZr72Yaf64n0fIVR4=
+github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.58.7/go.mod h1:9/Q0/HtqBTLMksFse42wZjUq0jJrUuo4XlnXy/uSoeg=
github.com/aws/aws-sdk-go-v2/service/dax v1.29.4 h1:IBtJf1olfwspLm3d20sDFstQTlHgL+dA2NtdXQMDius=
github.com/aws/aws-sdk-go-v2/service/dax v1.29.4/go.mod h1:XciMFO7HCGFzOlvNl42HsWNakjQtt/JHzW26VzlahGI=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.52.3 h1:28+obyib2FhFKASJ6qSPbuteiy0nvvcvfItdAAYure0=
diff --git a/lib/config/configuration.go b/lib/config/configuration.go
index ab5d7c4d48285..742f7144230aa 100644
--- a/lib/config/configuration.go
+++ b/lib/config/configuration.go
@@ -1709,11 +1709,18 @@ kubernetes matchers are present`)
Region: awsMatcher.CloudTrailLogs.QueueRegion,
}
}
+ var eksAuditLogs *types.AccessGraphAWSSyncEKSAuditLogs
+ if awsMatcher.EKSAuditLogs != nil {
+ eksAuditLogs = &types.AccessGraphAWSSyncEKSAuditLogs{
+ Tags: awsMatcher.EKSAuditLogs.Tags,
+ }
+ }
tMatcher.AWS = append(tMatcher.AWS, &types.AccessGraphAWSSync{
Regions: regions,
AssumeRole: assumeRole,
CloudTrailLogs: cloudTrailLogs,
+ EksAuditLogs: eksAuditLogs,
})
}
for _, azureMatcher := range fc.Discovery.AccessGraph.Azure {
diff --git a/lib/config/fileconf.go b/lib/config/fileconf.go
index 0b928306817b1..4ba0330bc4ed3 100644
--- a/lib/config/fileconf.go
+++ b/lib/config/fileconf.go
@@ -1640,6 +1640,17 @@ type AccessGraphAWSSync struct {
// CloudTrailLogs is the configuration for the SQS queue to poll for
// CloudTrail logs.
CloudTrailLogs *AccessGraphAWSSyncCloudTrailLogs `yaml:"cloud_trail_logs,omitempty"`
+ // EKSAuditLogs is the configuration for fetching audit logs for EKS
+ // clusters discovered.
+ EKSAuditLogs *AccessGraphEKSAuditLogs `yaml:"eks_audit_logs,omitempty"`
+}
+
+// AccessGraphEKSAuditLogs is the configuration for fetching audit logs from
+// clusters discovered for access graph.
+type AccessGraphEKSAuditLogs struct {
+ // Tags are AWS EKS tags to match. Clusters that have tags that match these
+ // will have their audit logs fetched and sent to Access Graph.
+ Tags map[string]apiutils.Strings `yaml:"tags,omitempty"`
}
// AccessGraphAzureSync represents the configuration for the Azure AccessGraph Sync service.
diff --git a/lib/srv/discovery/access_graph_aws.go b/lib/srv/discovery/access_graph_aws.go
index 052df6aaf96a9..f1322c1370a47 100644
--- a/lib/srv/discovery/access_graph_aws.go
+++ b/lib/srv/discovery/access_graph_aws.go
@@ -73,10 +73,17 @@ const (
// errNoAccessGraphFetchers is returned when there are no TAG fetchers.
var errNoAccessGraphFetchers = errors.New("no Access Graph fetchers")
-func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources *aws_sync.Resources, stream accessgraphv1alpha.AccessGraphService_AWSEventsStreamClient, features aws_sync.Features) error {
+func (s *Server) reconcileAccessGraph(
+ ctx context.Context,
+ currentTAGResources *aws_sync.Resources,
+ stream accessgraphv1alpha.AccessGraphService_AWSEventsStreamClient,
+ features aws_sync.Features,
+ eksAuditLogWatcher *eksAuditLogWatcher,
+) error {
type fetcherResult struct {
- result *aws_sync.Resources
- err error
+ fetcher *aws_sync.Fetcher
+ result *aws_sync.Resources
+ err error
}
allFetchers := s.getAllAWSSyncFetchers()
@@ -88,6 +95,8 @@ func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources *
if err := push(stream, upsert, toDel); err != nil {
s.Log.ErrorContext(ctx, "Error pushing empty resources to TAGs", "error", err)
}
+ // No clusters to fetch eks audit logs for.
+ eksAuditLogWatcher.Reconcile(ctx, nil)
return trace.Wrap(errNoAccessGraphFetchers)
}
@@ -110,21 +119,39 @@ func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources *
<-tokens
}()
result, err := fetcher.Poll(ctx, features)
- resultsC <- fetcherResult{result, trace.Wrap(err)}
+ resultsC <- fetcherResult{fetcher, result, trace.Wrap(err)}
}()
}
results := make([]*aws_sync.Resources, 0, len(allFetchers))
+ auditLogClusters := make([]eksAuditLogCluster, 0, len(allFetchers))
errs := make([]error, 0, len(allFetchers))
// Collect the results from all fetchers.
// Each fetcher can return an error and a result.
for range allFetchers {
fetcherResult := <-resultsC
- if fetcherResult.err != nil {
- errs = append(errs, fetcherResult.err)
+ fetcher, result, err := fetcherResult.fetcher, fetcherResult.result, fetcherResult.err
+ if err != nil {
+ errs = append(errs, err)
}
- if fetcherResult.result != nil {
- results = append(results, fetcherResult.result)
+ if result == nil {
+ continue
+ }
+ results = append(results, result)
+ // If the fetcher is configured for EKS audit logs, see if any
+ // EKS clusters match the configured tags.
+ if fetcher.EKSAuditLogs == nil {
+ continue
+ }
+ for _, cluster := range result.EKSClusters {
+ clusterTags := make(map[string]string, len(cluster.Tags))
+ for _, tag := range cluster.Tags {
+ clusterTags[tag.GetKey()] = tag.GetValue().GetValue()
+ }
+ match, _, _ := services.MatchLabels(fetcher.EKSAuditLogs.Tags, clusterTags)
+ if match {
+ auditLogClusters = append(auditLogClusters, eksAuditLogCluster{fetcher, cluster})
+ }
}
}
// Aggregate all errors into a single error.
@@ -137,6 +164,10 @@ func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources *
upsert, toDel := aws_sync.ReconcileResults(currentTAGResources, result)
pushErr := push(stream, upsert, toDel)
+ // Send the updated list of clusters requiring audit logs to the fetcher.
+ // The fetcher reconciles this list against the last set sent.
+ eksAuditLogWatcher.Reconcile(ctx, auditLogClusters)
+
for _, fetcher := range allFetchers {
s.tagSyncStatus.syncFinished(fetcher, pushErr, s.clock.Now())
}
@@ -425,11 +456,16 @@ func (s *Server) initializeAndWatchAccessGraph(ctx context.Context, reloadCh <-c
}
s.Log.InfoContext(ctx, "Access graph service poll interval", "poll_interval", tickerInterval)
+ // Start the EKS audit log watcher that keeps track of the EKS audit log
+ // fetchers and updates them when Reconcile is called.
+ eksAuditLogWatcher := newEKSAuditLogWatcher(client, s.Log)
+ go eksAuditLogWatcher.Run(ctx)
+
currentTAGResources := &aws_sync.Resources{}
timer := time.NewTimer(tickerInterval)
defer timer.Stop()
for {
- err := s.reconcileAccessGraph(ctx, currentTAGResources, stream, features)
+ err := s.reconcileAccessGraph(ctx, currentTAGResources, stream, features, eksAuditLogWatcher)
if errors.Is(err, errNoAccessGraphFetchers) {
// no fetchers, no need to continue.
// we will wait for the config to change and re-evaluate the fetchers
@@ -508,7 +544,8 @@ func (s *Server) initTAGAWSWatchers(ctx context.Context, cfg *Config) error {
continue
}
// reset the currentTAGResources to force a full sync
- if err := s.initializeAndWatchAccessGraph(ctx, reloadCh); errors.Is(err, errTAGFeatureNotEnabled) {
+ err := s.initializeAndWatchAccessGraph(ctx, reloadCh)
+ if errors.Is(err, errTAGFeatureNotEnabled) {
s.Log.WarnContext(ctx, "Access Graph specified in config, but the license does not include Teleport Identity Security. Access graph sync will not be enabled.")
break
} else if err != nil {
@@ -576,6 +613,10 @@ func (s *Server) accessGraphAWSFetchersFromMatchers(ctx context.Context, matcher
ExternalID: awsFetcher.AssumeRole.ExternalID,
}
}
+ var eksAuditLogs *aws_sync.EKSAuditLogs
+ if awsFetcher.EksAuditLogs != nil {
+ eksAuditLogs = &aws_sync.EKSAuditLogs{Tags: awsFetcher.EksAuditLogs.Tags}
+ }
fetcher, err := aws_sync.NewFetcher(
ctx,
aws_sync.Config{
@@ -587,6 +628,7 @@ func (s *Server) accessGraphAWSFetchersFromMatchers(ctx context.Context, matcher
Integration: awsFetcher.Integration,
DiscoveryConfigName: discoveryConfigName,
Log: s.Log,
+ EKSAuditLogs: eksAuditLogs,
},
)
if err != nil {
@@ -848,7 +890,7 @@ func (s *Server) receiveTAGResumeFromStream(ctx context.Context, stream accessgr
return nil
}
-func consumeTillErr(stream accessgraphv1alpha.AccessGraphService_AWSCloudTrailStreamClient) error {
+func consumeTillErr[Req any, Res any](stream grpc.BidiStreamingClient[Req, Res]) error {
for {
_, err := stream.Recv()
if err != nil {
diff --git a/lib/srv/discovery/eks_audit_log_fetcher.go b/lib/srv/discovery/eks_audit_log_fetcher.go
new file mode 100644
index 0000000000000..83a8f778d6a61
--- /dev/null
+++ b/lib/srv/discovery/eks_audit_log_fetcher.go
@@ -0,0 +1,214 @@
+/*
+ * Teleport
+ * Copyright (C) 2025 Gravitational, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package discovery
+
+import (
+ "context"
+ "errors"
+ "log/slog"
+ "time"
+
+ cwltypes "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
+ "github.com/gravitational/trace"
+ "google.golang.org/protobuf/encoding/protojson"
+ "google.golang.org/protobuf/types/known/structpb"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ accessgraphv1alpha "github.com/gravitational/teleport/gen/proto/go/accessgraph/v1alpha"
+)
+
+// cloudwatchlogFetcher fetches cloudwatch logs for a given cluster, starting
+// at the given cursur position. This interface exists so tests can plug in a
+// fake fetcher and not need to stub out deeper AWS interfaces.
+type cloudwatchlogFetcher interface {
+ FetchEKSAuditLogs(
+ ctx context.Context,
+ cluster *accessgraphv1alpha.AWSEKSClusterV1,
+ cursor *accessgraphv1alpha.KubeAuditLogCursor,
+ ) ([]cwltypes.FilteredLogEvent, error)
+}
+
+// eksAuditLogFetcher is a fetcher for EKS audit logs for a single cluster,
+// fetching the logs from AWS Cloud Watch Logs. It uses the grpc stream
+// to initiate the stream and possibly receive a resume state used to
+// synchronize the start point with a previous run fetching the logs.
+type eksAuditLogFetcher struct {
+ fetcher cloudwatchlogFetcher
+ cluster *accessgraphv1alpha.AWSEKSClusterV1
+ stream accessgraphv1alpha.AccessGraphService_KubeAuditLogStreamClient
+ log *slog.Logger
+}
+
+func newEKSAuditLogFetcher(
+ fetcher cloudwatchlogFetcher,
+ cluster *accessgraphv1alpha.AWSEKSClusterV1,
+ stream accessgraphv1alpha.AccessGraphService_KubeAuditLogStreamClient,
+ log *slog.Logger,
+) *eksAuditLogFetcher {
+ return &eksAuditLogFetcher{
+ fetcher: fetcher,
+ cluster: cluster,
+ stream: stream,
+ log: log,
+ }
+}
+
+// Run continuously polls AWS Cloud Watch Logs for Kubernetes apiserver
+// audit logs for the configured cluster. It feeds the logs retrieved to the
+// configured grpc stream, running until the given context is canceled.
+func (f *eksAuditLogFetcher) Run(ctx context.Context) error {
+ f.log = f.log.With("cluster", f.cluster.Arn)
+
+ cursor := initialCursor(f.cluster)
+ if err := f.sendTAGKubeAuditLogNewStream(ctx, cursor); err != nil {
+ return trace.Wrap(err)
+ }
+
+ cursor, err := f.receiveTAGKubeAuditLogResume(ctx)
+ if err != nil {
+ return trace.Wrap(err)
+ }
+
+ for ctx.Err() == nil {
+ var events []*structpb.Struct
+ events, cursor = f.fetchLogs(ctx, cursor)
+
+ if len(events) == 0 {
+ select {
+ case <-ctx.Done():
+ case <-time.After(logPollInterval):
+ }
+ continue
+ }
+
+ if err := f.sendTAGKubeAuditLogEvents(ctx, events, cursor); err != nil {
+ return trace.Wrap(err)
+ }
+
+ f.log.DebugContext(ctx, "Sent KubeAuditLogEvents", "count", len(events),
+ "cursor_time", cursor.GetLastEventTime().AsTime())
+ }
+ return trace.Wrap(ctx.Err())
+}
+
+// fetchLogs fetches a batch of logs from AWS Cloud Watch Logs after the given
+// cursor position and unmarshals them into the protobuf Struct well-known
+// type.
+//
+// It returns the fetched log entries and a new cursor for the next call. If an
+// error occurs, it is logged, and the function returns nil logs and the
+// original input cursor. This allows the caller to retry the operation.
+func (f *eksAuditLogFetcher) fetchLogs(ctx context.Context, cursor *accessgraphv1alpha.KubeAuditLogCursor) ([]*structpb.Struct, *accessgraphv1alpha.KubeAuditLogCursor) {
+ awsEvents, err := f.fetcher.FetchEKSAuditLogs(ctx, f.cluster, cursor)
+ if err != nil {
+ if !errors.Is(err, context.Canceled) {
+ f.log.ErrorContext(ctx, "Failed to fetch EKS audit logs", "error", err)
+ }
+ return nil, cursor
+ }
+
+ if len(awsEvents) == 0 {
+ return nil, cursor
+ }
+
+ events := []*structpb.Struct{}
+ var awsEvent cwltypes.FilteredLogEvent
+ for _, awsEvent = range awsEvents {
+ // TODO(camscale): Track event sizes and don't go over protobuf message
+ // limit. newAccessGraphClient() sets the limit to 50MB
+ event := &structpb.Struct{}
+ m := protojson.UnmarshalOptions{}
+ err = m.Unmarshal([]byte(*awsEvent.Message), event)
+ if err != nil {
+ f.log.ErrorContext(ctx, "failed to protojson.Unmarshal", "error", err)
+ continue
+ }
+ events = append(events, event)
+ }
+ cursor = cursorFromEvent(f.cluster, awsEvent)
+
+ return events, cursor
+}
+
+func (f *eksAuditLogFetcher) sendTAGKubeAuditLogNewStream(ctx context.Context, cursor *accessgraphv1alpha.KubeAuditLogCursor) error {
+ err := f.stream.Send(
+ &accessgraphv1alpha.KubeAuditLogStreamRequest{
+ Action: &accessgraphv1alpha.KubeAuditLogStreamRequest_NewStream{
+ NewStream: &accessgraphv1alpha.KubeAuditLogNewStream{Initial: cursor},
+ },
+ },
+ )
+ if err != nil {
+ err = consumeTillErr(f.stream)
+ f.log.ErrorContext(ctx, "Failed to send accessgraph.KubeAuditLogNewStream", "error", err)
+ return trace.Wrap(err)
+ }
+ return nil
+}
+
+func (f *eksAuditLogFetcher) receiveTAGKubeAuditLogResume(ctx context.Context) (*accessgraphv1alpha.KubeAuditLogCursor, error) {
+ msg, err := f.stream.Recv()
+ if err != nil {
+ return nil, trace.Wrap(err, "failed to receive KubeAuditLogStream resume state")
+ }
+
+ state := msg.GetResumeState()
+ if state == nil {
+ return nil, trace.BadParameter("AccessGraphService.KubeAuditLogStream did not return KubeAuditLogResumeState message")
+ }
+
+ f.log.InfoContext(ctx, "KubeAuditLogResumeState received", "state", state)
+ return state.Cursor, nil
+}
+
+func (f *eksAuditLogFetcher) sendTAGKubeAuditLogEvents(ctx context.Context, events []*structpb.Struct, cursor *accessgraphv1alpha.KubeAuditLogCursor) error {
+ err := f.stream.Send(
+ &accessgraphv1alpha.KubeAuditLogStreamRequest{
+ Action: &accessgraphv1alpha.KubeAuditLogStreamRequest_Events{
+ Events: &accessgraphv1alpha.KubeAuditLogEvents{Events: events, Cursor: cursor},
+ },
+ },
+ )
+ if err != nil {
+ err = consumeTillErr(f.stream)
+ f.log.ErrorContext(ctx, "Failed to send accessgraph.KubeAuditLogEvents", "error", err)
+ return trace.Wrap(err)
+ }
+ return nil
+}
+
+func cursorFromEvent(cluster *accessgraphv1alpha.AWSEKSClusterV1, event cwltypes.FilteredLogEvent) *accessgraphv1alpha.KubeAuditLogCursor {
+ return &accessgraphv1alpha.KubeAuditLogCursor{
+ LogSource: accessgraphv1alpha.KubeAuditLogCursor_KUBE_AUDIT_LOG_SOURCE_EKS,
+ ClusterId: cluster.Arn,
+ EventId: *event.EventId,
+ LastEventTime: timestamppb.New(time.UnixMilli(*event.Timestamp)),
+ }
+}
+
+// initialCursor returns a cursor for a EKS cluster that we have not previously
+// retrieved logs from, so there is no resume state. The cursor is set to
+// have logs retrieved back a standard amount of time.
+func initialCursor(cluster *accessgraphv1alpha.AWSEKSClusterV1) *accessgraphv1alpha.KubeAuditLogCursor {
+ return &accessgraphv1alpha.KubeAuditLogCursor{
+ LogSource: accessgraphv1alpha.KubeAuditLogCursor_KUBE_AUDIT_LOG_SOURCE_EKS,
+ ClusterId: cluster.Arn,
+ LastEventTime: timestamppb.New(time.Now().UTC().Add(-initialLogBacklog)),
+ }
+}
diff --git a/lib/srv/discovery/eks_audit_log_fetcher_test.go b/lib/srv/discovery/eks_audit_log_fetcher_test.go
new file mode 100644
index 0000000000000..bbf792daf4f8f
--- /dev/null
+++ b/lib/srv/discovery/eks_audit_log_fetcher_test.go
@@ -0,0 +1,245 @@
+/*
+ * Teleport
+ * Copyright (C) 2025 Gravitational, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package discovery
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "log/slog"
+ "testing"
+ "testing/synctest"
+ "time"
+
+ "github.com/aws/aws-sdk-go-v2/aws"
+ cwltypes "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
+ "github.com/stretchr/testify/require"
+
+ accessgraphv1alpha "github.com/gravitational/teleport/gen/proto/go/accessgraph/v1alpha"
+ "github.com/gravitational/teleport/lib/utils/testutils/grpctest"
+)
+
+type eksAuditLogFetcherFixture struct {
+ ctx context.Context
+ cancel context.CancelFunc
+ server kalsServer
+ fetcherErr error
+ cluster *accessgraphv1alpha.AWSEKSClusterV1
+ fakeLogFetcher *fakeCloudWatchLogFetcher
+}
+
+// Start the fixture. Must be called inside synctest bubble.
+func (f *eksAuditLogFetcherFixture) Start(t *testing.T) {
+ t.Helper()
+
+ f.ctx, f.cancel = context.WithCancel(t.Context())
+ tester := grpctest.NewGRPCTester[kalsRequest, kalsResponse](f.ctx)
+ f.server = tester.NewServerStream()
+ logger := slog.New(slog.DiscardHandler)
+ f.fakeLogFetcher = newFakeCloudWatchLogFetcher()
+ f.cluster = &accessgraphv1alpha.AWSEKSClusterV1{
+ Name: "cluster-name",
+ Arn: "cluster-arn",
+ }
+ logFetcher := newEKSAuditLogFetcher(f.fakeLogFetcher, f.cluster, tester.NewClientStream(), logger)
+ go func() { f.fetcherErr = logFetcher.Run(f.ctx) }()
+}
+
+// End the fixture. Must be called inside synctest bubble.
+func (f *eksAuditLogFetcherFixture) End(t *testing.T) {
+ t.Helper()
+ f.cancel()
+ synctest.Wait()
+ require.ErrorIs(t, f.fetcherErr, context.Canceled)
+}
+
+func (f *eksAuditLogFetcherFixture) testInitializeNewStream(t *testing.T) {
+ t.Helper()
+
+ // Wait for a NewStream action, and verify it contains what we expect
+ msg, err := f.server.Recv()
+ require.NoError(t, err)
+ newStream := msg.GetNewStream()
+ require.NotNil(t, newStream)
+ cursor := newStream.GetInitial()
+ require.NotNil(t, cursor)
+ require.Equal(t, accessgraphv1alpha.KubeAuditLogCursor_KUBE_AUDIT_LOG_SOURCE_EKS, cursor.GetLogSource())
+ require.Equal(t, f.cluster.GetArn(), cursor.GetClusterId())
+
+ // Send back a ResumeState
+ err = f.server.Send(newKubeAuditLogResponseResumeState(cursor))
+ require.NoError(t, err)
+}
+
+// TestEKSAuditLogFetcher_NewStream_Unknown tests that when a new log stream
+// is set up for a cluster, logs start being fetched from the cursor returned
+// by the grpc service.
+func TestEKSAuditLogFetcher_NewStream(t *testing.T) {
+ synctest.Test(t, func(t *testing.T) {
+ f := &eksAuditLogFetcherFixture{}
+ f.Start(t)
+ f.testInitializeNewStream(t)
+ f.End(t)
+ })
+}
+
+func TestEKSAuditLogFetcher_Batching(t *testing.T) {
+ synctest.Test(t, func(t *testing.T) {
+ startTime := time.Now().UTC()
+ logEpoch := startTime.Add(-7 * 24 * time.Hour)
+ f := &eksAuditLogFetcherFixture{}
+ f.Start(t)
+ f.testInitializeNewStream(t)
+
+ f.fakeLogFetcher.events <- nil
+ // Wait for a polling loop to occur. As there are no logs left,
+ // the time should now be the synctest epoch plus the poll interval
+ time.Sleep(logPollInterval)
+ synctest.Wait()
+ require.Equal(t, startTime.Add(logPollInterval), time.Now().UTC())
+
+ // Wait for an Events action with the log listed. Verify the log and cursor.
+ f.fakeLogFetcher.events <- []cwltypes.FilteredLogEvent{
+ makeEvent(logEpoch, 0, "{}"),
+ makeEvent(logEpoch.Add(time.Second), 1, `{"log": "value"}`),
+ }
+ msg, err := f.server.Recv()
+ require.NoError(t, err)
+ events := msg.GetEvents()
+ require.NotNil(t, events)
+ require.Len(t, events.GetEvents(), 2)
+ require.Empty(t, events.GetEvents()[0].GetFields())
+ require.Len(t, events.GetEvents()[1].GetFields(), 1)
+ cursor := events.GetCursor()
+ require.NotNil(t, cursor)
+ require.Equal(t, accessgraphv1alpha.KubeAuditLogCursor_KUBE_AUDIT_LOG_SOURCE_EKS, cursor.GetLogSource())
+ require.Equal(t, f.cluster.GetArn(), cursor.GetClusterId())
+ require.Equal(t, "event-id-1", cursor.GetEventId())
+ require.Equal(t, logEpoch.Add(time.Second), cursor.GetLastEventTime().AsTime())
+
+ f.fakeLogFetcher.events <- []cwltypes.FilteredLogEvent{
+ makeEvent(logEpoch.Add(time.Second), 2, `{"log": "value2"}`),
+ makeEvent(logEpoch.Add(2*time.Second), 3, `{}`),
+ }
+ msg, err = f.server.Recv()
+ require.NoError(t, err)
+ events = msg.GetEvents()
+ require.NotNil(t, events)
+ require.Len(t, events.GetEvents(), 2)
+ require.Len(t, events.GetEvents()[0].GetFields(), 1)
+ require.Empty(t, events.GetEvents()[1].GetFields())
+ cursor = events.GetCursor()
+ require.NotNil(t, cursor)
+ require.Equal(t, accessgraphv1alpha.KubeAuditLogCursor_KUBE_AUDIT_LOG_SOURCE_EKS, cursor.GetLogSource())
+ require.Equal(t, f.cluster.GetArn(), cursor.GetClusterId())
+ require.Equal(t, "event-id-3", cursor.GetEventId())
+ require.Equal(t, logEpoch.Add(2*time.Second), cursor.GetLastEventTime().AsTime())
+
+ f.End(t)
+ })
+}
+
+func TestEKSAuditLogFetcher_ContinueOnError(t *testing.T) {
+ synctest.Test(t, func(t *testing.T) {
+ startTime := time.Now().UTC()
+ logEpoch := startTime.Add(-7 * 24 * time.Hour)
+ f := &eksAuditLogFetcherFixture{}
+ f.Start(t)
+ f.testInitializeNewStream(t)
+
+ f.fakeLogFetcher.err <- errors.New("oh noes. something went wrong")
+ // Wait for a polling loop to occur. As there are no logs left,
+ // the time should now be the synctest epoch plus the poll interval
+ time.Sleep(logPollInterval)
+ synctest.Wait()
+ require.Equal(t, startTime.Add(logPollInterval), time.Now().UTC())
+
+ // Wait for an Events action with the log listed. Verify the log and cursor.
+ f.fakeLogFetcher.events <- []cwltypes.FilteredLogEvent{
+ makeEvent(logEpoch, 0, "{}"),
+ makeEvent(logEpoch.Add(time.Second), 1, `{"log": "value"}`),
+ }
+ msg, err := f.server.Recv()
+ require.NoError(t, err)
+ events := msg.GetEvents()
+ require.NotNil(t, events)
+ require.Len(t, events.GetEvents(), 2)
+ require.Empty(t, events.GetEvents()[0].GetFields())
+ require.Len(t, events.GetEvents()[1].GetFields(), 1)
+ cursor := events.GetCursor()
+ require.NotNil(t, cursor)
+ require.Equal(t, accessgraphv1alpha.KubeAuditLogCursor_KUBE_AUDIT_LOG_SOURCE_EKS, cursor.GetLogSource())
+ require.Equal(t, f.cluster.GetArn(), cursor.GetClusterId())
+ require.Equal(t, "event-id-1", cursor.GetEventId())
+ require.Equal(t, logEpoch.Add(time.Second), cursor.GetLastEventTime().AsTime())
+
+ f.End(t)
+ })
+}
+
+func newKubeAuditLogResponseResumeState(cursor *accessgraphv1alpha.KubeAuditLogCursor) *kalsResponse {
+ return &kalsResponse{
+ State: &accessgraphv1alpha.KubeAuditLogStreamResponse_ResumeState{
+ ResumeState: &accessgraphv1alpha.KubeAuditLogResumeState{
+ Cursor: cursor,
+ },
+ },
+ }
+}
+
+func makeEvent(t time.Time, id int, msg string) cwltypes.FilteredLogEvent {
+ return cwltypes.FilteredLogEvent{
+ EventId: aws.String(fmt.Sprintf("event-id-%d", id)),
+ IngestionTime: aws.Int64(t.UnixMilli()),
+ Timestamp: aws.Int64(t.UnixMilli()),
+ LogStreamName: aws.String("kube-apiserver-audit-12345678"),
+ Message: aws.String(msg),
+ }
+}
+
+func newFakeCloudWatchLogFetcher() *fakeCloudWatchLogFetcher {
+ return &fakeCloudWatchLogFetcher{
+ events: make(chan []cwltypes.FilteredLogEvent),
+ err: make(chan error),
+ }
+}
+
+// fakeCloudWatchLogFetcher is a cloudwatch log fetcher that waits on channels
+// for the data to return. This allows the unit under test to rendezvous with
+// the tests, allowing the tests to advance the state of the fetcher as it
+// needs.
+type fakeCloudWatchLogFetcher struct {
+ events chan []cwltypes.FilteredLogEvent
+ err chan error
+}
+
+func (f *fakeCloudWatchLogFetcher) FetchEKSAuditLogs(
+ ctx context.Context,
+ cluster *accessgraphv1alpha.AWSEKSClusterV1,
+ cursor *accessgraphv1alpha.KubeAuditLogCursor,
+) ([]cwltypes.FilteredLogEvent, error) {
+ select {
+ case events := <-f.events:
+ return events, nil
+ case err := <-f.err:
+ return nil, err
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ }
+}
diff --git a/lib/srv/discovery/eks_audit_log_watcher.go b/lib/srv/discovery/eks_audit_log_watcher.go
new file mode 100644
index 0000000000000..55e845a48baf3
--- /dev/null
+++ b/lib/srv/discovery/eks_audit_log_watcher.go
@@ -0,0 +1,266 @@
+/*
+ * Teleport
+ * Copyright (C) 2025 Gravitational, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package discovery
+
+import (
+ "context"
+ "errors"
+ "iter"
+ "log/slog"
+ "time"
+
+ "github.com/gravitational/trace"
+
+ accessgraphv1alpha "github.com/gravitational/teleport/gen/proto/go/accessgraph/v1alpha"
+ aws_sync "github.com/gravitational/teleport/lib/srv/discovery/fetchers/aws-sync"
+)
+
+const (
+ // initialLogBacklog is how far back to start retrieving EKS audit logs
+ // for newly discovered clusters.
+ // TODO(camscale): define in config.
+ initialLogBacklog = 7 * 24 * time.Hour
+
+ // logPollInterval is the amount of time to sleep between fetching audit
+ // logs from Cloud Watch Logs for a cluster.
+ // TODO(camscale): perhaps define in config.
+ logPollInterval = 30 * time.Second
+)
+
+// eksAuditLogCluster is a cluster for which audit logs should be fetched and
+// the fetcher to use to do that. It is sent over a channel as a slice from
+// the AWS resource watcher (access_graph_aws.go) to the EKS audit log watcher
+// created here. For each one of these received, an asynchronous log fetcher
+// is spawned to fetch Kubernetes apiserver audit logs from Cloud Watch Logs
+// and sent to the grpc AccessGraphService via the KubeAuditLogStream rpc.
+type eksAuditLogCluster struct {
+ fetcher *aws_sync.Fetcher
+ cluster *accessgraphv1alpha.AWSEKSClusterV1
+}
+
+type eksAuditLogFetcherRunner interface {
+ Run(context.Context) error
+}
+
+// eksAuditLogWatcher is a watcher that waits for notifications on a channel
+// indicating what EKS clusters should have audit logs fetched, and reconciles
+// that against what is currently being fetched. Fetchers are started and
+// stopped in response to this reconcilliation.
+type eksAuditLogWatcher struct {
+ client accessgraphv1alpha.AccessGraphServiceClient
+ log *slog.Logger
+ auditLogClustersCh chan []eksAuditLogCluster
+
+ // Fetchers tracks the cluster IDs (ARN) of the clusters for which we
+ // have a fetcher running. The value is a CancelFunc that is called to
+ // stop the fetcher.
+ fetchers map[string]context.CancelFunc
+ completedCh chan fetcherCompleted
+
+ // newFetcher is a function used to construct a new fetcher. It exists
+ // so tests can override it to not create real fetchers.
+ newFetcher func(
+ *aws_sync.Fetcher,
+ *accessgraphv1alpha.AWSEKSClusterV1,
+ accessgraphv1alpha.AccessGraphService_KubeAuditLogStreamClient,
+ *slog.Logger,
+ ) eksAuditLogFetcherRunner
+}
+
+func newEKSAuditLogWatcher(
+ client accessgraphv1alpha.AccessGraphServiceClient,
+ logger *slog.Logger,
+) *eksAuditLogWatcher {
+ return &eksAuditLogWatcher{
+ client: client,
+ log: logger,
+ auditLogClustersCh: make(chan []eksAuditLogCluster),
+ fetchers: make(map[string]context.CancelFunc),
+ completedCh: make(chan fetcherCompleted),
+ }
+}
+
+// fetcherCompleted captures the result of a completed eksAuditLogFetcher.
+type fetcherCompleted struct {
+ clusterID string
+ err error
+}
+
+// Run starts a watcher by creating a KubeAuditLogStream on its grpc client. It
+// negotiates a configuration (currently a no-op) and starts a main loop
+// waiting for a list of clusters that it should run log fetchers for. As these
+// lists of clusters arrives, it reconciles it against the running log fetchers
+// and starts/stops log fetchers as required to match the given list. It
+// completes when the given context is done.
+//
+// If any errors occur initializing the grpc stream, it is returned and the
+// main loop is not run.
+func (w *eksAuditLogWatcher) Run(ctx context.Context) error {
+ w.log.InfoContext(ctx, "EKS Audit Log Watcher started")
+ defer w.log.InfoContext(ctx, "EKS Audit Log Watcher completed")
+
+ stream, err := w.client.KubeAuditLogStream(ctx)
+ if err != nil {
+ w.log.ErrorContext(ctx, "Failed to get access graph service KubeAuditLogStream", "error", err)
+ return trace.Wrap(err)
+ }
+
+ config := &accessgraphv1alpha.KubeAuditLogConfig{}
+ if err := sendTAGKubeAuditLogConfig(ctx, stream, config); err != nil {
+ w.log.ErrorContext(ctx, "Failed to send access graph config", "error", err)
+ return trace.Wrap(err)
+ }
+
+ config, err = receiveTAGKubeAuditLogConfig(ctx, stream)
+ if err != nil {
+ w.log.ErrorContext(ctx, "Failed to receive access graph config", "error", err)
+ return trace.Wrap(err)
+ }
+ w.log.InfoContext(ctx, "KubeAuditLogConfig received", "config", config)
+
+ // Loop waiting for EKS clusters we need to fetch audit logs for on
+ // s.awsKubeAuditLogClustersCh channel (from the resource syncer).
+ // Reconcile that list of clusters against what we know and start/stop
+ // any log fetchers necessary.
+ for {
+ select {
+ case clusters := <-w.auditLogClustersCh:
+ w.reconcile(ctx, clusters, stream)
+ case completed := <-w.completedCh:
+ w.complete(ctx, completed)
+ case <-ctx.Done():
+ return trace.Wrap(ctx.Err())
+ }
+ }
+}
+
+// Reconcile triggers a reconcilliation of currently running fetchers against
+// the given slice of clusters. The reconcilliation will stop any fetchers for
+// clusters not in the slice and start any fetchers for clusters in the slice
+// that are not running.
+//
+// If the given context is done before the clusters can be sent to the
+// reconcilliation goroutine, the context's error will be returned.
+func (w *eksAuditLogWatcher) Reconcile(ctx context.Context, clusters []eksAuditLogCluster) error {
+ select {
+ case <-ctx.Done():
+ return trace.Wrap(ctx.Err())
+ case w.auditLogClustersCh <- clusters:
+ }
+
+ return nil
+}
+
+// reconcile compares the given slice of clusters against the currently running
+// log fetchers and stops any running fetchers not in the cluster slice and
+// starts a log fetcher for any cluster in the slice that does not have a
+// running log fetcher.
+//
+// Log fetchers that are started are initialized with the given grpc stream
+// over which they should send their audit logs.
+func (w *eksAuditLogWatcher) reconcile(ctx context.Context, clusters []eksAuditLogCluster, stream accessgraphv1alpha.AccessGraphService_KubeAuditLogStreamClient) {
+ w.log.DebugContext(ctx, "Reconciling EKS audit log clusters", "new_count", len(clusters))
+
+ // Make a map of the discovered clusters, keyed by ARN so we can compare against
+ // the existing clusters we are fetching audit logs for.
+ discoveredClusters := make(map[string]eksAuditLogCluster)
+ for _, discovered := range clusters {
+ discoveredClusters[discovered.cluster.Arn] = discovered
+ }
+ // Stop any fetchers for clusters we are running fetcher for that discovery did not return.
+ for arn, fetcherCancel := range mapDifference(w.fetchers, discoveredClusters) {
+ w.log.InfoContext(ctx, "Stopping eksKubeAuditLogFetcher", "cluster", arn)
+ fetcherCancel()
+ // cleanup will happen when the fetcher finishes and is put on the completed channel.
+ }
+ // Start any new fetchers for clusters we are not running that discovery returned.
+ for arn, discovered := range mapDifference(discoveredClusters, w.fetchers) {
+ w.log.InfoContext(ctx, "Starting eksKubeAuditLogFetcher", "cluster", arn)
+ ctx, cancel := context.WithCancel(ctx)
+ var logFetcher eksAuditLogFetcherRunner
+ if w.newFetcher == nil {
+ logFetcher = newEKSAuditLogFetcher(discovered.fetcher, discovered.cluster, stream, w.log)
+ } else {
+ // the pluggable newFetcher is for testing purposes
+ logFetcher = w.newFetcher(discovered.fetcher, discovered.cluster, stream, w.log)
+ }
+ w.fetchers[arn] = cancel
+ go func() {
+ err := logFetcher.Run(ctx)
+ select {
+ case w.completedCh <- fetcherCompleted{arn, err}:
+ case <-ctx.Done():
+ }
+ }()
+ }
+}
+
+// complete cleans up the maintained list of running log fetchers, removing the
+// given completed fetcher, and logs the completion status of the fetcher.
+func (w *eksAuditLogWatcher) complete(ctx context.Context, completed fetcherCompleted) {
+ arn := completed.clusterID
+ if completed.err != nil && !errors.Is(completed.err, context.Canceled) {
+ w.log.ErrorContext(ctx, "eksKubeAuditLogFetcher completed with error", "cluster", arn, "error", completed.err)
+ } else {
+ w.log.InfoContext(ctx, "eksKubeAuditLogFetcher completed", "cluster", arn)
+ }
+ delete(w.fetchers, arn)
+}
+
+// mapDifference yields all keys and values of m1 where the key is not in m2. It
+// can be considered the set operation "mapDifference" - m1-m2, yielding all
+// elements of m1 not in m2.
+func mapDifference[K comparable, V1 any, V2 any](m1 map[K]V1, m2 map[K]V2) iter.Seq2[K, V1] {
+ return func(yield func(K, V1) bool) {
+ for k, v := range m1 {
+ if _, ok := m2[k]; !ok {
+ if !yield(k, v) {
+ return
+ }
+ }
+ }
+ }
+}
+
+func sendTAGKubeAuditLogConfig(ctx context.Context, stream accessgraphv1alpha.AccessGraphService_KubeAuditLogStreamClient, config *accessgraphv1alpha.KubeAuditLogConfig) error {
+ err := stream.Send(
+ &accessgraphv1alpha.KubeAuditLogStreamRequest{
+ Action: &accessgraphv1alpha.KubeAuditLogStreamRequest_Config{Config: config},
+ },
+ )
+ if err != nil {
+ err = consumeTillErr(stream)
+ return trace.Wrap(err)
+ }
+ return nil
+}
+
+func receiveTAGKubeAuditLogConfig(ctx context.Context, stream accessgraphv1alpha.AccessGraphService_KubeAuditLogStreamClient) (*accessgraphv1alpha.KubeAuditLogConfig, error) {
+ msg, err := stream.Recv()
+ if err != nil {
+ return nil, trace.Wrap(err, "failed to receive KubeAuditLogStream config")
+ }
+
+ config := msg.GetConfig()
+ if config == nil {
+ return nil, trace.BadParameter("AccessGraphService.KubeAuditLogStream did not return KubeAuditLogConfig message")
+ }
+
+ return config, nil
+}
diff --git a/lib/srv/discovery/eks_audit_log_watcher_test.go b/lib/srv/discovery/eks_audit_log_watcher_test.go
new file mode 100644
index 0000000000000..4d3aa5719641c
--- /dev/null
+++ b/lib/srv/discovery/eks_audit_log_watcher_test.go
@@ -0,0 +1,210 @@
+/*
+ * Teleport
+ * Copyright (C) 2025 Gravitational, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package discovery
+
+import (
+ "context"
+ "log/slog"
+ "testing"
+ "testing/synctest"
+
+ "github.com/stretchr/testify/require"
+ "google.golang.org/grpc"
+
+ accessgraphv1alpha "github.com/gravitational/teleport/gen/proto/go/accessgraph/v1alpha"
+ aws_sync "github.com/gravitational/teleport/lib/srv/discovery/fetchers/aws-sync"
+ "github.com/gravitational/teleport/lib/utils/testutils/grpctest"
+)
+
+type (
+ kalsRequest = accessgraphv1alpha.KubeAuditLogStreamRequest
+ kalsResponse = accessgraphv1alpha.KubeAuditLogStreamResponse
+ kalsClient = grpc.BidiStreamingClient[kalsRequest, kalsResponse]
+ kalsServer = grpc.BidiStreamingServer[kalsRequest, kalsResponse]
+)
+
+func TestEKSAuditLogWatcher_Init(t *testing.T) {
+ synctest.Test(t, func(t *testing.T) {
+ ctx, cancel := context.WithCancel(t.Context())
+
+ client := newFakeKubeAuditLogClient(ctx)
+ watcher := newEKSAuditLogWatcher(client, slog.New(slog.DiscardHandler))
+ var err error
+ go func() { err = watcher.Run(ctx) }()
+
+ // Receive a config request.
+ req, err := client.serverStream.Recv()
+ require.NoError(t, err)
+ require.NotNil(t, req.GetConfig())
+
+ // Send back a config response with the config we received.
+ err = client.serverStream.Send(newKubeAuditLogResponseConfig(req.GetConfig()))
+ require.NoError(t, err)
+
+ cancel()
+ synctest.Wait()
+ require.ErrorIs(t, err, context.Canceled)
+ })
+}
+
+func TestEKSAuditLogWatcher_Reconcile(t *testing.T) {
+ synctest.Test(t, func(t *testing.T) {
+ ctx, cancel := context.WithCancel(t.Context())
+
+ fetcherTracker := newFakeFetcherTracker()
+ client := newFakeKubeAuditLogClient(ctx)
+ watcher := newEKSAuditLogWatcher(client, slog.New(slog.DiscardHandler))
+ watcher.newFetcher = fetcherTracker.newFetcher
+ var err error
+ go func() { err = watcher.Run(ctx) }()
+
+ // Receive a config request.
+ req, err := client.serverStream.Recv()
+ require.NoError(t, err)
+ require.NotNil(t, req.GetConfig())
+
+ // Send back a config response with the config we received.
+ err = client.serverStream.Send(newKubeAuditLogResponseConfig(req.GetConfig()))
+ require.NoError(t, err)
+
+ // Send a single cluster1 to the watcher to reconcile
+ cluster1 := &accessgraphv1alpha.AWSEKSClusterV1{Arn: "test-arn"}
+ fetcher1 := &aws_sync.Fetcher{}
+ watcher.Reconcile(ctx, []eksAuditLogCluster{{fetcher1, cluster1}})
+ synctest.Wait()
+
+ // Verify that a fetcher was started.
+ f1, ok := fetcherTracker.fetchers["test-arn"]
+ require.True(t, ok, "eksAuditLogFetcher not in fetcherTracker")
+ require.True(t, f1.runCalled, "fetcher Run() was not called")
+ require.Same(t, fetcher1, f1.fetcher)
+ require.Same(t, cluster1, f1.cluster)
+ require.Len(t, fetcherTracker.fetchers, 1)
+ require.Equal(t, 1, fetcherTracker.newCount)
+
+ // Add another cluster
+ cluster2 := &accessgraphv1alpha.AWSEKSClusterV1{Arn: "test-arn2"}
+ fetcher2 := &aws_sync.Fetcher{}
+ watcher.Reconcile(ctx, []eksAuditLogCluster{{fetcher1, cluster1}, {fetcher2, cluster2}})
+ synctest.Wait()
+
+ // Verify that a fetcher was started.
+ f2, ok := fetcherTracker.fetchers["test-arn2"]
+ require.True(t, ok, "eksAuditLogFetcher not in fetcherTracker")
+ require.True(t, f2.runCalled, "fetcher Run() was not called")
+ require.Same(t, fetcher2, f2.fetcher)
+ require.Same(t, cluster2, f2.cluster)
+ require.Len(t, fetcherTracker.fetchers, 2)
+ require.Equal(t, 2, fetcherTracker.newCount)
+
+ // Drop back to a single cluster
+ watcher.Reconcile(ctx, []eksAuditLogCluster{{fetcher1, cluster1}})
+ synctest.Wait()
+ require.Len(t, fetcherTracker.fetchers, 1)
+ require.Equal(t, 2, fetcherTracker.newCount)
+ require.True(t, f2.done)
+
+ // Send an empty cluster list. Should stop last fetcher
+ watcher.Reconcile(ctx, []eksAuditLogCluster{})
+ synctest.Wait()
+ require.Empty(t, fetcherTracker.fetchers)
+ require.Equal(t, 2, fetcherTracker.newCount)
+ require.True(t, f1.done)
+
+ cancel()
+ synctest.Wait()
+ require.ErrorIs(t, err, context.Canceled)
+ })
+}
+
+// fakeFetcherTracker keeps track of the fetchers created by an
+// eksAuditLogWatcher. It has a newFetcher method that can plug into a watcher
+// so that real fetchers are not created, and returns a fake fetcher for
+// testing purposes.
+type fakeFetcherTracker struct {
+ fetchers map[string]*fakeEksAuditLogFetcher
+ newCount int
+}
+
+func newFakeFetcherTracker() *fakeFetcherTracker {
+ return &fakeFetcherTracker{fetchers: make(map[string]*fakeEksAuditLogFetcher)}
+}
+
+// newFetcher plugs into eksAuditLogWatcher.newFetcher so that it creates fake
+// fetchers for testing purposes. We do not need real fetchers to test the
+// watcher.
+func (fft *fakeFetcherTracker) newFetcher(
+ fetcher *aws_sync.Fetcher,
+ cluster *accessgraphv1alpha.AWSEKSClusterV1,
+ stream accessgraphv1alpha.AccessGraphService_KubeAuditLogStreamClient,
+ log *slog.Logger,
+) eksAuditLogFetcherRunner {
+ f := &fakeEksAuditLogFetcher{
+ fetcher: fetcher,
+ cluster: cluster,
+ cleanup: func() { delete(fft.fetchers, cluster.Arn) },
+ }
+ fft.fetchers[cluster.Arn] = f
+ fft.newCount++
+ return f
+}
+
+type fakeEksAuditLogFetcher struct {
+ fetcher *aws_sync.Fetcher
+ cluster *accessgraphv1alpha.AWSEKSClusterV1
+ cleanup func()
+ runCalled bool
+ done bool
+}
+
+func (f *fakeEksAuditLogFetcher) Run(ctx context.Context) error {
+ f.runCalled = true // used in synctest bubble, no race
+ <-ctx.Done()
+ f.done = true
+ f.cleanup()
+ return ctx.Err()
+}
+
+func newKubeAuditLogResponseConfig(cfg *accessgraphv1alpha.KubeAuditLogConfig) *kalsResponse {
+ return &kalsResponse{
+ State: &accessgraphv1alpha.KubeAuditLogStreamResponse_Config{
+ Config: cfg,
+ },
+ }
+}
+
+func newFakeKubeAuditLogClient(ctx context.Context) *fakeKubeAuditLogClient {
+ tester := grpctest.NewGRPCTester[kalsRequest, kalsResponse](ctx)
+ return &fakeKubeAuditLogClient{
+ clientStream: tester.NewClientStream(),
+ serverStream: tester.NewServerStream(),
+ }
+}
+
+type fakeKubeAuditLogClient struct {
+ accessgraphv1alpha.AccessGraphServiceClient
+
+ clientStream kalsClient
+ serverStream kalsServer
+}
+
+// Implements KubeAuditLogStream grpc method on the client
+func (c *fakeKubeAuditLogClient) KubeAuditLogStream(ctx context.Context, opts ...grpc.CallOption) (kalsClient, error) {
+ return c.clientStream, nil
+}
diff --git a/lib/srv/discovery/fetchers/aws-sync/aws-sync.go b/lib/srv/discovery/fetchers/aws-sync/aws-sync.go
index c66275dfd7945..369466c2de2bd 100644
--- a/lib/srv/discovery/fetchers/aws-sync/aws-sync.go
+++ b/lib/srv/discovery/fetchers/aws-sync/aws-sync.go
@@ -27,6 +27,7 @@ import (
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/retry"
+ "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go-v2/service/iam"
"github.com/aws/aws-sdk-go-v2/service/kms"
"github.com/aws/aws-sdk-go-v2/service/rds"
@@ -36,6 +37,7 @@ import (
"golang.org/x/sync/errgroup"
usageeventsv1 "github.com/gravitational/teleport/api/gen/proto/go/usageevents/v1"
+ "github.com/gravitational/teleport/api/types"
accessgraphv1alpha "github.com/gravitational/teleport/gen/proto/go/accessgraph/v1alpha"
"github.com/gravitational/teleport/lib/cloud/awsconfig"
"github.com/gravitational/teleport/lib/srv/server"
@@ -67,6 +69,9 @@ type Config struct {
DiscoveryConfigName string
// Log is the logger to use for logging.
Log *slog.Logger
+ // EKSAuditLogs if set specifies the EKS clusters for which apiserver audit logs
+ // should be fetched.
+ EKSAuditLogs *EKSAuditLogs
// awsClients provides AWS SDK clients.
awsClients awsClientProvider
@@ -128,6 +133,8 @@ type awsClientProvider interface {
getSTSClient(cfg aws.Config, optFns ...func(*sts.Options)) stsClient
// getKMSClient provides a [kmsClient].
getKMSClient(cfg aws.Config, optFns ...func(*kms.Options)) kmsClient
+ // getCloudWatchLogsClient provides a [cloudwatchlogs.FilterLogEventsAPIClient].
+ getCloudWatchLogsClient(cfg aws.Config, optFns ...func(*cloudwatchlogs.Options)) cloudwatchlogs.FilterLogEventsAPIClient
}
type defaultAWSClients struct{}
@@ -152,6 +159,10 @@ func (defaultAWSClients) getKMSClient(cfg aws.Config, optFns ...func(*kms.Option
return kms.NewFromConfig(cfg, optFns...)
}
+func (defaultAWSClients) getCloudWatchLogsClient(cfg aws.Config, optFns ...func(*cloudwatchlogs.Options)) cloudwatchlogs.FilterLogEventsAPIClient {
+ return cloudwatchlogs.NewFromConfig(cfg, optFns...)
+}
+
// AssumeRole is the configuration for assuming an AWS role.
type AssumeRole struct {
// RoleARN is the ARN of the role to assume.
@@ -160,6 +171,13 @@ type AssumeRole struct {
ExternalID string
}
+// EKSAuditLogs is the configuration of which discovered EKS clusters should have
+// their apiserver audit logs fetched and sent to Access Graph.
+type EKSAuditLogs struct {
+ // Tags is a set of name/value tags that an EKS cluster must have for audit log fetching.
+ Tags types.Labels
+}
+
// Fetcher is a fetcher that fetches AWS resources.
type Fetcher struct {
Config
diff --git a/lib/srv/discovery/fetchers/aws-sync/cloudwatchlogs.go b/lib/srv/discovery/fetchers/aws-sync/cloudwatchlogs.go
new file mode 100644
index 0000000000000..f2fa18c975a1c
--- /dev/null
+++ b/lib/srv/discovery/fetchers/aws-sync/cloudwatchlogs.go
@@ -0,0 +1,102 @@
+/*
+ * Teleport
+ * Copyright (C) 2025 Gravitational, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package aws_sync
+
+import (
+ "context"
+ "time"
+
+ "github.com/aws/aws-sdk-go-v2/aws"
+ "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
+ cwltypes "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
+ "github.com/gravitational/trace"
+
+ accessgraphv1alpha "github.com/gravitational/teleport/gen/proto/go/accessgraph/v1alpha"
+)
+
+// FetchEKSAuditLogs returns a slice of audit log events for the given cluster
+// starting from the given cursor.
+func (a *Fetcher) FetchEKSAuditLogs(ctx context.Context, cluster *accessgraphv1alpha.AWSEKSClusterV1, cursor *accessgraphv1alpha.KubeAuditLogCursor) ([]cwltypes.FilteredLogEvent, error) {
+ cfg, err := a.AWSConfigProvider.GetConfig(ctx, cluster.GetRegion(), a.getAWSOptions()...)
+ if err != nil {
+ return nil, trace.Wrap(err)
+ }
+ client := a.awsClients.getCloudWatchLogsClient(cfg)
+
+ // limit is not a hard limit - we may exceed it but won't get any more pages
+ // once reached.
+ var limit int32 = 500 // TODO(camscale): Consider making this a parameter
+ startTime := cursor.GetLastEventTime().AsTime().UTC()
+ input := &cloudwatchlogs.FilterLogEventsInput{
+ LogGroupName: aws.String("/aws/eks/" + cluster.GetName() + "/cluster"),
+ LogStreamNamePrefix: aws.String("kube-apiserver-audit-"),
+ StartTime: aws.Int64(startTime.UnixMilli()),
+ Limit: aws.Int32(limit),
+ }
+
+ var result []cwltypes.FilteredLogEvent
+ for p := cloudwatchlogs.NewFilterLogEventsPaginator(client, input); p.HasMorePages(); {
+ output, err := p.NextPage(ctx)
+ if err != nil {
+ return nil, trace.Wrap(err)
+ }
+
+ eventsAfterCursor := cwlEventsAfterCursor(output.Events, cursor)
+ if eventsAfterCursor != nil {
+ cursor = nil
+ result = append(result, eventsAfterCursor...)
+ if len(result) >= int(limit) {
+ break
+ }
+ }
+ }
+
+ return result, nil
+}
+
+// cwlEventsAfterCursor returns the events from the given events after the
+// cursor. If the cursor was not found, but the timestamp of the cursor
+// was not passed, then nil is returned. In this case, the cursor is still
+// valid and should continue to be used to find the next event. Otherwise
+// a slice (possibly empty) is returned which means the cursor was consumed;
+// either the event ID in the cursor was found, or we passed the timestamp
+// of the cursor.
+// If the cursor is nil, the events are returned unfiltered.
+func cwlEventsAfterCursor(events []cwltypes.FilteredLogEvent, cursor *accessgraphv1alpha.KubeAuditLogCursor) []cwltypes.FilteredLogEvent {
+ // If we're not looking for events from a cursor position, just return all events.
+ if cursor == nil || cursor.GetEventId() == "" {
+ return events
+ }
+
+ startTime := cursor.GetLastEventTime().AsTime().UTC()
+ for i, event := range events {
+ // If we never saw cursor.EventId with the given timestamp,
+ // just return all the events.
+ if time.UnixMilli(*event.Timestamp).UTC().After(startTime) {
+ return events
+ }
+ if *event.EventId == cursor.GetEventId() {
+ return events[i+1:]
+ }
+ }
+ // The cursor was not found in the events, but it was not discarded as
+ // the timestamp on the events did not move past the cursor timestamp.
+ // A nil slice (as opposed to an empty slice) indicates this.
+ return nil
+}
diff --git a/lib/srv/discovery/fetchers/aws-sync/rds_test.go b/lib/srv/discovery/fetchers/aws-sync/rds_test.go
index ce1ae0ed43e1a..6211212c6f002 100644
--- a/lib/srv/discovery/fetchers/aws-sync/rds_test.go
+++ b/lib/srv/discovery/fetchers/aws-sync/rds_test.go
@@ -23,6 +23,7 @@ import (
"testing"
"github.com/aws/aws-sdk-go-v2/aws"
+ "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go-v2/service/iam"
"github.com/aws/aws-sdk-go-v2/service/kms"
"github.com/aws/aws-sdk-go-v2/service/rds"
@@ -224,6 +225,7 @@ type fakeAWSClients struct {
s3Client s3Client
stsClient stsClient
kmsClient kmsClient
+ cwlClient cloudwatchlogs.FilterLogEventsAPIClient
}
func (f fakeAWSClients) getIAMClient(cfg aws.Config, optFns ...func(*iam.Options)) iamClient {
@@ -245,3 +247,7 @@ func (f fakeAWSClients) getSTSClient(cfg aws.Config, optFns ...func(*sts.Options
func (f fakeAWSClients) getKMSClient(cfg aws.Config, optFns ...func(*kms.Options)) kmsClient {
return f.kmsClient
}
+
+func (f fakeAWSClients) getCloudWatchLogsClient(cfg aws.Config, optFns ...func(*cloudwatchlogs.Options)) cloudwatchlogs.FilterLogEventsAPIClient {
+ return f.cwlClient
+}
diff --git a/lib/utils/testutils/grpctest/grpc.go b/lib/utils/testutils/grpctest/grpc.go
new file mode 100644
index 0000000000000..5664b2c7dc1d8
--- /dev/null
+++ b/lib/utils/testutils/grpctest/grpc.go
@@ -0,0 +1,146 @@
+/*
+ * Teleport
+ * Copyright (C) 2025 Gravitational, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package grpctest
+
+import (
+ "context"
+ "io"
+
+ "google.golang.org/grpc"
+)
+
+// NewGRPCStreams creates a new bidirectional streaming gRPC pair, a bidirectional
+// streaming client and server, for the given request type T1 and response type
+// T2.
+//
+// The streams are directly connected without the use of network and are
+// therefore suitable to be used with synctest.
+//
+// The client sends its requests on the clientStream which are directly received
+// by the server via a channel with buffer size 1. The server sends its
+// responses on its serverStream via another channel with buffer size 1.
+//
+// Private fields are purposefully written in a *not* concurrency-safe manner to
+// simulate non-concurrency safety of real over-the-network GRPC stream. It will
+// be caught when executing the test with the race detector enbled.
+func NewGRPCTester[T1, T2 any](ctx context.Context) *GRPCTester[T1, T2] {
+ return &GRPCTester[T1, T2]{
+ ctx: ctx,
+ toServer: make(chan *T1, 1),
+ toClient: make(chan *T2, 1),
+ }
+}
+
+type GRPCTester[T1, T2 any] struct {
+ ctx context.Context
+ toServer chan *T1
+ toClient chan *T2
+}
+
+func (t *GRPCTester[T1, T2]) NewClientStream() grpc.BidiStreamingClient[T1, T2] {
+ return &client[T1, T2]{
+ ctx: t.ctx,
+ toServer: t.toServer,
+ toClient: t.toClient,
+ }
+}
+
+func (t *GRPCTester[T1, T2]) NewServerStream() grpc.BidiStreamingServer[T1, T2] {
+ return &server[T1, T2]{
+ ctx: t.ctx,
+ toServer: t.toServer,
+ toClient: t.toClient,
+ }
+}
+
+type client[T1, T2 any] struct {
+ grpc.ClientStream
+ ctx context.Context
+ toServer chan *T1
+ toClient chan *T2
+ // simulate non-concurrency safety
+ sendRaceDetector bool
+ receiveRaceDetector bool
+}
+
+func (c *client[T1, T2]) Context() context.Context {
+ return c.ctx
+}
+
+func (c *client[T1, T2]) Send(req *T1) error {
+ c.sendRaceDetector = true // simulate non-concurrency safety
+ select {
+ case c.toServer <- req:
+ return nil
+ case <-c.ctx.Done():
+ return c.ctx.Err()
+ }
+}
+
+func (c *client[T1, T2]) Recv() (*T2, error) {
+ c.receiveRaceDetector = true // simulate non-concurrency safety
+ select {
+ case resp := <-c.toClient:
+ return resp, nil
+ case <-c.ctx.Done():
+ return nil, c.ctx.Err()
+ }
+}
+
+func (c *client[T1, T2]) CloseSend() error {
+ close(c.toServer)
+ return nil
+}
+
+type server[T1, T2 any] struct {
+ grpc.ServerStream
+ ctx context.Context
+ toServer chan *T1
+ toClient chan *T2
+ // simulate non-concurrency safety
+ sendRaceDetector bool
+ receiveRaceDetector bool
+}
+
+func (s *server[T1, T2]) Context() context.Context {
+ return s.ctx
+}
+
+func (s *server[T1, T2]) Send(resp *T2) error {
+ s.sendRaceDetector = true
+ select {
+ case s.toClient <- resp:
+ return nil
+ case <-s.ctx.Done():
+ return s.ctx.Err()
+ }
+}
+
+func (s *server[T1, T2]) Recv() (*T1, error) {
+ s.receiveRaceDetector = true
+ select {
+ case req, ok := <-s.toServer:
+ if !ok {
+ return nil, io.EOF
+ }
+ return req, nil
+ case <-s.ctx.Done():
+ return nil, s.ctx.Err()
+ }
+}