Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions internal/streamingnode/server/wal/adaptor/scanner_adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ func (s *scannerAdaptorImpl) produceEventLoop(msgChan chan<- message.ImmutableMe
if wb, err = resource.Resource().TimeTickInspector().MustGetOperator(s.Channel()).WriteAheadBuffer(s.Context()); err != nil {
return err
}
// Trigger a persisted time tick to make sure the timetick is pushed forward.
// because the underlying wal may be deleted because of retention policy.
// So we cannot get the timetick from the wal.
// Trigger the timetick inspector to append a new persisted timetick,
// then the catch up scanner can see the latest timetick and make a catchup.
resource.Resource().TimeTickInspector().TriggerSync(s.Channel(), true)
}

scanner := newSwithableScanner(s.Name(), s.logger, s.innerWAL, wb, s.readOption.DeliverPolicy, msgChan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestScannerAdaptorReadError(t *testing.T) {

operator := mock_inspector.NewMockTimeTickSyncOperator(t)
operator.EXPECT().Channel().Return(types.PChannelInfo{})
operator.EXPECT().Sync(mock.Anything).Run(func(ctx context.Context) {
operator.EXPECT().Sync(mock.Anything, mock.Anything).Run(func(ctx context.Context, forcePersisted bool) {
sig1.Close()
})
wb := mock_wab.NewMockROWriteAheadBuffer(t)
Expand Down
2 changes: 1 addition & 1 deletion internal/streamingnode/server/wal/adaptor/wal_adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (w *walAdaptorImpl) GetLatestMVCCTimestamp(ctx context.Context, vchannel st
currentMVCC := mvccManager.GetMVCCOfVChannel(vchannel)
if !currentMVCC.Confirmed {
// if the mvcc is not confirmed, trigger a sync operation to make it confirmed as soon as possible.
resource.Resource().TimeTickInspector().TriggerSync(w.rwWALImpls.Channel())
resource.Resource().TimeTickInspector().TriggerSync(w.rwWALImpls.Channel(), false)
}
return currentMVCC.Timetick, nil
}
Expand Down
4 changes: 2 additions & 2 deletions internal/streamingnode/server/wal/adaptor/wal_adaptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestWalAdaptorReadFail(t *testing.T) {
writeAheadBuffer := mock_wab.NewMockROWriteAheadBuffer(t)
operator := mock_inspector.NewMockTimeTickSyncOperator(t)
operator.EXPECT().Channel().Return(types.PChannelInfo{}).Maybe()
operator.EXPECT().Sync(mock.Anything).Return().Maybe()
operator.EXPECT().Sync(mock.Anything, mock.Anything).Return().Maybe()
operator.EXPECT().WriteAheadBuffer(mock.Anything).Return(writeAheadBuffer, nil).Maybe()
resource.Resource().TimeTickInspector().RegisterSyncOperator(
operator,
Expand Down Expand Up @@ -95,7 +95,7 @@ func TestWALAdaptor(t *testing.T) {

operator := mock_inspector.NewMockTimeTickSyncOperator(t)
operator.EXPECT().Channel().Return(types.PChannelInfo{})
operator.EXPECT().Sync(mock.Anything).Return()
operator.EXPECT().Sync(mock.Anything, mock.Anything).Return()
buffer := mock_wab.NewMockROWriteAheadBuffer(t)
operator.EXPECT().WriteAheadBuffer(mock.Anything).Return(buffer, nil)
resource.Resource().TimeTickInspector().RegisterSyncOperator(operator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ type timeTickSyncInspectorImpl struct {
operators *typeutil.ConcurrentMap[string, TimeTickSyncOperator]
}

func (s *timeTickSyncInspectorImpl) TriggerSync(pChannelInfo types.PChannelInfo) {
s.syncNotifier.AddAndNotify(pChannelInfo)
func (s *timeTickSyncInspectorImpl) TriggerSync(pChannelInfo types.PChannelInfo, persisted bool) {
s.syncNotifier.AddAndNotify(pChannelInfo, persisted)
}

// GetOperator gets the operator by pchannel info.
Expand Down Expand Up @@ -72,16 +72,16 @@ func (s *timeTickSyncInspectorImpl) background() {
return
case <-ticker.C:
s.operators.Range(func(_ string, operator TimeTickSyncOperator) bool {
operator.Sync(s.taskNotifier.Context())
operator.Sync(s.taskNotifier.Context(), false)
return true
})
case <-s.syncNotifier.WaitChan():
s.syncNotifier.Get().Range(func(pchannel types.PChannelInfo) bool {
signals := s.syncNotifier.Get()
for pchannel, persisted := range signals {
if operator, ok := s.operators.Get(pchannel.Name); ok {
operator.Sync(s.taskNotifier.Context())
operator.Sync(s.taskNotifier.Context(), persisted)
}
return true
})
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ type TimeTickSyncOperator interface {

// Sync trigger a sync operation, try to send the timetick message into wal.
// Sync operation is a blocking operation, and not thread-safe, will only call in one goroutine.
Sync(ctx context.Context)
Sync(ctx context.Context, forcePersisted bool)
}

// TimeTickSyncInspector is the inspector to sync time tick.
type TimeTickSyncInspector interface {
// TriggerSync adds a pchannel info and notify the sync operation.
// manually trigger the sync operation of pchannel.
TriggerSync(pChannelInfo types.PChannelInfo)
TriggerSync(pChannelInfo types.PChannelInfo, forcePersisted bool)

// RegisterSyncOperator registers a sync operator.
RegisterSyncOperator(operator TimeTickSyncOperator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ func TestInsepctor(t *testing.T) {
Term: 1,
}
operator.EXPECT().Channel().Return(pchannel)
operator.EXPECT().Sync(mock.Anything).Run(func(ctx context.Context) {})
operator.EXPECT().Sync(mock.Anything, mock.Anything).Run(func(ctx context.Context, forcePersisted bool) {})

i.RegisterSyncOperator(operator)
assert.Panics(t, func() {
i.RegisterSyncOperator(operator)
})
i.TriggerSync(pchannel)
i.TriggerSync(pchannel, false)
o := i.MustGetOperator(pchannel)
assert.NotNil(t, o)
time.Sleep(250 * time.Millisecond)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,40 @@ import (

"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
"github.com/milvus-io/milvus/pkg/v2/util/syncutil"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)

// newSyncNotifier creates a new sync notifier.
func newSyncNotifier() *syncNotifier {
return &syncNotifier{
cond: syncutil.NewContextCond(&sync.Mutex{}),
signal: typeutil.NewSet[types.PChannelInfo](),
signal: map[types.PChannelInfo]bool{},
}
}

// syncNotifier is a notifier for sync signal.
type syncNotifier struct {
cond *syncutil.ContextCond
signal typeutil.Set[types.PChannelInfo]
signal map[types.PChannelInfo]bool
}

// AddAndNotify adds a signal and notifies the waiter.
func (n *syncNotifier) AddAndNotify(pChannelInfo types.PChannelInfo) {
func (n *syncNotifier) AddAndNotify(pChannelInfo types.PChannelInfo, persisted bool) {
n.cond.LockAndBroadcast()
n.signal.Insert(pChannelInfo)
n.cond.L.Unlock()
defer n.cond.L.Unlock()
if _, ok := n.signal[pChannelInfo]; !ok {
n.signal[pChannelInfo] = persisted
return
}
if persisted {
// only persisted signal can overwrite the previous signal
n.signal[pChannelInfo] = persisted
}
}

// WaitChan returns the wait channel.
func (n *syncNotifier) WaitChan() <-chan struct{} {
n.cond.L.Lock()
if n.signal.Len() > 0 {
if len(n.signal) > 0 {
n.cond.L.Unlock()
ch := make(chan struct{})
close(ch)
Expand All @@ -42,10 +48,10 @@ func (n *syncNotifier) WaitChan() <-chan struct{} {
}

// Get gets the signal.
func (n *syncNotifier) Get() typeutil.Set[types.PChannelInfo] {
func (n *syncNotifier) Get() map[types.PChannelInfo]bool {
n.cond.L.Lock()
signal := n.signal
n.signal = typeutil.NewSet[types.PChannelInfo]()
n.signal = map[types.PChannelInfo]bool{}
n.cond.L.Unlock()
return signal
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,42 @@ import (
func TestSyncNotifier(t *testing.T) {
n := newSyncNotifier()
ch := n.WaitChan()
assert.True(t, n.Get().Len() == 0)
assert.True(t, len(n.Get()) == 0)

shouldBeBlocked(ch)

n.AddAndNotify(types.PChannelInfo{
Name: "test",
Term: 1,
})
}, false)
// should not block
<-ch
assert.True(t, n.Get().Len() == 1)
assert.True(t, n.Get().Len() == 0)
assert.True(t, len(n.Get()) == 1)
assert.True(t, len(n.Get()) == 0)

n.AddAndNotify(types.PChannelInfo{
Name: "test",
Term: 1,
})
}, false)
ch = n.WaitChan()
<-ch

n.AddAndNotify(types.PChannelInfo{
Name: "test",
Term: 1,
}, true)
n.AddAndNotify(types.PChannelInfo{
Name: "test",
Term: 1,
}, false)
ch = n.WaitChan()
<-ch

signals := n.Get()
assert.Equal(t, 1, len(signals))
for _, v := range signals {
assert.True(t, v)
}
}

func shouldBeBlocked(ch <-chan struct{}) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (impl *timeTickSyncOperator) Channel() types.PChannelInfo {

// Sync trigger a sync operation.
// Sync operation is not thread safe, so call it in a single goroutine.
func (impl *timeTickSyncOperator) Sync(ctx context.Context) {
func (impl *timeTickSyncOperator) Sync(ctx context.Context, persisted bool) {
// Sync operation cannot trigger until isReady.
if !impl.isReady() {
return
Expand All @@ -106,7 +106,7 @@ func (impl *timeTickSyncOperator) Sync(ctx context.Context) {
return nil, err
}
return appendResult.MessageID, nil
})
}, persisted)
if err != nil {
impl.logger.Warn("send time tick sync message failed", zap.Error(err))
}
Expand Down Expand Up @@ -235,7 +235,7 @@ func (impl *timeTickSyncOperator) Close() {

// sendTsMsg sends first timestamp message to wal.
// TODO: TT lag warning.
func (impl *timeTickSyncOperator) sendTsMsg(ctx context.Context, appender func(ctx context.Context, msg message.MutableMessage) (message.MessageID, error)) error {
func (impl *timeTickSyncOperator) sendTsMsg(ctx context.Context, appender func(ctx context.Context, msg message.MutableMessage) (message.MessageID, error), forcePersisted bool) error {
// Sync the timestamp acknowledged details.
impl.syncAcknowledgedDetails(ctx)

Expand All @@ -248,7 +248,7 @@ func (impl *timeTickSyncOperator) sendTsMsg(ctx context.Context, appender func(c
// Construct time tick message.
ts := impl.ackDetails.LastAllAcknowledgedTimestamp()
lastConfirmedMessageID := impl.ackDetails.EarliestLastConfirmedMessageID()
persist := !impl.ackDetails.IsNoPersistedMessage()
persist := (!impl.ackDetails.IsNoPersistedMessage() || forcePersisted)

return impl.sendTsMsgToWAL(ctx, ts, lastConfirmedMessageID, persist, appender)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,24 @@ func TestTimeTickSyncOperator(t *testing.T) {
assert.ErrorIs(t, err, context.DeadlineExceeded)
assert.Nil(t, r)
// should not trigger any wal operation, but only update the timetick.
operator.Sync(ctx)
operator.Sync(ctx, false)
r, err = wb.ReadFromExclusiveTimeTick(context.Background(), ts)
assert.NoError(t, err)
// should not block because timetick updates.
msg, err := r.Next(context.Background())
assert.NoError(t, err)
assert.NotNil(t, msg)
assert.Greater(t, msg.TimeTick(), ts)

// should trigger wal operation.
l.EXPECT().Append(mock.Anything, mock.Anything).Unset()
l.EXPECT().Append(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, mm message.MutableMessage) (*types.AppendResult, error) {
return &types.AppendResult{
MessageID: walimplstest.NewTestMessageID(1),
TimeTick: mm.TimeTick(),
}, nil
})
operator.Sync(ctx, true)
}

func shouldBlock(ch <-chan struct{}) {
Expand Down
Loading