Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ Generally, you have two ways of using `objstore` module:

First is to import the provider you want e.g. [`github.com/thanos-io/objstore/providers/s3`](providers/s3) and instantiate it with available constructor (e.g. `NewBucket`).

The second option is to use the factory `NewBucket(logger log.Logger, confContentYaml []byte, reg prometheus.Registerer, component string)` that will instantiate the object storage client based on YAML file provided. The YAML file has generally the format like this:
The second option is to use the factory `NewBucket(logger log.Logger, confContentYaml []byte, reg prometheus.Registerer, component string,rt http.RoundTripper)` that will instantiate the object storage client based on YAML file provided. The YAML file has generally the format like this:

```yaml
type: <PROVIDER_TYPE>
Expand All @@ -114,6 +114,8 @@ config:

The exact option depends on provider and are in sections below.

`NewBucket` function now accepts an `http.RoundTripper` parameter allows clients to provide a custom transport for HTTP requests, making the function more flexible , this change facilitates the use of various HTTP client configurations, including hedged HTTP transports.

> NOTE: All code snippets are auto-generated from code and up-to-date.

Check out the [Thanos documentation](https://thanos.io/tip/thanos/storage.md/) to see how Thanos uses this module.
Expand Down
13 changes: 7 additions & 6 deletions client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package client
import (
"context"
"fmt"
"net/http"
"strings"

"github.com/thanos-io/objstore"
Expand Down Expand Up @@ -49,7 +50,7 @@ type BucketConfig struct {

// NewBucket initializes and returns new object storage clients.
// NOTE: confContentYaml can contain secrets.
func NewBucket(logger log.Logger, confContentYaml []byte, component string) (objstore.Bucket, error) {
func NewBucket(logger log.Logger, confContentYaml []byte, component string, rt http.RoundTripper) (objstore.Bucket, error) {
level.Info(logger).Log("msg", "loading bucket configuration")
bucketConf := &BucketConfig{}
if err := yaml.UnmarshalStrict(confContentYaml, bucketConf); err != nil {
Expand All @@ -64,23 +65,23 @@ func NewBucket(logger log.Logger, confContentYaml []byte, component string) (obj
var bucket objstore.Bucket
switch strings.ToUpper(string(bucketConf.Type)) {
case string(GCS):
bucket, err = gcs.NewBucket(context.Background(), logger, config, component)
bucket, err = gcs.NewBucket(context.Background(), logger, config, component, rt)
case string(S3):
bucket, err = s3.NewBucket(logger, config, component)
bucket, err = s3.NewBucket(logger, config, component, rt)
case string(AZURE):
bucket, err = azure.NewBucket(logger, config, component)
bucket, err = azure.NewBucket(logger, config, component, rt)
case string(SWIFT):
bucket, err = swift.NewContainer(logger, config)
Copy link
Member

@GiedriusS GiedriusS May 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not supported by all providers? 🤔 I checked a few like swift/bos and I don't see a reason why this is not implemented there.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not supported by all providers? 🤔 I checked a few like swift/bos and I don't see a reason why this is not implemented there.

i added this parameter to the providers which had http.Client , swift does not uses NewBucket but bos/obs can have this parameter i think also i see that obs makes new ObsClient and bos uses NewClient which makes a BOS client...wdyt?

case string(COS):
bucket, err = cos.NewBucket(logger, config, component)
bucket, err = cos.NewBucket(logger, config, component, rt)
case string(ALIYUNOSS):
bucket, err = oss.NewBucket(logger, config, component)
case string(FILESYSTEM):
bucket, err = filesystem.NewBucketFromConfig(config)
case string(BOS):
bucket, err = bos.NewBucket(logger, config, component)
case string(OCI):
bucket, err = oci.NewBucket(logger, config)
bucket, err = oci.NewBucket(logger, config, rt)
case string(OBS):
bucket, err = obs.NewBucket(logger, config)
default:
Expand Down
7 changes: 4 additions & 3 deletions client/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package client
import (
"context"
"fmt"
"net/http"
"os"

"github.com/go-kit/log"
Expand All @@ -23,7 +24,7 @@ func ExampleBucket() {
}

// Create a new bucket.
bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example")
bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example", http.DefaultTransport)
if err != nil {
panic(err)
}
Expand All @@ -46,7 +47,7 @@ func ExampleTracingBucketUsingOpenTracing() { //nolint:govet
}

// Create a new bucket.
bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example")
bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example", http.DefaultTransport)
if err != nil {
panic(err)
}
Expand All @@ -72,7 +73,7 @@ func ExampleTracingBucketUsingOpenTelemetry() { //nolint:govet
}

// Create a new bucket.
bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example")
bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example", http.DefaultTransport)
if err != nil {
panic(err)
}
Expand Down
11 changes: 6 additions & 5 deletions providers/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package azure
import (
"context"
"io"
"net/http"
"os"
"strings"
"testing"
Expand Down Expand Up @@ -143,7 +144,7 @@ type Bucket struct {
}

// NewBucket returns a new Bucket using the provided Azure config.
func NewBucket(logger log.Logger, azureConfig []byte, component string) (*Bucket, error) {
func NewBucket(logger log.Logger, azureConfig []byte, component string, rt http.RoundTripper) (*Bucket, error) {
level.Debug(logger).Log("msg", "creating new Azure bucket connection", "component", component)
conf, err := parseConfig(azureConfig)
if err != nil {
Expand All @@ -152,16 +153,16 @@ func NewBucket(logger log.Logger, azureConfig []byte, component string) (*Bucket
if conf.MSIResource != "" {
level.Warn(logger).Log("msg", "The field msi_resource has been deprecated and should no longer be set")
}
return NewBucketWithConfig(logger, conf, component)
return NewBucketWithConfig(logger, conf, component, rt)
}

// NewBucketWithConfig returns a new Bucket using the provided Azure config struct.
func NewBucketWithConfig(logger log.Logger, conf Config, component string) (*Bucket, error) {
func NewBucketWithConfig(logger log.Logger, conf Config, component string, rt http.RoundTripper) (*Bucket, error) {
if err := conf.validate(); err != nil {
return nil, err
}

containerClient, err := getContainerClient(conf)
containerClient, err := getContainerClient(conf, rt)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -352,7 +353,7 @@ func NewTestBucket(t testing.TB, component string) (objstore.Bucket, func(), err
if err != nil {
return nil, nil, err
}
bkt, err := NewBucket(log.NewNopLogger(), bc, component)
bkt, err := NewBucket(log.NewNopLogger(), bc, component, http.DefaultTransport)
if err != nil {
t.Errorf("Cannot create Azure storage container:")
return nil, nil, err
Expand Down
16 changes: 12 additions & 4 deletions providers/azure/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,19 @@ import (
// DirDelim is the delimiter used to model a directory structure in an object store bucket.
const DirDelim = "/"

func getContainerClient(conf Config) (*container.Client, error) {
dt, err := exthttp.DefaultTransport(conf.HTTPConfig)
if err != nil {
return nil, err
func getContainerClient(conf Config, rt http.RoundTripper) (*container.Client, error) {
var dt http.RoundTripper
var err error

if rt != nil {
dt = rt
} else {
dt, err = exthttp.DefaultTransport(conf.HTTPConfig)
if err != nil {
return nil, err
}
}

opt := &container.ClientOptions{
ClientOptions: azcore.ClientOptions{
Retry: policy.RetryOptions{
Expand Down
17 changes: 11 additions & 6 deletions providers/cos/cos.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func parseConfig(conf []byte) (Config, error) {
}

// NewBucket returns a new Bucket using the provided cos configuration.
func NewBucket(logger log.Logger, conf []byte, component string) (*Bucket, error) {
func NewBucket(logger log.Logger, conf []byte, component string, rt http.RoundTripper) (*Bucket, error) {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -105,11 +105,11 @@ func NewBucket(logger log.Logger, conf []byte, component string) (*Bucket, error
return nil, errors.Wrap(err, "parsing cos configuration")
}

return NewBucketWithConfig(logger, config, component)
return NewBucketWithConfig(logger, config, component, rt)
}

// NewBucketWithConfig returns a new Bucket using the provided cos config values.
func NewBucketWithConfig(logger log.Logger, config Config, component string) (*Bucket, error) {
func NewBucketWithConfig(logger log.Logger, config Config, component string, rt http.RoundTripper) (*Bucket, error) {
if err := config.validate(); err != nil {
return nil, errors.Wrap(err, "validate cos configuration")
}
Expand All @@ -128,7 +128,12 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string) (*B
}
}
b := &cos.BaseURL{BucketURL: bucketURL}
tpt, _ := exthttp.DefaultTransport(config.HTTPConfig)
var tpt http.RoundTripper
if rt != nil {
tpt = rt
} else {
tpt, _ = exthttp.DefaultTransport(config.HTTPConfig)
}
client := cos.NewClient(b, &http.Client{
Transport: &cos.AuthorizationTransport{
SecretID: config.SecretId,
Expand Down Expand Up @@ -485,7 +490,7 @@ func NewTestBucket(t testing.TB) (objstore.Bucket, func(), error) {
return nil, nil, err
}

b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test")
b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test", http.DefaultTransport)
if err != nil {
return nil, nil, err
}
Expand All @@ -506,7 +511,7 @@ func NewTestBucket(t testing.TB) (objstore.Bucket, func(), error) {
return nil, nil, err
}

b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test")
b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test", http.DefaultTransport)
if err != nil {
return nil, nil, err
}
Expand Down
23 changes: 11 additions & 12 deletions providers/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,16 @@ func parseConfig(conf []byte) (Config, error) {
}

// NewBucket returns a new Bucket against the given bucket handle.
func NewBucket(ctx context.Context, logger log.Logger, conf []byte, component string) (*Bucket, error) {
func NewBucket(ctx context.Context, logger log.Logger, conf []byte, component string, rt http.RoundTripper) (*Bucket, error) {
config, err := parseConfig(conf)
if err != nil {
return nil, err
}
return NewBucketWithConfig(ctx, logger, config, component)
return NewBucketWithConfig(ctx, logger, config, component, rt)
}

// NewBucketWithConfig returns a new Bucket with gcs Config struct.
func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, component string) (*Bucket, error) {
func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, component string, rt http.RoundTripper) (*Bucket, error) {
if gc.Bucket == "" {
return nil, errors.New("missing Google Cloud Storage bucket name for stored blocks")
}
Expand All @@ -103,7 +103,7 @@ func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, comp

if !gc.UseGRPC {
var err error
opts, err = appendHttpOptions(gc, opts)
opts, err = appendHttpOptions(gc, opts, rt)
if err != nil {
return nil, err
}
Expand All @@ -112,25 +112,24 @@ func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, comp
return newBucket(ctx, logger, gc, opts)
}

func appendHttpOptions(gc Config, opts []option.ClientOption) ([]option.ClientOption, error) {
func appendHttpOptions(gc Config, opts []option.ClientOption, rt http.RoundTripper) ([]option.ClientOption, error) {
// Check if a roundtripper has been set in the config
// otherwise build the default transport.
var rt http.RoundTripper
if gc.HTTPConfig.Transport != nil {
rt = gc.HTTPConfig.Transport
var tpt http.RoundTripper
if rt != nil {
tpt = rt
} else {
var err error
rt, err = exthttp.DefaultTransport(gc.HTTPConfig)
tpt, err = exthttp.DefaultTransport(gc.HTTPConfig)
if err != nil {
return nil, err
}
}

// GCS uses some defaults when "options.WithHTTPClient" is not used that are important when we call
// htransport.NewTransport namely the scopes that are then used for OAth authentication. So to build our own
// http client we need to se those defaults
opts = append(opts, option.WithScopes(storage.ScopeFullControl, "https://www.googleapis.com/auth/cloud-platform"))
gRT, err := htransport.NewTransport(context.Background(), rt, opts...)
gRT, err := htransport.NewTransport(context.Background(), tpt, opts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -302,7 +301,7 @@ func NewTestBucket(t testing.TB, project string) (objstore.Bucket, func(), error
return nil, nil, err
}

b, err := NewBucket(ctx, log.NewNopLogger(), bc, "thanos-e2e-test")
b, err := NewBucket(ctx, log.NewNopLogger(), bc, "thanos-e2e-test", http.DefaultTransport)
if err != nil {
return nil, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion providers/gcs/gcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestNewBucketWithConfig_ShouldCreateGRPC(t *testing.T) {
err = os.Setenv("STORAGE_EMULATOR_HOST_GRPC", svr.Addr)
testutil.Ok(t, err)

bkt, err := NewBucketWithConfig(context.Background(), log.NewNopLogger(), cfg, "test-bucket")
bkt, err := NewBucketWithConfig(context.Background(), log.NewNopLogger(), cfg, "test-bucket", http.DefaultTransport)
testutil.Ok(t, err)

// Check if the bucket is created.
Expand Down
13 changes: 10 additions & 3 deletions providers/oci/oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (b *Bucket) deleteBucket(ctx context.Context) (err error) {
}

// NewBucket returns a new Bucket using the provided oci config values.
func NewBucket(logger log.Logger, ociConfig []byte) (*Bucket, error) {
func NewBucket(logger log.Logger, ociConfig []byte, rt http.RoundTripper) (*Bucket, error) {
level.Debug(logger).Log("msg", "creating new oci bucket connection")
var config = DefaultConfig
var configurationProvider common.ConfigurationProvider
Expand Down Expand Up @@ -335,8 +335,15 @@ func NewBucket(logger log.Logger, ociConfig []byte) (*Bucket, error) {
return nil, errors.Wrapf(err, "unable to create ObjectStorage client with the given oci configurations")
}

var tpt http.RoundTripper
if rt != nil {
tpt = rt
} else {
tpt = CustomTransport(config)
}

httpClient := http.Client{
Transport: CustomTransport(config),
Transport: tpt,
Timeout: config.HTTPConfig.ClientTimeout,
}
client.HTTPClient = &httpClient
Expand Down Expand Up @@ -375,7 +382,7 @@ func NewTestBucket(t testing.TB) (objstore.Bucket, func(), error) {
return nil, nil, err
}

bkt, err := NewBucket(log.NewNopLogger(), ociConfig)
bkt, err := NewBucket(log.NewNopLogger(), ociConfig, http.DefaultTransport)
if err != nil {
return nil, nil, err
}
Expand Down
18 changes: 9 additions & 9 deletions providers/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,13 @@ func parseConfig(conf []byte) (Config, error) {
}

// NewBucket returns a new Bucket using the provided s3 config values.
func NewBucket(logger log.Logger, conf []byte, component string) (*Bucket, error) {
func NewBucket(logger log.Logger, conf []byte, component string, rt http.RoundTripper) (*Bucket, error) {
config, err := parseConfig(conf)
if err != nil {
return nil, err
}

return NewBucketWithConfig(logger, config, component)
return NewBucketWithConfig(logger, config, component, rt)
}

type overrideSignerType struct {
Expand All @@ -201,7 +201,7 @@ func (s *overrideSignerType) Retrieve() (credentials.Value, error) {
}

// NewBucketWithConfig returns a new Bucket using the provided s3 config values.
func NewBucketWithConfig(logger log.Logger, config Config, component string) (*Bucket, error) {
func NewBucketWithConfig(logger log.Logger, config Config, component string, rt http.RoundTripper) (*Bucket, error) {
var chain []credentials.Provider

// TODO(bwplotka): Don't do flags as they won't scale, use actual params like v2, v4 instead
Expand Down Expand Up @@ -244,12 +244,12 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string) (*B

// Check if a roundtripper has been set in the config
// otherwise build the default transport.
var rt http.RoundTripper
if config.HTTPConfig.Transport != nil {
rt = config.HTTPConfig.Transport
var tpt http.RoundTripper
if rt != nil {
tpt = rt
} else {
var err error
rt, err = exthttp.DefaultTransport(config.HTTPConfig)
tpt, err = exthttp.DefaultTransport(config.HTTPConfig)
if err != nil {
return nil, err
}
Expand All @@ -259,7 +259,7 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string) (*B
Creds: credentials.NewChainCredentials(chain),
Secure: !config.Insecure,
Region: config.Region,
Transport: rt,
Transport: tpt,
BucketLookup: config.BucketLookupType.MinioType(),
})
if err != nil {
Expand Down Expand Up @@ -605,7 +605,7 @@ func NewTestBucketFromConfig(t testing.TB, location string, c Config, reuseBucke
if err != nil {
return nil, nil, err
}
b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test")
b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test", http.DefaultTransport)
if err != nil {
return nil, nil, err
}
Expand Down
Loading