Skip to content

Commit b03e605

Browse files
authored
enhance: add proxy and datanode checker when wal balance startup (milvus-io#40877)
issue: milvus-io#40532 - balance should enable only when there's no proxy and datanode which version is lower than 2.6.0 --------- Signed-off-by: chyezh <[email protected]>
1 parent cef1d16 commit b03e605

File tree

21 files changed

+210
-144
lines changed

21 files changed

+210
-144
lines changed

internal/mocks/util/streamingutil/service/mock_discoverer/mock_Discoverer.go

-45
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/mocks/util/streamingutil/service/mock_resolver/mock_Resolver.go

+24-13
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/streamingcoord/client/client.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ type Client interface {
6868
func NewClient(etcdCli *clientv3.Client) Client {
6969
// StreamingCoord is deployed on DataCoord node.
7070
role := sessionutil.GetSessionPrefixByRole(typeutil.RootCoordRole)
71-
rb := resolver.NewSessionExclusiveBuilder(etcdCli, role)
71+
rb := resolver.NewSessionExclusiveBuilder(etcdCli, role, ">=2.6.0-dev")
7272
dialTimeout := paramtable.Get().StreamingCoordGrpcClientCfg.DialTimeout.GetAsDuration(time.Millisecond)
7373
dialOptions := getDialOptions(rb)
7474
conn := lazygrpc.NewConn(func(ctx context.Context) (*grpc.ClientConn, error) {

internal/streamingcoord/server/balancer/balancer_impl.go

+36
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010

1111
"github.com/milvus-io/milvus/internal/streamingcoord/server/balancer/channel"
1212
"github.com/milvus-io/milvus/internal/streamingcoord/server/resource"
13+
"github.com/milvus-io/milvus/internal/util/sessionutil"
14+
"github.com/milvus-io/milvus/internal/util/streamingutil/service/resolver"
1315
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
1416
"github.com/milvus-io/milvus/pkg/v2/log"
1517
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
@@ -122,6 +124,11 @@ func (b *balancerImpl) execute() {
122124
b.logger.Info("balancer execute finished")
123125
}()
124126

127+
if err := b.blockUntilAllNodeIsGreaterThan260(b.ctx); err != nil {
128+
b.logger.Warn("fail to block until all node is greater than 2.6.0", zap.Error(err))
129+
return
130+
}
131+
125132
balanceTimer := typeutil.NewBackoffTimer(&backoffConfigFetcher{})
126133
nodeChanged, err := resource.Resource().StreamingNodeManagerClient().WatchNodeChanged(b.backgroundTaskNotifier.Context())
127134
if err != nil {
@@ -164,6 +171,35 @@ func (b *balancerImpl) execute() {
164171
}
165172
}
166173

174+
// blockUntilAllNodeIsGreaterThan260 block until all node is greater than 2.6.0.
175+
// It's just a protection, but didn't promised that there will never be a node with version < 2.6.0 join the cluster.
176+
// These promise can only be achieved by the cluster dev-ops.
177+
func (b *balancerImpl) blockUntilAllNodeIsGreaterThan260(ctx context.Context) error {
178+
doneErr := errors.New("done")
179+
expectedRoles := []string{typeutil.ProxyRole, typeutil.DataNodeRole}
180+
for _, role := range expectedRoles {
181+
logger := b.logger.With(zap.String("role", role))
182+
logger.Info("start to wait that the nodes is greater than 2.6.0")
183+
// Check if there's any proxy or data node with version < 2.6.0.
184+
proxyResolver := resolver.NewSessionBuilder(resource.Resource().ETCD(), sessionutil.GetSessionPrefixByRole(role), "<2.6.0")
185+
r := proxyResolver.Resolver()
186+
err := r.Watch(ctx, func(vs resolver.VersionedState) error {
187+
if len(vs.Sessions()) == 0 {
188+
return doneErr
189+
}
190+
logger.Info("session changes", zap.Int("sessionCount", len(vs.Sessions())))
191+
return nil
192+
})
193+
if err != nil && !errors.Is(err, doneErr) {
194+
logger.Info("fail to wait that the nodes is greater than 2.6.0", zap.Error(err))
195+
return err
196+
}
197+
logger.Info("all nodes is greater than 2.6.0")
198+
proxyResolver.Close()
199+
}
200+
return nil
201+
}
202+
167203
// applyAllRequest apply all request in the request channel.
168204
func (b *balancerImpl) applyAllRequest() {
169205
for {

internal/streamingcoord/server/balancer/balancer_test.go

+36-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package balancer_test
22

33
import (
44
"context"
5+
"encoding/json"
6+
"path"
57
"testing"
68
"time"
79

@@ -14,15 +16,23 @@ import (
1416
"github.com/milvus-io/milvus/internal/streamingcoord/server/balancer"
1517
_ "github.com/milvus-io/milvus/internal/streamingcoord/server/balancer/policy"
1618
"github.com/milvus-io/milvus/internal/streamingcoord/server/resource"
19+
"github.com/milvus-io/milvus/internal/util/sessionutil"
1720
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
1821
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
22+
"github.com/milvus-io/milvus/pkg/v2/util/etcd"
1923
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
2024
"github.com/milvus-io/milvus/pkg/v2/util/syncutil"
2125
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
2226
)
2327

2428
func TestBalancer(t *testing.T) {
2529
paramtable.Init()
30+
err := etcd.InitEtcdServer(true, "", t.TempDir(), "stdout", "info")
31+
assert.NoError(t, err)
32+
defer etcd.StopEtcdServer()
33+
34+
etcdClient, err := etcd.GetEmbedEtcdClient()
35+
assert.NoError(t, err)
2636

2737
streamingNodeManager := mock_manager.NewMockManagerClient(t)
2838
streamingNodeManager.EXPECT().WatchNodeChanged(mock.Anything).Return(make(chan struct{}), nil)
@@ -57,7 +67,7 @@ func TestBalancer(t *testing.T) {
5767
}, nil)
5868

5969
catalog := mock_metastore.NewMockStreamingCoordCataLog(t)
60-
resource.InitForTest(resource.OptStreamingCatalog(catalog), resource.OptStreamingManagerClient(streamingNodeManager))
70+
resource.InitForTest(resource.OptETCD(etcdClient), resource.OptStreamingCatalog(catalog), resource.OptStreamingManagerClient(streamingNodeManager))
6171
catalog.EXPECT().ListPChannel(mock.Anything).Unset()
6272
catalog.EXPECT().ListPChannel(mock.Anything).RunAndReturn(func(ctx context.Context) ([]*streamingpb.PChannelMeta, error) {
6373
return []*streamingpb.PChannelMeta{
@@ -89,11 +99,36 @@ func TestBalancer(t *testing.T) {
8999
})
90100
catalog.EXPECT().SavePChannels(mock.Anything, mock.Anything).Return(nil).Maybe()
91101

102+
// Test for lower datanode and proxy version protection.
103+
metaRoot := paramtable.Get().EtcdCfg.MetaRootPath.GetValue()
104+
proxyPath1 := path.Join(metaRoot, sessionutil.DefaultServiceRoot, typeutil.ProxyRole+"-1")
105+
r := sessionutil.SessionRaw{Version: "2.5.11", ServerID: 1}
106+
data, _ := json.Marshal(r)
107+
resource.Resource().ETCD().Put(context.Background(), proxyPath1, string(data))
108+
proxyPath2 := path.Join(metaRoot, sessionutil.DefaultServiceRoot, typeutil.ProxyRole+"-2")
109+
r = sessionutil.SessionRaw{Version: "2.5.11", ServerID: 2}
110+
data, _ = json.Marshal(r)
111+
resource.Resource().ETCD().Put(context.Background(), proxyPath2, string(data))
112+
metaRoot = paramtable.Get().EtcdCfg.MetaRootPath.GetValue()
113+
dataNodePath := path.Join(metaRoot, sessionutil.DefaultServiceRoot, typeutil.DataNodeRole)
114+
resource.Resource().ETCD().Put(context.Background(), dataNodePath, string(data))
115+
92116
ctx := context.Background()
93117
b, err := balancer.RecoverBalancer(ctx, "pchannel_count_fair")
94118
assert.NoError(t, err)
95119
assert.NotNil(t, b)
96120

121+
ctx1, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
122+
defer cancel()
123+
err = b.WatchChannelAssignments(ctx1, func(version typeutil.VersionInt64Pair, relations []types.PChannelInfoAssigned) error {
124+
assert.Len(t, relations, 2)
125+
return nil
126+
})
127+
assert.ErrorIs(t, err, context.DeadlineExceeded)
128+
resource.Resource().ETCD().Delete(context.Background(), proxyPath1)
129+
resource.Resource().ETCD().Delete(context.Background(), proxyPath2)
130+
resource.Resource().ETCD().Delete(context.Background(), dataNodePath)
131+
97132
b.MarkAsUnavailable(ctx, []types.PChannelInfo{{
98133
Name: "test-channel-1",
99134
Term: 1,

internal/streamingnode/client/manager/manager_client.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ type ManagerClient interface {
4747
// NewManagerClient creates a new manager client.
4848
func NewManagerClient(etcdCli *clientv3.Client) ManagerClient {
4949
role := sessionutil.GetSessionPrefixByRole(typeutil.StreamingNodeRole)
50-
rb := resolver.NewSessionBuilder(etcdCli, role)
50+
rb := resolver.NewSessionBuilder(etcdCli, role, ">=2.6.0-dev")
5151
dialTimeout := paramtable.Get().StreamingNodeGrpcClientCfg.DialTimeout.GetAsDuration(time.Millisecond)
5252
dialOptions := getDialOptions(rb)
5353
conn := lazygrpc.NewConn(func(ctx context.Context) (*grpc.ClientConn, error) {

internal/streamingnode/client/manager/manager_client_impl.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,10 @@ func (c *managerClientImpl) CollectAllStatus(ctx context.Context) (map[int64]*ty
6161
defer c.lifetime.Done()
6262

6363
// Get all discovered streamingnode.
64-
state := c.rb.Resolver().GetLatestState()
64+
state, err := c.rb.Resolver().GetLatestState(ctx)
65+
if err != nil {
66+
return nil, err
67+
}
6568
if len(state.State.Addresses) == 0 {
6669
return make(map[int64]*types.StreamingNodeStatus), nil
6770
}
@@ -73,7 +76,10 @@ func (c *managerClientImpl) CollectAllStatus(ctx context.Context) (map[int64]*ty
7376
}
7477

7578
// Collect status may cost some time, so we need to check the lifetime again.
76-
newState := c.rb.Resolver().GetLatestState()
79+
newState, err := c.rb.Resolver().GetLatestState(ctx)
80+
if err != nil {
81+
return nil, err
82+
}
7783
if newState.Version.GT(state.Version) {
7884
newSession := newState.Sessions()
7985
for serverID := range result {

internal/streamingnode/client/manager/manager_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,11 @@ func TestManager(t *testing.T) {
5050
assert.False(t, ok)
5151

5252
// Test CollectAllStatus
53-
r.EXPECT().GetLatestState().RunAndReturn(func() discoverer.VersionedState {
53+
r.EXPECT().GetLatestState(mock.Anything).RunAndReturn(func(ctx context.Context) (discoverer.VersionedState, error) {
5454
return discoverer.VersionedState{
5555
Version: typeutil.VersionInt64(1),
5656
State: resolver.State{},
57-
}
57+
}, nil
5858
})
5959
// Not address here.
6060
nodes, err := m.CollectAllStatus(context.Background())
@@ -75,11 +75,11 @@ func TestManager(t *testing.T) {
7575
{1: false, 2: false, 3: true},
7676
{1: true, 2: false},
7777
}
78-
r.EXPECT().GetLatestState().Unset()
79-
r.EXPECT().GetLatestState().RunAndReturn(func() discoverer.VersionedState {
78+
r.EXPECT().GetLatestState(mock.Anything).Unset()
79+
r.EXPECT().GetLatestState(mock.Anything).RunAndReturn(func(ctx context.Context) (discoverer.VersionedState, error) {
8080
s := newVersionedState(int64(i), states[i])
8181
i++
82-
return s
82+
return s, nil
8383
})
8484

8585
nodes, err = m.CollectAllStatus(context.Background())

internal/util/streamingutil/service/discoverer/channel_assignment_discoverer.go

+6-8
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,12 @@ func (d *channelAssignmentDiscoverer) NewVersionedState() VersionedState {
3636

3737
// channelAssignmentDiscoverer implements the resolver.Discoverer interface.
3838
func (d *channelAssignmentDiscoverer) Discover(ctx context.Context, cb func(VersionedState) error) error {
39-
// Always send the current state first.
40-
// Outside logic may lost the last state before retry Discover function.
41-
if err := cb(d.parseState()); err != nil {
42-
return err
39+
if d.lastDiscovery != nil {
40+
// Always send the current state first if there's.
41+
// Outside logic may lost the last state before retry Discover function.
42+
if err := cb(d.parseState()); err != nil {
43+
return err
44+
}
4345
}
4446
return d.assignmentWatcher.AssignmentDiscover(ctx, func(assignments *types.VersionedStreamingNodeAssignments) error {
4547
d.lastDiscovery = assignments
@@ -50,10 +52,6 @@ func (d *channelAssignmentDiscoverer) Discover(ctx context.Context, cb func(Vers
5052
// parseState parses the addresses from the discovery response.
5153
// Always perform a copy here.
5254
func (d *channelAssignmentDiscoverer) parseState() VersionedState {
53-
if d.lastDiscovery == nil {
54-
return d.NewVersionedState()
55-
}
56-
5755
addrs := make([]resolver.Address, 0, len(d.lastDiscovery.Assignments))
5856
for _, assignment := range d.lastDiscovery.Assignments {
5957
assignment := assignment

internal/util/streamingutil/service/discoverer/channel_assignment_discoverer_test.go

+1-6
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,8 @@ func TestChannelAssignmentDiscoverer(t *testing.T) {
3636
})
3737

3838
d := NewChannelAssignmentDiscoverer(w)
39-
s := d.NewVersionedState()
40-
assert.True(t, s.Version.EQ(typeutil.VersionInt64Pair{Global: -1, Local: -1}))
4139

4240
expected := []*types.VersionedStreamingNodeAssignments{
43-
{
44-
Version: typeutil.VersionInt64Pair{Global: -1, Local: -1},
45-
Assignments: map[int64]types.StreamingNodeAssignment{},
46-
},
4741
{
4842
Version: typeutil.VersionInt64Pair{
4943
Global: 1,
@@ -81,6 +75,7 @@ func TestChannelAssignmentDiscoverer(t *testing.T) {
8175
},
8276
}
8377

78+
ch <- expected[0]
8479
idx := 0
8580
ctx, cancel := context.WithCancel(context.Background())
8681
defer cancel()

internal/util/streamingutil/service/discoverer/discoverer.go

-3
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,6 @@ import (
1313
// 1. concurrent safe.
1414
// 2. the version of discovery may be repeated or decreasing. So user should check the version in callback.
1515
type Discoverer interface {
16-
// NewVersionedState returns a lowest versioned state.
17-
NewVersionedState() VersionedState
18-
1916
// Discover watches the service discovery on these goroutine.
2017
// 1. Call the callback when the discovery is changed, and block until the discovery is canceled or break down.
2118
// 2. Discover should always send the current state first and then block.

0 commit comments

Comments
 (0)