Skip to content

Commit

Permalink
chore: Move Swift client to new ObjectClientAdapter
Browse files Browse the repository at this point in the history
  • Loading branch information
JoaoBraveCoding committed Oct 30, 2024
1 parent 5a511b9 commit bc2eeb8
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 351 deletions.
26 changes: 24 additions & 2 deletions pkg/storage/bucket/http/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package http

import (
"flag"
"net/http"
"time"
)

Expand All @@ -10,12 +11,25 @@ type Config struct {
IdleConnTimeout time.Duration `yaml:"idle_conn_timeout"`
ResponseHeaderTimeout time.Duration `yaml:"response_header_timeout"`
InsecureSkipVerify bool `yaml:"insecure_skip_verify"`

TLSHandshakeTimeout time.Duration `yaml:"tls_handshake_timeout"`
ExpectContinueTimeout time.Duration `yaml:"expect_continue_timeout"`
MaxIdleConns int `yaml:"max_idle_connections"`
MaxIdleConnsPerHost int `yaml:"max_idle_connections_per_host"`
MaxConnsPerHost int `yaml:"max_connections_per_host"`
CAFile string `yaml:"ca_file"`

// Allow upstream callers to inject a round tripper
Transport http.RoundTripper `yaml:"-"`

TLSConfig TLSConfig `yaml:",inline"`
}

// TLSConfig configures the options for TLS connections.
type TLSConfig struct {
CAPath string `yaml:"tls_ca_path" category:"advanced"`
CertPath string `yaml:"tls_cert_path" category:"advanced"`
KeyPath string `yaml:"tls_key_path" category:"advanced"`
ServerName string `yaml:"tls_server_name" category:"advanced"`
}

// RegisterFlags registers the flags for the storage HTTP client.
Expand All @@ -33,5 +47,13 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.MaxIdleConns, prefix+"max-idle-connections", 100, "Maximum number of idle (keep-alive) connections across all hosts. 0 means no limit.")
f.IntVar(&cfg.MaxIdleConnsPerHost, prefix+"max-idle-connections-per-host", 100, "Maximum number of idle (keep-alive) connections to keep per-host. If 0, a built-in default value is used.")
f.IntVar(&cfg.MaxConnsPerHost, prefix+"max-connections-per-host", 0, "Maximum number of connections per host. 0 means no limit.")
f.StringVar(&cfg.CAFile, prefix+"ca-file", "", "Path to the trusted CA file that signed the SSL certificate of the object storage endpoint.")
cfg.TLSConfig.RegisterFlagsWithPrefix(prefix, f)
}

// RegisterFlagsWithPrefix registers the flags for s3 storage with the provided prefix.
func (cfg *TLSConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.CAPath, prefix+"http.tls-ca-path", "", "Path to the CA certificates to validate server certificate against. If not set, the host's root CA certificates are used.")
f.StringVar(&cfg.CertPath, prefix+"http.tls-cert-path", "", "Path to the client certificate, which will be used for authenticating with the server. Also requires the key path to be configured.")
f.StringVar(&cfg.KeyPath, prefix+"http.tls-key-path", "", "Path to the key for the client certificate. Also requires the client certificate to be configured.")
f.StringVar(&cfg.ServerName, prefix+"http.tls-server-name", "", "Override the expected name on the server certificate.")
}
44 changes: 25 additions & 19 deletions pkg/storage/bucket/swift/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,27 @@ import (
// NewBucketClient creates a new Swift bucket client
func NewBucketClient(cfg Config, _ string, logger log.Logger) (objstore.Bucket, error) {
bucketConfig := swift.Config{
AuthVersion: cfg.AuthVersion,
AuthUrl: cfg.AuthURL,
Username: cfg.Username,
UserDomainName: cfg.UserDomainName,
UserDomainID: cfg.UserDomainID,
UserId: cfg.UserID,
Password: cfg.Password,
DomainId: cfg.DomainID,
DomainName: cfg.DomainName,
ProjectID: cfg.ProjectID,
ProjectName: cfg.ProjectName,
ProjectDomainID: cfg.ProjectDomainID,
ProjectDomainName: cfg.ProjectDomainName,
RegionName: cfg.RegionName,
ContainerName: cfg.ContainerName,
Retries: cfg.MaxRetries,
ConnectTimeout: model.Duration(cfg.ConnectTimeout),
Timeout: model.Duration(cfg.RequestTimeout),
ApplicationCredentialID: cfg.ApplicationCredentialID,
ApplicationCredentialName: cfg.ApplicationCredentialName,
ApplicationCredentialSecret: cfg.ApplicationCredentialSecret.String(),
AuthVersion: cfg.AuthVersion,
AuthUrl: cfg.AuthURL,
Username: cfg.Username,
UserDomainName: cfg.UserDomainName,
UserDomainID: cfg.UserDomainID,
UserId: cfg.UserID,
Password: cfg.Password.String(),
DomainId: cfg.DomainID,
DomainName: cfg.DomainName,
ProjectID: cfg.ProjectID,
ProjectName: cfg.ProjectName,
ProjectDomainID: cfg.ProjectDomainID,
ProjectDomainName: cfg.ProjectDomainName,
RegionName: cfg.RegionName,
ContainerName: cfg.ContainerName,
Retries: cfg.MaxRetries,
ConnectTimeout: model.Duration(cfg.ConnectTimeout),
Timeout: model.Duration(cfg.RequestTimeout),
HTTPConfig: exthttp.HTTPConfig{
IdleConnTimeout: model.Duration(cfg.HTTP.IdleConnTimeout),
ResponseHeaderTimeout: model.Duration(cfg.HTTP.ResponseHeaderTimeout),
Expand All @@ -41,7 +44,10 @@ func NewBucketClient(cfg Config, _ string, logger log.Logger) (objstore.Bucket,
MaxConnsPerHost: cfg.HTTP.MaxConnsPerHost,
Transport: cfg.HTTP.Transport,
TLSConfig: exthttp.TLSConfig{
CAFile: cfg.HTTP.CAFile,
CAFile: cfg.HTTP.TLSConfig.CAPath,
CertFile: cfg.HTTP.TLSConfig.CertPath,
KeyFile: cfg.HTTP.TLSConfig.KeyPath,
ServerName: cfg.HTTP.TLSConfig.ServerName,
},
},

Expand Down
70 changes: 30 additions & 40 deletions pkg/storage/bucket/swift/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,37 @@ package swift

import (
"flag"
"net/http"
"time"

bucket_http "github.com/grafana/loki/pkg/storage/bucket/http"
)
"github.com/grafana/dskit/flagext"

// HTTPConfig stores the http.Transport configuration for the swift minio client.
type HTTPConfig struct {
bucket_http.Config `yaml:",inline"`
Transport http.RoundTripper `yaml:"-"`
}
"github.com/grafana/loki/v3/pkg/storage/bucket/http"
)

// Config holds the config options for Swift backend
type Config struct {
AuthVersion int `yaml:"auth_version"`
AuthURL string `yaml:"auth_url"`
Internal bool `yaml:"internal"`
Username string `yaml:"username"`
UserDomainName string `yaml:"user_domain_name"`
UserDomainID string `yaml:"user_domain_id"`
UserID string `yaml:"user_id"`
Password string `yaml:"password"`
DomainID string `yaml:"domain_id"`
DomainName string `yaml:"domain_name"`
ProjectID string `yaml:"project_id"`
ProjectName string `yaml:"project_name"`
ProjectDomainID string `yaml:"project_domain_id"`
ProjectDomainName string `yaml:"project_domain_name"`
RegionName string `yaml:"region_name"`
ContainerName string `yaml:"container_name"`
MaxRetries int `yaml:"max_retries"`
ConnectTimeout time.Duration `yaml:"connect_timeout"`
RequestTimeout time.Duration `yaml:"request_timeout"`
HTTP HTTPConfig `yaml:"http"`
}

// RegisterFlagsWithPrefix registers the flags for swift storage with the provided prefix
func (cfg *HTTPConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.Config.RegisterFlagsWithPrefix(prefix+"swift.", f)
ApplicationCredentialID string `yaml:"application_credential_id"`
ApplicationCredentialName string `yaml:"application_credential_name"`
ApplicationCredentialSecret flagext.Secret `yaml:"application_credential_secret"`
AuthVersion int `yaml:"auth_version"`
AuthURL string `yaml:"auth_url"`
Username string `yaml:"username"`
UserDomainName string `yaml:"user_domain_name"`
UserDomainID string `yaml:"user_domain_id"`
UserID string `yaml:"user_id"`
Password flagext.Secret `yaml:"password"`
DomainID string `yaml:"domain_id"`
DomainName string `yaml:"domain_name"`
ProjectID string `yaml:"project_id"`
ProjectName string `yaml:"project_name"`
ProjectDomainID string `yaml:"project_domain_id"`
ProjectDomainName string `yaml:"project_domain_name"`
RegionName string `yaml:"region_name"`
ContainerName string `yaml:"container_name"`
MaxRetries int `yaml:"max_retries" category:"advanced"`
ConnectTimeout time.Duration `yaml:"connect_timeout" category:"advanced"`
RequestTimeout time.Duration `yaml:"request_timeout" category:"advanced"`
HTTP http.Config `yaml:"http_config"`
}

// RegisterFlags registers the flags for Swift storage
Expand All @@ -50,14 +42,16 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

// RegisterFlagsWithPrefix registers the flags for Swift storage with the provided prefix
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.ApplicationCredentialID, prefix+"swift.application-credential-id", "", "OpenStack Swift application credential id")
f.StringVar(&cfg.ApplicationCredentialName, prefix+"swift.application-credential-name", "", "OpenStack Swift application credential name")
f.Var(&cfg.ApplicationCredentialSecret, prefix+"swift.application-credential-secret", "OpenStack Swift application credential secret")
f.IntVar(&cfg.AuthVersion, prefix+"swift.auth-version", 0, "OpenStack Swift authentication API version. 0 to autodetect.")
f.StringVar(&cfg.AuthURL, prefix+"swift.auth-url", "", "OpenStack Swift authentication URL")
f.BoolVar(&cfg.Internal, prefix+"swift.internal", false, "Set this to true to use the internal OpenStack Swift endpoint URL")
f.StringVar(&cfg.Username, prefix+"swift.username", "", "OpenStack Swift username.")
f.StringVar(&cfg.UserDomainName, prefix+"swift.user-domain-name", "", "OpenStack Swift user's domain name.")
f.StringVar(&cfg.UserDomainID, prefix+"swift.user-domain-id", "", "OpenStack Swift user's domain ID.")
f.StringVar(&cfg.UserID, prefix+"swift.user-id", "", "OpenStack Swift user ID.")
f.StringVar(&cfg.Password, prefix+"swift.password", "", "OpenStack Swift API key.")
f.Var(&cfg.Password, prefix+"swift.password", "OpenStack Swift API key.")
f.StringVar(&cfg.DomainID, prefix+"swift.domain-id", "", "OpenStack Swift user's domain ID.")
f.StringVar(&cfg.DomainName, prefix+"swift.domain-name", "", "OpenStack Swift user's domain name.")
f.StringVar(&cfg.ProjectID, prefix+"swift.project-id", "", "OpenStack Swift project ID (v2,v3 auth only).")
Expand All @@ -69,9 +63,5 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.MaxRetries, prefix+"swift.max-retries", 3, "Max retries on requests error.")
f.DurationVar(&cfg.ConnectTimeout, prefix+"swift.connect-timeout", 10*time.Second, "Time after which a connection attempt is aborted.")
f.DurationVar(&cfg.RequestTimeout, prefix+"swift.request-timeout", 5*time.Second, "Time after which an idle request is aborted. The timeout watchdog is reset each time some data is received, so the timeout triggers after X time no data is received on a request.")
cfg.HTTP.RegisterFlagsWithPrefix(prefix, f)
}

func (cfg *Config) Validate() error {
return nil
cfg.HTTP.RegisterFlagsWithPrefix(prefix+"swift.", f)
}
86 changes: 46 additions & 40 deletions pkg/storage/chunk/client/openstack/swift_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,44 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

bucket_http "github.com/grafana/loki/v3/pkg/storage/bucket/http"
bucket_swift "github.com/grafana/loki/v3/pkg/storage/bucket/swift"
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging"
"github.com/grafana/loki/v3/pkg/util/log"
)

var defaultTransport http.RoundTripper = &http.Transport{
Proxy: http.ProxyFromEnvironment,
MaxIdleConnsPerHost: 200,
MaxIdleConns: 200,
ExpectContinueTimeout: 5 * time.Second,
}
func defaultTransport(config bucket_http.Config) (http.RoundTripper, error) {
tlsConfig := &tls.Config{}
if len(config.TLSConfig.CAPath) > 0 {
caPath := config.TLSConfig.CAPath
data, err := os.ReadFile(caPath)
if err != nil {
return nil, fmt.Errorf("unable to load specified CA cert %s: %s", caPath, err)
}
caCertPool := x509.NewCertPool()
if !caCertPool.AppendCertsFromPEM(data) {
return nil, fmt.Errorf("unable to use specified CA cert %s", caPath)
}
tlsConfig.RootCAs = caCertPool
}

// HTTPConfig stores the http.Transport configuration
type HTTPConfig struct {
Timeout time.Duration `yaml:"timeout"`
IdleConnTimeout time.Duration `yaml:"idle_conn_timeout"`
ResponseHeaderTimeout time.Duration `yaml:"response_header_timeout"`
InsecureSkipVerify bool `yaml:"insecure_skip_verify"`
CAFile string `yaml:"ca_file"`
if config.Transport != nil {
return config.Transport, nil
}

return &http.Transport{
Proxy: http.ProxyFromEnvironment,
MaxIdleConns: config.MaxIdleConns,
MaxIdleConnsPerHost: config.MaxIdleConnsPerHost,
ExpectContinueTimeout: 5 * time.Second,
// Set this value so that the underlying transport round-tripper
// doesn't try to auto decode the body of objects with
// content-encoding set to `gzip`.
//
// Refer: https://golang.org/src/net/http/transport.go?h=roundTrip#L1843.
TLSClientConfig: tlsConfig,
}, nil
}

type SwiftObjectClient struct {
Expand All @@ -46,8 +64,8 @@ type SwiftObjectClient struct {

// SwiftConfig is config for the Swift Chunk Client.
type SwiftConfig struct {
Internal bool `yaml:"internal"`
bucket_swift.Config `yaml:",inline"`
HTTPConfig HTTPConfig `yaml:"http_config"`
}

// RegisterFlags registers flags.
Expand All @@ -62,9 +80,8 @@ func (cfg *SwiftConfig) Validate() error {

// RegisterFlagsWithPrefix registers flags with prefix.
func (cfg *SwiftConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.BoolVar(&cfg.Internal, prefix+"swift.internal", false, "Set this to true to use the internal OpenStack Swift endpoint URL")
cfg.Config.RegisterFlagsWithPrefix(prefix, f)
f.DurationVar(&cfg.HTTPConfig.Timeout, prefix+"swift.http.timeout", 0, "Timeout specifies a time limit for requests made by swift Client.")
f.StringVar(&cfg.HTTPConfig.CAFile, prefix+"swift.http.ca-file", "", "Path to the trusted CA file that signed the SSL certificate of the Swift endpoint.")
}

// NewSwiftObjectClient makes a new chunk.Client that writes chunks to OpenStack Swift.
Expand All @@ -91,24 +108,16 @@ func NewSwiftObjectClient(cfg SwiftConfig, hedgingCfg hedging.Config) (*SwiftObj
}

func createConnection(cfg SwiftConfig, hedgingCfg hedging.Config, hedging bool) (*swift.Connection, error) {
tlsConfig := &tls.Config{
InsecureSkipVerify: cfg.HTTP.InsecureSkipVerify,
}
if cfg.HTTPConfig.CAFile != "" {
tlsConfig.RootCAs = x509.NewCertPool()
data, err := os.ReadFile(cfg.HTTPConfig.CAFile)
if err != nil {
return nil, err
}
tlsConfig.RootCAs.AppendCertsFromPEM(data)
defaultTransport := defaultTransport.(*http.Transport)
defaultTransport.TLSClientConfig = tlsConfig
defaultTransport, err := defaultTransport(cfg.Config.HTTP)
if err != nil {
return nil, err
}

c := &swift.Connection{
AuthVersion: cfg.Config.AuthVersion,
AuthUrl: cfg.Config.AuthURL,
Internal: cfg.Config.Internal,
ApiKey: cfg.Config.Password,
Internal: cfg.Internal,
ApiKey: cfg.Config.Password.String(),
UserName: cfg.Config.Username,
UserId: cfg.Config.UserID,
Retries: cfg.Config.MaxRetries,
Expand All @@ -124,14 +133,6 @@ func createConnection(cfg SwiftConfig, hedgingCfg hedging.Config, hedging bool)
Transport: defaultTransport,
}

// Create a connection

switch {
case cfg.Config.UserDomainName != "":
c.Domain = cfg.Config.UserDomainName
case cfg.Config.UserDomainID != "":
c.DomainId = cfg.Config.UserDomainID
}
if hedging {
var err error
c.Transport, err = hedgingCfg.RoundTripperWithRegisterer(c.Transport, prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer))
Expand All @@ -140,7 +141,8 @@ func createConnection(cfg SwiftConfig, hedgingCfg hedging.Config, hedging bool)
}
}

err := c.Authenticate(context.TODO())
// Create a connection
err = c.Authenticate(context.TODO())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -252,4 +254,8 @@ func (s *SwiftObjectClient) IsObjectNotFoundErr(err error) bool {
}

// TODO(dannyk): implement for client
func (s *SwiftObjectClient) IsRetryableErr(error) bool { return false }
func IsRetryableErr(error) bool { return false }

func (s *SwiftObjectClient) IsRetryableErr(err error) bool {
return IsRetryableErr(err)
}
10 changes: 8 additions & 2 deletions pkg/storage/chunk/client/openstack/swift_object_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/atomic"

"github.com/grafana/dskit/flagext"

bucket_http "github.com/grafana/loki/v3/pkg/storage/bucket/http"
"github.com/grafana/loki/v3/pkg/storage/bucket/swift"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging"
)
Expand Down Expand Up @@ -61,7 +64,7 @@ func Test_Hedging(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
count := atomic.NewInt32(0)
// hijack the transport to count the number of calls
defaultTransport = RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
transportCounter := RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
// fake auth
if req.Header.Get("X-Auth-Key") == "passwd" {
return &http.Response{
Expand Down Expand Up @@ -93,9 +96,12 @@ func Test_Hedging(t *testing.T) {
MaxRetries: 1,
ContainerName: "foo",
AuthVersion: 1,
Password: "passwd",
Password: flagext.SecretWithValue("passwd"),
ConnectTimeout: 10 * time.Second,
RequestTimeout: 10 * time.Second,
HTTP: bucket_http.Config{
Transport: transportCounter,
},
},
}, hedging.Config{
At: tc.hedgeAt,
Expand Down
Loading

0 comments on commit bc2eeb8

Please sign in to comment.