diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index c1a542acf58..032abd402d7 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3408,7 +3408,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco } panic(err.Error()) } - s, cc := js.server(), js.cluster + s := js.server() var removed bool if md.NoErase { @@ -3417,9 +3417,18 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco removed, err = mset.eraseMsg(md.Seq) } - // Cluster reset error. + var isLeader bool + if node := mset.raftNode(); node != nil && node.Leader() { + isLeader = true + } + if err == ErrStoreEOF { - return 0, err + if isLeader && !isRecovering { + var resp = JSApiMsgDeleteResponse{ApiResponse: ApiResponse{Type: JSApiMsgDeleteResponseType}} + resp.Error = NewJSStreamMsgDeleteFailedError(err, Unless(err)) + s.sendAPIErrResponse(md.Client, mset.account(), md.Subject, md.Reply, _EMPTY_, s.jsonResponse(resp)) + } + continue } if err != nil && !isRecovering { @@ -3427,10 +3436,6 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco md.Seq, md.Client.serviceAccount(), md.Stream, err) } - js.mu.RLock() - isLeader := cc.isStreamLeader(md.Client.serviceAccount(), md.Stream) - js.mu.RUnlock() - if isLeader && !isRecovering { var resp = JSApiMsgDeleteResponse{ApiResponse: ApiResponse{Type: JSApiMsgDeleteResponseType}} if err != nil { diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index c75b697cee3..d1a5c4248a8 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -10194,6 +10194,33 @@ func TestJetStreamClusterPersistModeAsync(t *testing.T) { require_Error(t, err, NewJSStreamInvalidConfigError(fmt.Errorf("async persist mode is not supported on replicated streams"))) } +func TestJetStreamClusterDeleteMsgEOF(t *testing.T) { + for _, replicas := range []int{1, 3} { + t.Run(fmt.Sprintf("R%d", replicas), func(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: replicas, + }) + require_NoError(t, err) + + _, err = js.Publish("foo", nil) + require_NoError(t, err) + + require_Error(t, js.DeleteMsg("TEST", 0), NewJSNoMessageFoundError()) + require_NoError(t, js.DeleteMsg("TEST", 1)) + require_Error(t, js.DeleteMsg("TEST", 1), NewJSNoMessageFoundError()) + require_Error(t, js.DeleteMsg("TEST", 2), NewJSStreamMsgDeleteFailedError(ErrStoreEOF)) + }) + } +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value.