diff --git a/api/proto/buf.yaml b/api/proto/buf.yaml index 36fb08bdf7aa4..fd40a9a76d836 100644 --- a/api/proto/buf.yaml +++ b/api/proto/buf.yaml @@ -23,6 +23,7 @@ lint: - teleport/legacy/client/proto/certs.proto - teleport/legacy/client/proto/proxyservice.proto - teleport/legacy/types/events/events.proto + - teleport/legacy/types/events/athena.proto - teleport/legacy/types/types.proto - teleport/legacy/types/wrappers/wrappers.proto ignore_only: diff --git a/api/proto/teleport/legacy/types/events/athena.proto b/api/proto/teleport/legacy/types/events/athena.proto new file mode 100644 index 0000000000000..c36736a2f9cc0 --- /dev/null +++ b/api/proto/teleport/legacy/types/events/athena.proto @@ -0,0 +1,29 @@ +// Copyright 2023 Gravitational, Inc +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package events; + +option go_package = "github.com/gravitational/teleport/api/types/events"; + +// AthenaS3EventPayload is used as payload for sending large events to SQS. +message AthenaS3EventPayload { + // Path on S3. + string path = 1; + // VersionID is versionID of file on s3, if versioning is enabled. + string version_id = 2; + // Custom KMS key for server-side encryption. + string ckms = 3; +} diff --git a/api/types/events/athena.pb.go b/api/types/events/athena.pb.go new file mode 100644 index 0000000000000..8adc4e16b70e8 --- /dev/null +++ b/api/types/events/athena.pb.go @@ -0,0 +1,436 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: teleport/legacy/types/events/athena.proto + +package events + +import ( + fmt "fmt" + proto "github.com/gogo/protobuf/proto" + io "io" + math "math" + math_bits "math/bits" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +// AthenaS3EventPayload is used as payload for sending large events to SQS. +type AthenaS3EventPayload struct { + // Path on S3. + Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"` + // VersionID is versionID of file on s3, if versioning is enabled. + VersionId string `protobuf:"bytes,2,opt,name=version_id,json=versionId,proto3" json:"version_id,omitempty"` + // Custom KMS key for server-side encryption. + Ckms string `protobuf:"bytes,3,opt,name=ckms,proto3" json:"ckms,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *AthenaS3EventPayload) Reset() { *m = AthenaS3EventPayload{} } +func (m *AthenaS3EventPayload) String() string { return proto.CompactTextString(m) } +func (*AthenaS3EventPayload) ProtoMessage() {} +func (*AthenaS3EventPayload) Descriptor() ([]byte, []int) { + return fileDescriptor_c0d45ba0499f9acf, []int{0} +} +func (m *AthenaS3EventPayload) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *AthenaS3EventPayload) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_AthenaS3EventPayload.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *AthenaS3EventPayload) XXX_Merge(src proto.Message) { + xxx_messageInfo_AthenaS3EventPayload.Merge(m, src) +} +func (m *AthenaS3EventPayload) XXX_Size() int { + return m.Size() +} +func (m *AthenaS3EventPayload) XXX_DiscardUnknown() { + xxx_messageInfo_AthenaS3EventPayload.DiscardUnknown(m) +} + +var xxx_messageInfo_AthenaS3EventPayload proto.InternalMessageInfo + +func (m *AthenaS3EventPayload) GetPath() string { + if m != nil { + return m.Path + } + return "" +} + +func (m *AthenaS3EventPayload) GetVersionId() string { + if m != nil { + return m.VersionId + } + return "" +} + +func (m *AthenaS3EventPayload) GetCkms() string { + if m != nil { + return m.Ckms + } + return "" +} + +func init() { + proto.RegisterType((*AthenaS3EventPayload)(nil), "events.AthenaS3EventPayload") +} + +func init() { + proto.RegisterFile("teleport/legacy/types/events/athena.proto", fileDescriptor_c0d45ba0499f9acf) +} + +var fileDescriptor_c0d45ba0499f9acf = []byte{ + // 195 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xd2, 0x2c, 0x49, 0xcd, 0x49, + 0x2d, 0xc8, 0x2f, 0x2a, 0xd1, 0xcf, 0x49, 0x4d, 0x4f, 0x4c, 0xae, 0xd4, 0x2f, 0xa9, 0x2c, 0x48, + 0x2d, 0xd6, 0x4f, 0x2d, 0x4b, 0xcd, 0x2b, 0x29, 0xd6, 0x4f, 0x2c, 0xc9, 0x48, 0xcd, 0x4b, 0xd4, + 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x83, 0x08, 0x2a, 0xc5, 0x72, 0x89, 0x38, 0x82, 0xc5, + 0x83, 0x8d, 0x5d, 0x41, 0x22, 0x01, 0x89, 0x95, 0x39, 0xf9, 0x89, 0x29, 0x42, 0x42, 0x5c, 0x2c, + 0x05, 0x89, 0x25, 0x19, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, 0x60, 0xb6, 0x90, 0x2c, 0x17, + 0x57, 0x59, 0x6a, 0x51, 0x71, 0x66, 0x7e, 0x5e, 0x7c, 0x66, 0x8a, 0x04, 0x13, 0x58, 0x86, 0x13, + 0x2a, 0xe2, 0x09, 0xd6, 0x92, 0x9c, 0x9d, 0x5b, 0x2c, 0xc1, 0x0c, 0xd1, 0x02, 0x62, 0x3b, 0x39, + 0x9c, 0x78, 0x24, 0xc7, 0x78, 0xe1, 0x91, 0x1c, 0xe3, 0x83, 0x47, 0x72, 0x8c, 0x51, 0x46, 0xe9, + 0x99, 0x25, 0x19, 0xa5, 0x49, 0x7a, 0xc9, 0xf9, 0xb9, 0xfa, 0xe9, 0x45, 0x89, 0x65, 0x99, 0x25, + 0x89, 0x25, 0x99, 0xf9, 0x79, 0x89, 0x39, 0xfa, 0x70, 0x87, 0x27, 0x16, 0x64, 0xa2, 0xb8, 0x3a, + 0x89, 0x0d, 0xec, 0x5e, 0x63, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff, 0x6d, 0xfe, 0x60, 0xcf, 0xdc, + 0x00, 0x00, 0x00, +} + +func (m *AthenaS3EventPayload) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *AthenaS3EventPayload) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *AthenaS3EventPayload) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if len(m.Ckms) > 0 { + i -= len(m.Ckms) + copy(dAtA[i:], m.Ckms) + i = encodeVarintAthena(dAtA, i, uint64(len(m.Ckms))) + i-- + dAtA[i] = 0x1a + } + if len(m.VersionId) > 0 { + i -= len(m.VersionId) + copy(dAtA[i:], m.VersionId) + i = encodeVarintAthena(dAtA, i, uint64(len(m.VersionId))) + i-- + dAtA[i] = 0x12 + } + if len(m.Path) > 0 { + i -= len(m.Path) + copy(dAtA[i:], m.Path) + i = encodeVarintAthena(dAtA, i, uint64(len(m.Path))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func encodeVarintAthena(dAtA []byte, offset int, v uint64) int { + offset -= sovAthena(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *AthenaS3EventPayload) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Path) + if l > 0 { + n += 1 + l + sovAthena(uint64(l)) + } + l = len(m.VersionId) + if l > 0 { + n += 1 + l + sovAthena(uint64(l)) + } + l = len(m.Ckms) + if l > 0 { + n += 1 + l + sovAthena(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func sovAthena(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozAthena(x uint64) (n int) { + return sovAthena(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *AthenaS3EventPayload) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAthena + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: AthenaS3EventPayload: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: AthenaS3EventPayload: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Path", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAthena + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthAthena + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthAthena + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Path = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field VersionId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAthena + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthAthena + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthAthena + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.VersionId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Ckms", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAthena + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthAthena + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthAthena + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Ckms = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipAthena(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthAthena + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipAthena(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + depth := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowAthena + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowAthena + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + case 1: + iNdEx += 8 + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowAthena + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthAthena + } + iNdEx += length + case 3: + depth++ + case 4: + if depth == 0 { + return 0, ErrUnexpectedEndOfGroupAthena + } + depth-- + case 5: + iNdEx += 4 + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + if iNdEx < 0 { + return 0, ErrInvalidLengthAthena + } + if depth == 0 { + return iNdEx, nil + } + } + return 0, io.ErrUnexpectedEOF +} + +var ( + ErrInvalidLengthAthena = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowAthena = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupAthena = fmt.Errorf("proto: unexpected end of group") +) diff --git a/go.mod b/go.mod index 666de37b92e04..a610f7ba22f41 100644 --- a/go.mod +++ b/go.mod @@ -29,10 +29,13 @@ require ( github.com/armon/go-radix v1.0.0 github.com/aws/aws-sdk-go v1.44.234 github.com/aws/aws-sdk-go-v2 v1.17.8 - github.com/aws/aws-sdk-go-v2/config v1.18.20 + github.com/aws/aws-sdk-go-v2/config v1.18.21 github.com/aws/aws-sdk-go-v2/credentials v1.13.20 github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.2 + github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.62 github.com/aws/aws-sdk-go-v2/service/ec2 v1.93.2 + github.com/aws/aws-sdk-go-v2/service/s3 v1.31.3 + github.com/aws/aws-sdk-go-v2/service/sns v1.20.8 github.com/aws/aws-sdk-go-v2/service/sts v1.18.9 github.com/aws/aws-sigv4-auth-cassandra-gocql-driver-plugin v0.0.0-20220331165046-e4d000c0d6a6 github.com/beevik/etree v1.1.0 @@ -197,16 +200,14 @@ require ( github.com/apache/arrow/go/v10 v10.0.1 // indirect github.com/apache/thrift v0.16.0 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 // indirect - github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.45 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.32 // indirect github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.26 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.3.33 // indirect - github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.18 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.24 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.22 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.27 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.26 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.21 // indirect - github.com/aws/aws-sdk-go-v2/service/s3 v1.29.6 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.14.1 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.12.8 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.8 // indirect github.com/aws/smithy-go v1.13.5 // indirect diff --git a/go.sum b/go.sum index e465474c79a0a..716409620d48e 100644 --- a/go.sum +++ b/go.sum @@ -183,20 +183,17 @@ github.com/aws/aws-sdk-go-v2 v1.17.8 h1:GMupCNNI7FARX27L7GjCJM8NgivWbRgpjNI/hOQj github.com/aws/aws-sdk-go-v2 v1.17.8/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 h1:dK82zF6kkPeCo8J1e+tGx4JdvDIQzj7ygIoLg8WMuGs= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10/go.mod h1:VeTZetY5KRJLuD/7fkQXMU6Mw7H5m/KP2J5Iy9osMno= -github.com/aws/aws-sdk-go-v2/config v1.18.6/go.mod h1:qyjgnyqpKnNGT+C62zMsrZ/Mn2OodYqwIH0DpXiW8f8= github.com/aws/aws-sdk-go-v2/config v1.18.8/go.mod h1:5XCmmyutmzzgkpk/6NYTjeWb6lgo9N170m1j6pQkIBs= -github.com/aws/aws-sdk-go-v2/config v1.18.20 h1:yYy+onqmLmDVZtx0mkqbx8aJPl+58V6ivLbLDZ2Qztc= -github.com/aws/aws-sdk-go-v2/config v1.18.20/go.mod h1:RWjF39RiDevmHw/+VaD8F0A36OPIPTHQQyRx0eZohnw= -github.com/aws/aws-sdk-go-v2/credentials v1.13.6/go.mod h1:VbnUvhw31DUu6aiubViixQwWCBNO/st84dhPeOkmdls= +github.com/aws/aws-sdk-go-v2/config v1.18.21 h1:ENTXWKwE8b9YXgQCsruGLhvA9bhg+RqAsL9XEMEsa2c= +github.com/aws/aws-sdk-go-v2/config v1.18.21/go.mod h1:+jPQiVPz1diRnjj6VGqWcLK6EzNmQ42l7J3OqGTLsSY= github.com/aws/aws-sdk-go-v2/credentials v1.13.8/go.mod h1:lVa4OHbvgjVot4gmh1uouF1ubgexSCN92P6CJQpT0t8= -github.com/aws/aws-sdk-go-v2/credentials v1.13.19/go.mod h1:2m4uvLvl5hvQezVkLeBBUGMEDm5GcUNc3016W6d3NGg= github.com/aws/aws-sdk-go-v2/credentials v1.13.20 h1:oZCEFcrMppP/CNiS8myzv9JgOzq2s0d3v3MXYil/mxQ= github.com/aws/aws-sdk-go-v2/credentials v1.13.20/go.mod h1:xtZnXErtbZ8YGXC3+8WfajpMBn5Ga/3ojZdxHq6iI8o= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.21/go.mod h1:ugwW57Z5Z48bpvUyZuaPy4Kv+vEfJWnIrky7RmkBvJg= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.2 h1:jOzQAesnBFDmz93feqKnsTHsXrlwWORNZMFHMV+WLFU= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.2/go.mod h1:cDh1p6XkSGSwSRIArWRc6+UqAQ7x4alQ0QfpVR6f+co= -github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.45 h1:ckFtXy51PT613d/KLKPxFiwRqgGIxDhVbNLof6x/XLo= -github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.45/go.mod h1:xar61xizdVU4pQygvQrNdZY1VCLNcOIvm87KzdZmWrE= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.62 h1:LhVbe/UDWvBT/jp5LYAweFVH8s+DNtT07Qp2riWEovU= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.62/go.mod h1:4xCuu1TSwhW5UH6WOdtS4/x/9UfMr2XplzKc86Ffj78= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.27/go.mod h1:a1/UpzeyBBerajpnP5nGZa9mGzsBn5cOKxm6NWQsvoI= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.32 h1:dpbVNUjczQ8Ae3QKHbpHBpfvaVkRdesxpTOe9pTouhU= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.32/go.mod h1:RudqOgadTWdcS3t/erPQo24pcVEoYyqj/kKW5Vya21I= @@ -206,34 +203,30 @@ github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.26/go.mod h1:vq86l7956Vg github.com/aws/aws-sdk-go-v2/internal/ini v1.3.28/go.mod h1:yRZVr/iT0AqyHeep00SZ4YfBAKojXz08w3XMBscdi0c= github.com/aws/aws-sdk-go-v2/internal/ini v1.3.33 h1:HbH1VjUgrCdLJ+4lnnuLI4iVNRvBbBELGaJ5f69ClA8= github.com/aws/aws-sdk-go-v2/internal/ini v1.3.33/go.mod h1:zG2FcwjQarWaqXSCGpgcr3RSjZ6dHGguZSppUL0XR7Q= -github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.18 h1:H/mF2LNWwX00lD6FlYfKpLLZgUW7oIzCBkig78x4Xok= -github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.18/go.mod h1:T2Ku+STrYQ1zIkL1wMvj8P3wWQaaCMKNdz70MT2FLfE= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.24 h1:zsg+5ouVLLbePknVZlUMm1ptwyQLkjjLMWnN+kVs5dA= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.24/go.mod h1:+fFaIjycTmpV6hjmPTbyU9Kp5MI/lA+bbibcAtmlhYA= github.com/aws/aws-sdk-go-v2/service/ec2 v1.93.2 h1:c6a19AjfhEXKlEX63cnlWtSQ4nzENihHZOG0I3wH6BE= github.com/aws/aws-sdk-go-v2/service/ec2 v1.93.2/go.mod h1:VX22JN3HQXDtQ3uS4h4TtM+K11vydq58tpHTlsm8TL8= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11 h1:y2+VQzC6Zh2ojtV2LoC0MNwHWc6qXv/j2vrQtlftkdA= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11/go.mod h1:iV4q2hsqtNECrfmlXyord9u4zyuFEJX9eLgLpSPzWA8= -github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.22 h1:kv5vRAl00tozRxSnI0IszPWGXsJOyA7hmEUHFYqsyvw= -github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.22/go.mod h1:Od+GU5+Yx41gryN/ZGZzAJMZ9R1yn6lgA0fD5Lo5SkQ= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.27 h1:qIw7Hg5eJEc1uSxg3hRwAthPAO7NeOd4dPxhaTi0yB0= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.27/go.mod h1:Zz0kvhcSlu3NX4XJkaGgdjaa+u7a9LYuy8JKxA5v3RM= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.21/go.mod h1:lRToEJsn+DRA9lW4O9L9+/3hjTkUzlzyzHqn8MTds5k= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.26 h1:uUt4XctZLhl9wBE1L8lobU3bVN8SNUP7T+olb0bWBO4= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.26/go.mod h1:Bd4C/4PkVGubtNe5iMXu5BNnaBi/9t/UsFspPt4ram8= -github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.21 h1:vY5siRXvW5TrOKm2qKEf9tliBfdLxdfy0i02LOcmqUo= -github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.21/go.mod h1:WZvNXT1XuH8dnJM0HvOlvk+RNn7NbAPvA/ACO0QarSc= -github.com/aws/aws-sdk-go-v2/service/s3 v1.29.6 h1:W8pLcSn6Uy0eXgDBUUl8M8Kxv7JCoP68ZKTD04OXLEA= -github.com/aws/aws-sdk-go-v2/service/s3 v1.29.6/go.mod h1:L2l2/q76teehcW7YEsgsDjqdsDTERJeX3nOMIFlgGUE= -github.com/aws/aws-sdk-go-v2/service/sso v1.11.27/go.mod h1:wo/B7uUm/7zw/dWhBJ4FXuw1sySU5lyIhVg1Bu2yL9A= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.14.1 h1:lRWp3bNu5wy0X3a8GS42JvZFlv++AKsMdzEnoiVJrkg= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.14.1/go.mod h1:VXBHSxdN46bsJrkniN68psSwbyBKsazQfU2yX/iSDso= +github.com/aws/aws-sdk-go-v2/service/s3 v1.31.3 h1:MG+2UlhyBL3oCOoHbUQh+Sqr3elN0I5PBe0MtVh0xMg= +github.com/aws/aws-sdk-go-v2/service/s3 v1.31.3/go.mod h1:aSl9/LJltSz1cVusiR/Mu8tvI4Sv/5w/WWrJmmkNii0= +github.com/aws/aws-sdk-go-v2/service/sns v1.20.8 h1:wy1jYAot40/Odzpzeq9S3OfSddJJ5RmpaKujvj5Hz7k= +github.com/aws/aws-sdk-go-v2/service/sns v1.20.8/go.mod h1:HmCFGnmh0Tx4Onh9xUklrVhNcCsBTeDx4n53WGhp+oY= github.com/aws/aws-sdk-go-v2/service/sso v1.12.0/go.mod h1:wo/B7uUm/7zw/dWhBJ4FXuw1sySU5lyIhVg1Bu2yL9A= -github.com/aws/aws-sdk-go-v2/service/sso v1.12.7/go.mod h1:GNIveDnP+aE3jujyUSH5aZ/rktsTM5EvtKnCqBZawdw= github.com/aws/aws-sdk-go-v2/service/sso v1.12.8 h1:5cb3D6xb006bPTqEfCNaEA6PPEfBXxxy4NNeX/44kGk= github.com/aws/aws-sdk-go-v2/service/sso v1.12.8/go.mod h1:GNIveDnP+aE3jujyUSH5aZ/rktsTM5EvtKnCqBZawdw= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.11/go.mod h1:TZSH7xLO7+phDtViY/KUp9WGCJMQkLJ/VpgkTFd5gh8= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.0/go.mod h1:TZSH7xLO7+phDtViY/KUp9WGCJMQkLJ/VpgkTFd5gh8= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.7/go.mod h1:44qFP1g7pfd+U+sQHLPalAPKnyfTZjJsYR4xIwsJy5o= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.8 h1:NZaj0ngZMzsubWZbrEFSB4rgSQRbFq38Sd6KBxHuOIU= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.8/go.mod h1:44qFP1g7pfd+U+sQHLPalAPKnyfTZjJsYR4xIwsJy5o= -github.com/aws/aws-sdk-go-v2/service/sts v1.17.7/go.mod h1:+lGbb3+1ugwKrNTWcf2RT05Xmp543B06zDFTwiTLp7I= github.com/aws/aws-sdk-go-v2/service/sts v1.18.0/go.mod h1:+lGbb3+1ugwKrNTWcf2RT05Xmp543B06zDFTwiTLp7I= -github.com/aws/aws-sdk-go-v2/service/sts v1.18.8/go.mod h1:yyW88BEPXA2fGFyI2KCcZC3dNpiT0CZAHaF+i656/tQ= github.com/aws/aws-sdk-go-v2/service/sts v1.18.9 h1:Qf1aWwnsNkyAoqDqmdM3nHwN78XQjec27LjM6b9vyfI= github.com/aws/aws-sdk-go-v2/service/sts v1.18.9/go.mod h1:yyW88BEPXA2fGFyI2KCcZC3dNpiT0CZAHaF+i656/tQ= github.com/aws/aws-sigv4-auth-cassandra-gocql-driver-plugin v0.0.0-20220331165046-e4d000c0d6a6 h1:+AQtpMAj/wOpgdmXSGKSBVozGsYbvaf73gTz4aSK9vM= diff --git a/lib/events/athena/athena.go b/lib/events/athena/athena.go index 39b5cfaf42748..95bf3d2d91903 100644 --- a/lib/events/athena/athena.go +++ b/lib/events/athena/athena.go @@ -32,8 +32,6 @@ import ( "github.com/gravitational/teleport" "github.com/gravitational/teleport/api/types" apievents "github.com/gravitational/teleport/api/types/events" - "github.com/gravitational/teleport/lib/events" - "github.com/gravitational/teleport/lib/session" "github.com/gravitational/teleport/lib/utils" ) @@ -57,7 +55,9 @@ type Config struct { // LargeEventsS3 is location on S3 where temporary large events (>256KB) // are stored before converting it to Parquet and moving to long term // storage (required). - LargeEventsS3 string + LargeEventsS3 string + largeEventsBucket string + largeEventsPrefix string // Query settings. @@ -96,13 +96,18 @@ type Config struct { Clock clockwork.Clock // UIDGenerator is unique ID generator. UIDGenerator utils.UID + // LogEntry is a log entry. + LogEntry *log.Entry + // AWSConfig is AWS config which can be used to construct varius AWS Clients + // using aws-sdk-go-v2. + AWSConfig *aws.Config // TODO(tobiaszheller): add FIPS config in later phase. } // CheckAndSetDefaults is a helper returns an error if the supplied configuration // is not enough to setup Athena based audit log. -func (cfg *Config) CheckAndSetDefaults() error { +func (cfg *Config) CheckAndSetDefaults(ctx context.Context) error { // AWS restrictions (https://docs.aws.amazon.com/athena/latest/ug/tables-databases-columns-names.html) const glueNameMaxLen = 255 if cfg.Database == "" { @@ -141,9 +146,16 @@ func (cfg *Config) CheckAndSetDefaults() error { if cfg.LargeEventsS3 == "" { return trace.BadParameter("LargeEventsS3 is not specified") } - if scheme, ok := isValidUrlWithScheme(cfg.LargeEventsS3); !ok || scheme != "s3" { - return trace.BadParameter("LargeEventsS3 must be valid url and start with s3") + + largeEventsS3URL, err := url.Parse(cfg.LargeEventsS3) + if err != nil { + return trace.BadParameter("LargeEventsS3 must be valid url") + } + if largeEventsS3URL.Scheme != "s3" { + return trace.BadParameter("LargeEventsS3 must starts with s3://") } + cfg.largeEventsBucket = largeEventsS3URL.Host + cfg.largeEventsPrefix = strings.TrimSuffix(strings.TrimPrefix(largeEventsS3URL.Path, "/"), "/") if cfg.QueueURL == "" { return trace.BadParameter("QueueURL is not specified") @@ -196,6 +208,25 @@ func (cfg *Config) CheckAndSetDefaults() error { cfg.UIDGenerator = utils.NewRealUID() } + if cfg.LogEntry == nil { + cfg.LogEntry = log.WithFields(log.Fields{ + trace.Component: teleport.ComponentAthena, + }) + } + + if cfg.AWSConfig == nil { + awsCfg, err := awsconfig.LoadDefaultConfig(ctx) + if err != nil { + return trace.Wrap(err) + } + // override the default environment (region + credentials) with the values + // from the config. + if cfg.Region != "" { + awsCfg.Region = cfg.Region + } + cfg.AWSConfig = &awsCfg + } + return nil } @@ -285,39 +316,19 @@ func (cfg *Config) SetFromURL(url *url.URL) error { // Parquet and send it to S3 for long term storage. // Athena is used for quering Parquet files on S3. type Log struct { - // Entry is a log entry - *log.Entry - // Config is a backend configuration - Config - - awsConfig aws.Config + publisher *publisher } // New creates an instance of an Athena based audit log. func New(ctx context.Context, cfg Config) (*Log, error) { - err := cfg.CheckAndSetDefaults() + err := cfg.CheckAndSetDefaults(ctx) if err != nil { return nil, trace.Wrap(err) } - logEntry := log.WithFields(log.Fields{ - trace.Component: teleport.ComponentAthena, - }) l := &Log{ - Entry: logEntry, - Config: cfg, + publisher: newPublisher(cfg), } - l.awsConfig, err = awsconfig.LoadDefaultConfig(ctx) - if err != nil { - return nil, trace.Wrap(err) - } - // override the default environment (region + credentials) with the values - // from the config. - if cfg.Region != "" { - l.awsConfig.Region = cfg.Region - } - - // TODO(tobiaszheller): initialize publisher // TODO(tobiaszheller): initialize batcher // TODO(tobiaszheller): initialize querier @@ -325,15 +336,7 @@ func New(ctx context.Context, cfg Config) (*Log, error) { } func (l *Log) EmitAuditEvent(ctx context.Context, in apievents.AuditEvent) error { - return trace.NotImplemented("not implemented") -} - -func (l *Log) GetSessionChunk(namespace string, sid session.ID, offsetBytes, maxBytes int) ([]byte, error) { - return nil, trace.NotImplemented("not implemented") -} - -func (l *Log) GetSessionEvents(namespace string, sid session.ID, after int, includePrintEvents bool) ([]events.EventFields, error) { - return nil, trace.NotImplemented("not implemented") + return trace.Wrap(l.publisher.EmitAuditEvent(ctx, in)) } func (l *Log) SearchEvents(fromUTC, toUTC time.Time, namespace string, eventTypes []string, limit int, order types.EventOrder, startKey string) ([]apievents.AuditEvent, string, error) { @@ -348,12 +351,6 @@ func (l *Log) Close() error { return nil } -func (l *Log) StreamSessionEvents(ctx context.Context, sessionID session.ID, startIndex int64) (chan apievents.AuditEvent, chan error) { - c, e := make(chan apievents.AuditEvent), make(chan error, 1) - e <- trace.NotImplemented("not implemented") - return c, e -} - var isAlphanumericOrUnderscoreRe = regexp.MustCompile("^[a-zA-Z0-9_]+$") func isAlphanumericOrUnderscore(s string) bool { diff --git a/lib/events/athena/athena_test.go b/lib/events/athena/athena_test.go index 60d084c2df5a0..0ccf6d01b7cb2 100644 --- a/lib/events/athena/athena_test.go +++ b/lib/events/athena/athena_test.go @@ -15,10 +15,12 @@ package athena import ( + "context" "net/url" "testing" "time" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/require" @@ -93,7 +95,7 @@ func TestConfig_SetFromURL(t *testing.T) { err = cfg.SetFromURL(u) if tt.wantErr == "" { require.NoError(t, err, "SetFromURL return unexpected err") - require.Empty(t, cmp.Diff(tt.want, *cfg, cmpopts.EquateApprox(0, 0.0001))) + require.Empty(t, cmp.Diff(tt.want, *cfg, cmpopts.EquateApprox(0, 0.0001), cmpopts.IgnoreUnexported(Config{}))) } else { require.ErrorContains(t, err, tt.wantErr) } @@ -109,6 +111,7 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) { LargeEventsS3: "s3://large-payloads-bucket", LocationS3: "s3://events-bucket", QueueURL: "https://queue-url", + AWSConfig: &aws.Config{}, } tests := []struct { name string @@ -126,11 +129,13 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) { TableName: "tbl", TopicARN: "arn:topic", LargeEventsS3: "s3://large-payloads-bucket", + largeEventsBucket: "large-payloads-bucket", LocationS3: "s3://events-bucket", QueueURL: "https://queue-url", GetQueryResultsInterval: 100 * time.Millisecond, BatchMaxItems: 20000, BatchMaxInterval: 1 * time.Minute, + AWSConfig: &aws.Config{}, }, }, { @@ -220,10 +225,10 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { cfg := tt.input() - err := cfg.CheckAndSetDefaults() + err := cfg.CheckAndSetDefaults(context.Background()) if tt.wantErr == "" { require.NoError(t, err, "CheckAndSetDefaults return unexpected err") - require.Empty(t, cmp.Diff(tt.want, cfg, cmpopts.EquateApprox(0, 0.0001), cmpopts.IgnoreFields(Config{}, "Clock", "UIDGenerator"))) + require.Empty(t, cmp.Diff(tt.want, cfg, cmpopts.EquateApprox(0, 0.0001), cmpopts.IgnoreFields(Config{}, "Clock", "UIDGenerator", "LogEntry"), cmp.AllowUnexported(Config{}))) } else { require.ErrorContains(t, err, tt.wantErr) } diff --git a/lib/events/athena/fakequeue_test.go b/lib/events/athena/fakequeue_test.go new file mode 100644 index 0000000000000..24bc57d905ce8 --- /dev/null +++ b/lib/events/athena/fakequeue_test.go @@ -0,0 +1,73 @@ +// Copyright 2023 Gravitational, Inc +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package athena + +import ( + "context" + "sync" + + "github.com/aws/aws-sdk-go-v2/service/sns" + snsTypes "github.com/aws/aws-sdk-go-v2/service/sns/types" +) + +// fakeQueue is used to fake SNS+SQS combination on AWS. +type fakeQueue struct { + // publishErrors is chain of error returns on Publish method. + // Errors are returned from start to end and removed, one-by-one, on each + // invocation of the Publish method. + // If the slice is empty, Publish runs normally. + publishErrors []error + mu sync.Mutex + msgs []fakeQueueMessage +} + +type fakeQueueMessage struct { + payload string + attributes map[string]snsTypes.MessageAttributeValue +} + +func newFakeQueue() *fakeQueue { + return &fakeQueue{} +} + +func (f *fakeQueue) Publish(ctx context.Context, params *sns.PublishInput, optFns ...func(*sns.Options)) (*sns.PublishOutput, error) { + f.mu.Lock() + defer f.mu.Unlock() + if len(f.publishErrors) > 0 { + err := f.publishErrors[0] + f.publishErrors = f.publishErrors[1:] + return nil, err + } + f.msgs = append(f.msgs, fakeQueueMessage{ + payload: *params.Message, + attributes: params.MessageAttributes, + }) + return nil, nil +} + +func (f *fakeQueue) getMessages() []fakeQueueMessage { + f.mu.Lock() + defer f.mu.Unlock() + batchSize := 10 + if len(f.msgs) < 1 { + return nil + } + if len(f.msgs) < batchSize { + batchSize = len(f.msgs) + } + items := f.msgs[:batchSize] + f.msgs = f.msgs[batchSize:] + return items +} diff --git a/lib/events/athena/publisher.go b/lib/events/athena/publisher.go new file mode 100644 index 0000000000000..df3aaff8a14b9 --- /dev/null +++ b/lib/events/athena/publisher.go @@ -0,0 +1,162 @@ +// Copyright 2023 Gravitational, Inc +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package athena + +import ( + "bytes" + "context" + "encoding/base64" + "path/filepath" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/aws/retry" + "github.com/aws/aws-sdk-go-v2/feature/s3/manager" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/sns" + snsTypes "github.com/aws/aws-sdk-go-v2/service/sns/types" + "github.com/gravitational/trace" + + apievents "github.com/gravitational/teleport/api/types/events" +) + +const ( + payloadTypeAttr = "payload_type" + payloadTypeRawProtoEvent = "raw_proto_event" + payloadTypeS3Based = "s3_event" + + // maxSNSMessageSize defines maximum size of SNS message. AWS allows 256KB + // however it counts also headers. We round it to 250KB, just to be sure. + maxSNSMessageSize = 250 * 1024 + // maxS3BasedSize defines some resonable threshold for S3 based messages (2GB). + maxS3BasedSize = 2 * 1024 * 1024 * 1024 +) + +// publisher is a SNS based events publisher. +// It publishes proto events directly to SNS topic, or use S3 bucket +// if payload is too large for SNS. +type publisher struct { + topicARN string + snsPublisher snsPublisher + uploader s3uploader + payloadBucket string + payloadPrefix string +} + +type snsPublisher interface { + Publish(ctx context.Context, params *sns.PublishInput, optFns ...func(*sns.Options)) (*sns.PublishOutput, error) +} + +type s3uploader interface { + Upload(ctx context.Context, input *s3.PutObjectInput, opts ...func(*manager.Uploader)) (*manager.UploadOutput, error) +} + +// newPublisher returns new instance of publisher. +func newPublisher(cfg Config) *publisher { + r := retry.NewStandard(func(so *retry.StandardOptions) { + so.MaxAttempts = 20 + so.MaxBackoff = 1 * time.Minute + }) + + // TODO(tobiaszheller): consider reworking lib/observability to work also on s3 sdk-v2. + return &publisher{ + topicARN: cfg.TopicARN, + snsPublisher: sns.NewFromConfig(*cfg.AWSConfig, func(o *sns.Options) { + o.Retryer = r + }), + uploader: manager.NewUploader(s3.NewFromConfig(*cfg.AWSConfig)), + payloadBucket: cfg.largeEventsBucket, + payloadPrefix: cfg.largeEventsPrefix, + } +} + +// EmitAuditEvent emits audit event to SNS topic. Topic should be connected with +// SQS via subscription, so event is persisted on queue. +// Events are marshaled as oneOf from apievents and encoded using base64. +// For large events, payload is publihsed to S3, and on SNS there is only passed +// location on S3. +func (p *publisher) EmitAuditEvent(ctx context.Context, in apievents.AuditEvent) error { + // Just double check that audit event has minimum necessary fields for athena + // to works. Teleport emitter layer above makes sure that they are filled. + if in.GetID() == "" { + return trace.BadParameter("missing uid of audit event %s", in.GetType()) + } + if in.GetTime().IsZero() { + return trace.BadParameter("missing time of audit event %s", in.GetType()) + } + + oneOf, err := apievents.ToOneOf(in) + if err != nil { + return trace.Wrap(err) + } + marshaledProto, err := oneOf.Marshal() + if err != nil { + return trace.Wrap(err) + } + + b64Encoded := base64.StdEncoding.EncodeToString(marshaledProto) + if len(b64Encoded) > maxSNSMessageSize { + if len(b64Encoded) > maxS3BasedSize { + return trace.BadParameter("message too large to publish, size %d", len(b64Encoded)) + } + return trace.Wrap(p.emitViaS3(ctx, in.GetID(), marshaledProto)) + } + return trace.Wrap(p.emitViaSNS(ctx, in.GetID(), b64Encoded)) +} + +func (p *publisher) emitViaS3(ctx context.Context, uid string, marshaledEvent []byte) error { + path := filepath.Join(p.payloadPrefix, uid) + out, err := p.uploader.Upload(ctx, &s3.PutObjectInput{ + Bucket: aws.String(p.payloadBucket), + Key: aws.String(path), + Body: bytes.NewBuffer(marshaledEvent), + }) + if err != nil { + return trace.Wrap(err) + } + + var versionID string + if out.VersionID != nil { + versionID = *out.VersionID + } + msg := &apievents.AthenaS3EventPayload{ + Path: path, + VersionId: versionID, + } + buf, err := msg.Marshal() + if err != nil { + return trace.Wrap(err) + } + + _, err = p.snsPublisher.Publish(ctx, &sns.PublishInput{ + TopicArn: aws.String(p.topicARN), + Message: aws.String(string(buf)), + MessageAttributes: map[string]snsTypes.MessageAttributeValue{ + payloadTypeAttr: {DataType: aws.String("String"), StringValue: aws.String(payloadTypeS3Based)}, + }, + }) + return trace.Wrap(err) +} + +func (p *publisher) emitViaSNS(ctx context.Context, uid string, b64Encoded string) error { + _, err := p.snsPublisher.Publish(ctx, &sns.PublishInput{ + TopicArn: aws.String(p.topicARN), + Message: aws.String(b64Encoded), + MessageAttributes: map[string]snsTypes.MessageAttributeValue{ + payloadTypeAttr: {DataType: aws.String("String"), StringValue: aws.String(payloadTypeRawProtoEvent)}, + }, + }) + return trace.Wrap(err) +} diff --git a/lib/events/athena/publisher_test.go b/lib/events/athena/publisher_test.go new file mode 100644 index 0000000000000..9f19399f0469f --- /dev/null +++ b/lib/events/athena/publisher_test.go @@ -0,0 +1,106 @@ +// Copyright 2023 Gravitational, Inc +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package athena + +import ( + "context" + "errors" + "strings" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/feature/s3/manager" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/google/uuid" + "github.com/stretchr/testify/require" + + apievents "github.com/gravitational/teleport/api/types/events" +) + +// TODO(tobiaszheller): Those UT just cover basic stuff. When we will have consumer +// there will be UT which will cover whole flow of message with encoding/decoding. +func Test_EmitAuditEvent(t *testing.T) { + tests := []struct { + name string + in apievents.AuditEvent + publishErrors []error + uploader s3uploader + wantCheck func(t *testing.T, out []fakeQueueMessage) + }{ + { + name: "valid publish", + in: &apievents.AppCreate{ + Metadata: apievents.Metadata{ + ID: uuid.NewString(), + Time: time.Now().UTC(), + }, + }, + wantCheck: func(t *testing.T, out []fakeQueueMessage) { + require.Len(t, out, 1) + require.Contains(t, *out[0].attributes[payloadTypeAttr].StringValue, payloadTypeRawProtoEvent) + }, + }, + { + name: "should succeed after some retries", + in: &apievents.AppCreate{ + Metadata: apievents.Metadata{ + ID: uuid.NewString(), + Time: time.Now().UTC(), + }, + }, + publishErrors: []error{ + errors.New("error1"), errors.New("error2"), + }, + wantCheck: func(t *testing.T, out []fakeQueueMessage) { + require.Len(t, out, 1) + require.Contains(t, *out[0].attributes[payloadTypeAttr].StringValue, payloadTypeRawProtoEvent) + }, + }, + { + name: "big message via s3", + in: &apievents.AppCreate{ + Metadata: apievents.Metadata{ + ID: uuid.NewString(), + Time: time.Now().UTC(), + Code: strings.Repeat("d", 2*maxSNSMessageSize), + }, + }, + uploader: mockUploader{}, + wantCheck: func(t *testing.T, out []fakeQueueMessage) { + require.Len(t, out, 1) + require.Contains(t, *out[0].attributes[payloadTypeAttr].StringValue, payloadTypeS3Based) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fq := newFakeQueue() + p := &publisher{ + snsPublisher: fq, + uploader: tt.uploader, + } + err := p.EmitAuditEvent(context.Background(), tt.in) + require.NoError(t, err) + out := fq.getMessages() + tt.wantCheck(t, out) + }) + } +} + +type mockUploader struct{} + +func (m mockUploader) Upload(ctx context.Context, input *s3.PutObjectInput, opts ...func(*manager.Uploader)) (*manager.UploadOutput, error) { + return &manager.UploadOutput{}, nil +}