diff --git a/CHANGELOG.md b/CHANGELOG.md index e3778004fab..2f0c57d30bc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Added +- [#6369](https://github.com/thanos-io/thanos/pull/6369) Receive: add az-aware replication support for Ketama algorithm - [#6185](https://github.com/thanos-io/thanos/pull/6185) Tracing: tracing in OTLP support configuring service_name. - [#6192](https://github.com/thanos-io/thanos/pull/6192) Store: add flag `bucket-web-label` to select the label to use as timeline title in web UI - [#6167](https://github.com/thanos-io/thanos/pull/6195) Receive: add flag `tsdb.too-far-in-future.time-window` to prevent clock skewed samples to pollute TSDB head and block all valid incoming samples. diff --git a/docs/components/receive.md b/docs/components/receive.md index 50bf682fad6..fbf08e4b631 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -95,6 +95,47 @@ The example content of `hashring.json`: With such configuration any receive listens for remote write on `10908/api/v1/receive` and will forward to correct one in hashring if needed for tenancy and replication. +### AZ-aware Ketama hashring (experimental) + +In order to ensure even spread for replication over nodes in different availability-zones, you can choose to include az definition in your hashring config. If we for example have a 6 node cluster, spread over 3 different availability zones; A, B and C, we could use the following example `hashring.json`: + +```json +[ + { + "endpoints": [ + { + "address": "127.0.0.1:10907", + "az": "A" + }, + { + "address": "127.0.0.1:11907", + "az": "B" + }, + { + "address": "127.0.0.1:12907", + "az": "C" + }, + { + "address": "127.0.0.1:13907", + "az": "A" + }, + { + "address": "127.0.0.1:14907", + "az": "B" + }, + { + "address": "127.0.0.1:15907", + "az": "C" + } + ] + } +] +``` + +This is only supported for the Ketama algorithm. + +**NOTE:** This feature is made available from v0.32 onwards. Receive can still operate with `endpoints` set to an array of IP strings in ketama mode. But to use AZ-aware hashring, you would need to migrate your existing hashring (and surrounding automation) to the new JSON structure mentioned above. + ## Limits & gates (experimental) Thanos Receive has some limits and gates that can be configured to control resource usage. Here's the difference between limits and gates: diff --git a/pkg/receive/config.go b/pkg/receive/config.go index a88942c16bf..2772e1fd989 100644 --- a/pkg/receive/config.go +++ b/pkg/receive/config.go @@ -37,12 +37,35 @@ const ( RouterIngestor ReceiverMode = "RouterIngestor" ) +type Endpoint struct { + Address string `json:"address"` + AZ string `json:"az"` +} + +func (e *Endpoint) UnmarshalJSON(data []byte) error { + // First try to unmarshal as a string. + err := json.Unmarshal(data, &e.Address) + if err == nil { + return nil + } + + // If that fails, try to unmarshal as an endpoint object. + type endpointAlias Endpoint + var configEndpoint endpointAlias + err = json.Unmarshal(data, &configEndpoint) + if err == nil { + e.Address = configEndpoint.Address + e.AZ = configEndpoint.AZ + } + return err +} + // HashringConfig represents the configuration for a hashring // a receive node knows about. type HashringConfig struct { Hashring string `json:"hashring,omitempty"` Tenants []string `json:"tenants,omitempty"` - Endpoints []string `json:"endpoints"` + Endpoints []Endpoint `json:"endpoints"` Algorithm HashringAlgorithm `json:"algorithm,omitempty"` ExternalLabels map[string]string `json:"external_labels,omitempty"` } diff --git a/pkg/receive/config_test.go b/pkg/receive/config_test.go index 47146fb424f..c1230f327e6 100644 --- a/pkg/receive/config_test.go +++ b/pkg/receive/config_test.go @@ -38,7 +38,7 @@ func TestValidateConfig(t *testing.T) { name: "valid config", cfg: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, }, }, err: nil, // means it's valid. @@ -71,3 +71,16 @@ func TestValidateConfig(t *testing.T) { }) } } + +func TestUnmarshalEndpointSlice(t *testing.T) { + t.Run("Endpoints as string slice", func(t *testing.T) { + var endpoints []Endpoint + testutil.Ok(t, json.Unmarshal([]byte(`["node-1"]`), &endpoints)) + testutil.Equals(t, endpoints, []Endpoint{{Address: "node-1"}}) + }) + t.Run("Endpoints as endpoints slice", func(t *testing.T) { + var endpoints []Endpoint + testutil.Ok(t, json.Unmarshal([]byte(`[{"address": "node-1", "az": "az-1"}]`), &endpoints)) + testutil.Equals(t, endpoints, []Endpoint{{Address: "node-1", AZ: "az-1"}}) + }) +} diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 735c858e77f..ae4ec9b9850 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -195,7 +195,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin h.peers = peers addr := ag.newAddr() h.options.Endpoint = addr - cfg[0].Endpoints = append(cfg[0].Endpoints, h.options.Endpoint) + cfg[0].Endpoints = append(cfg[0].Endpoints, Endpoint{Address: h.options.Endpoint}) peers.cache[addr] = &fakeRemoteWriteGRPCServer{h: h} } // Use hashmod as default. diff --git a/pkg/receive/hashring.go b/pkg/receive/hashring.go index 4742aa8c988..18925cc4cc2 100644 --- a/pkg/receive/hashring.go +++ b/pkg/receive/hashring.go @@ -5,6 +5,7 @@ package receive import ( "fmt" + "math" "sort" "strconv" "sync" @@ -75,6 +76,17 @@ func (s SingleNodeHashring) GetN(_ string, _ *prompb.TimeSeries, n uint64) (stri // simpleHashring represents a group of nodes handling write requests by hashmoding individual series. type simpleHashring []string +func newSimpleHashring(endpoints []Endpoint) (Hashring, error) { + addresses := make([]string, len(endpoints)) + for i := range endpoints { + if endpoints[i].AZ != "" { + return nil, errors.New("Hashmod algorithm does not support AZ aware hashring configuration. Either use Ketama or remove AZ configuration.") + } + addresses[i] = endpoints[i].Address + } + return simpleHashring(addresses), nil +} + // Get returns a target to handle the given tenant and time series. func (s simpleHashring) Get(tenant string, ts *prompb.TimeSeries) (string, error) { return s.GetN(tenant, ts, 0) @@ -90,6 +102,7 @@ func (s simpleHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st } type section struct { + az string endpointIndex uint64 hash uint64 replicas []uint64 @@ -104,25 +117,27 @@ func (p sections) Sort() { sort.Sort(p) } // ketamaHashring represents a group of nodes handling write requests with consistent hashing. type ketamaHashring struct { - endpoints []string + endpoints []Endpoint sections sections numEndpoints uint64 } -func newKetamaHashring(endpoints []string, sectionsPerNode int, replicationFactor uint64) (*ketamaHashring, error) { +func newKetamaHashring(endpoints []Endpoint, sectionsPerNode int, replicationFactor uint64) (*ketamaHashring, error) { numSections := len(endpoints) * sectionsPerNode if len(endpoints) < int(replicationFactor) { return nil, errors.New("ketama: amount of endpoints needs to be larger than replication factor") } - hash := xxhash.New() + availabilityZones := make(map[string]struct{}) ringSections := make(sections, 0, numSections) for endpointIndex, endpoint := range endpoints { + availabilityZones[endpoint.AZ] = struct{}{} for i := 1; i <= sectionsPerNode; i++ { - _, _ = hash.Write([]byte(endpoint + ":" + strconv.Itoa(i))) + _, _ = hash.Write([]byte(endpoint.Address + ":" + strconv.Itoa(i))) n := §ion{ + az: endpoint.AZ, endpointIndex: uint64(endpointIndex), hash: hash.Sum64(), replicas: make([]uint64, 0, replicationFactor), @@ -133,7 +148,7 @@ func newKetamaHashring(endpoints []string, sectionsPerNode int, replicationFacto } } sort.Sort(ringSections) - calculateSectionReplicas(ringSections, replicationFactor) + calculateSectionReplicas(ringSections, replicationFactor, availabilityZones) return &ketamaHashring{ endpoints: endpoints, @@ -142,11 +157,26 @@ func newKetamaHashring(endpoints []string, sectionsPerNode int, replicationFacto }, nil } +func sizeOfLeastOccupiedAZ(azSpread map[string]int64) int64 { + minValue := int64(math.MaxInt64) + for _, value := range azSpread { + if value < minValue { + minValue = value + } + } + return minValue +} + // calculateSectionReplicas pre-calculates replicas for each section, // ensuring that replicas for each ring section are owned by different endpoints. -func calculateSectionReplicas(ringSections sections, replicationFactor uint64) { +func calculateSectionReplicas(ringSections sections, replicationFactor uint64, availabilityZones map[string]struct{}) { for i, s := range ringSections { replicas := make(map[uint64]struct{}) + azSpread := make(map[string]int64) + for az := range availabilityZones { + // This is to make sure each az is initially represented + azSpread[az] = 0 + } j := i - 1 for uint64(len(replicas)) < replicationFactor { j = (j + 1) % len(ringSections) @@ -154,7 +184,12 @@ func calculateSectionReplicas(ringSections sections, replicationFactor uint64) { if _, ok := replicas[rep.endpointIndex]; ok { continue } + if len(azSpread) > 1 && azSpread[rep.az] > 0 && azSpread[rep.az] > sizeOfLeastOccupiedAZ(azSpread) { + // We want to ensure even AZ spread before we add more replicas within the same AZ + continue + } replicas[rep.endpointIndex] = struct{}{} + azSpread[rep.az]++ s.replicas = append(s.replicas, rep.endpointIndex) } } @@ -182,7 +217,7 @@ func (c ketamaHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st } endpointIndex := c.sections[i].replicas[n] - return c.endpoints[endpointIndex], nil + return c.endpoints[endpointIndex].Address, nil } // multiHashring represents a set of hashrings. @@ -246,11 +281,11 @@ func NewMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg for _, h := range cfg { var hashring Hashring var err error + activeAlgorithm := algorithm if h.Algorithm != "" { - hashring, err = newHashring(h.Algorithm, h.Endpoints, replicationFactor, h.Hashring, h.Tenants) - } else { - hashring, err = newHashring(algorithm, h.Endpoints, replicationFactor, h.Hashring, h.Tenants) + activeAlgorithm = h.Algorithm } + hashring, err = newHashring(activeAlgorithm, h.Endpoints, replicationFactor, h.Hashring, h.Tenants) if err != nil { return nil, err } @@ -267,10 +302,10 @@ func NewMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg return m, nil } -func newHashring(algorithm HashringAlgorithm, endpoints []string, replicationFactor uint64, hashring string, tenants []string) (Hashring, error) { +func newHashring(algorithm HashringAlgorithm, endpoints []Endpoint, replicationFactor uint64, hashring string, tenants []string) (Hashring, error) { switch algorithm { case AlgorithmHashmod: - return simpleHashring(endpoints), nil + return newSimpleHashring(endpoints) case AlgorithmKetama: return newKetamaHashring(endpoints, SectionsPerNode, replicationFactor) default: @@ -278,6 +313,6 @@ func newHashring(algorithm HashringAlgorithm, endpoints []string, replicationFac level.Warn(l).Log("msg", "Unrecognizable hashring algorithm. Fall back to hashmod algorithm.", "hashring", hashring, "tenants", tenants) - return simpleHashring(endpoints), nil + return newSimpleHashring(endpoints) } } diff --git a/pkg/receive/hashring_test.go b/pkg/receive/hashring_test.go index 3a72aac9ebd..ad6d4db8553 100644 --- a/pkg/receive/hashring_test.go +++ b/pkg/receive/hashring_test.go @@ -5,8 +5,11 @@ package receive import ( "fmt" + "math" + "strings" "testing" + "github.com/efficientgo/core/testutil" "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/model/labels" @@ -44,7 +47,7 @@ func TestHashringGet(t *testing.T) { name: "simple", cfg: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, }, }, nodes: map[string]struct{}{"node1": {}}, @@ -53,11 +56,11 @@ func TestHashringGet(t *testing.T) { name: "specific", cfg: []HashringConfig{ { - Endpoints: []string{"node2"}, + Endpoints: []Endpoint{{Address: "node2"}}, Tenants: []string{"tenant2"}, }, { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, }, }, nodes: map[string]struct{}{"node2": {}}, @@ -67,15 +70,15 @@ func TestHashringGet(t *testing.T) { name: "many tenants", cfg: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1"}, }, { - Endpoints: []string{"node2"}, + Endpoints: []Endpoint{{Address: "node2"}}, Tenants: []string{"tenant2"}, }, { - Endpoints: []string{"node3"}, + Endpoints: []Endpoint{{Address: "node3"}}, Tenants: []string{"tenant3"}, }, }, @@ -86,15 +89,15 @@ func TestHashringGet(t *testing.T) { name: "many tenants error", cfg: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1"}, }, { - Endpoints: []string{"node2"}, + Endpoints: []Endpoint{{Address: "node2"}}, Tenants: []string{"tenant2"}, }, { - Endpoints: []string{"node3"}, + Endpoints: []Endpoint{{Address: "node3"}}, Tenants: []string{"tenant3"}, }, }, @@ -104,11 +107,11 @@ func TestHashringGet(t *testing.T) { name: "many nodes", cfg: []HashringConfig{ { - Endpoints: []string{"node1", "node2", "node3"}, + Endpoints: []Endpoint{{Address: "node1"}, {Address: "node2"}, {Address: "node3"}}, Tenants: []string{"tenant1"}, }, { - Endpoints: []string{"node4", "node5", "node6"}, + Endpoints: []Endpoint{{Address: "node4"}, {Address: "node5"}, {Address: "node6"}}, }, }, nodes: map[string]struct{}{ @@ -122,11 +125,11 @@ func TestHashringGet(t *testing.T) { name: "many nodes default", cfg: []HashringConfig{ { - Endpoints: []string{"node1", "node2", "node3"}, + Endpoints: []Endpoint{{Address: "node1"}, {Address: "node2"}, {Address: "node3"}}, Tenants: []string{"tenant1"}, }, { - Endpoints: []string{"node4", "node5", "node6"}, + Endpoints: []Endpoint{{Address: "node4"}, {Address: "node5"}, {Address: "node6"}}, }, }, nodes: map[string]struct{}{ @@ -167,53 +170,53 @@ func TestKetamaHashringGet(t *testing.T) { } tests := []struct { name string - nodes []string + endpoints []Endpoint expectedNode string ts *prompb.TimeSeries n uint64 }{ { name: "base case", - nodes: []string{"node-1", "node-2", "node-3"}, + endpoints: []Endpoint{{Address: "node-1"}, {Address: "node-2"}, {Address: "node-3"}}, ts: baseTS, expectedNode: "node-2", }, { name: "base case with replication", - nodes: []string{"node-1", "node-2", "node-3"}, + endpoints: []Endpoint{{Address: "node-1"}, {Address: "node-2"}, {Address: "node-3"}}, ts: baseTS, n: 1, expectedNode: "node-1", }, { name: "base case with replication", - nodes: []string{"node-1", "node-2", "node-3"}, + endpoints: []Endpoint{{Address: "node-1"}, {Address: "node-2"}, {Address: "node-3"}}, ts: baseTS, n: 2, expectedNode: "node-3", }, { name: "base case with replication and reordered nodes", - nodes: []string{"node-1", "node-3", "node-2"}, + endpoints: []Endpoint{{Address: "node-1"}, {Address: "node-3"}, {Address: "node-2"}}, ts: baseTS, n: 2, expectedNode: "node-3", }, { name: "base case with new node at beginning of ring", - nodes: []string{"node-0", "node-1", "node-2", "node-3"}, + endpoints: []Endpoint{{Address: "node-0"}, {Address: "node-1"}, {Address: "node-2"}, {Address: "node-3"}}, ts: baseTS, expectedNode: "node-2", }, { name: "base case with new node at end of ring", - nodes: []string{"node-1", "node-2", "node-3", "node-4"}, + endpoints: []Endpoint{{Address: "node-1"}, {Address: "node-2"}, {Address: "node-3"}, {Address: "node-4"}}, ts: baseTS, expectedNode: "node-2", }, { - name: "base case with different timeseries", - nodes: []string{"node-1", "node-2", "node-3"}, + name: "base case with different timeseries", + endpoints: []Endpoint{{Address: "node-1"}, {Address: "node-2"}, {Address: "node-3"}}, ts: &prompb.TimeSeries{ Labels: []labelpb.ZLabel{ { @@ -228,7 +231,7 @@ func TestKetamaHashringGet(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - hashRing, err := newKetamaHashring(test.nodes, 10, test.n+1) + hashRing, err := newKetamaHashring(test.endpoints, 10, test.n+1) require.NoError(t, err) result, err := hashRing.GetN("tenant", test.ts, test.n) @@ -239,18 +242,18 @@ func TestKetamaHashringGet(t *testing.T) { } func TestKetamaHashringBadConfigIsRejected(t *testing.T) { - _, err := newKetamaHashring([]string{"node-1"}, 1, 2) + _, err := newKetamaHashring([]Endpoint{{Address: "node-1"}}, 1, 2) require.Error(t, err) } func TestKetamaHashringConsistency(t *testing.T) { series := makeSeries() - ringA := []string{"node-1", "node-2", "node-3"} + ringA := []Endpoint{{Address: "node-1"}, {Address: "node-2"}, {Address: "node-3"}} a1, err := assignSeries(series, ringA) require.NoError(t, err) - ringB := []string{"node-1", "node-2", "node-3"} + ringB := []Endpoint{{Address: "node-1"}, {Address: "node-2"}, {Address: "node-3"}} a2, err := assignSeries(series, ringB) require.NoError(t, err) @@ -266,18 +269,18 @@ func TestKetamaHashringConsistency(t *testing.T) { func TestKetamaHashringIncreaseAtEnd(t *testing.T) { series := makeSeries() - initialRing := []string{"node-1", "node-2", "node-3"} + initialRing := []Endpoint{{Address: "node-1"}, {Address: "node-2"}, {Address: "node-3"}} initialAssignments, err := assignSeries(series, initialRing) require.NoError(t, err) - resizedRing := []string{"node-1", "node-2", "node-3", "node-4", "node-5"} + resizedRing := []Endpoint{{Address: "node-1"}, {Address: "node-2"}, {Address: "node-3"}, {Address: "node-4"}, {Address: "node-5"}} reassignments, err := assignSeries(series, resizedRing) require.NoError(t, err) // Assert that the initial nodes have no new keys after increasing the ring size for _, node := range initialRing { - for _, ts := range reassignments[node] { - foundInInitialAssignment := findSeries(initialAssignments, node, ts) + for _, ts := range reassignments[node.Address] { + foundInInitialAssignment := findSeries(initialAssignments, node.Address, ts) require.True(t, foundInInitialAssignment, "node %s contains new series after resizing", node) } } @@ -286,18 +289,18 @@ func TestKetamaHashringIncreaseAtEnd(t *testing.T) { func TestKetamaHashringIncreaseInMiddle(t *testing.T) { series := makeSeries() - initialRing := []string{"node-1", "node-3"} + initialRing := []Endpoint{{Address: "node-1"}, {Address: "node-3"}} initialAssignments, err := assignSeries(series, initialRing) require.NoError(t, err) - resizedRing := []string{"node-1", "node-2", "node-3"} + resizedRing := []Endpoint{{Address: "node-1"}, {Address: "node-2"}, {Address: "node-3"}} reassignments, err := assignSeries(series, resizedRing) require.NoError(t, err) // Assert that the initial nodes have no new keys after increasing the ring size for _, node := range initialRing { - for _, ts := range reassignments[node] { - foundInInitialAssignment := findSeries(initialAssignments, node, ts) + for _, ts := range reassignments[node.Address] { + foundInInitialAssignment := findSeries(initialAssignments, node.Address, ts) require.True(t, foundInInitialAssignment, "node %s contains new series after resizing", node) } } @@ -306,23 +309,289 @@ func TestKetamaHashringIncreaseInMiddle(t *testing.T) { func TestKetamaHashringReplicationConsistency(t *testing.T) { series := makeSeries() - initialRing := []string{"node-1", "node-4", "node-5"} + initialRing := []Endpoint{{Address: "node-1"}, {Address: "node-4"}, {Address: "node-5"}} initialAssignments, err := assignReplicatedSeries(series, initialRing, 2) require.NoError(t, err) - resizedRing := []string{"node-4", "node-3", "node-1", "node-2", "node-5"} + resizedRing := []Endpoint{{Address: "node-4"}, {Address: "node-3"}, {Address: "node-1"}, {Address: "node-2"}, {Address: "node-5"}} reassignments, err := assignReplicatedSeries(series, resizedRing, 2) require.NoError(t, err) // Assert that the initial nodes have no new keys after increasing the ring size for _, node := range initialRing { - for _, ts := range reassignments[node] { - foundInInitialAssignment := findSeries(initialAssignments, node, ts) + for _, ts := range reassignments[node.Address] { + foundInInitialAssignment := findSeries(initialAssignments, node.Address, ts) require.True(t, foundInInitialAssignment, "node %s contains new series after resizing", node) } } } +func TestKetamaHashringReplicationConsistencyWithAZs(t *testing.T) { + for _, tt := range []struct { + initialRing []Endpoint + resizedRing []Endpoint + replicas uint64 + }{ + { + initialRing: []Endpoint{{Address: "a", AZ: "1"}, {Address: "b", AZ: "2"}, {Address: "c", AZ: "3"}}, + resizedRing: []Endpoint{{Address: "b", AZ: "2"}, {Address: "c", AZ: "3"}, {Address: "a", AZ: "1"}, {Address: "d", AZ: "2"}, {Address: "e", AZ: "4"}}, + replicas: 3, + }, + { + initialRing: []Endpoint{{Address: "a", AZ: "1"}, {Address: "b", AZ: "2"}, {Address: "c", AZ: "3"}}, + resizedRing: []Endpoint{{Address: "a", AZ: "1"}, {Address: "b", AZ: "2"}, {Address: "c", AZ: "3"}, {Address: "d", AZ: "1"}, {Address: "e", AZ: "2"}, {Address: "f", AZ: "3"}}, + replicas: 3, + }, + { + initialRing: []Endpoint{{Address: "a", AZ: "1"}, {Address: "b", AZ: "2"}, {Address: "c", AZ: "3"}}, + resizedRing: []Endpoint{{Address: "a", AZ: "1"}, {Address: "b", AZ: "2"}, {Address: "c", AZ: "3"}, {Address: "d", AZ: "4"}, {Address: "e", AZ: "5"}, {Address: "f", AZ: "6"}}, + replicas: 3, + }, + { + initialRing: []Endpoint{{Address: "a", AZ: "1"}, {Address: "b", AZ: "2"}, {Address: "c", AZ: "3"}}, + resizedRing: []Endpoint{{Address: "a", AZ: "1"}, {Address: "b", AZ: "2"}, {Address: "c", AZ: "3"}, {Address: "d", AZ: "4"}, {Address: "e", AZ: "5"}, {Address: "f", AZ: "6"}}, + replicas: 2, + }, + { + initialRing: []Endpoint{{Address: "a", AZ: "1"}, {Address: "c", AZ: "2"}, {Address: "f", AZ: "3"}}, + resizedRing: []Endpoint{{Address: "a", AZ: "1"}, {Address: "b", AZ: "1"}, {Address: "c", AZ: "2"}, {Address: "d", AZ: "2"}, {Address: "f", AZ: "3"}}, + replicas: 2, + }, + } { + t.Run("", func(t *testing.T) { + series := makeSeries() + + initialAssignments, err := assignReplicatedSeries(series, tt.initialRing, tt.replicas) + require.NoError(t, err) + + reassignments, err := assignReplicatedSeries(series, tt.resizedRing, tt.replicas) + require.NoError(t, err) + + // Assert that the initial nodes have no new keys after increasing the ring size + for _, node := range tt.initialRing { + for _, ts := range reassignments[node.Address] { + foundInInitialAssignment := findSeries(initialAssignments, node.Address, ts) + require.True(t, foundInInitialAssignment, "node %s contains new series after resizing", node) + } + } + }) + } +} + +func TestKetamaHashringEvenAZSpread(t *testing.T) { + tenant := "default-tenant" + ts := &prompb.TimeSeries{ + Labels: labelpb.ZLabelsFromPromLabels(labels.FromStrings("foo", "bar")), + Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, + } + + for _, tt := range []struct { + nodes []Endpoint + replicas uint64 + }{ + { + nodes: []Endpoint{ + {Address: "a", AZ: "1"}, + {Address: "b", AZ: "2"}, + {Address: "c", AZ: "1"}, + {Address: "d", AZ: "2"}, + }, + replicas: 1, + }, + { + nodes: []Endpoint{{Address: "a"}, {Address: "b"}, {Address: "c"}, {Address: "d"}}, + replicas: 1, + }, + { + nodes: []Endpoint{ + {Address: "a", AZ: "1"}, + {Address: "b", AZ: "2"}, + {Address: "c", AZ: "1"}, + {Address: "d", AZ: "2"}, + }, + replicas: 2, + }, + { + nodes: []Endpoint{ + {Address: "a", AZ: "1"}, + {Address: "b", AZ: "2"}, + {Address: "c", AZ: "3"}, + {Address: "d", AZ: "1"}, + {Address: "e", AZ: "2"}, + {Address: "f", AZ: "3"}, + }, + replicas: 3, + }, + { + nodes: []Endpoint{{Address: "a"}, {Address: "b"}, {Address: "c"}, {Address: "d"}, {Address: "e"}, {Address: "f"}, {Address: "g"}}, + replicas: 3, + }, + { + nodes: []Endpoint{ + {Address: "a", AZ: "1"}, + {Address: "b", AZ: "2"}, + {Address: "c", AZ: "3"}, + {Address: "d", AZ: "1"}, + {Address: "e", AZ: "2"}, + {Address: "f", AZ: "3"}, + {Address: "g", AZ: "4"}, + {Address: "h", AZ: "4"}, + {Address: "i", AZ: "4"}, + {Address: "j", AZ: "5"}, + {Address: "k", AZ: "5"}, + {Address: "l", AZ: "5"}, + }, + replicas: 10, + }, + } { + t.Run("", func(t *testing.T) { + hashRing, err := newKetamaHashring(tt.nodes, SectionsPerNode, tt.replicas) + testutil.Ok(t, err) + + availableAzs := make(map[string]int64) + for _, endpoint := range tt.nodes { + availableAzs[endpoint.AZ] = 0 + } + + azSpread := make(map[string]int64) + for i := 0; i < int(tt.replicas); i++ { + r, err := hashRing.GetN(tenant, ts, uint64(i)) + testutil.Ok(t, err) + + for _, n := range tt.nodes { + if !strings.HasPrefix(n.Address, r) { + continue + } + azSpread[n.AZ]++ + } + + } + + expectedAzSpreadLength := int(tt.replicas) + if int(tt.replicas) > len(availableAzs) { + expectedAzSpreadLength = len(availableAzs) + } + testutil.Equals(t, len(azSpread), expectedAzSpreadLength) + + for _, writeToAz := range azSpread { + minAz := sizeOfLeastOccupiedAZ(azSpread) + testutil.Assert(t, math.Abs(float64(writeToAz-minAz)) <= 1.0) + } + }) + } +} + +func TestKetamaHashringEvenNodeSpread(t *testing.T) { + tenant := "default-tenant" + + for _, tt := range []struct { + nodes []Endpoint + replicas uint64 + numSeries uint64 + }{ + { + nodes: []Endpoint{ + {Address: "a", AZ: "1"}, + {Address: "b", AZ: "2"}, + {Address: "c", AZ: "1"}, + {Address: "d", AZ: "2"}, + }, + replicas: 2, + numSeries: 1000, + }, + { + nodes: []Endpoint{{Address: "a"}, {Address: "b"}, {Address: "c"}, {Address: "d"}}, + replicas: 2, + numSeries: 1000, + }, + { + nodes: []Endpoint{ + {Address: "a", AZ: "1"}, + {Address: "b", AZ: "2"}, + {Address: "c", AZ: "3"}, + {Address: "d", AZ: "2"}, + {Address: "e", AZ: "1"}, + {Address: "f", AZ: "3"}, + }, + replicas: 3, + numSeries: 10000, + }, + { + nodes: []Endpoint{ + {Address: "a", AZ: "1"}, + {Address: "b", AZ: "2"}, + {Address: "c", AZ: "3"}, + {Address: "d", AZ: "2"}, + {Address: "e", AZ: "1"}, + {Address: "f", AZ: "3"}, + {Address: "g", AZ: "1"}, + {Address: "h", AZ: "2"}, + {Address: "i", AZ: "3"}, + }, + replicas: 2, + numSeries: 10000, + }, + { + nodes: []Endpoint{ + {Address: "a", AZ: "1"}, + {Address: "b", AZ: "2"}, + {Address: "c", AZ: "3"}, + {Address: "d", AZ: "2"}, + {Address: "e", AZ: "1"}, + {Address: "f", AZ: "3"}, + {Address: "g", AZ: "1"}, + {Address: "h", AZ: "2"}, + {Address: "i", AZ: "3"}, + }, + replicas: 9, + numSeries: 10000, + }, + } { + t.Run("", func(t *testing.T) { + hashRing, err := newKetamaHashring(tt.nodes, SectionsPerNode, tt.replicas) + testutil.Ok(t, err) + optimalSpread := int(tt.numSeries*tt.replicas) / len(tt.nodes) + nodeSpread := make(map[string]int) + for i := 0; i < int(tt.numSeries); i++ { + ts := &prompb.TimeSeries{ + Labels: labelpb.ZLabelsFromPromLabels(labels.FromStrings("foo", fmt.Sprintf("%d", i))), + Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, + } + for j := 0; j < int(tt.replicas); j++ { + r, err := hashRing.GetN(tenant, ts, uint64(j)) + testutil.Ok(t, err) + + nodeSpread[r]++ + } + } + for _, node := range nodeSpread { + diff := math.Abs(float64(node) - float64(optimalSpread)) + testutil.Assert(t, diff/float64(optimalSpread) < 0.1) + } + }) + } +} + +func TestInvalidAZHashringCfg(t *testing.T) { + for _, tt := range []struct { + cfg []HashringConfig + replicas uint64 + algorithm HashringAlgorithm + expectedError string + }{ + { + cfg: []HashringConfig{{Endpoints: []Endpoint{{Address: "a", AZ: "1"}, {Address: "b", AZ: "2"}}}}, + replicas: 2, + expectedError: "Hashmod algorithm does not support AZ aware hashring configuration. Either use Ketama or remove AZ configuration.", + }, + } { + t.Run("", func(t *testing.T) { + _, err := NewMultiHashring(tt.algorithm, tt.replicas, tt.cfg) + require.EqualError(t, err, tt.expectedError) + }) + } +} + func makeSeries() []prompb.TimeSeries { numSeries := 10000 series := make([]prompb.TimeSeries, numSeries) @@ -351,11 +620,11 @@ func findSeries(initialAssignments map[string][]prompb.TimeSeries, node string, return false } -func assignSeries(series []prompb.TimeSeries, nodes []string) (map[string][]prompb.TimeSeries, error) { +func assignSeries(series []prompb.TimeSeries, nodes []Endpoint) (map[string][]prompb.TimeSeries, error) { return assignReplicatedSeries(series, nodes, 0) } -func assignReplicatedSeries(series []prompb.TimeSeries, nodes []string, replicas uint64) (map[string][]prompb.TimeSeries, error) { +func assignReplicatedSeries(series []prompb.TimeSeries, nodes []Endpoint, replicas uint64) (map[string][]prompb.TimeSeries, error) { hashRing, err := newKetamaHashring(nodes, SectionsPerNode, replicas) if err != nil { return nil, err diff --git a/pkg/receive/receive_test.go b/pkg/receive/receive_test.go index 7c2fae78b7f..55b59d182a2 100644 --- a/pkg/receive/receive_test.go +++ b/pkg/receive/receive_test.go @@ -38,7 +38,7 @@ func TestAddingExternalLabelsForTenants(t *testing.T) { name: "One tenant - No labels", cfg: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1"}, }, }, @@ -50,7 +50,7 @@ func TestAddingExternalLabelsForTenants(t *testing.T) { name: "One tenant - One label", cfg: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1"}, ExternalLabels: map[string]string{ "name1": "value1", @@ -65,7 +65,7 @@ func TestAddingExternalLabelsForTenants(t *testing.T) { name: "One tenant - Multiple labels", cfg: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1"}, ExternalLabels: map[string]string{ "name1": "value1", @@ -83,7 +83,7 @@ func TestAddingExternalLabelsForTenants(t *testing.T) { name: "Multiple tenants - No labels", cfg: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1", "tenant2", "tenant3"}, }, }, @@ -97,7 +97,7 @@ func TestAddingExternalLabelsForTenants(t *testing.T) { name: "Multiple tenants - One label", cfg: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1", "tenant2", "tenant3"}, ExternalLabels: map[string]string{ "name1": "value1", @@ -114,7 +114,7 @@ func TestAddingExternalLabelsForTenants(t *testing.T) { name: "Multiple tenants - Multiple labels", cfg: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1", "tenant2", "tenant3"}, ExternalLabels: map[string]string{ "name3": "value3", @@ -136,7 +136,7 @@ func TestAddingExternalLabelsForTenants(t *testing.T) { name: "Multiple hashrings - No repeated tenants", cfg: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1", "tenant2", "tenant3"}, ExternalLabels: map[string]string{ "name1": "value1", @@ -145,7 +145,7 @@ func TestAddingExternalLabelsForTenants(t *testing.T) { }, }, { - Endpoints: []string{"node2"}, + Endpoints: []Endpoint{{Address: "node2"}}, Tenants: []string{"tenant4", "tenant5", "tenant6"}, ExternalLabels: map[string]string{ "name6": "value6", @@ -173,7 +173,7 @@ func TestAddingExternalLabelsForTenants(t *testing.T) { name: "Multiple hashrings - One repeated tenant", cfg: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1", "tenant2", "tenant3"}, ExternalLabels: map[string]string{ "name3": "value3", @@ -182,7 +182,7 @@ func TestAddingExternalLabelsForTenants(t *testing.T) { }, }, { - Endpoints: []string{"node2"}, + Endpoints: []Endpoint{{Address: "node2"}}, Tenants: []string{"tenant4", "tenant5", "tenant1"}, ExternalLabels: map[string]string{ "name4": "value4", @@ -245,7 +245,7 @@ func TestAddingExternalLabelsForTenants(t *testing.T) { func TestLabelSetsOfTenantsWhenAddingTenants(t *testing.T) { initialConfig := []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1", "tenant2", "tenant3"}, ExternalLabels: map[string]string{ "name1": "value1", @@ -265,7 +265,7 @@ func TestLabelSetsOfTenantsWhenAddingTenants(t *testing.T) { changedConfig := []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1", "tenant2", "tenant3", "tenant4", "tenant5"}, ExternalLabels: map[string]string{ "name1": "value1", @@ -354,7 +354,7 @@ func TestLabelSetsOfTenantsWhenAddingTenants(t *testing.T) { func TestLabelSetsOfTenantsWhenChangingLabels(t *testing.T) { initialConfig := []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1", "tenant2", "tenant3"}, ExternalLabels: map[string]string{ "name1": "value1", @@ -363,7 +363,7 @@ func TestLabelSetsOfTenantsWhenChangingLabels(t *testing.T) { }, }, { - Endpoints: []string{"node2"}, + Endpoints: []Endpoint{{Address: "node2"}}, Tenants: []string{"tenant4", "tenant5", "tenant6"}, ExternalLabels: map[string]string{ "name6": "value6", @@ -396,7 +396,7 @@ func TestLabelSetsOfTenantsWhenChangingLabels(t *testing.T) { name: "Adding labels", changedConfig: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1", "tenant2", "tenant3"}, ExternalLabels: map[string]string{ "name1": "value1", @@ -406,7 +406,7 @@ func TestLabelSetsOfTenantsWhenChangingLabels(t *testing.T) { }, }, { - Endpoints: []string{"node2"}, + Endpoints: []Endpoint{{Address: "node2"}}, Tenants: []string{"tenant4", "tenant5", "tenant6"}, ExternalLabels: map[string]string{ "name4": "value4", @@ -435,7 +435,7 @@ func TestLabelSetsOfTenantsWhenChangingLabels(t *testing.T) { name: "Deleting some labels", changedConfig: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1", "tenant2", "tenant3"}, ExternalLabels: map[string]string{ "name1": "value1", @@ -443,7 +443,7 @@ func TestLabelSetsOfTenantsWhenChangingLabels(t *testing.T) { }, }, { - Endpoints: []string{"node2"}, + Endpoints: []Endpoint{{Address: "node2"}}, Tenants: []string{"tenant4", "tenant5", "tenant6"}, ExternalLabels: map[string]string{ "name4": "value4", @@ -470,11 +470,11 @@ func TestLabelSetsOfTenantsWhenChangingLabels(t *testing.T) { name: "Deleting all labels", changedConfig: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1", "tenant2", "tenant3"}, }, { - Endpoints: []string{"node2"}, + Endpoints: []Endpoint{{Address: "node2"}}, Tenants: []string{"tenant4", "tenant5", "tenant6"}, }, }, @@ -491,7 +491,7 @@ func TestLabelSetsOfTenantsWhenChangingLabels(t *testing.T) { name: "Changing values of some labels", changedConfig: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1", "tenant2", "tenant3"}, ExternalLabels: map[string]string{ "name1": "value3", @@ -500,7 +500,7 @@ func TestLabelSetsOfTenantsWhenChangingLabels(t *testing.T) { }, }, { - Endpoints: []string{"node2"}, + Endpoints: []Endpoint{{Address: "node2"}}, Tenants: []string{"tenant4", "tenant5", "tenant6"}, ExternalLabels: map[string]string{ "name4": "value6", @@ -584,7 +584,7 @@ func TestLabelSetsOfTenantsWhenChangingLabels(t *testing.T) { func TestAddingLabelsWhenTenantAppearsInMultipleHashrings(t *testing.T) { initialConfig := []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1", "tenant2", "tenant3"}, ExternalLabels: map[string]string{ "name3": "value3", @@ -593,7 +593,7 @@ func TestAddingLabelsWhenTenantAppearsInMultipleHashrings(t *testing.T) { }, }, { - Endpoints: []string{"node2"}, + Endpoints: []Endpoint{{Address: "node2"}}, Tenants: []string{"tenant4", "tenant5", "tenant1"}, ExternalLabels: map[string]string{ "name4": "value4", @@ -624,7 +624,7 @@ func TestAddingLabelsWhenTenantAppearsInMultipleHashrings(t *testing.T) { name: "Adding labels in first hashring that tenant appears", changedConfig: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1", "tenant2", "tenant3"}, ExternalLabels: map[string]string{ "name1": "value1", @@ -634,7 +634,7 @@ func TestAddingLabelsWhenTenantAppearsInMultipleHashrings(t *testing.T) { }, }, { - Endpoints: []string{"node2"}, + Endpoints: []Endpoint{{Address: "node2"}}, Tenants: []string{"tenant4", "tenant5", "tenant1"}, ExternalLabels: map[string]string{ "name4": "value4", @@ -660,7 +660,7 @@ func TestAddingLabelsWhenTenantAppearsInMultipleHashrings(t *testing.T) { name: "Adding labels in second hashring that tenant appears", changedConfig: []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1", "tenant2", "tenant3"}, ExternalLabels: map[string]string{ "name1": "value1", @@ -669,7 +669,7 @@ func TestAddingLabelsWhenTenantAppearsInMultipleHashrings(t *testing.T) { }, }, { - Endpoints: []string{"node2"}, + Endpoints: []Endpoint{{Address: "node2"}}, Tenants: []string{"tenant4", "tenant5", "tenant1"}, ExternalLabels: map[string]string{ "name4": "value4", @@ -752,7 +752,7 @@ func TestAddingLabelsWhenTenantAppearsInMultipleHashrings(t *testing.T) { func TestReceiverLabelsNotOverwrittenByExternalLabels(t *testing.T) { cfg := []HashringConfig{ { - Endpoints: []string{"node1"}, + Endpoints: []Endpoint{{Address: "node1"}}, Tenants: []string{"tenant1"}, ExternalLabels: map[string]string{ "replica": "0", diff --git a/test/e2e/native_histograms_test.go b/test/e2e/native_histograms_test.go index dc8b4a0c7a2..33810354fd8 100644 --- a/test/e2e/native_histograms_test.go +++ b/test/e2e/native_histograms_test.go @@ -106,9 +106,9 @@ func TestWriteNativeHistograms(t *testing.T) { ingestor1 := e2ethanos.NewReceiveBuilder(e, "ingestor1").WithIngestionEnabled().WithNativeHistograms().Init() h := receive.HashringConfig{ - Endpoints: []string{ - ingestor0.InternalEndpoint("grpc"), - ingestor1.InternalEndpoint("grpc"), + Endpoints: []receive.Endpoint{ + {Address: ingestor0.InternalEndpoint("grpc")}, + {Address: ingestor1.InternalEndpoint("grpc")}, }, } diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index 17a98386907..5a4220e325e 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -136,10 +136,10 @@ func TestReceive(t *testing.T) { i3 := e2ethanos.NewReceiveBuilder(e, "i3").WithIngestionEnabled().Init() h := receive.HashringConfig{ - Endpoints: []string{ - i1.InternalEndpoint("grpc"), - i2.InternalEndpoint("grpc"), - i3.InternalEndpoint("grpc"), + Endpoints: []receive.Endpoint{ + {Address: i1.InternalEndpoint("grpc")}, + {Address: i2.InternalEndpoint("grpc")}, + {Address: i3.InternalEndpoint("grpc")}, }, } @@ -236,15 +236,15 @@ func TestReceive(t *testing.T) { // Setup distributors r2 := e2ethanos.NewReceiveBuilder(e, "r2").WithRouting(2, receive.HashringConfig{ - Endpoints: []string{ - i2.InternalEndpoint("grpc"), - i3.InternalEndpoint("grpc"), + Endpoints: []receive.Endpoint{ + {Address: i2.InternalEndpoint("grpc")}, + {Address: i3.InternalEndpoint("grpc")}, }, }).Init() r1 := e2ethanos.NewReceiveBuilder(e, "r1").WithRouting(2, receive.HashringConfig{ - Endpoints: []string{ - i1.InternalEndpoint("grpc"), - r2.InternalEndpoint("grpc"), + Endpoints: []receive.Endpoint{ + {Address: i1.InternalEndpoint("grpc")}, + {Address: r2.InternalEndpoint("grpc")}, }, }).Init() testutil.Ok(t, e2e.StartAndWaitReady(i1, i2, i3, r1, r2)) @@ -330,10 +330,10 @@ func TestReceive(t *testing.T) { r3 := e2ethanos.NewReceiveBuilder(e, "3").WithIngestionEnabled() h := receive.HashringConfig{ - Endpoints: []string{ - r1.InternalEndpoint("grpc"), - r2.InternalEndpoint("grpc"), - r3.InternalEndpoint("grpc"), + Endpoints: []receive.Endpoint{ + {Address: r1.InternalEndpoint("grpc")}, + {Address: r2.InternalEndpoint("grpc")}, + {Address: r3.InternalEndpoint("grpc")}, }, } @@ -402,10 +402,10 @@ func TestReceive(t *testing.T) { r3 := e2ethanos.NewReceiveBuilder(e, "3").WithIngestionEnabled() h := receive.HashringConfig{ - Endpoints: []string{ - r1.InternalEndpoint("grpc"), - r2.InternalEndpoint("grpc"), - r3.InternalEndpoint("grpc"), + Endpoints: []receive.Endpoint{ + {Address: r1.InternalEndpoint("grpc")}, + {Address: r2.InternalEndpoint("grpc")}, + {Address: r3.InternalEndpoint("grpc")}, }, } @@ -469,10 +469,10 @@ func TestReceive(t *testing.T) { r3 := e2ethanos.NewReceiveBuilder(e, "3").WithIngestionEnabled() h := receive.HashringConfig{ - Endpoints: []string{ - r1.InternalEndpoint("grpc"), - r2.InternalEndpoint("grpc"), - r3.InternalEndpoint("grpc"), + Endpoints: []receive.Endpoint{ + {Address: r1.InternalEndpoint("grpc")}, + {Address: r2.InternalEndpoint("grpc")}, + {Address: r3.InternalEndpoint("grpc")}, }, } @@ -522,8 +522,8 @@ func TestReceive(t *testing.T) { r1 := e2ethanos.NewReceiveBuilder(e, "1").WithIngestionEnabled() h := receive.HashringConfig{ - Endpoints: []string{ - r1.InternalEndpoint("grpc"), + Endpoints: []receive.Endpoint{ + {Address: r1.InternalEndpoint("grpc")}, }, } @@ -659,10 +659,10 @@ func TestReceive(t *testing.T) { ingestor3 := e2ethanos.NewReceiveBuilder(e, "i3").WithIngestionEnabled() h := receive.HashringConfig{ - Endpoints: []string{ - ingestor1.InternalEndpoint("grpc"), - ingestor2.InternalEndpoint("grpc"), - ingestor3.InternalEndpoint("grpc"), + Endpoints: []receive.Endpoint{ + {Address: ingestor1.InternalEndpoint("grpc")}, + {Address: ingestor2.InternalEndpoint("grpc")}, + {Address: ingestor3.InternalEndpoint("grpc")}, }, }