Skip to content

Commit e1574ec

Browse files
authored
Revert "Enables 0s deduplication window duration when the stream has sources (#4476)"
This reverts commit db96238.
1 parent f650e1a commit e1574ec

File tree

1 file changed

+28
-32
lines changed

1 file changed

+28
-32
lines changed

server/stream.go

Lines changed: 28 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -805,32 +805,30 @@ func (mset *stream) rebuildDedupe() {
805805

806806
mset.ddloaded = true
807807

808-
if mset.cfg.Duplicates > time.Duration(0) {
809-
// We have some messages. Lookup starting sequence by duplicate time window.
810-
sseq := mset.store.GetSeqFromTime(time.Now().Add(-mset.cfg.Duplicates))
811-
if sseq == 0 {
812-
return
813-
}
808+
// We have some messages. Lookup starting sequence by duplicate time window.
809+
sseq := mset.store.GetSeqFromTime(time.Now().Add(-mset.cfg.Duplicates))
810+
if sseq == 0 {
811+
return
812+
}
814813

815-
var smv StoreMsg
816-
var state StreamState
817-
mset.store.FastState(&state)
814+
var smv StoreMsg
815+
var state StreamState
816+
mset.store.FastState(&state)
818817

819-
for seq := sseq; seq <= state.LastSeq; seq++ {
820-
sm, err := mset.store.LoadMsg(seq, &smv)
821-
if err != nil {
822-
continue
823-
}
824-
var msgId string
825-
if len(sm.hdr) > 0 {
826-
if msgId = getMsgId(sm.hdr); msgId != _EMPTY_ {
827-
mset.storeMsgIdLocked(&ddentry{msgId, sm.seq, sm.ts})
828-
}
829-
}
830-
if seq == state.LastSeq {
831-
mset.lmsgId = msgId
818+
for seq := sseq; seq <= state.LastSeq; seq++ {
819+
sm, err := mset.store.LoadMsg(seq, &smv)
820+
if err != nil {
821+
continue
822+
}
823+
var msgId string
824+
if len(sm.hdr) > 0 {
825+
if msgId = getMsgId(sm.hdr); msgId != _EMPTY_ {
826+
mset.storeMsgIdLocked(&ddentry{msgId, sm.seq, sm.ts})
832827
}
833828
}
829+
if seq == state.LastSeq {
830+
mset.lmsgId = msgId
831+
}
834832
}
835833
}
836834

@@ -1025,7 +1023,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
10251023
if cfg.MaxConsumers == 0 {
10261024
cfg.MaxConsumers = -1
10271025
}
1028-
if cfg.Duplicates == 0 && cfg.Mirror == nil && len(cfg.Sources) == 0 {
1026+
if cfg.Duplicates == 0 && cfg.Mirror == nil {
10291027
maxWindow := StreamDefaultDuplicatesWindow
10301028
if lim.Duplicates > 0 && maxWindow > lim.Duplicates {
10311029
maxWindow = lim.Duplicates
@@ -3488,15 +3486,13 @@ func (mset *stream) storeMsgId(dde *ddentry) {
34883486
// storeMsgIdLocked will store the message id for duplicate detection.
34893487
// Lock should he held.
34903488
func (mset *stream) storeMsgIdLocked(dde *ddentry) {
3491-
if mset.cfg.Duplicates > time.Duration(0) {
3492-
if mset.ddmap == nil {
3493-
mset.ddmap = make(map[string]*ddentry)
3494-
}
3495-
mset.ddmap[dde.id] = dde
3496-
mset.ddarr = append(mset.ddarr, dde)
3497-
if mset.ddtmr == nil {
3498-
mset.ddtmr = time.AfterFunc(mset.cfg.Duplicates, mset.purgeMsgIds)
3499-
}
3489+
if mset.ddmap == nil {
3490+
mset.ddmap = make(map[string]*ddentry)
3491+
}
3492+
mset.ddmap[dde.id] = dde
3493+
mset.ddarr = append(mset.ddarr, dde)
3494+
if mset.ddtmr == nil {
3495+
mset.ddtmr = time.AfterFunc(mset.cfg.Duplicates, mset.purgeMsgIds)
35003496
}
35013497
}
35023498

0 commit comments

Comments
 (0)