Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

### Added

- [#6369](https://github.com/thanos-io/thanos/pull/6369) Receive: add az-aware replication support for Ketama algorithm
- [#6185](https://github.com/thanos-io/thanos/pull/6185) Tracing: tracing in OTLP support configuring service_name.
- [#6192](https://github.com/thanos-io/thanos/pull/6192) Store: add flag `bucket-web-label` to select the label to use as timeline title in web UI
- [#6167](https://github.com/thanos-io/thanos/pull/6195) Receive: add flag `tsdb.too-far-in-future.time-window` to prevent clock skewed samples to pollute TSDB head and block all valid incoming samples.
Expand Down
41 changes: 41 additions & 0 deletions docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,47 @@ The example content of `hashring.json`:

With such configuration any receive listens for remote write on `<ip>10908/api/v1/receive` and will forward to correct one in hashring if needed for tenancy and replication.

### AZ-aware Ketama hashring (experimental)

In order to ensure even spread for replication over nodes in different availability-zones, you can choose to include az definition in your hashring config. If we for example have a 6 node cluster, spread over 3 different availability zones; A, B and C, we could use the following example `hashring.json`:

```json
[
{
"endpoints": [
{
"address": "127.0.0.1:10907",
"az": "A"
},
{
"address": "127.0.0.1:11907",
"az": "B"
},
{
"address": "127.0.0.1:12907",
"az": "C"
},
{
"address": "127.0.0.1:13907",
"az": "A"
},
{
"address": "127.0.0.1:14907",
"az": "B"
},
{
"address": "127.0.0.1:15907",
"az": "C"
}
]
}
]
```

This is only supported for the Ketama algorithm.

**NOTE:** This feature is made available from v0.32 onwards. Receive can still operate with `endpoints` set to an array of IP strings in ketama mode. But to use AZ-aware hashring, you would need to migrate your existing hashring (and surrounding automation) to the new JSON structure mentioned above.

## Limits & gates (experimental)

Thanos Receive has some limits and gates that can be configured to control resource usage. Here's the difference between limits and gates:
Expand Down
25 changes: 24 additions & 1 deletion pkg/receive/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,35 @@ const (
RouterIngestor ReceiverMode = "RouterIngestor"
)

type Endpoint struct {
Address string `json:"address"`
AZ string `json:"az"`
}

func (e *Endpoint) UnmarshalJSON(data []byte) error {
// First try to unmarshal as a string.
err := json.Unmarshal(data, &e.Address)
if err == nil {
return nil
}

// If that fails, try to unmarshal as an endpoint object.
type endpointAlias Endpoint
var configEndpoint endpointAlias
err = json.Unmarshal(data, &configEndpoint)
if err == nil {
e.Address = configEndpoint.Address
e.AZ = configEndpoint.AZ
}
return err
}

// HashringConfig represents the configuration for a hashring
// a receive node knows about.
type HashringConfig struct {
Hashring string `json:"hashring,omitempty"`
Tenants []string `json:"tenants,omitempty"`
Endpoints []string `json:"endpoints"`
Endpoints []Endpoint `json:"endpoints"`
Algorithm HashringAlgorithm `json:"algorithm,omitempty"`
ExternalLabels map[string]string `json:"external_labels,omitempty"`
}
Expand Down
15 changes: 14 additions & 1 deletion pkg/receive/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestValidateConfig(t *testing.T) {
name: "valid config",
cfg: []HashringConfig{
{
Endpoints: []string{"node1"},
Endpoints: []Endpoint{{Address: "node1"}},
},
},
err: nil, // means it's valid.
Expand Down Expand Up @@ -71,3 +71,16 @@ func TestValidateConfig(t *testing.T) {
})
}
}

func TestUnmarshalEndpointSlice(t *testing.T) {
t.Run("Endpoints as string slice", func(t *testing.T) {
var endpoints []Endpoint
testutil.Ok(t, json.Unmarshal([]byte(`["node-1"]`), &endpoints))
testutil.Equals(t, endpoints, []Endpoint{{Address: "node-1"}})
})
t.Run("Endpoints as endpoints slice", func(t *testing.T) {
var endpoints []Endpoint
testutil.Ok(t, json.Unmarshal([]byte(`[{"address": "node-1", "az": "az-1"}]`), &endpoints))
testutil.Equals(t, endpoints, []Endpoint{{Address: "node-1", AZ: "az-1"}})
})
}
2 changes: 1 addition & 1 deletion pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin
h.peers = peers
addr := ag.newAddr()
h.options.Endpoint = addr
cfg[0].Endpoints = append(cfg[0].Endpoints, h.options.Endpoint)
cfg[0].Endpoints = append(cfg[0].Endpoints, Endpoint{Address: h.options.Endpoint})
peers.cache[addr] = &fakeRemoteWriteGRPCServer{h: h}
}
// Use hashmod as default.
Expand Down
61 changes: 48 additions & 13 deletions pkg/receive/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package receive

import (
"fmt"
"math"
"sort"
"strconv"
"sync"
Expand Down Expand Up @@ -75,6 +76,17 @@ func (s SingleNodeHashring) GetN(_ string, _ *prompb.TimeSeries, n uint64) (stri
// simpleHashring represents a group of nodes handling write requests by hashmoding individual series.
type simpleHashring []string

func newSimpleHashring(endpoints []Endpoint) (Hashring, error) {
addresses := make([]string, len(endpoints))
for i := range endpoints {
if endpoints[i].AZ != "" {
return nil, errors.New("Hashmod algorithm does not support AZ aware hashring configuration. Either use Ketama or remove AZ configuration.")
}
addresses[i] = endpoints[i].Address
}
return simpleHashring(addresses), nil
}

// Get returns a target to handle the given tenant and time series.
func (s simpleHashring) Get(tenant string, ts *prompb.TimeSeries) (string, error) {
return s.GetN(tenant, ts, 0)
Expand All @@ -90,6 +102,7 @@ func (s simpleHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
}

type section struct {
az string
endpointIndex uint64
hash uint64
replicas []uint64
Expand All @@ -104,25 +117,27 @@ func (p sections) Sort() { sort.Sort(p) }

// ketamaHashring represents a group of nodes handling write requests with consistent hashing.
type ketamaHashring struct {
endpoints []string
endpoints []Endpoint
sections sections
numEndpoints uint64
}

func newKetamaHashring(endpoints []string, sectionsPerNode int, replicationFactor uint64) (*ketamaHashring, error) {
func newKetamaHashring(endpoints []Endpoint, sectionsPerNode int, replicationFactor uint64) (*ketamaHashring, error) {
numSections := len(endpoints) * sectionsPerNode

if len(endpoints) < int(replicationFactor) {
return nil, errors.New("ketama: amount of endpoints needs to be larger than replication factor")

}

hash := xxhash.New()
availabilityZones := make(map[string]struct{})
ringSections := make(sections, 0, numSections)
for endpointIndex, endpoint := range endpoints {
availabilityZones[endpoint.AZ] = struct{}{}
for i := 1; i <= sectionsPerNode; i++ {
_, _ = hash.Write([]byte(endpoint + ":" + strconv.Itoa(i)))
_, _ = hash.Write([]byte(endpoint.Address + ":" + strconv.Itoa(i)))
n := &section{
az: endpoint.AZ,
endpointIndex: uint64(endpointIndex),
hash: hash.Sum64(),
replicas: make([]uint64, 0, replicationFactor),
Expand All @@ -133,7 +148,7 @@ func newKetamaHashring(endpoints []string, sectionsPerNode int, replicationFacto
}
}
sort.Sort(ringSections)
calculateSectionReplicas(ringSections, replicationFactor)
calculateSectionReplicas(ringSections, replicationFactor, availabilityZones)

return &ketamaHashring{
endpoints: endpoints,
Expand All @@ -142,19 +157,39 @@ func newKetamaHashring(endpoints []string, sectionsPerNode int, replicationFacto
}, nil
}

func sizeOfLeastOccupiedAZ(azSpread map[string]int64) int64 {
minValue := int64(math.MaxInt64)
for _, value := range azSpread {
if value < minValue {
minValue = value
}
}
return minValue
}

// calculateSectionReplicas pre-calculates replicas for each section,
// ensuring that replicas for each ring section are owned by different endpoints.
func calculateSectionReplicas(ringSections sections, replicationFactor uint64) {
func calculateSectionReplicas(ringSections sections, replicationFactor uint64, availabilityZones map[string]struct{}) {
for i, s := range ringSections {
replicas := make(map[uint64]struct{})
azSpread := make(map[string]int64)
for az := range availabilityZones {
// This is to make sure each az is initially represented
azSpread[az] = 0
}
j := i - 1
for uint64(len(replicas)) < replicationFactor {
j = (j + 1) % len(ringSections)
rep := ringSections[j]
if _, ok := replicas[rep.endpointIndex]; ok {
continue
}
if len(azSpread) > 1 && azSpread[rep.az] > 0 && azSpread[rep.az] > sizeOfLeastOccupiedAZ(azSpread) {
// We want to ensure even AZ spread before we add more replicas within the same AZ
continue
}
replicas[rep.endpointIndex] = struct{}{}
azSpread[rep.az]++
s.replicas = append(s.replicas, rep.endpointIndex)
}
}
Expand Down Expand Up @@ -182,7 +217,7 @@ func (c ketamaHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
}

endpointIndex := c.sections[i].replicas[n]
return c.endpoints[endpointIndex], nil
return c.endpoints[endpointIndex].Address, nil
}

// multiHashring represents a set of hashrings.
Expand Down Expand Up @@ -246,11 +281,11 @@ func NewMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg
for _, h := range cfg {
var hashring Hashring
var err error
activeAlgorithm := algorithm
if h.Algorithm != "" {
hashring, err = newHashring(h.Algorithm, h.Endpoints, replicationFactor, h.Hashring, h.Tenants)
} else {
hashring, err = newHashring(algorithm, h.Endpoints, replicationFactor, h.Hashring, h.Tenants)
activeAlgorithm = h.Algorithm
}
hashring, err = newHashring(activeAlgorithm, h.Endpoints, replicationFactor, h.Hashring, h.Tenants)
if err != nil {
return nil, err
}
Expand All @@ -267,17 +302,17 @@ func NewMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg
return m, nil
}

func newHashring(algorithm HashringAlgorithm, endpoints []string, replicationFactor uint64, hashring string, tenants []string) (Hashring, error) {
func newHashring(algorithm HashringAlgorithm, endpoints []Endpoint, replicationFactor uint64, hashring string, tenants []string) (Hashring, error) {
switch algorithm {
case AlgorithmHashmod:
return simpleHashring(endpoints), nil
return newSimpleHashring(endpoints)
case AlgorithmKetama:
return newKetamaHashring(endpoints, SectionsPerNode, replicationFactor)
default:
l := log.NewNopLogger()
level.Warn(l).Log("msg", "Unrecognizable hashring algorithm. Fall back to hashmod algorithm.",
"hashring", hashring,
"tenants", tenants)
return simpleHashring(endpoints), nil
return newSimpleHashring(endpoints)
}
}
Loading