Skip to content

Commit 46570f6

Browse files
committed
Run Member tests in parallel
Introduce port allocator and remove unused MemberNumber. On my local machine it brings down execution time from 5m to 32s. Issue: etcd-io#18983 Signed-off-by: Aleksander Mistewicz <[email protected]>
1 parent f03dee9 commit 46570f6

File tree

5 files changed

+90
-15
lines changed

5 files changed

+90
-15
lines changed

tests/common/member_test.go

+6
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ func TestMemberList(t *testing.T) {
3434

3535
for _, tc := range clusterTestCases() {
3636
t.Run(tc.name, func(t *testing.T) {
37+
t.Parallel()
38+
3739
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
3840
defer cancel()
3941
clus := testRunner.NewCluster(ctx, t, config.WithClusterConfig(tc.config))
@@ -113,6 +115,8 @@ func TestMemberAdd(t *testing.T) {
113115
for _, quorumTc := range quorumTcs {
114116
for _, clusterTc := range clusterTestCases() {
115117
t.Run(learnerTc.name+"/"+quorumTc.name+"/"+clusterTc.name, func(t *testing.T) {
118+
t.Parallel()
119+
116120
ctxTimeout := 10 * time.Second
117121
if quorumTc.waitForQuorum {
118122
ctxTimeout += etcdserver.HealthInterval
@@ -198,6 +202,8 @@ func TestMemberRemove(t *testing.T) {
198202
continue
199203
}
200204
t.Run(quorumTc.name+"/"+clusterTc.name, func(t *testing.T) {
205+
t.Parallel()
206+
201207
ctx, cancel := context.WithTimeout(context.Background(), 14*time.Second)
202208
defer cancel()
203209
c := clusterTc.config

tests/framework/e2e/cluster.go

+13-4
Original file line numberDiff line numberDiff line change
@@ -407,9 +407,6 @@ func InitEtcdProcessCluster(t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdP
407407
if cfg.Logger == nil {
408408
cfg.Logger = zaptest.NewLogger(t)
409409
}
410-
if cfg.BasePort == 0 {
411-
cfg.BasePort = EtcdProcessBasePort
412-
}
413410
if cfg.ServerConfig.SnapshotCount == 0 {
414411
cfg.ServerConfig.SnapshotCount = etcdserver.DefaultSnapshotCount
415412
}
@@ -518,6 +515,16 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
518515
peer2Port := port + 3
519516
clientHTTPPort := port + 4
520517

518+
var allocatedPorts []int
519+
if cfg.BasePort == 0 {
520+
clientPort = uniquePorts.Alloc()
521+
peerPort = uniquePorts.Alloc()
522+
metricsPort = uniquePorts.Alloc()
523+
peer2Port = uniquePorts.Alloc()
524+
clientHTTPPort = uniquePorts.Alloc()
525+
allocatedPorts = []int{clientPort, peerPort, metricsPort, peer2Port, clientHTTPPort}
526+
}
527+
521528
if cfg.Client.ConnectionType == ClientTLSAndNonTLS {
522529
curl = clientURL(cfg.ClientScheme(), clientPort, ClientNonTLS)
523530
curls = []string{curl, clientURL(cfg.ClientScheme(), clientPort, ClientTLS)}
@@ -639,7 +646,8 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
639646
}
640647
var gofailPort int
641648
if cfg.GoFailEnabled {
642-
gofailPort = (i+1)*10000 + 2381
649+
gofailPort = uniquePorts.Alloc()
650+
allocatedPorts = append(allocatedPorts, gofailPort)
643651
envVars["GOFAIL_HTTP"] = fmt.Sprintf("127.0.0.1:%d", gofailPort)
644652
}
645653

@@ -662,6 +670,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
662670
GoFailClientTimeout: cfg.GoFailClientTimeout,
663671
Proxy: proxyCfg,
664672
LazyFSEnabled: cfg.LazyFSEnabled,
673+
AllocatedPorts: allocatedPorts,
665674
}
666675
}
667676

tests/framework/e2e/etcd_process.go

+10-4
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,11 @@ type EtcdServerProcessConfig struct {
8888

8989
Name string
9090

91-
PeerURL url.URL
92-
ClientURL string
93-
ClientHTTPURL string
94-
MetricsURL string
91+
PeerURL url.URL
92+
ClientURL string
93+
ClientHTTPURL string
94+
MetricsURL string
95+
AllocatedPorts []int
9596

9697
InitialToken string
9798
InitialCluster string
@@ -248,6 +249,11 @@ func (ep *EtcdServerProcess) Close() error {
248249
ep.cfg.lg.Info("removing directory", zap.String("data-dir", ep.cfg.DataDirPath))
249250
return os.RemoveAll(ep.cfg.DataDirPath)
250251
}
252+
253+
for _, port := range ep.cfg.AllocatedPorts {
254+
uniquePorts.Free(port)
255+
}
256+
ep.cfg.AllocatedPorts = nil
251257
return nil
252258
}
253259

tests/framework/e2e/port_alloc.go

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Copyright 2024 The etcd Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package e2e
16+
17+
import "sync"
18+
19+
// uniquePorts is a global instance of testPorts.
20+
var uniquePorts *testPorts
21+
22+
func init() {
23+
uniquePorts = newTestPorts(11000, 19000)
24+
}
25+
26+
// testPorts is used to allocate listen ports for etcd instance in tests
27+
// in a safe way for concurrent use (i.e. running tests in parallel).
28+
type testPorts struct {
29+
mux sync.Mutex
30+
unused map[int]bool
31+
}
32+
33+
// newTestPorts keeps track of unused ports in the specified range.
34+
func newTestPorts(start, end int) *testPorts {
35+
m := make(map[int]bool, end-start)
36+
for i := start; i < end; i++ {
37+
m[i] = true
38+
}
39+
return &testPorts{unused: m}
40+
}
41+
42+
// Alloc allocates a new port or panics if none is available.
43+
func (pa *testPorts) Alloc() int {
44+
pa.mux.Lock()
45+
defer pa.mux.Unlock()
46+
for port := range pa.unused {
47+
delete(pa.unused, port)
48+
return port
49+
}
50+
panic("all ports are used")
51+
}
52+
53+
// Free makes port available for allocation through Alloc.
54+
func (pa *testPorts) Free(port int) {
55+
pa.mux.Lock()
56+
defer pa.mux.Unlock()
57+
pa.unused[port] = true
58+
}

tests/framework/integration/cluster.go

+3-7
Original file line numberDiff line numberDiff line change
@@ -260,13 +260,12 @@ func (c *Cluster) ProtoMembers() []*pb.Member {
260260
}
261261

262262
func (c *Cluster) mustNewMember(t testutil.TB) *Member {
263-
memberNumber := c.LastMemberNum
263+
uniqueNumber := atomic.AddInt32(&UniqueNumber, 1)*10 + int32(c.LastMemberNum)
264264
c.LastMemberNum++
265265

266266
m := MustNewMember(t,
267267
MemberConfig{
268-
Name: fmt.Sprintf("m%v", memberNumber),
269-
MemberNumber: memberNumber,
268+
Name: fmt.Sprintf("m%v", uniqueNumber),
270269
AuthToken: c.Cfg.AuthToken,
271270
PeerTLS: c.Cfg.PeerTLS,
272271
ClientTLS: c.Cfg.ClientTLS,
@@ -549,7 +548,6 @@ func NewListenerWithAddr(t testutil.TB, addr string) net.Listener {
549548
type Member struct {
550549
config.ServerConfig
551550
UniqNumber int
552-
MemberNumber int
553551
Port string
554552
PeerListeners, ClientListeners []net.Listener
555553
GRPCListener net.Listener
@@ -591,7 +589,6 @@ type Member struct {
591589
type MemberConfig struct {
592590
Name string
593591
UniqNumber int64
594-
MemberNumber int
595592
PeerTLS *transport.TLSInfo
596593
ClientTLS *transport.TLSInfo
597594
AuthToken string
@@ -624,8 +621,7 @@ type MemberConfig struct {
624621
func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member {
625622
var err error
626623
m := &Member{
627-
MemberNumber: mcfg.MemberNumber,
628-
UniqNumber: int(atomic.AddInt32(&UniqueCount, 1)),
624+
UniqNumber: int(atomic.AddInt32(&UniqueCount, 1)),
629625
}
630626

631627
peerScheme := SchemeFromTLSInfo(mcfg.PeerTLS)

0 commit comments

Comments
 (0)