Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func runCompact(
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, component.String())
bkt, err := client.NewBucket(logger, confContentYaml, component.String(), nil)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func RunDownsample(
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, component.Downsample.String())
bkt, err := client.NewBucket(logger, confContentYaml, component.Downsample.String(), nil)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func runReceive(
}
// The background shipper continuously scans the data directory and uploads
// new blocks to object storage service.
bkt, err = client.NewBucket(logger, confContentYaml, comp.String())
bkt, err = client.NewBucket(logger, confContentYaml, comp.String(), nil)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,7 @@ func runRule(
if len(confContentYaml) > 0 {
// The background shipper continuously scans the data directory and uploads
// new blocks to Google Cloud Storage or an S3-compatible storage service.
bkt, err := client.NewBucket(logger, confContentYaml, component.Rule.String())
bkt, err := client.NewBucket(logger, confContentYaml, component.Rule.String(), nil)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func runSidecar(
if uploads {
// The background shipper continuously scans the data directory and uploads
// new blocks to Google Cloud Storage or an S3-compatible storage service.
bkt, err := client.NewBucket(logger, confContentYaml, component.Sidecar.String())
bkt, err := client.NewBucket(logger, confContentYaml, component.Sidecar.String(), nil)
if err != nil {
return err
}
Expand Down
9 changes: 7 additions & 2 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
commonmodel "github.com/prometheus/common/model"
"github.com/prometheus/common/route"
"gopkg.in/yaml.v2"

"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/client"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
hidden "github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/exthttp"
"github.com/thanos-io/thanos/pkg/extkingpin"
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
Expand Down Expand Up @@ -308,8 +310,11 @@ func runStore(
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, conf.component.String())
customBktConfig := exthttp.DefaultCustomBucketConfig()
if err := yaml.Unmarshal(confContentYaml, &customBktConfig); err != nil {
return errors.Wrap(err, "parsing config YAML file")
}
bkt, err := client.NewBucket(logger, confContentYaml, conf.component.String(), exthttp.CreateHedgedTransportWithConfig(customBktConfig))
if err != nil {
return err
}
Expand Down
20 changes: 10 additions & 10 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.Path
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, component.Bucket.String())
bkt, err := client.NewBucket(logger, confContentYaml, component.Bucket.String(), nil)
if err != nil {
return err
}
Expand All @@ -346,7 +346,7 @@ func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.Path
}
} else {
// nil Prometheus registerer: don't create conflicting metrics.
backupBkt, err = client.NewBucket(logger, backupconfContentYaml, component.Bucket.String())
backupBkt, err = client.NewBucket(logger, backupconfContentYaml, component.Bucket.String(), nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -411,7 +411,7 @@ func registerBucketLs(app extkingpin.AppClause, objStoreConfig *extflag.PathOrCo
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, component.Bucket.String())
bkt, err := client.NewBucket(logger, confContentYaml, component.Bucket.String(), nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -519,7 +519,7 @@ func registerBucketInspect(app extkingpin.AppClause, objStoreConfig *extflag.Pat
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, component.Bucket.String())
bkt, err := client.NewBucket(logger, confContentYaml, component.Bucket.String(), nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -629,7 +629,7 @@ func registerBucketWeb(app extkingpin.AppClause, objStoreConfig *extflag.PathOrC
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, component.Bucket.String())
bkt, err := client.NewBucket(logger, confContentYaml, component.Bucket.String(), nil)
if err != nil {
return errors.Wrap(err, "bucket client")
}
Expand Down Expand Up @@ -826,7 +826,7 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, component.Cleanup.String())
bkt, err := client.NewBucket(logger, confContentYaml, component.Cleanup.String(), nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -1084,7 +1084,7 @@ func registerBucketMarkBlock(app extkingpin.AppClause, objStoreConfig *extflag.P
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, component.Mark.String())
bkt, err := client.NewBucket(logger, confContentYaml, component.Mark.String(), nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -1164,7 +1164,7 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, component.Rewrite.String())
bkt, err := client.NewBucket(logger, confContentYaml, component.Rewrite.String(), nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -1372,7 +1372,7 @@ func registerBucketRetention(app extkingpin.AppClause, objStoreConfig *extflag.P
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, component.Retention.String())
bkt, err := client.NewBucket(logger, confContentYaml, component.Retention.String(), nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -1462,7 +1462,7 @@ func registerBucketUploadBlocks(app extkingpin.AppClause, objStoreConfig *extfla
return errors.Wrap(err, "unable to parse objstore config")
}

bkt, err := client.NewBucket(logger, confContentYaml, component.Upload.String())
bkt, err := client.NewBucket(logger, confContentYaml, component.Upload.String(), nil)
if err != nil {
return errors.Wrap(err, "unable to create bucket")
}
Expand Down
25 changes: 25 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,31 @@ Note that there must be no trailing slash in the `peers` configuration i.e. one

If timeout is set to zero then there is no timeout for fetching and fetching's lifetime is equal to the lifetime to the original request's lifetime. It is recommended to keep it higher than zero. It is generally preferred to keep this value higher because the fetching operation potentially includes loading of data from remote object storage.

## Hedged Requests

Thanos Store Gateway supports `hedged requests` to improve performance and reliability, especially useful in high-latency environments.

The configuration options for hedged requests allow for tuning based on latency tolerance and cost considerations, as some providers may charge per request.

In the `bucket.yml` file, you can specify the following fields under `hedging_config`:

- `up_to`: maximum number of hedged requests allowed for each initial request.
- **Purpose**: controls the redundancy level of hedged requests to improve response times.
- **Cost vs. Benefit**: increasing up_to can reduce latency but may increase costs, as some providers charge per request. Higher values provide diminishing returns on latency beyond a certain level.
- `quantile`: latency threshold, specified as a quantile (e.g., percentile), which determines when additional hedged requests should be sent.
- **Purpose**: controls when hedged requests are triggered based on response time distribution.
- **Cost vs. Benefit**: lower quantile (e.g., 0.7) initiates hedged requests sooner, potentially raising costs while lowering latency variance. A higher quantile (e.g., 0.95) will initiate hedged requests later, reducing cost by limiting redundancy.

By default, `hedging_config` is set as follows:

```yaml
hedging_config:
up_to: 3
quantile: 0.9
```

This configuration sends up to three additional requests if the initial request response time exceeds the 90th percentile.

## Index Header

In order to query series inside blocks from object storage, Store Gateway has to know certain initial info from each block index. In order to achieve so, on startup the Gateway builds an `index-header` for each block and stores it on local disk; such `index-header` is build by downloading specific pieces of original block's index, stored on local disk and then mmaped and used by Store Gateway.
Expand Down
7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ require (
github.com/pkg/errors v0.9.1
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/alertmanager v0.27.0
github.com/prometheus/client_golang v1.20.4
github.com/prometheus/client_golang v1.20.5
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.60.0
github.com/prometheus/exporter-toolkit v0.12.0
// Prometheus maps version 2.x.y to tags v0.x.y.
github.com/prometheus/prometheus v0.55.1-0.20241102120812-a6fd22b9d2c8
github.com/sony/gobreaker v0.5.0
github.com/stretchr/testify v1.9.0
github.com/thanos-io/objstore v0.0.0-20240913074259-63feed0da069
github.com/thanos-io/objstore v0.0.0-20241024120700-168679cbbf20
github.com/thanos-io/promql-engine v0.0.0-20240921092401-37747eddbd31
github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
Expand Down Expand Up @@ -144,6 +144,7 @@ require (
github.com/google/s2a-go v0.1.8 // indirect
github.com/huaweicloud/huaweicloud-sdk-go-obs v3.23.3+incompatible // indirect
github.com/jcchavezs/porto v0.1.0 // indirect
github.com/leesper/go_rng v0.0.0-20190531154944-a612b043e353 // indirect
github.com/mdlayher/socket v0.4.1 // indirect
github.com/mdlayher/vsock v1.2.1 // indirect
github.com/metalmatze/signal v0.0.0-20210307161603-1c9aa721a97a // indirect
Expand Down Expand Up @@ -191,10 +192,12 @@ require (
github.com/aws/smithy-go v1.11.1 // indirect
github.com/baidubce/bce-sdk-go v0.9.111 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/caio/go-tdigest v3.1.0+incompatible
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/chromedp/sysutil v1.0.0 // indirect
github.com/clbanning/mxj v1.8.4 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/cristalhq/hedgedhttp v0.9.1
github.com/dennwc/varint v1.0.0 // indirect
github.com/edsrzf/mmap-go v1.1.0 // indirect
github.com/elastic/go-sysinfo v1.8.1 // indirect
Expand Down
14 changes: 10 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1452,6 +1452,8 @@ github.com/bluele/gcache v0.0.2 h1:WcbfdXICg7G/DGBh1PFfcirkWOQV+v077yF1pSy3DGw=
github.com/bluele/gcache v0.0.2/go.mod h1:m15KV+ECjptwSPxKhOhQoAFQVtUFjTVkc3H8o0t/fp0=
github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
github.com/caio/go-tdigest v3.1.0+incompatible h1:uoVMJ3Q5lXmVLCCqaMGHLBWnbGoN6Lpu7OAUPR60cds=
github.com/caio/go-tdigest v3.1.0+incompatible/go.mod h1:sHQM/ubZStBUmF1WbB8FAm8q9GjDajLC5T7ydxE3JHI=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
Expand Down Expand Up @@ -1508,6 +1510,8 @@ github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV
github.com/cortexproject/promqlsmith v0.0.0-20240506042652-6cfdd9739a5e h1:nOWmgQD3L/Z0bmm29iDxB7nlqjMnh7yD/PNOx9rnZmA=
github.com/cortexproject/promqlsmith v0.0.0-20240506042652-6cfdd9739a5e/go.mod h1:+bSqRETXJ1uk2S93m//htzTVqu8DJPvlGEb3bSE9PzI=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cristalhq/hedgedhttp v0.9.1 h1:g68L9cf8uUyQKQJwciD0A1Vgbsz+QgCjuB1I8FAsCDs=
github.com/cristalhq/hedgedhttp v0.9.1/go.mod h1:XkqWU6qVMutbhW68NnzjWrGtH8NUx1UfYqGYtHVKIsI=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
Expand Down Expand Up @@ -1970,6 +1974,8 @@ github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/ledongthuc/pdf v0.0.0-20220302134840-0c2507a12d80 h1:6Yzfa6GP0rIo/kULo2bwGEkFvCePZ3qHDDTC3/J9Swo=
github.com/ledongthuc/pdf v0.0.0-20220302134840-0c2507a12d80/go.mod h1:imJHygn/1yfhB7XSJJKlFZKl/J+dCPAknuiaGOshXAs=
github.com/leesper/go_rng v0.0.0-20190531154944-a612b043e353 h1:X/79QL0b4YJVO5+OsPH9rF2u428CIrGL/jLmPsoOQQ4=
github.com/leesper/go_rng v0.0.0-20190531154944-a612b043e353/go.mod h1:N0SVk0uhy+E1PZ3C9ctsPRlvOPAFPkCNlcPBDkt0N3U=
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20210210170715-a8dfcb80d3a7 h1:YjW+hUb8Fh2S58z4av4t/0cBMK/Q0aP48RocCFsC8yI=
github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20210210170715-a8dfcb80d3a7/go.mod h1:Spd59icnvRxSKuyijbbwe5AemzvcyXAUBgApa7VybMw=
Expand Down Expand Up @@ -2136,8 +2142,8 @@ github.com/prometheus/client_golang v1.12.1/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrb
github.com/prometheus/client_golang v1.13.0/go.mod h1:vTeo+zgvILHsnnj/39Ou/1fPN5nJFOEMgftOUOmlvYQ=
github.com/prometheus/client_golang v1.14.0/go.mod h1:8vpkKitgIVNcqrRBWh1C4TIUQgYNtG/XQE4E/Zae36Y=
github.com/prometheus/client_golang v1.15.1/go.mod h1:e9yaBhRPU2pPNsZwE+JdQl0KEt1N9XgF6zxWmaC0xOk=
github.com/prometheus/client_golang v1.20.4 h1:Tgh3Yr67PaOv/uTqloMsCEdeuFTatm5zIq5+qNN23vI=
github.com/prometheus/client_golang v1.20.4/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE=
github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y=
github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
Expand Down Expand Up @@ -2249,8 +2255,8 @@ github.com/tencentyun/cos-go-sdk-v5 v0.7.40 h1:W6vDGKCHe4wBACI1d2UgE6+50sJFhRWU4
github.com/tencentyun/cos-go-sdk-v5 v0.7.40/go.mod h1:4dCEtLHGh8QPxHEkgq+nFaky7yZxQuYwgSJM87icDaw=
github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e h1:f1Zsv7OAU9iQhZwigp50Yl38W10g/vd5NC8Rdk1Jzng=
github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e/go.mod h1:jXcofnrSln/cLI6/dhlBxPQZEEQHVPCcFaH75M+nSzM=
github.com/thanos-io/objstore v0.0.0-20240913074259-63feed0da069 h1:TUPZ6euAh8I62KrpDnBIg7k2C5HjgXQnVHoUUMacGwM=
github.com/thanos-io/objstore v0.0.0-20240913074259-63feed0da069/go.mod h1:Cba80S8NbVBBdyZKzra7San/jXvpAxArbpFymWzIZhg=
github.com/thanos-io/objstore v0.0.0-20241024120700-168679cbbf20 h1:NmVMYAsXPnj9zRG5dDj0SGqrHfbs/1parMRZTvwB8YE=
github.com/thanos-io/objstore v0.0.0-20241024120700-168679cbbf20/go.mod h1:/ZMUxFcp/nT6oYV5WslH9k07NU/+86+aibgZRmMMr/4=
github.com/thanos-io/promql-engine v0.0.0-20240921092401-37747eddbd31 h1:xPaP58g+3EPohdw4cv+6jv5+LcX6LynhHvQcYwTAMxQ=
github.com/thanos-io/promql-engine v0.0.0-20240921092401-37747eddbd31/go.mod h1:wx0JlRZtsB2S10JYUgeg5GqLfMxw31SzArP+28yyE00=
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU=
Expand Down
89 changes: 89 additions & 0 deletions pkg/exthttp/hedging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package exthttp

import (
"fmt"
"net/http"
"sync"
"time"

"github.com/caio/go-tdigest"
"github.com/cristalhq/hedgedhttp"
)

type CustomBucketConfig struct {
HedgingConfig HedgingConfig `yaml:"hedging_config"`
}

type HedgingConfig struct {
UpTo uint `yaml:"up_to"`
Quantile float64 `yaml:"quantile"`
}

func DefaultCustomBucketConfig() CustomBucketConfig {
return CustomBucketConfig{
HedgingConfig: HedgingConfig{
UpTo: 3,
Quantile: 0.9,
},
}
}

type hedgingRoundTripper struct {
Transport http.RoundTripper
TDigest *tdigest.TDigest
mu sync.RWMutex
config HedgingConfig
}

func (hrt *hedgingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
start := time.Now()
resp, err := hrt.Transport.RoundTrip(req)
if err != nil {
return nil, err
}
duration := float64(time.Since(start).Milliseconds())
hrt.mu.Lock()
err = hrt.TDigest.Add(duration)
if err != nil {
return nil, err
}
hrt.mu.Unlock()
return resp, err
}

func (hrt *hedgingRoundTripper) nextFn() (int, time.Duration) {
hrt.mu.RLock()
defer hrt.mu.RUnlock()

delayMs := hrt.TDigest.Quantile(hrt.config.Quantile)
delay := time.Duration(delayMs) * time.Millisecond
upto := int(hrt.config.UpTo)
return upto, delay
}

func CreateHedgedTransportWithConfig(config CustomBucketConfig) func(rt http.RoundTripper) http.RoundTripper {
return func(rt http.RoundTripper) http.RoundTripper {
td, err := tdigest.New()
if err != nil {
panic(fmt.Sprintf("BUG: Failed to initialize T-Digest: %v", err))
}
hrt := &hedgingRoundTripper{
Transport: rt,
TDigest: td,
config: config.HedgingConfig,
}
cfg := hedgedhttp.Config{
Transport: hrt,
Upto: int(config.HedgingConfig.UpTo),
Next: hrt.nextFn,
}
hedgedrt, err := hedgedhttp.New(cfg)
if err != nil {
panic(fmt.Sprintf("BUG: Failed to create hedged transport: %v", err))
}
return hedgedrt
}
}
4 changes: 2 additions & 2 deletions pkg/replicate/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func RunReplicate(
return errors.New("No supported bucket was configured to replicate from")
}

bkt, err := client.NewBucket(logger, fromConfContentYaml, component.Replicate.String())
bkt, err := client.NewBucket(logger, fromConfContentYaml, component.Replicate.String(), nil)
if err != nil {
return err
}
Expand All @@ -136,7 +136,7 @@ func RunReplicate(
return errors.New("No supported bucket was configured to replicate to")
}

toBkt, err := client.NewBucket(logger, toConfContentYaml, component.Replicate.String())
toBkt, err := client.NewBucket(logger, toConfContentYaml, component.Replicate.String(), nil)
if err != nil {
return err
}
Expand Down
Loading
Loading