Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

keyspace: Asynchronously pre-allocate keyspaces #9019

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 23 additions & 25 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@
Config map[string]string
// CreateTime is the timestamp used to record creation time.
CreateTime int64
// IsPreAlloc indicates whether the keyspace is pre-allocated when the cluster starts.
IsPreAlloc bool
}

// NewKeyspaceManager creates a Manager of keyspace related data.
Expand Down Expand Up @@ -163,24 +161,28 @@
// Initialize pre-alloc keyspace.
preAlloc := manager.config.GetPreAlloc()
for _, keyspaceName := range preAlloc {
config, err := manager.kgm.GetKeyspaceConfigByKind(endpoint.Basic)
if err != nil {
return err
}
req := &CreateKeyspaceRequest{
Name: keyspaceName,
CreateTime: now,
IsPreAlloc: true,
Config: config,
}
keyspace, err := manager.CreateKeyspace(req)
// Ignore the keyspaceExists error for the same reason as saving default keyspace.
if err != nil && err != errs.ErrKeyspaceExists {
return err
}
if err := manager.kgm.UpdateKeyspaceForGroup(endpoint.Basic, config[TSOKeyspaceGroupIDKey], keyspace.GetId(), opAdd); err != nil {
return err
}
go func() {
config, err := manager.kgm.GetKeyspaceConfigByKind(endpoint.Basic)
if err != nil {
log.Error("[keyspace] failed to get keyspace config for pre-alloc keyspace", zap.String("keyspaceName", keyspaceName), zap.Error(err))
return
}

Check warning on line 169 in pkg/keyspace/keyspace.go

View check run for this annotation

Codecov / codecov/patch

pkg/keyspace/keyspace.go#L167-L169

Added lines #L167 - L169 were not covered by tests
req := &CreateKeyspaceRequest{
Name: keyspaceName,
CreateTime: now,
Config: config,
}
keyspace, err := manager.CreateKeyspace(req)
// Ignore the keyspaceExists error for the same reason as saving default keyspace.
if err != nil && err != errs.ErrKeyspaceExists {
log.Error("[keyspace] failed to create pre-alloc keyspace", zap.String("keyspaceName", keyspaceName), zap.Error(err))
return
}

Check warning on line 180 in pkg/keyspace/keyspace.go

View check run for this annotation

Codecov / codecov/patch

pkg/keyspace/keyspace.go#L178-L180

Added lines #L178 - L180 were not covered by tests
if err := manager.kgm.UpdateKeyspaceForGroup(endpoint.Basic, config[TSOKeyspaceGroupIDKey], keyspace.GetId(), opAdd); err != nil {
log.Error("[keyspace] failed to update pre-alloc keyspace for group", zap.String("keyspaceName", keyspaceName), zap.Error(err))
return
}

Check warning on line 184 in pkg/keyspace/keyspace.go

View check run for this annotation

Codecov / codecov/patch

pkg/keyspace/keyspace.go#L182-L184

Added lines #L182 - L184 were not covered by tests
}()
}
return nil
}
Expand Down Expand Up @@ -232,11 +234,8 @@
)
return nil, err
}
// If the request to create a keyspace is pre-allocated when the PD starts,
// there is no need to wait for the region split, because TiKV has not started.
waitRegionSplit := !request.IsPreAlloc && manager.config.ToWaitRegionSplit()
// Split keyspace region.
err = manager.splitKeyspaceRegion(newID, waitRegionSplit)
err = manager.splitKeyspaceRegion(newID, manager.config.ToWaitRegionSplit())
if err != nil {
err2 := manager.store.RunInTxn(manager.ctx, func(txn kv.Txn) error {
idPath := keypath.KeyspaceIDPath(request.Name)
Expand Down Expand Up @@ -531,7 +530,6 @@
}
return nil
})

if err != nil {
log.Warn("[keyspace] failed to update keyspace config",
zap.Uint32("keyspace-id", meta.GetId()),
Expand Down
1 change: 0 additions & 1 deletion pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ func makeCreateKeyspaceRequests(count int) []*CreateKeyspaceRequest {
testConfig2: "200",
},
CreateTime: now,
IsPreAlloc: true, // skip wait region split
}
}
return requests
Expand Down
1 change: 0 additions & 1 deletion server/apiv2/handlers/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ func CreateKeyspace(c *gin.Context) {
Name: createParams.Name,
Config: createParams.Config,
CreateTime: time.Now().Unix(),
IsPreAlloc: false,
}
meta, err := manager.CreateKeyspace(req)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions server/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func NewTestSingleConfig(c *assertutil.Checker) *config.Config {
})

c.AssertNil(cfg.Adjust(nil, false))
cfg.Keyspace.WaitRegionSplit = false

return cfg
}
Expand Down
53 changes: 53 additions & 0 deletions tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ var (
WaitLeaderCheckInterval = 500 * time.Millisecond
// WaitLeaderRetryTimes represents the maximum number of loops of WaitLeader.
WaitLeaderRetryTimes = 100

// WaitPreAllocKeyspacesInterval represents the time interval of WaitPreAllocKeyspaces running check.
WaitPreAllocKeyspacesInterval = 500 * time.Millisecond
// WaitPreAllocKeyspacesRetryTimes represents the maximum number of loops of WaitPreAllocKeyspaces.
WaitPreAllocKeyspacesRetryTimes = 100
)

// TestServer is only for test.
Expand Down Expand Up @@ -380,6 +385,12 @@ func (s *TestServer) BootstrapCluster() error {
if resp.GetHeader().GetError() != nil {
return errors.New(resp.GetHeader().GetError().String())
}

err = s.waitPreAllocKeyspaces()
if err != nil {
return err
}

return nil
}

Expand All @@ -396,6 +407,48 @@ func (s *TestServer) WaitLeader() bool {
return false
}

func (s *TestServer) waitPreAllocKeyspaces() error {
keyspaces := s.GetConfig().Keyspace.PreAlloc
if len(keyspaces) == 0 {
return nil
}

manager := s.GetKeyspaceManager()
idx := 0
Outer:
for range WaitPreAllocKeyspacesRetryTimes {
for idx < len(keyspaces) {
_, err := manager.LoadKeyspace(keyspaces[idx])
if errors.ErrorEqual(err, errs.ErrKeyspaceNotFound) {
time.Sleep(WaitPreAllocKeyspacesInterval)
continue Outer
}
if err != nil {
return errors.Trace(err)
}

idx += 1
}
return nil
}
return errors.New("wait pre-alloc keyspaces retry limit exceeded")
}

// GetPreAllocKeyspaceIDs returns the pre-allocated keyspace IDs.
func (s *TestServer) GetPreAllocKeyspaceIDs() ([]uint32, error) {
keyspaces := s.GetConfig().Keyspace.PreAlloc
ids := make([]uint32, 0, len(keyspaces))
manager := s.GetKeyspaceManager()
for _, keyspace := range keyspaces {
meta, err := manager.LoadKeyspace(keyspace)
if err != nil {
return nil, errors.Trace(err)
}
ids = append(ids, meta.GetId())
}
return ids, nil
}

// GetTSOAllocatorManager returns the server's TSO Allocator Manager.
func (s *TestServer) GetTSOAllocatorManager() *tso.AllocatorManager {
return s.server.GetTSOAllocatorManager()
Expand Down
4 changes: 1 addition & 3 deletions tests/integrations/client/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ func mustMakeTestKeyspaces(re *require.Assertions, server *server.Server, start
testConfig2: "200",
},
CreateTime: now,
IsPreAlloc: true, // skip wait region split
})
re.NoError(err)
}
Expand Down Expand Up @@ -105,8 +104,7 @@ func mustCreateKeyspaceAtState(re *require.Assertions, server *server.Server, in
meta, err := manager.CreateKeyspace(&keyspace.CreateKeyspaceRequest{
Name: fmt.Sprintf("test_keyspace_%d", index),
Config: nil,
CreateTime: 0, // Use 0 to indicate unchanged keyspace.
IsPreAlloc: true, // skip wait region split
CreateTime: 0, // Use 0 to indicate unchanged keyspace.
})
re.NoError(err)
switch state {
Expand Down
1 change: 1 addition & 0 deletions tests/integrations/mcs/tso/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func TestTSOServerStartFirst(t *testing.T) {

cluster, err := tests.NewTestClusterWithKeyspaceGroup(ctx, 1, func(conf *config.Config, _ string) {
conf.Keyspace.PreAlloc = []string{"k1", "k2"}
conf.Keyspace.WaitRegionSplit = false
})
defer cluster.Destroy()
re.NoError(err)
Expand Down
28 changes: 17 additions & 11 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) {
conf.Keyspace.PreAlloc = []string{
"keyspace_a", "keyspace_b",
}
conf.Keyspace.WaitRegionSplit = false
})
re.NoError(err)
defer tc.Destroy()
Expand Down Expand Up @@ -735,10 +736,12 @@ func TestGetTSOImmediately(t *testing.T) {
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller", `return(true)`))

// Init PD config but not start.
keyspaces := []string{
"keyspace_a", "keyspace_b",
}
tc, err := tests.NewTestClusterWithKeyspaceGroup(ctx, 1, func(conf *config.Config, _ string) {
conf.Keyspace.PreAlloc = []string{
"keyspace_a", "keyspace_b",
}
conf.Keyspace.PreAlloc = keyspaces
conf.Keyspace.WaitRegionSplit = false
})
re.NoError(err)
defer tc.Destroy()
Expand Down Expand Up @@ -782,14 +785,17 @@ func TestGetTSOImmediately(t *testing.T) {
return p0 == kg0.Members[0].Address && p1 == kg1.Members[1].Address
}, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond))

apiCtx := pd.NewAPIContextV2("keyspace_b") // its keyspace id is 2.
cli, err := pd.NewClientWithAPIContext(ctx, apiCtx,
caller.TestComponent,
[]string{pdAddr}, pd.SecurityOption{})
re.NoError(err)
_, _, err = cli.GetTS(ctx)
re.NoError(err)
cli.Close()
for _, name := range keyspaces {
apiCtx := pd.NewAPIContextV2(name)
cli, err := pd.NewClientWithAPIContext(ctx, apiCtx,
caller.TestComponent,
[]string{pdAddr}, pd.SecurityOption{})
re.NoError(err)
_, _, err = cli.GetTS(ctx)
re.NoError(err)
cli.Close()
}

re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastPrimaryPriorityCheck"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller"))
Expand Down
2 changes: 1 addition & 1 deletion tests/server/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func (suite *keyspaceTestSuite) SetupTest() {
suite.cancel = cancel
cluster, err := tests.NewTestCluster(ctx, 3, func(conf *config.Config, _ string) {
conf.Keyspace.PreAlloc = preAllocKeyspace
conf.Keyspace.WaitRegionSplit = false
})
suite.cluster = cluster
re.NoError(err)
Expand Down Expand Up @@ -86,7 +87,6 @@ func (suite *keyspaceTestSuite) TestRegionLabeler() {
keyspaces[i], err = manager.CreateKeyspace(&keyspace.CreateKeyspaceRequest{
Name: fmt.Sprintf("test_keyspace_%d", i),
CreateTime: now,
IsPreAlloc: true, // skip wait region split
})
re.NoError(err)
}
Expand Down
6 changes: 6 additions & 0 deletions tools/pd-ctl/tests/keyspace/keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func TestSplitKeyspaceGroup(t *testing.T) {
}
tc, err := pdTests.NewTestClusterWithKeyspaceGroup(ctx, 3, func(conf *config.Config, _ string) {
conf.Keyspace.PreAlloc = keyspaces
conf.Keyspace.WaitRegionSplit = false
})
re.NoError(err)
defer tc.Destroy()
Expand Down Expand Up @@ -159,6 +160,7 @@ func TestExternalAllocNodeWhenStart(t *testing.T) {
}
tc, err := pdTests.NewTestClusterWithKeyspaceGroup(ctx, 1, func(conf *config.Config, _ string) {
conf.Keyspace.PreAlloc = keyspaces
conf.Keyspace.WaitRegionSplit = false
})
re.NoError(err)
defer tc.Destroy()
Expand Down Expand Up @@ -199,6 +201,7 @@ func TestSetNodeAndPriorityKeyspaceGroup(t *testing.T) {
}
tc, err := pdTests.NewTestClusterWithKeyspaceGroup(ctx, 3, func(conf *config.Config, _ string) {
conf.Keyspace.PreAlloc = keyspaces
conf.Keyspace.WaitRegionSplit = false
})
re.NoError(err)
defer tc.Destroy()
Expand Down Expand Up @@ -303,6 +306,7 @@ func TestMergeKeyspaceGroup(t *testing.T) {
}
tc, err := pdTests.NewTestClusterWithKeyspaceGroup(ctx, 1, func(conf *config.Config, _ string) {
conf.Keyspace.PreAlloc = keyspaces
conf.Keyspace.WaitRegionSplit = false
})
re.NoError(err)
defer tc.Destroy()
Expand Down Expand Up @@ -422,6 +426,7 @@ func TestKeyspaceGroupState(t *testing.T) {
}
tc, err := pdTests.NewTestClusterWithKeyspaceGroup(ctx, 1, func(conf *config.Config, _ string) {
conf.Keyspace.PreAlloc = keyspaces
conf.Keyspace.WaitRegionSplit = false
})
re.NoError(err)
defer tc.Destroy()
Expand Down Expand Up @@ -513,6 +518,7 @@ func TestShowKeyspaceGroupPrimary(t *testing.T) {
}
tc, err := pdTests.NewTestClusterWithKeyspaceGroup(ctx, 1, func(conf *config.Config, _ string) {
conf.Keyspace.PreAlloc = keyspaces
conf.Keyspace.WaitRegionSplit = false
})
re.NoError(err)
defer tc.Destroy()
Expand Down
7 changes: 5 additions & 2 deletions tools/pd-ctl/tests/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func TestKeyspace(t *testing.T) {
}
tc, err := pdTests.NewTestClusterWithKeyspaceGroup(ctx, 3, func(conf *config.Config, _ string) {
conf.Keyspace.PreAlloc = keyspaces
conf.Keyspace.WaitRegionSplit = false
})
re.NoError(err)
defer tc.Destroy()
Expand All @@ -67,6 +68,8 @@ func TestKeyspace(t *testing.T) {
leaderServer := tc.GetLeaderServer()
re.NoError(leaderServer.BootstrapCluster())
defaultKeyspaceGroupID := fmt.Sprintf("%d", constant.DefaultKeyspaceGroupID)
keyspaceIDs, err := leaderServer.GetPreAllocKeyspaceIDs()
re.NoError(err)

var k api.KeyspaceMeta
keyspaceName := "keyspace_1"
Expand All @@ -77,13 +80,13 @@ func TestKeyspace(t *testing.T) {
re.NoError(json.Unmarshal(output, &k))
return k.GetName() == keyspaceName
})
re.Equal(uint32(1), k.GetId())
re.Equal(keyspaceIDs[0], k.GetId())
re.Equal(defaultKeyspaceGroupID, k.Config[keyspace.TSOKeyspaceGroupIDKey])

// split keyspace group.
newGroupID := "2"
testutil.Eventually(re, func() bool {
args := []string{"-u", pdAddr, "keyspace-group", "split", "0", newGroupID, "1"}
args := []string{"-u", pdAddr, "keyspace-group", "split", "0", newGroupID, strconv.Itoa(int(keyspaceIDs[0]))}
output, err := tests.ExecuteCommand(cmd, args...)
re.NoError(err)
return strings.Contains(string(output), "Success")
Expand Down
Loading