Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 17 additions & 18 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1997,19 +1997,22 @@ func (rg *raftGroup) setPreferred() {
}

// createRaftGroup is called to spin up this raft group if needed.
func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage StorageType, labels pprofLabels) error {
func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage StorageType, labels pprofLabels) (RaftNode, error) {
// Must hold JS lock throughout, otherwise two parallel calls for the same raft group could result
// in duplicate instances for the same identifier, if the current Raft node is shutting down.
// We can release the lock temporarily while waiting for the Raft node to shut down.
js.mu.Lock()
defer js.mu.Unlock()

s, cc := js.srv, js.cluster
if cc == nil || cc.meta == nil {
js.mu.Unlock()
return NewJSClusterNotActiveError()
return nil, NewJSClusterNotActiveError()
}

// If this is a single peer raft group or we are not a member return.
if len(rg.Peers) <= 1 || !rg.isMember(cc.meta.ID()) {
js.mu.Unlock()
// Nothing to do here.
return nil
return nil, nil
}

// Check if we already have this assigned.
Expand Down Expand Up @@ -2057,17 +2060,15 @@ retry:
}
}
rg.node = node
js.mu.Unlock()
return nil
return node, nil
}

s.Debugf("JetStream cluster creating raft group:%+v", rg)
js.mu.Unlock()

sysAcc := s.SystemAccount()
if sysAcc == nil {
s.Debugf("JetStream cluster detected shutdown processing raft group: %+v", rg)
return errors.New("shutting down")
return nil, errors.New("shutting down")
}

// Check here to see if we have a max HA Assets limit set.
Expand All @@ -2076,7 +2077,7 @@ retry:
s.Warnf("Maximum HA Assets limit reached: %d", maxHaAssets)
// Since the meta leader assigned this, send a statsz update to them to get them up to date.
go s.sendStatszUpdate()
return errors.New("system limit reached")
return nil, errors.New("system limit reached")
}
}

Expand All @@ -2097,14 +2098,14 @@ retry:
)
if err != nil {
s.Errorf("Error creating filestore WAL: %v", err)
return err
return nil, err
}
store = fs
} else {
ms, err := newMemStore(&StreamConfig{Name: rg.Name, Storage: MemoryStorage})
if err != nil {
s.Errorf("Error creating memstore WAL: %v", err)
return err
return nil, err
}
store = ms
}
Expand All @@ -2118,17 +2119,15 @@ retry:
n, err := s.startRaftNode(accName, cfg, labels)
if err != nil || n == nil {
s.Debugf("Error creating raft group: %v", err)
return err
return nil, err
}
// Need locking here for the assignment to avoid data-race reports
js.mu.Lock()
// Need JS lock to be held for the assignment to avoid data-race reports
rg.node = n
// See if we are preferred and should start campaign immediately.
if n.ID() == rg.Preferred && n.Term() == 0 {
n.CampaignImmediately()
}
js.mu.Unlock()
return nil
return n, nil
}

func (mset *stream) raftGroup() *raftGroup {
Expand Down Expand Up @@ -3752,7 +3751,7 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme
js.mu.RUnlock()

// Process the raft group and make sure it's running if needed.
err := js.createRaftGroup(acc.GetName(), rg, storage, pprofLabels{
_, err := js.createRaftGroup(acc.GetName(), rg, storage, pprofLabels{
"type": "stream",
"account": acc.Name,
"stream": sa.Config.Name,
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7552,7 +7552,7 @@ func TestJetStreamClusterConsumerHealthCheckMustNotRecreate(t *testing.T) {
checkNodeIsClosed(ca)

// We create a new RAFT group, the health check should detect this skew and restart.
err = sjs.createRaftGroup(globalAccountName, ca.Group, MemoryStorage, pprofLabels{})
_, err = sjs.createRaftGroup(globalAccountName, ca.Group, MemoryStorage, pprofLabels{})
require_NoError(t, err)
sjs.mu.Lock()
// We set creating to now, since previously it would delete all data but NOT restart if created within <10s.
Expand Down
68 changes: 68 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5321,3 +5321,71 @@ func TestJetStreamClusterObserverNotElectedMetaLeader(t *testing.T) {
}
}
}

func TestJetStreamClusterParallelCreateRaftGroup(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Replicas: 3,
})
require_NoError(t, err)
checkFor(t, time.Second, 100*time.Millisecond, func() error {
return checkState(t, c, globalAccountName, "TEST")
})

ml := c.leader()
sjs := ml.getJetStream()
sa := sjs.streamAssignment(globalAccountName, "TEST")
require_NotNil(t, sa)
acc, err := ml.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)

rg, storage := sa.Group, sa.Group.Storage
rn := mset.raftNode().(*raft)
// Do first half of Stop.
require_NotEqual(t, rn.state.Swap(int32(Closed)), int32(Closed))

var wg sync.WaitGroup
var finish sync.WaitGroup
wg.Add(2)
finish.Add(2)

var mu sync.Mutex
var nodes []RaftNode

// Call createRaftGroup in parallel.
for i := 0; i < 2; i++ {
go func() {
wg.Done()
defer finish.Done()
if n, rerr := sjs.createRaftGroup(acc.GetName(), rg, storage, pprofLabels{}); rerr == nil {
mu.Lock()
nodes = append(nodes, n)
mu.Unlock()
}
}()
}

// Wait for both goroutines, and allow some time for both to have entered createRaftGroup.
wg.Wait()
time.Sleep(100 * time.Millisecond)

// Do second half of Stop while goroutines are in createRaftGroup.
rn.leaderState.Store(false)
close(rn.quit)

// Wait for node and goroutines to stop.
rn.WaitForStop()
finish.Wait()

// Should only create one new node instance.
require_Len(t, len(nodes), 2)
require_Equal(t, nodes[0], nodes[1])
}
Loading