Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ import (
var errChannelLifetimeUnrecoverable = errors.New("channel lifetime unrecoverable")

// RecoverWALFlusher recovers the wal flusher.
func RecoverWALFlusher(param interceptors.InterceptorBuildParam) *WALFlusherImpl {
func RecoverWALFlusher(param *interceptors.InterceptorBuildParam) *WALFlusherImpl {
flusher := &WALFlusherImpl{
notifier: syncutil.NewAsyncTaskNotifier[struct{}](),
wal: param.WAL,
logger: resource.Resource().Logger().With(
log.FieldComponent("flusher"),
zap.String("pchannel", param.WALImpls.Channel().Name)),
metrics: newFlusherMetrics(param.WALImpls.Channel()),
zap.String("pchannel", param.ChannelInfo.Name)),
metrics: newFlusherMetrics(param.ChannelInfo),
}
go flusher.Execute()
return flusher
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
internaltypes "github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/mocks/streaming/mock_walimpls"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
Expand Down Expand Up @@ -72,13 +71,9 @@ func TestWALFlusher(t *testing.T) {
resource.OptStreamingNodeCatalog(snMeta),
resource.OptChunkManager(mock_storage.NewMockChunkManager(t)),
)
walImpl := mock_walimpls.NewMockWALImpls(t)
walImpl.EXPECT().Channel().Return(types.PChannelInfo{Name: "pchannel"})

l := newMockWAL(t, false)
param := interceptors.InterceptorBuildParam{
WALImpls: walImpl,
WAL: syncutil.NewFuture[wal.WAL](),
param := &interceptors.InterceptorBuildParam{
WAL: syncutil.NewFuture[wal.WAL](),
}
param.WAL.Set(l)
flusher := RecoverWALFlusher(param)
Expand Down
110 changes: 110 additions & 0 deletions internal/streamingnode/server/wal/adaptor/initializing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package adaptor

import (
"context"
"time"

"github.com/cockroachdb/errors"
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/mvcc"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/wab"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/streaming/walimpls"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/syncutil"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)

// buildInterceptorParams builds the interceptor params for the walimpls.
func buildInterceptorParams(ctx context.Context, underlyingWALImpls walimpls.WALImpls) (*interceptors.InterceptorBuildParam, error) {
msg, err := sendFirstTimeTick(ctx, underlyingWALImpls)
if err != nil {
return nil, err
}

Check warning on line 28 in internal/streamingnode/server/wal/adaptor/initializing.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingnode/server/wal/adaptor/initializing.go#L27-L28

Added lines #L27 - L28 were not covered by tests

capacity := int(paramtable.Get().StreamingCfg.WALWriteAheadBufferCapacity.GetAsSize())
keepalive := paramtable.Get().StreamingCfg.WALWriteAheadBufferKeepalive.GetAsDurationByParse()
writeAheadBuffer := wab.NewWirteAheadBuffer(
underlyingWALImpls.Channel().Name,
resource.Resource().Logger().With(),
capacity,
keepalive,
msg,
)
mvccManager := mvcc.NewMVCCManager(msg.TimeTick())
return &interceptors.InterceptorBuildParam{
ChannelInfo: underlyingWALImpls.Channel(),
WAL: syncutil.NewFuture[wal.WAL](),
InitializedTimeTick: msg.TimeTick(),
InitializedMessageID: msg.MessageID(),
WriteAheadBuffer: writeAheadBuffer,
MVCCManager: mvccManager,
}, nil
}

// sendFirstTimeTick sends the first timetick message to walimpls.
// It is used to make a fence operation with the underlying walimpls and get the timetick and last message id to recover the wal state.
func sendFirstTimeTick(ctx context.Context, underlyingWALImpls walimpls.WALImpls) (message.ImmutableMessage, error) {
logger := resource.Resource().Logger()
logger.Info("start to sync first time tick")
defer logger.Info("sync first time tick done")

backoffTimer := typeutil.NewBackoffTimer(typeutil.BackoffTimerConfig{
Default: 5 * time.Second,
Backoff: typeutil.BackoffConfig{
InitialInterval: 20 * time.Millisecond,
Multiplier: 2.0,
MaxInterval: 5 * time.Second,
},
})
backoffTimer.EnableBackoff()

var lastErr error
sourceID := paramtable.GetNodeID()
// Send first timetick message to wal before interceptor is ready.
for count := 0; ; count++ {
if count > 0 {
nextTimer, nextBalanceInterval := backoffTimer.NextTimer()
logger.Warn(
"send first time tick failed",
zap.Duration("nextBalanceInterval", nextBalanceInterval),
zap.Int("retryCount", count),
zap.Error(lastErr),
)
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-nextTimer:

Check warning on line 82 in internal/streamingnode/server/wal/adaptor/initializing.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingnode/server/wal/adaptor/initializing.go#L72-L82

Added lines #L72 - L82 were not covered by tests
}
}

// Sent first timetick message to wal before ready.
// New TT is always greater than all tt on previous streamingnode.
// A fencing operation of underlying WAL is needed to make exclusive produce of topic.
// Otherwise, the TT principle may be violated.
// And sendTsMsg must be done, to help ackManager to get first LastConfirmedMessageID
// !!! Send a timetick message into walimpls directly is safe.
resource.Resource().TSOAllocator().Sync()
ts, err := resource.Resource().TSOAllocator().Allocate(ctx)
if err != nil {
lastErr = errors.Wrap(err, "allocate timestamp failed")
continue

Check warning on line 96 in internal/streamingnode/server/wal/adaptor/initializing.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingnode/server/wal/adaptor/initializing.go#L95-L96

Added lines #L95 - L96 were not covered by tests
}
msg, err := timetick.NewTimeTickMsg(ts, nil, sourceID)
if err != nil {
lastErr = errors.Wrap(err, "at build time tick msg")
continue

Check warning on line 101 in internal/streamingnode/server/wal/adaptor/initializing.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingnode/server/wal/adaptor/initializing.go#L100-L101

Added lines #L100 - L101 were not covered by tests
}
msgID, err := underlyingWALImpls.Append(ctx, msg)
if err != nil {
lastErr = errors.Wrap(err, "send first timestamp message failed")
continue

Check warning on line 106 in internal/streamingnode/server/wal/adaptor/initializing.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingnode/server/wal/adaptor/initializing.go#L105-L106

Added lines #L105 - L106 were not covered by tests
}
return msg.IntoImmutableMessage(msgID), nil
}
}
5 changes: 4 additions & 1 deletion internal/streamingnode/server/wal/adaptor/opener.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,13 @@
}

// wrap the wal into walExtend with cleanup function and interceptors.
wal := adaptImplsToWAL(l, o.interceptorBuilders, func() {
wal, err := adaptImplsToWAL(ctx, l, o.interceptorBuilders, func() {
o.walInstances.Remove(id)
logger.Info("wal deleted from opener")
})
if err != nil {
return nil, err
}

Check warning on line 65 in internal/streamingnode/server/wal/adaptor/opener.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingnode/server/wal/adaptor/opener.go#L64-L65

Added lines #L64 - L65 were not covered by tests

o.walInstances.Insert(id, wal)
logger.Info("new wal created")
Expand Down
2 changes: 2 additions & 0 deletions internal/streamingnode/server/wal/adaptor/opener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/pkg/v2/mocks/streaming/mock_walimpls"
"github.com/milvus-io/milvus/pkg/v2/mocks/streaming/util/mock_message"
Expand Down Expand Up @@ -40,6 +41,7 @@ func TestOpenerAdaptorFailure(t *testing.T) {
}

func TestOpenerAdaptor(t *testing.T) {
resource.InitForTest(t)
// Build basic opener.
basicOpener := mock_walimpls.NewMockOpenerImpls(t)
basicOpener.EXPECT().Open(mock.Anything, mock.Anything).RunAndReturn(
Expand Down
4 changes: 1 addition & 3 deletions internal/streamingnode/server/wal/adaptor/scanner_adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,13 @@ func (s *scannerAdaptorImpl) produceEventLoop(msgChan chan<- message.ImmutableMe
var wb wab.ROWriteAheadBuffer
var err error
if s.Channel().AccessMode == types.AccessModeRW {
if wb, err = resource.Resource().TimeTickInspector().MustGetOperator(s.Channel()).WriteAheadBuffer(s.Context()); err != nil {
return err
}
// Trigger a persisted time tick to make sure the timetick is pushed forward.
// because the underlying wal may be deleted because of retention policy.
// So we cannot get the timetick from the wal.
// Trigger the timetick inspector to append a new persisted timetick,
// then the catch up scanner can see the latest timetick and make a catchup.
resource.Resource().TimeTickInspector().TriggerSync(s.Channel(), true)
wb = resource.Resource().TimeTickInspector().MustGetOperator(s.Channel()).WriteAheadBuffer()
}

scanner := newSwithableScanner(s.Name(), s.logger, s.innerWAL, wb, s.readOption.DeliverPolicy, msgChan)
Expand Down
Loading
Loading