diff --git a/internal/mocks/streamingnode/server/wal/interceptors/timetick/mock_inspector/mock_TimeTickSyncOperator.go b/internal/mocks/streamingnode/server/wal/interceptors/timetick/mock_inspector/mock_TimeTickSyncOperator.go index f05452ae7c0f2..002b461b42aaa 100644 --- a/internal/mocks/streamingnode/server/wal/interceptors/timetick/mock_inspector/mock_TimeTickSyncOperator.go +++ b/internal/mocks/streamingnode/server/wal/interceptors/timetick/mock_inspector/mock_TimeTickSyncOperator.go @@ -130,9 +130,9 @@ func (_c *MockTimeTickSyncOperator_MVCCManager_Call) RunAndReturn(run func(conte return _c } -// Sync provides a mock function with given fields: ctx -func (_m *MockTimeTickSyncOperator) Sync(ctx context.Context) { - _m.Called(ctx) +// Sync provides a mock function with given fields: ctx, forcePersisted +func (_m *MockTimeTickSyncOperator) Sync(ctx context.Context, forcePersisted bool) { + _m.Called(ctx, forcePersisted) } // MockTimeTickSyncOperator_Sync_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Sync' @@ -142,13 +142,14 @@ type MockTimeTickSyncOperator_Sync_Call struct { // Sync is a helper method to define mock.On call // - ctx context.Context -func (_e *MockTimeTickSyncOperator_Expecter) Sync(ctx interface{}) *MockTimeTickSyncOperator_Sync_Call { - return &MockTimeTickSyncOperator_Sync_Call{Call: _e.mock.On("Sync", ctx)} +// - forcePersisted bool +func (_e *MockTimeTickSyncOperator_Expecter) Sync(ctx interface{}, forcePersisted interface{}) *MockTimeTickSyncOperator_Sync_Call { + return &MockTimeTickSyncOperator_Sync_Call{Call: _e.mock.On("Sync", ctx, forcePersisted)} } -func (_c *MockTimeTickSyncOperator_Sync_Call) Run(run func(ctx context.Context)) *MockTimeTickSyncOperator_Sync_Call { +func (_c *MockTimeTickSyncOperator_Sync_Call) Run(run func(ctx context.Context, forcePersisted bool)) *MockTimeTickSyncOperator_Sync_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context)) + run(args[0].(context.Context), args[1].(bool)) }) return _c } @@ -158,7 +159,7 @@ func (_c *MockTimeTickSyncOperator_Sync_Call) Return() *MockTimeTickSyncOperator return _c } -func (_c *MockTimeTickSyncOperator_Sync_Call) RunAndReturn(run func(context.Context)) *MockTimeTickSyncOperator_Sync_Call { +func (_c *MockTimeTickSyncOperator_Sync_Call) RunAndReturn(run func(context.Context, bool)) *MockTimeTickSyncOperator_Sync_Call { _c.Call.Return(run) return _c } diff --git a/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go b/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go index ceed16de898a2..6b9e1b7416beb 100644 --- a/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go +++ b/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go @@ -129,6 +129,12 @@ func (s *scannerAdaptorImpl) produceEventLoop(msgChan chan<- message.ImmutableMe if wb, err = resource.Resource().TimeTickInspector().MustGetOperator(s.Channel()).WriteAheadBuffer(s.Context()); err != nil { return err } + // Trigger a persisted time tick to make sure the timetick is pushed forward. + // because the underlying wal may be deleted because of retention policy. + // So we cannot get the timetick from the wal. + // Trigger the timetick inspector to append a new persisted timetick, + // then the catch up scanner can see the latest timetick and make a catchup. + resource.Resource().TimeTickInspector().TriggerSync(s.Channel(), true) } scanner := newSwithableScanner(s.Name(), s.logger, s.innerWAL, wb, s.readOption.DeliverPolicy, msgChan) diff --git a/internal/streamingnode/server/wal/adaptor/scanner_adaptor_test.go b/internal/streamingnode/server/wal/adaptor/scanner_adaptor_test.go index f85460713a2d6..a1ce7d855e1ca 100644 --- a/internal/streamingnode/server/wal/adaptor/scanner_adaptor_test.go +++ b/internal/streamingnode/server/wal/adaptor/scanner_adaptor_test.go @@ -30,7 +30,7 @@ func TestScannerAdaptorReadError(t *testing.T) { operator := mock_inspector.NewMockTimeTickSyncOperator(t) operator.EXPECT().Channel().Return(types.PChannelInfo{}) - operator.EXPECT().Sync(mock.Anything).Run(func(ctx context.Context) { + operator.EXPECT().Sync(mock.Anything, mock.Anything).Run(func(ctx context.Context, forcePersisted bool) { sig1.Close() }) wb := mock_wab.NewMockROWriteAheadBuffer(t) diff --git a/internal/streamingnode/server/wal/adaptor/wal_adaptor.go b/internal/streamingnode/server/wal/adaptor/wal_adaptor.go index ccb35ec70867b..9e1b27fbeb38d 100644 --- a/internal/streamingnode/server/wal/adaptor/wal_adaptor.go +++ b/internal/streamingnode/server/wal/adaptor/wal_adaptor.go @@ -96,7 +96,7 @@ func (w *walAdaptorImpl) GetLatestMVCCTimestamp(ctx context.Context, vchannel st currentMVCC := mvccManager.GetMVCCOfVChannel(vchannel) if !currentMVCC.Confirmed { // if the mvcc is not confirmed, trigger a sync operation to make it confirmed as soon as possible. - resource.Resource().TimeTickInspector().TriggerSync(w.rwWALImpls.Channel()) + resource.Resource().TimeTickInspector().TriggerSync(w.rwWALImpls.Channel(), false) } return currentMVCC.Timetick, nil } diff --git a/internal/streamingnode/server/wal/adaptor/wal_adaptor_test.go b/internal/streamingnode/server/wal/adaptor/wal_adaptor_test.go index 691ea9669b2b6..67785ceee2944 100644 --- a/internal/streamingnode/server/wal/adaptor/wal_adaptor_test.go +++ b/internal/streamingnode/server/wal/adaptor/wal_adaptor_test.go @@ -65,7 +65,7 @@ func TestWalAdaptorReadFail(t *testing.T) { writeAheadBuffer := mock_wab.NewMockROWriteAheadBuffer(t) operator := mock_inspector.NewMockTimeTickSyncOperator(t) operator.EXPECT().Channel().Return(types.PChannelInfo{}).Maybe() - operator.EXPECT().Sync(mock.Anything).Return().Maybe() + operator.EXPECT().Sync(mock.Anything, mock.Anything).Return().Maybe() operator.EXPECT().WriteAheadBuffer(mock.Anything).Return(writeAheadBuffer, nil).Maybe() resource.Resource().TimeTickInspector().RegisterSyncOperator( operator, @@ -95,7 +95,7 @@ func TestWALAdaptor(t *testing.T) { operator := mock_inspector.NewMockTimeTickSyncOperator(t) operator.EXPECT().Channel().Return(types.PChannelInfo{}) - operator.EXPECT().Sync(mock.Anything).Return() + operator.EXPECT().Sync(mock.Anything, mock.Anything).Return() buffer := mock_wab.NewMockROWriteAheadBuffer(t) operator.EXPECT().WriteAheadBuffer(mock.Anything).Return(buffer, nil) resource.Resource().TimeTickInspector().RegisterSyncOperator(operator) diff --git a/internal/streamingnode/server/wal/interceptors/timetick/inspector/impls.go b/internal/streamingnode/server/wal/interceptors/timetick/inspector/impls.go index 717d9fcb1328a..b03f2ce46c778 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/inspector/impls.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/inspector/impls.go @@ -29,8 +29,8 @@ type timeTickSyncInspectorImpl struct { operators *typeutil.ConcurrentMap[string, TimeTickSyncOperator] } -func (s *timeTickSyncInspectorImpl) TriggerSync(pChannelInfo types.PChannelInfo) { - s.syncNotifier.AddAndNotify(pChannelInfo) +func (s *timeTickSyncInspectorImpl) TriggerSync(pChannelInfo types.PChannelInfo, persisted bool) { + s.syncNotifier.AddAndNotify(pChannelInfo, persisted) } // GetOperator gets the operator by pchannel info. @@ -72,16 +72,16 @@ func (s *timeTickSyncInspectorImpl) background() { return case <-ticker.C: s.operators.Range(func(_ string, operator TimeTickSyncOperator) bool { - operator.Sync(s.taskNotifier.Context()) + operator.Sync(s.taskNotifier.Context(), false) return true }) case <-s.syncNotifier.WaitChan(): - s.syncNotifier.Get().Range(func(pchannel types.PChannelInfo) bool { + signals := s.syncNotifier.Get() + for pchannel, persisted := range signals { if operator, ok := s.operators.Get(pchannel.Name); ok { - operator.Sync(s.taskNotifier.Context()) + operator.Sync(s.taskNotifier.Context(), persisted) } - return true - }) + } } } } diff --git a/internal/streamingnode/server/wal/interceptors/timetick/inspector/inspector.go b/internal/streamingnode/server/wal/interceptors/timetick/inspector/inspector.go index 6bf0f33f1bd4d..6b28f496222ca 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/inspector/inspector.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/inspector/inspector.go @@ -20,14 +20,14 @@ type TimeTickSyncOperator interface { // Sync trigger a sync operation, try to send the timetick message into wal. // Sync operation is a blocking operation, and not thread-safe, will only call in one goroutine. - Sync(ctx context.Context) + Sync(ctx context.Context, forcePersisted bool) } // TimeTickSyncInspector is the inspector to sync time tick. type TimeTickSyncInspector interface { // TriggerSync adds a pchannel info and notify the sync operation. // manually trigger the sync operation of pchannel. - TriggerSync(pChannelInfo types.PChannelInfo) + TriggerSync(pChannelInfo types.PChannelInfo, forcePersisted bool) // RegisterSyncOperator registers a sync operator. RegisterSyncOperator(operator TimeTickSyncOperator) diff --git a/internal/streamingnode/server/wal/interceptors/timetick/inspector/inspector_test.go b/internal/streamingnode/server/wal/interceptors/timetick/inspector/inspector_test.go index b9ef7bc2e4c77..2681c42c08d53 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/inspector/inspector_test.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/inspector/inspector_test.go @@ -24,13 +24,13 @@ func TestInsepctor(t *testing.T) { Term: 1, } operator.EXPECT().Channel().Return(pchannel) - operator.EXPECT().Sync(mock.Anything).Run(func(ctx context.Context) {}) + operator.EXPECT().Sync(mock.Anything, mock.Anything).Run(func(ctx context.Context, forcePersisted bool) {}) i.RegisterSyncOperator(operator) assert.Panics(t, func() { i.RegisterSyncOperator(operator) }) - i.TriggerSync(pchannel) + i.TriggerSync(pchannel, false) o := i.MustGetOperator(pchannel) assert.NotNil(t, o) time.Sleep(250 * time.Millisecond) diff --git a/internal/streamingnode/server/wal/interceptors/timetick/inspector/notifier.go b/internal/streamingnode/server/wal/interceptors/timetick/inspector/notifier.go index ca44463b811d3..106158d819ffe 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/inspector/notifier.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/inspector/notifier.go @@ -5,34 +5,40 @@ import ( "github.com/milvus-io/milvus/pkg/v2/streaming/util/types" "github.com/milvus-io/milvus/pkg/v2/util/syncutil" - "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) // newSyncNotifier creates a new sync notifier. func newSyncNotifier() *syncNotifier { return &syncNotifier{ cond: syncutil.NewContextCond(&sync.Mutex{}), - signal: typeutil.NewSet[types.PChannelInfo](), + signal: map[types.PChannelInfo]bool{}, } } // syncNotifier is a notifier for sync signal. type syncNotifier struct { cond *syncutil.ContextCond - signal typeutil.Set[types.PChannelInfo] + signal map[types.PChannelInfo]bool } // AddAndNotify adds a signal and notifies the waiter. -func (n *syncNotifier) AddAndNotify(pChannelInfo types.PChannelInfo) { +func (n *syncNotifier) AddAndNotify(pChannelInfo types.PChannelInfo, persisted bool) { n.cond.LockAndBroadcast() - n.signal.Insert(pChannelInfo) - n.cond.L.Unlock() + defer n.cond.L.Unlock() + if _, ok := n.signal[pChannelInfo]; !ok { + n.signal[pChannelInfo] = persisted + return + } + if persisted { + // only persisted signal can overwrite the previous signal + n.signal[pChannelInfo] = persisted + } } // WaitChan returns the wait channel. func (n *syncNotifier) WaitChan() <-chan struct{} { n.cond.L.Lock() - if n.signal.Len() > 0 { + if len(n.signal) > 0 { n.cond.L.Unlock() ch := make(chan struct{}) close(ch) @@ -42,10 +48,10 @@ func (n *syncNotifier) WaitChan() <-chan struct{} { } // Get gets the signal. -func (n *syncNotifier) Get() typeutil.Set[types.PChannelInfo] { +func (n *syncNotifier) Get() map[types.PChannelInfo]bool { n.cond.L.Lock() signal := n.signal - n.signal = typeutil.NewSet[types.PChannelInfo]() + n.signal = map[types.PChannelInfo]bool{} n.cond.L.Unlock() return signal } diff --git a/internal/streamingnode/server/wal/interceptors/timetick/inspector/notifier_test.go b/internal/streamingnode/server/wal/interceptors/timetick/inspector/notifier_test.go index 0ac7dd133a293..68cd395ac15ea 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/inspector/notifier_test.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/inspector/notifier_test.go @@ -11,25 +11,42 @@ import ( func TestSyncNotifier(t *testing.T) { n := newSyncNotifier() ch := n.WaitChan() - assert.True(t, n.Get().Len() == 0) + assert.True(t, len(n.Get()) == 0) shouldBeBlocked(ch) n.AddAndNotify(types.PChannelInfo{ Name: "test", Term: 1, - }) + }, false) // should not block <-ch - assert.True(t, n.Get().Len() == 1) - assert.True(t, n.Get().Len() == 0) + assert.True(t, len(n.Get()) == 1) + assert.True(t, len(n.Get()) == 0) n.AddAndNotify(types.PChannelInfo{ Name: "test", Term: 1, - }) + }, false) ch = n.WaitChan() <-ch + + n.AddAndNotify(types.PChannelInfo{ + Name: "test", + Term: 1, + }, true) + n.AddAndNotify(types.PChannelInfo{ + Name: "test", + Term: 1, + }, false) + ch = n.WaitChan() + <-ch + + signals := n.Get() + assert.Equal(t, 1, len(signals)) + for _, v := range signals { + assert.True(t, v) + } } func shouldBeBlocked(ch <-chan struct{}) { diff --git a/internal/streamingnode/server/wal/interceptors/timetick/timetick_sync_operator.go b/internal/streamingnode/server/wal/interceptors/timetick/timetick_sync_operator.go index b016b57e2e4e0..db1571ceb201f 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/timetick_sync_operator.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/timetick_sync_operator.go @@ -93,7 +93,7 @@ func (impl *timeTickSyncOperator) Channel() types.PChannelInfo { // Sync trigger a sync operation. // Sync operation is not thread safe, so call it in a single goroutine. -func (impl *timeTickSyncOperator) Sync(ctx context.Context) { +func (impl *timeTickSyncOperator) Sync(ctx context.Context, persisted bool) { // Sync operation cannot trigger until isReady. if !impl.isReady() { return @@ -106,7 +106,7 @@ func (impl *timeTickSyncOperator) Sync(ctx context.Context) { return nil, err } return appendResult.MessageID, nil - }) + }, persisted) if err != nil { impl.logger.Warn("send time tick sync message failed", zap.Error(err)) } @@ -235,7 +235,7 @@ func (impl *timeTickSyncOperator) Close() { // sendTsMsg sends first timestamp message to wal. // TODO: TT lag warning. -func (impl *timeTickSyncOperator) sendTsMsg(ctx context.Context, appender func(ctx context.Context, msg message.MutableMessage) (message.MessageID, error)) error { +func (impl *timeTickSyncOperator) sendTsMsg(ctx context.Context, appender func(ctx context.Context, msg message.MutableMessage) (message.MessageID, error), forcePersisted bool) error { // Sync the timestamp acknowledged details. impl.syncAcknowledgedDetails(ctx) @@ -248,7 +248,7 @@ func (impl *timeTickSyncOperator) sendTsMsg(ctx context.Context, appender func(c // Construct time tick message. ts := impl.ackDetails.LastAllAcknowledgedTimestamp() lastConfirmedMessageID := impl.ackDetails.EarliestLastConfirmedMessageID() - persist := !impl.ackDetails.IsNoPersistedMessage() + persist := (!impl.ackDetails.IsNoPersistedMessage() || forcePersisted) return impl.sendTsMsgToWAL(ctx, ts, lastConfirmedMessageID, persist, appender) } diff --git a/internal/streamingnode/server/wal/interceptors/timetick/timetick_sync_operator_test.go b/internal/streamingnode/server/wal/interceptors/timetick/timetick_sync_operator_test.go index 4e1f1c0d52d7e..f648e25d56fcf 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/timetick_sync_operator_test.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/timetick_sync_operator_test.go @@ -74,7 +74,7 @@ func TestTimeTickSyncOperator(t *testing.T) { assert.ErrorIs(t, err, context.DeadlineExceeded) assert.Nil(t, r) // should not trigger any wal operation, but only update the timetick. - operator.Sync(ctx) + operator.Sync(ctx, false) r, err = wb.ReadFromExclusiveTimeTick(context.Background(), ts) assert.NoError(t, err) // should not block because timetick updates. @@ -82,6 +82,16 @@ func TestTimeTickSyncOperator(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, msg) assert.Greater(t, msg.TimeTick(), ts) + + // should trigger wal operation. + l.EXPECT().Append(mock.Anything, mock.Anything).Unset() + l.EXPECT().Append(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, mm message.MutableMessage) (*types.AppendResult, error) { + return &types.AppendResult{ + MessageID: walimplstest.NewTestMessageID(1), + TimeTick: mm.TimeTick(), + }, nil + }) + operator.Sync(ctx, true) } func shouldBlock(ch <-chan struct{}) {