diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 8753f363fd4..ea157180cb2 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1997,19 +1997,22 @@ func (rg *raftGroup) setPreferred() { } // createRaftGroup is called to spin up this raft group if needed. -func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage StorageType, labels pprofLabels) error { +func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage StorageType, labels pprofLabels) (RaftNode, error) { + // Must hold JS lock throughout, otherwise two parallel calls for the same raft group could result + // in duplicate instances for the same identifier, if the current Raft node is shutting down. + // We can release the lock temporarily while waiting for the Raft node to shut down. js.mu.Lock() + defer js.mu.Unlock() + s, cc := js.srv, js.cluster if cc == nil || cc.meta == nil { - js.mu.Unlock() - return NewJSClusterNotActiveError() + return nil, NewJSClusterNotActiveError() } // If this is a single peer raft group or we are not a member return. if len(rg.Peers) <= 1 || !rg.isMember(cc.meta.ID()) { - js.mu.Unlock() // Nothing to do here. - return nil + return nil, nil } // Check if we already have this assigned. @@ -2057,17 +2060,15 @@ retry: } } rg.node = node - js.mu.Unlock() - return nil + return node, nil } s.Debugf("JetStream cluster creating raft group:%+v", rg) - js.mu.Unlock() sysAcc := s.SystemAccount() if sysAcc == nil { s.Debugf("JetStream cluster detected shutdown processing raft group: %+v", rg) - return errors.New("shutting down") + return nil, errors.New("shutting down") } // Check here to see if we have a max HA Assets limit set. @@ -2076,7 +2077,7 @@ retry: s.Warnf("Maximum HA Assets limit reached: %d", maxHaAssets) // Since the meta leader assigned this, send a statsz update to them to get them up to date. go s.sendStatszUpdate() - return errors.New("system limit reached") + return nil, errors.New("system limit reached") } } @@ -2097,14 +2098,14 @@ retry: ) if err != nil { s.Errorf("Error creating filestore WAL: %v", err) - return err + return nil, err } store = fs } else { ms, err := newMemStore(&StreamConfig{Name: rg.Name, Storage: MemoryStorage}) if err != nil { s.Errorf("Error creating memstore WAL: %v", err) - return err + return nil, err } store = ms } @@ -2118,17 +2119,15 @@ retry: n, err := s.startRaftNode(accName, cfg, labels) if err != nil || n == nil { s.Debugf("Error creating raft group: %v", err) - return err + return nil, err } - // Need locking here for the assignment to avoid data-race reports - js.mu.Lock() + // Need JS lock to be held for the assignment to avoid data-race reports rg.node = n // See if we are preferred and should start campaign immediately. if n.ID() == rg.Preferred && n.Term() == 0 { n.CampaignImmediately() } - js.mu.Unlock() - return nil + return n, nil } func (mset *stream) raftGroup() *raftGroup { @@ -3752,7 +3751,7 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme js.mu.RUnlock() // Process the raft group and make sure it's running if needed. - err := js.createRaftGroup(acc.GetName(), rg, storage, pprofLabels{ + _, err := js.createRaftGroup(acc.GetName(), rg, storage, pprofLabels{ "type": "stream", "account": acc.Name, "stream": sa.Config.Name, diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 9ae2bd407ec..1d94b3c2a59 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -7552,7 +7552,7 @@ func TestJetStreamClusterConsumerHealthCheckMustNotRecreate(t *testing.T) { checkNodeIsClosed(ca) // We create a new RAFT group, the health check should detect this skew and restart. - err = sjs.createRaftGroup(globalAccountName, ca.Group, MemoryStorage, pprofLabels{}) + _, err = sjs.createRaftGroup(globalAccountName, ca.Group, MemoryStorage, pprofLabels{}) require_NoError(t, err) sjs.mu.Lock() // We set creating to now, since previously it would delete all data but NOT restart if created within <10s. diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 774e9dc70f8..cc77b581284 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -5321,3 +5321,71 @@ func TestJetStreamClusterObserverNotElectedMetaLeader(t *testing.T) { } } } + +func TestJetStreamClusterParallelCreateRaftGroup(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Replicas: 3, + }) + require_NoError(t, err) + checkFor(t, time.Second, 100*time.Millisecond, func() error { + return checkState(t, c, globalAccountName, "TEST") + }) + + ml := c.leader() + sjs := ml.getJetStream() + sa := sjs.streamAssignment(globalAccountName, "TEST") + require_NotNil(t, sa) + acc, err := ml.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + + rg, storage := sa.Group, sa.Group.Storage + rn := mset.raftNode().(*raft) + // Do first half of Stop. + require_NotEqual(t, rn.state.Swap(int32(Closed)), int32(Closed)) + + var wg sync.WaitGroup + var finish sync.WaitGroup + wg.Add(2) + finish.Add(2) + + var mu sync.Mutex + var nodes []RaftNode + + // Call createRaftGroup in parallel. + for i := 0; i < 2; i++ { + go func() { + wg.Done() + defer finish.Done() + if n, rerr := sjs.createRaftGroup(acc.GetName(), rg, storage, pprofLabels{}); rerr == nil { + mu.Lock() + nodes = append(nodes, n) + mu.Unlock() + } + }() + } + + // Wait for both goroutines, and allow some time for both to have entered createRaftGroup. + wg.Wait() + time.Sleep(100 * time.Millisecond) + + // Do second half of Stop while goroutines are in createRaftGroup. + rn.leaderState.Store(false) + close(rn.quit) + + // Wait for node and goroutines to stop. + rn.WaitForStop() + finish.Wait() + + // Should only create one new node instance. + require_Len(t, len(nodes), 2) + require_Equal(t, nodes[0], nodes[1]) +}