diff --git a/go.mod b/go.mod index 38b85bcd58a..09e9612ce5e 100644 --- a/go.mod +++ b/go.mod @@ -75,7 +75,7 @@ require ( github.com/prometheus/otlptranslator v0.0.0-20250501145537-53ceaec28820 github.com/prometheus/procfs v0.15.1 github.com/shirou/gopsutil/v4 v4.24.12 - github.com/thanos-io/objstore v0.0.0-20250129163715-ec72e5a88a79 + github.com/thanos-io/objstore v0.0.0-20250317105316-a0136a6f898d github.com/tjhop/slog-gokit v0.1.4 github.com/twmb/franz-go v1.19.1 github.com/twmb/franz-go/pkg/kadm v1.15.0 diff --git a/go.sum b/go.sum index c45e5c64c94..235e5718d3c 100644 --- a/go.sum +++ b/go.sum @@ -1010,8 +1010,8 @@ github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf github.com/subosito/gotenv v1.4.1/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0= github.com/tencentyun/cos-go-sdk-v5 v0.7.40 h1:W6vDGKCHe4wBACI1d2UgE6+50sJFhRWU4O8IB2ozzxM= github.com/tencentyun/cos-go-sdk-v5 v0.7.40/go.mod h1:4dCEtLHGh8QPxHEkgq+nFaky7yZxQuYwgSJM87icDaw= -github.com/thanos-io/objstore v0.0.0-20250129163715-ec72e5a88a79 h1:sVc5fCMlPFEZfhIfdiEJqMmNIP3sEnRZv9K83Nz/mR8= -github.com/thanos-io/objstore v0.0.0-20250129163715-ec72e5a88a79/go.mod h1:Quz9HUDjGidU0RQpoytzK4KqJ7kwzP+DMAm4K57/usM= +github.com/thanos-io/objstore v0.0.0-20250317105316-a0136a6f898d h1:L4k+8i1cl0h3MscslVUAcBpvA5i9UYzE0DybcOxzvlM= +github.com/thanos-io/objstore v0.0.0-20250317105316-a0136a6f898d/go.mod h1:Nmy3+M2UM7wu2sEvg0h5M/c3mu1QaxmdyPvoGUPGlaU= github.com/tinylib/msgp v1.1.8/go.mod h1:qkpG+2ldGg4xRFmx+jfTvZPxfGFhi64BcnL9vkCm/Tw= github.com/tjhop/slog-gokit v0.1.4 h1:uj/vbDt3HaF0Py8bHPV4ti/s0utnO0miRbO277FLBKM= github.com/tjhop/slog-gokit v0.1.4/go.mod h1:Bbu5v2748qpAWH7k6gse/kw3076IJf6owJmh7yArmJs= diff --git a/pkg/ingester/shipper_test.go b/pkg/ingester/shipper_test.go index 329d1aff8d8..79011cb101f 100644 --- a/pkg/ingester/shipper_test.go +++ b/pkg/ingester/shipper_test.go @@ -170,8 +170,8 @@ type deceivingUploadBucket struct { objectBaseName string } -func (b deceivingUploadBucket) Upload(ctx context.Context, name string, r io.Reader) error { - actualErr := b.Bucket.Upload(ctx, name, r) +func (b deceivingUploadBucket) Upload(ctx context.Context, name string, r io.Reader, opts ...objstore.ObjectUploadOption) error { + actualErr := b.Bucket.Upload(ctx, name, r, opts...) if actualErr != nil { return actualErr } else if path.Base(name) == b.objectBaseName { diff --git a/pkg/mimirtool/commands/bucket_validation.go b/pkg/mimirtool/commands/bucket_validation.go index 8608b4ec980..cca1ca0f3ee 100644 --- a/pkg/mimirtool/commands/bucket_validation.go +++ b/pkg/mimirtool/commands/bucket_validation.go @@ -65,8 +65,8 @@ func (c *retryingBucketClient) withRetries(f func() error) error { } } -func (c *retryingBucketClient) Upload(ctx context.Context, name string, r io.Reader) error { - return c.withRetries(func() error { return c.Bucket.Upload(ctx, name, r) }) +func (c *retryingBucketClient) Upload(ctx context.Context, name string, r io.Reader, opts ...objstore.ObjectUploadOption) error { + return c.withRetries(func() error { return c.Bucket.Upload(ctx, name, r, opts...) }) } func (c *retryingBucketClient) Exists(ctx context.Context, name string) (bool, error) { diff --git a/pkg/storage/bucket/client_mock.go b/pkg/storage/bucket/client_mock.go index a978d79b432..178a660a0d1 100644 --- a/pkg/storage/bucket/client_mock.go +++ b/pkg/storage/bucket/client_mock.go @@ -29,13 +29,13 @@ func (m *ClientMock) Provider() objstore.ObjProvider { } // Upload mocks objstore.Bucket.Upload() -func (m *ClientMock) Upload(ctx context.Context, name string, r io.Reader) error { - args := m.Called(ctx, name, r) +func (m *ClientMock) Upload(ctx context.Context, name string, r io.Reader, opts ...objstore.ObjectUploadOption) error { + args := m.Called(ctx, name, r, opts) return args.Error(0) } func (m *ClientMock) MockUpload(name string, err error) { - m.On("Upload", mock.Anything, name, mock.Anything).Return(err) + m.On("Upload", mock.Anything, name, mock.Anything, mock.Anything).Return(err) } // Delete mocks objstore.Bucket.Delete() diff --git a/pkg/storage/bucket/delayed_bucket_client.go b/pkg/storage/bucket/delayed_bucket_client.go index e4b534674f3..606d73187cf 100644 --- a/pkg/storage/bucket/delayed_bucket_client.go +++ b/pkg/storage/bucket/delayed_bucket_client.go @@ -36,11 +36,11 @@ func (m *DelayedBucketClient) Provider() objstore.ObjProvider { return m.wrapped.Provider() } -func (m *DelayedBucketClient) Upload(ctx context.Context, name string, r io.Reader) error { +func (m *DelayedBucketClient) Upload(ctx context.Context, name string, r io.Reader, opts ...objstore.ObjectUploadOption) error { m.delay() defer m.delay() - return m.wrapped.Upload(ctx, name, r) + return m.wrapped.Upload(ctx, name, r, opts...) } func (m *DelayedBucketClient) Delete(ctx context.Context, name string) error { diff --git a/pkg/storage/bucket/error_injected_bucket_client.go b/pkg/storage/bucket/error_injected_bucket_client.go index b1373282b55..d93952e15de 100644 --- a/pkg/storage/bucket/error_injected_bucket_client.go +++ b/pkg/storage/bucket/error_injected_bucket_client.go @@ -57,11 +57,11 @@ func (b *ErrorInjectedBucketClient) Exists(ctx context.Context, name string) (bo return b.Bucket.Exists(ctx, name) } -func (b *ErrorInjectedBucketClient) Upload(ctx context.Context, name string, r io.Reader) error { +func (b *ErrorInjectedBucketClient) Upload(ctx context.Context, name string, r io.Reader, opts ...objstore.ObjectUploadOption) error { if err := b.injectError(OpUpload, name); err != nil { return err } - return b.Bucket.Upload(ctx, name, r) + return b.Bucket.Upload(ctx, name, r, opts...) } func (b *ErrorInjectedBucketClient) Delete(ctx context.Context, name string) error { diff --git a/pkg/storage/bucket/prefixed_bucket_client.go b/pkg/storage/bucket/prefixed_bucket_client.go index e953c37ba35..94cbfe706ee 100644 --- a/pkg/storage/bucket/prefixed_bucket_client.go +++ b/pkg/storage/bucket/prefixed_bucket_client.go @@ -40,8 +40,8 @@ func (b *PrefixedBucketClient) Provider() objstore.ObjProvider { } // Upload the contents of the reader as an object into the bucket. -func (b *PrefixedBucketClient) Upload(ctx context.Context, name string, r io.Reader) (err error) { - err = b.bucket.Upload(ctx, b.fullName(name), r) +func (b *PrefixedBucketClient) Upload(ctx context.Context, name string, r io.Reader, opts ...objstore.ObjectUploadOption) (err error) { + err = b.bucket.Upload(ctx, b.fullName(name), r, opts...) return } diff --git a/pkg/storage/bucket/sse_bucket_client.go b/pkg/storage/bucket/sse_bucket_client.go index b19e1eac99b..54330ea23ce 100644 --- a/pkg/storage/bucket/sse_bucket_client.go +++ b/pkg/storage/bucket/sse_bucket_client.go @@ -56,7 +56,7 @@ func (b *SSEBucketClient) Provider() objstore.ObjProvider { } // Upload the contents of the reader as an object into the bucket. -func (b *SSEBucketClient) Upload(ctx context.Context, name string, r io.Reader) error { +func (b *SSEBucketClient) Upload(ctx context.Context, name string, r io.Reader, opts ...objstore.ObjectUploadOption) error { if sse, err := b.getCustomS3SSEConfig(); err != nil { return err } else if sse != nil { @@ -65,7 +65,7 @@ func (b *SSEBucketClient) Upload(ctx context.Context, name string, r io.Reader) ctx = s3.ContextWithSSEConfig(ctx, sse) } - return b.bucket.Upload(ctx, name, r) + return b.bucket.Upload(ctx, name, r, opts...) } // Delete implements objstore.Bucket. diff --git a/pkg/storage/tsdb/block/block_test.go b/pkg/storage/tsdb/block/block_test.go index cf01d494be0..7e61fca3cb8 100644 --- a/pkg/storage/tsdb/block/block_test.go +++ b/pkg/storage/tsdb/block/block_test.go @@ -509,8 +509,8 @@ type errBucket struct { failSuffix string } -func (eb errBucket) Upload(ctx context.Context, name string, r io.Reader) error { - err := eb.Bucket.Upload(ctx, name, r) +func (eb errBucket) Upload(ctx context.Context, name string, r io.Reader, opts ...objstore.ObjectUploadOption) error { + err := eb.Bucket.Upload(ctx, name, r, opts...) if err != nil { return err } diff --git a/pkg/storage/tsdb/block/global_markers_bucket_client.go b/pkg/storage/tsdb/block/global_markers_bucket_client.go index 48e32135f6c..f5294d59b4b 100644 --- a/pkg/storage/tsdb/block/global_markers_bucket_client.go +++ b/pkg/storage/tsdb/block/global_markers_bucket_client.go @@ -35,10 +35,10 @@ func (b *globalMarkersBucket) Provider() objstore.ObjProvider { } // Upload implements objstore.Bucket. -func (b *globalMarkersBucket) Upload(ctx context.Context, name string, r io.Reader) error { +func (b *globalMarkersBucket) Upload(ctx context.Context, name string, r io.Reader, opts ...objstore.ObjectUploadOption) error { globalMarkPath := getGlobalMarkPathFromBlockMark(name) if globalMarkPath == "" { - return b.parent.Upload(ctx, name, r) + return b.parent.Upload(ctx, name, r, opts...) } // Read the marker. @@ -48,12 +48,12 @@ func (b *globalMarkersBucket) Upload(ctx context.Context, name string, r io.Read } // Upload it to the original location. - if err := b.parent.Upload(ctx, name, bytes.NewReader(body)); err != nil { + if err := b.parent.Upload(ctx, name, bytes.NewReader(body), opts...); err != nil { return err } // Upload it to the global markers location too. - return b.parent.Upload(ctx, globalMarkPath, bytes.NewReader(body)) + return b.parent.Upload(ctx, globalMarkPath, bytes.NewReader(body), opts...) } // Delete implements objstore.Bucket. diff --git a/pkg/storage/tsdb/bucketcache/caching_bucket.go b/pkg/storage/tsdb/bucketcache/caching_bucket.go index 0074f605b4b..db4d21c0d2b 100644 --- a/pkg/storage/tsdb/bucketcache/caching_bucket.go +++ b/pkg/storage/tsdb/bucketcache/caching_bucket.go @@ -166,10 +166,10 @@ func NewCachingBucket(bucketID string, bucketClient objstore.Bucket, cfg *Cachin return cb, nil } -func (cb *CachingBucket) Upload(ctx context.Context, name string, r io.Reader) error { +func (cb *CachingBucket) Upload(ctx context.Context, name string, r io.Reader, opts ...objstore.ObjectUploadOption) error { keyGen := newCacheKeyBuilder(cb.bucketID, name) cb.invalidation.start(ctx, name, keyGen) - err := cb.Bucket.Upload(ctx, name, r) + err := cb.Bucket.Upload(ctx, name, r, opts...) if err == nil { cb.invalidation.finish(ctx, name, keyGen) } diff --git a/vendor/github.com/thanos-io/objstore/CHANGELOG.md b/vendor/github.com/thanos-io/objstore/CHANGELOG.md index b8be4db2a2e..6fee1ccd411 100644 --- a/vendor/github.com/thanos-io/objstore/CHANGELOG.md +++ b/vendor/github.com/thanos-io/objstore/CHANGELOG.md @@ -9,6 +9,7 @@ NOTE: As semantic versioning states all 0.y.z releases can contain breaking chan We use *breaking :warning:* to mark changes that are not backward compatible (relates only to v0.y.z releases.) ## Unreleased +- [#165](https://github.com/thanos-io/objstore/pull/165) GCS: Upgrade cloud.google.com/go/storage version to `v1.50.0`. - [#38](https://github.com/thanos-io/objstore/pull/38) GCS: Upgrade cloud.google.com/go/storage version to `v1.43.0`. - [#145](https://github.com/thanos-io/objstore/pull/145) Include content length in the response of Get and GetRange. - [#157](https://github.com/thanos-io/objstore/pull/157) Azure: Add `az_tenant_id`, `client_id` and `client_secret` configs. @@ -70,7 +71,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#70](https://github.com/thanos-io/objstore/pull/70) GCS: Update cloud.google.com/go/storage version to `v1.27.0`. - [#71](https://github.com/thanos-io/objstore/pull/71) Replace method `IsCustomerManagedKeyError` for a more generic `IsAccessDeniedErr` on the bucket interface. - [#89](https://github.com/thanos-io/objstore/pull/89) GCS: Upgrade cloud.google.com/go/storage version to `v1.35.1`. -- [#123](https://github.com/thanos-io/objstore/pull/123) *: Upgrade minio-go version to `v7.0.71`. +- [#123](https://github.com/thanos-io/objstore/pull/123) *: Upgrade minio-go version to `v7.0.72`. - [#132](https://github.com/thanos-io/objstore/pull/132) s3: Upgrade aws-sdk-go-v2/config version to `v1.27.30` ### Removed diff --git a/vendor/github.com/thanos-io/objstore/README.md b/vendor/github.com/thanos-io/objstore/README.md index 516d10705be..ad65b538d93 100644 --- a/vendor/github.com/thanos-io/objstore/README.md +++ b/vendor/github.com/thanos-io/objstore/README.md @@ -670,6 +670,7 @@ config: max_conns_per_host: 0 // Optional maximum total number of connections per host. disable_compression: false // Optional. If true, prevents the Transport from requesting compression. client_timeout: 90s // Optional time limit for requests made by the HTTP Client. +prefix: "" ``` #### Instance Principal Provider @@ -682,6 +683,7 @@ config: provider: "instance-principal" bucket: "" compartment_ocid: "" +prefix: "" ``` You can also include any of the optional configuration just like the example in `Default Provider`. @@ -702,6 +704,7 @@ config: fingerprint: "" privatekey: "" passphrase: "" // Optional passphrase to encrypt the private API Signing key +prefix: "" ``` You can also include any of the optional configuration just like the example in `Default Provider`. @@ -716,6 +719,7 @@ config: provider: "oke-workload-identity" bucket: "" region: "" +prefix: "" ``` The `bucket` and `region` fields are required. The `region` field identifies the bucket region. diff --git a/vendor/github.com/thanos-io/objstore/inmem.go b/vendor/github.com/thanos-io/objstore/inmem.go index 6a34406661f..97e5f8b0d79 100644 --- a/vendor/github.com/thanos-io/objstore/inmem.go +++ b/vendor/github.com/thanos-io/objstore/inmem.go @@ -213,7 +213,7 @@ func (b *InMemBucket) Attributes(_ context.Context, name string) (ObjectAttribut } // Upload writes the file specified in src to into the memory. -func (b *InMemBucket) Upload(_ context.Context, name string, r io.Reader) error { +func (b *InMemBucket) Upload(_ context.Context, name string, r io.Reader, _ ...ObjectUploadOption) error { b.mtx.Lock() defer b.mtx.Unlock() body, err := io.ReadAll(r) diff --git a/vendor/github.com/thanos-io/objstore/objstore.go b/vendor/github.com/thanos-io/objstore/objstore.go index 86ecfa26814..bdbb52a390f 100644 --- a/vendor/github.com/thanos-io/objstore/objstore.go +++ b/vendor/github.com/thanos-io/objstore/objstore.go @@ -62,7 +62,7 @@ type Bucket interface { // Upload the contents of the reader as an object into the bucket. // Upload should be idempotent. - Upload(ctx context.Context, name string, r io.Reader) error + Upload(ctx context.Context, name string, r io.Reader, opts ...ObjectUploadOption) error // Delete removes the object with the given name. // If object does not exist in the moment of deletion, Delete should throw error. @@ -196,6 +196,26 @@ func ApplyIterOptions(options ...IterOption) IterParams { return out } +type UploadObjectParams struct { + ContentType string +} + +type ObjectUploadOption func(f *UploadObjectParams) + +func WithContentType(contentType string) ObjectUploadOption { + return func(f *UploadObjectParams) { + f.ContentType = contentType + } +} + +func ApplyObjectUploadOptions(opts ...ObjectUploadOption) UploadObjectParams { + out := UploadObjectParams{} + for _, opt := range opts { + opt(&out) + } + return out +} + // DownloadOption configures the provided params. type DownloadOption func(params *downloadParams) @@ -747,7 +767,7 @@ func (b *metricBucket) Exists(ctx context.Context, name string) (bool, error) { return ok, nil } -func (b *metricBucket) Upload(ctx context.Context, name string, r io.Reader) error { +func (b *metricBucket) Upload(ctx context.Context, name string, r io.Reader, opts ...ObjectUploadOption) error { const op = OpUpload b.metrics.ops.WithLabelValues(op).Inc() @@ -765,7 +785,7 @@ func (b *metricBucket) Upload(ctx context.Context, name string, r io.Reader) err b.metrics.opsTransferredBytes, ) defer trc.Close() - err := b.bkt.Upload(ctx, name, trc) + err := b.bkt.Upload(ctx, name, trc, opts...) if err != nil { if !b.metrics.isOpFailureExpected(err) && ctx.Err() != context.Canceled { b.metrics.opsFailures.WithLabelValues(op).Inc() diff --git a/vendor/github.com/thanos-io/objstore/prefixed_bucket.go b/vendor/github.com/thanos-io/objstore/prefixed_bucket.go index a37450ca870..608688551a8 100644 --- a/vendor/github.com/thanos-io/objstore/prefixed_bucket.go +++ b/vendor/github.com/thanos-io/objstore/prefixed_bucket.go @@ -101,8 +101,8 @@ func (p *PrefixedBucket) Attributes(ctx context.Context, name string) (ObjectAtt // Upload the contents of the reader as an object into the bucket. // Upload should be idempotent. -func (p *PrefixedBucket) Upload(ctx context.Context, name string, r io.Reader) error { - return p.bkt.Upload(ctx, conditionalPrefix(p.prefix, name), r) +func (p *PrefixedBucket) Upload(ctx context.Context, name string, r io.Reader, opts ...ObjectUploadOption) error { + return p.bkt.Upload(ctx, conditionalPrefix(p.prefix, name), r, opts...) } // Delete removes the object with the given name. diff --git a/vendor/github.com/thanos-io/objstore/providers/azure/azure.go b/vendor/github.com/thanos-io/objstore/providers/azure/azure.go index 05fbdb55c0c..f3b891ecad3 100644 --- a/vendor/github.com/thanos-io/objstore/providers/azure/azure.go +++ b/vendor/github.com/thanos-io/objstore/providers/azure/azure.go @@ -365,12 +365,17 @@ func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) { } // Upload the contents of the reader as an object into the bucket. -func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { +func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader, uploadOpts ...objstore.ObjectUploadOption) error { level.Debug(b.logger).Log("msg", "uploading blob", "blob", name) blobClient := b.containerClient.NewBlockBlobClient(name) + + uploadOptions := objstore.ApplyObjectUploadOptions(uploadOpts...) opts := &blockblob.UploadStreamOptions{ BlockSize: 3 * 1024 * 1024, Concurrency: 4, + HTTPHeaders: &blob.HTTPHeaders{ + BlobContentType: &uploadOptions.ContentType, + }, } if _, err := blobClient.UploadStream(ctx, r, opts); err != nil { return errors.Wrapf(err, "cannot upload Azure blob, address: %s", name) diff --git a/vendor/github.com/thanos-io/objstore/providers/filesystem/filesystem.go b/vendor/github.com/thanos-io/objstore/providers/filesystem/filesystem.go index df602877be8..3717cf3d516 100644 --- a/vendor/github.com/thanos-io/objstore/providers/filesystem/filesystem.go +++ b/vendor/github.com/thanos-io/objstore/providers/filesystem/filesystem.go @@ -247,7 +247,7 @@ func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) { } // Upload writes the file specified in src to into the memory. -func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) (err error) { +func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader, _ ...objstore.ObjectUploadOption) (err error) { if ctx.Err() != nil { return ctx.Err() } diff --git a/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go b/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go index b89f8735bc5..484e33a9168 100644 --- a/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go +++ b/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go @@ -332,13 +332,15 @@ func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) { } // Upload writes the file specified in src to remote GCS location specified as target. -func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { +func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader, opts ...objstore.ObjectUploadOption) error { w := b.bkt.Object(name).NewWriter(ctx) + uploadOpts := objstore.ApplyObjectUploadOptions(opts...) // if `chunkSize` is 0, we don't set any custom value for writer's ChunkSize. // It uses whatever the default value https://pkg.go.dev/google.golang.org/cloud/storage#Writer if b.chunkSize > 0 { w.ChunkSize = b.chunkSize + w.ContentType = uploadOpts.ContentType } if _, err := io.Copy(w, r); err != nil { diff --git a/vendor/github.com/thanos-io/objstore/providers/s3/s3.go b/vendor/github.com/thanos-io/objstore/providers/s3/s3.go index 62107ec4828..5c8cd9531bc 100644 --- a/vendor/github.com/thanos-io/objstore/providers/s3/s3.go +++ b/vendor/github.com/thanos-io/objstore/providers/s3/s3.go @@ -532,7 +532,7 @@ func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) { } // Upload the contents of the reader as an object into the bucket. -func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { +func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader, opts ...objstore.ObjectUploadOption) error { sse, err := b.getServerSideEncryption(ctx) if err != nil { return err @@ -556,6 +556,8 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { userMetadata[k] = v } + uploadOpts := objstore.ApplyObjectUploadOptions(opts...) + if _, err := b.client.PutObject( ctx, b.name, @@ -572,7 +574,8 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { // 4 is what minio-go have as the default. To be certain we do micro benchmark before any changes we // ensure we pin this number to four. // TODO(bwplotka): Consider adjusting this number to GOMAXPROCS or to expose this in config if it becomes bottleneck. - NumThreads: 4, + NumThreads: 4, + ContentType: uploadOpts.ContentType, }, ); err != nil { return errors.Wrap(err, "upload s3 object") diff --git a/vendor/github.com/thanos-io/objstore/providers/swift/swift.go b/vendor/github.com/thanos-io/objstore/providers/swift/swift.go index 19eb0d45455..c5263b25688 100644 --- a/vendor/github.com/thanos-io/objstore/providers/swift/swift.go +++ b/vendor/github.com/thanos-io/objstore/providers/swift/swift.go @@ -338,13 +338,16 @@ func (c *Container) IsAccessDeniedErr(err error) bool { } // Upload writes the contents of the reader as an object into the container. -func (c *Container) Upload(_ context.Context, name string, r io.Reader) (err error) { +func (c *Container) Upload(_ context.Context, name string, r io.Reader, opts ...objstore.ObjectUploadOption) (err error) { size, err := objstore.TryToGetSize(r) if err != nil { level.Warn(c.logger).Log("msg", "could not guess file size, using large object to avoid issues if the file is larger than limit", "name", name, "err", err) // Anything higher or equal to chunk size so the SLO is used. size = c.chunkSize } + + uploadOpts := objstore.ApplyObjectUploadOptions(opts...) + var file io.WriteCloser if size >= c.chunkSize { opts := swift.LargeObjectOpts{ @@ -353,6 +356,7 @@ func (c *Container) Upload(_ context.Context, name string, r io.Reader) (err err ChunkSize: c.chunkSize, SegmentContainer: c.segmentsContainer, CheckHash: true, + ContentType: uploadOpts.ContentType, } if c.useDynamicLargeObjects { if file, err = c.connection.DynamicLargeObjectCreateFile(&opts); err != nil { @@ -364,7 +368,7 @@ func (c *Container) Upload(_ context.Context, name string, r io.Reader) (err err } } } else { - if file, err = c.connection.ObjectCreate(c.name, name, true, "", "", swift.Headers{}); err != nil { + if file, err = c.connection.ObjectCreate(c.name, name, true, "", uploadOpts.ContentType, swift.Headers{}); err != nil { return errors.Wrap(err, "create file") } } diff --git a/vendor/github.com/thanos-io/objstore/testing.go b/vendor/github.com/thanos-io/objstore/testing.go index 80f1e198e0c..ce6ca6b77ac 100644 --- a/vendor/github.com/thanos-io/objstore/testing.go +++ b/vendor/github.com/thanos-io/objstore/testing.go @@ -316,9 +316,9 @@ func (d *delayingBucket) Exists(ctx context.Context, name string) (bool, error) return d.bkt.Exists(ctx, name) } -func (d *delayingBucket) Upload(ctx context.Context, name string, r io.Reader) error { +func (d *delayingBucket) Upload(ctx context.Context, name string, r io.Reader, opts ...ObjectUploadOption) error { time.Sleep(d.delay) - return d.bkt.Upload(ctx, name, r) + return d.bkt.Upload(ctx, name, r, opts...) } func (d *delayingBucket) Delete(ctx context.Context, name string) error { diff --git a/vendor/github.com/thanos-io/objstore/tracing/opentracing/opentracing.go b/vendor/github.com/thanos-io/objstore/tracing/opentracing/opentracing.go index 58bdea0776c..15bbfd20cf5 100644 --- a/vendor/github.com/thanos-io/objstore/tracing/opentracing/opentracing.go +++ b/vendor/github.com/thanos-io/objstore/tracing/opentracing/opentracing.go @@ -110,10 +110,10 @@ func (t TracingBucket) Attributes(ctx context.Context, name string) (attrs objst return } -func (t TracingBucket) Upload(ctx context.Context, name string, r io.Reader) (err error) { +func (t TracingBucket) Upload(ctx context.Context, name string, r io.Reader, opts ...objstore.ObjectUploadOption) (err error) { doWithSpan(ctx, "bucket_upload", func(spanCtx context.Context, span opentracing.Span) { span.LogKV("name", name) - err = t.bkt.Upload(spanCtx, name, r) + err = t.bkt.Upload(spanCtx, name, r, opts...) }) return } diff --git a/vendor/modules.txt b/vendor/modules.txt index b03e967c8f9..e9661582f3d 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1333,8 +1333,8 @@ github.com/stretchr/testify/assert github.com/stretchr/testify/assert/yaml github.com/stretchr/testify/mock github.com/stretchr/testify/require -# github.com/thanos-io/objstore v0.0.0-20250129163715-ec72e5a88a79 -## explicit; go 1.22 +# github.com/thanos-io/objstore v0.0.0-20250317105316-a0136a6f898d +## explicit; go 1.22.7 github.com/thanos-io/objstore github.com/thanos-io/objstore/exthttp github.com/thanos-io/objstore/providers/azure