Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2739,11 +2739,22 @@ func (c *Client) UploadEncryptedRecording(ctx context.Context, sessionID string,
return trace.Wrap(err)
}

next, stop := iter.Pull2(parts)
defer stop()

part, err, ok := next()
if err != nil {
return trace.Wrap(err)
} else if !ok {
return trace.BadParameter("unexpected empty upload")
}

var uploadedParts []*recordingencryptionv1pb.Part
// S3 requires that part numbers start at 1, so we do that by default regardless of which uploader is
// configured for the auth service
var partNumber int64 = 1
for part, err := range parts {
for {
nextPart, err, hasNext := next()
if err != nil {
return trace.Wrap(err)
}
Expand All @@ -2752,11 +2763,18 @@ func (c *Client) UploadEncryptedRecording(ctx context.Context, sessionID string,
Upload: createRes.Upload,
PartNumber: partNumber,
Part: part,
IsLast: !hasNext,
})
if err != nil {
return trace.Wrap(err)
}
uploadedParts = append(uploadedParts, uploadRes.Part)

if !hasNext {
break
}

part = nextPart
partNumber++
}

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ message UploadPartRequest {
int64 part_number = 2;
// The encrypted part of session recording data being uploaded.
bytes part = 3;
// Whether this is the last upload part in the upload.
bool is_last = 4;
}

// The resulting metadata about an uploaded part.
Expand Down
10 changes: 8 additions & 2 deletions lib/auth/recordingencryption/recordingencryptionv1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,14 @@ func (s *Service) UploadPart(ctx context.Context, req *recordingencryptionv1.Upl
return nil, trace.Wrap(err)
}

part := bytes.NewReader(req.Part)
streamPart, err := s.uploader.UploadPart(ctx, upload, req.PartNumber, part)
// If upload part is not at least the minimum upload part size, append an empty part
// to pad up to the minimum upload size.
part := req.Part
if !req.IsLast && len(part) < events.MinUploadPartSizeBytes {
part = events.PadUploadPart(part, events.MinUploadPartSizeBytes)
}

streamPart, err := s.uploader.UploadPart(ctx, upload, req.PartNumber, bytes.NewReader(part))
if err != nil {
return nil, trace.Wrap(err, "uploading encrypted recording part")
}
Expand Down
6 changes: 5 additions & 1 deletion lib/events/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1087,7 +1087,11 @@ type MultipartUploader interface {
// ReserveUploadPart reserves an upload part. Reserve is used to identify
// upload errors beforehand.
ReserveUploadPart(ctx context.Context, upload StreamUpload, partNumber int64) error
// UploadPart uploads part and returns the part
// UploadPart uploads part and returns the part.
//
// The part must be greater than [MinUploadPartSizeBytes]. It is the responsibility
// of the caller to add padding if needed, or else the upload may fail depending on
// storage provider.
UploadPart(ctx context.Context, upload StreamUpload, partNumber int64, partBody io.ReadSeeker) (*StreamPart, error)
// ListParts returns all uploaded parts for the completed upload in sorted order
ListParts(ctx context.Context, upload StreamUpload) ([]StreamPart, error)
Expand Down
46 changes: 43 additions & 3 deletions lib/events/auditlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,24 +631,47 @@ func (l *AuditLog) UploadEncryptedRecording(ctx context.Context, sessionID strin
return trace.Wrap(err, "creating upload")
}

next, stop := iter.Pull2(parts)
defer stop()

part, err, ok := next()
if err != nil {
return trace.Wrap(err)
} else if !ok {
return trace.BadParameter("unexpected empty upload")
}

var streamParts []StreamPart
// S3 requires that part numbers start at 1, so we do that by default regardless of which uploader is
// configured for the auth service
var partNumber int64 = 1
for part, err := range parts {
for {
if err := l.UploadHandler.ReserveUploadPart(ctx, *upload, partNumber); err != nil {
return trace.Wrap(err, "reserving upload part")
}

nextPart, err, hasNext := next()
if err != nil {
return trace.Wrap(err)
}

if err := l.UploadHandler.ReserveUploadPart(ctx, *upload, partNumber); err != nil {
return trace.Wrap(err, "reserving upload part")
// If the upload part is not at least the minimum upload part size, and this isn't
// the last part, add padding to meet the minimum upload size.
if hasNext && len(part) < MinUploadPartSizeBytes {
part = PadUploadPart(part, MinUploadPartSizeBytes)
}

streamPart, err := l.UploadHandler.UploadPart(ctx, *upload, partNumber, bytes.NewReader(part))
if err != nil {
return trace.Wrap(err, "uploading part")
}
streamParts = append(streamParts, *streamPart)

if !hasNext {
break
}

part = nextPart
partNumber++
}

Expand Down Expand Up @@ -771,3 +794,20 @@ func sessionStartCallbackFromContext(ctx context.Context) (SessionStartCallback,

return cb, nil
}

// PadUploadPart adds padding to the given upload part to reach the minimum size.
func PadUploadPart(uploadPart []byte, minSize int) []byte {
// Create padding to reach the target size. Note that the padding cannot
// be shorter than the header size.
paddingBytes := max(minSize-len(uploadPart), ProtoStreamV2PartHeaderSize)
paddedPart := make([]byte, paddingBytes)

paddedPartHeader := PartHeader{
ProtoVersion: ProtoStreamV2,
PaddingSize: uint64(paddingBytes - ProtoStreamV2PartHeaderSize),
PartSize: 0,
}
copy(paddedPart, paddedPartHeader.Bytes())

return append(uploadPart, paddedPart...)
}
49 changes: 49 additions & 0 deletions lib/events/auditlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package events_test

import (
"bytes"
"context"
"encoding/json"
"errors"
Expand Down Expand Up @@ -372,3 +373,51 @@ func makeLog(t *testing.T, clock clockwork.Clock) *events.AuditLog {

return alog
}

func TestPadUploadPart(t *testing.T) {
partData := bytes.Repeat([]byte{1, 2, 3}, 10)
partHeader := events.PartHeader{
ProtoVersion: events.V2,
PartSize: uint64(len(partData)),
PaddingSize: 0,
}
headerBytes := partHeader.Bytes()
part := append(headerBytes, partData...)

// Pad the upload part to double the size.
minSize := len(part) * 2
paddedPart := events.PadUploadPart(part, minSize)
require.Len(t, paddedPart, minSize)

// Padding the upload part again with the same minimum should add a single header in size.
paddedPart = events.PadUploadPart(paddedPart, minSize)
require.Len(t, paddedPart, minSize+events.ProtoStreamV2PartHeaderSize)

// Ensure we can read out each part.
r := bytes.NewReader(paddedPart)
h1, err := events.ParsePartHeader(r)
require.NoError(t, err)
require.Equal(t, partHeader, h1)
gotData, err := io.ReadAll(io.LimitReader(r, int64(h1.PartSize)))
require.NoError(t, err)
require.Equal(t, partData, gotData)
io.Copy(io.Discard, io.LimitReader(r, int64(h1.PaddingSize)))

h2, err := events.ParsePartHeader(r)
require.NoError(t, err)
require.Equal(t, events.PartHeader{
ProtoVersion: events.V2,
PaddingSize: uint64(len(part) - events.ProtoStreamV2PartHeaderSize),
}, h2)
io.Copy(io.Discard, io.LimitReader(r, int64(h2.PaddingSize)))

h3, err := events.ParsePartHeader(r)
require.NoError(t, err)
require.Equal(t, events.PartHeader{
ProtoVersion: events.V2,
PaddingSize: 0,
}, h3)

_, err = r.Read(nil)
require.ErrorIs(t, err, io.EOF)
}
2 changes: 1 addition & 1 deletion lib/events/azsessions/azsessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ func (*Handler) ReserveUploadPart(ctx context.Context, upload events.StreamUploa
func (h *Handler) UploadPart(ctx context.Context, upload events.StreamUpload, partNumber int64, partBody io.ReadSeeker) (*events.StreamPart, error) {
partBlob := h.partBlob(upload, partNumber)

// our parts are just over 5 MiB (events.MinUploadPartSizeBytes) so we can
// our parts are just over 5 MiB [events.MinUploadPartSizeBytes] so we can
// upload them in one shot
response, err := cErr(partBlob.Upload(ctx, streaming.NopCloser(partBody), nil))
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions lib/events/eventstest/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import (
// for generated session
type SessionParams struct {
// PrintEvents sets up print events count. Ignored if PrintData is set.
// The size of the resulting event stream varies due to compression, but with
// a sufficiently large number of events results in approximately 64 bytes per event.
PrintEvents int64
// PrintData is optional data to use for print events. Each element of the
// slice represents data for one print event.
Expand Down
Loading
Loading