Skip to content

Commit 78bc688

Browse files
authored
Remove QueryMsgStream in MqFactory interface (milvus-io#26374)
Signed-off-by: Enwei Jiao <[email protected]>
1 parent 4742049 commit 78bc688

14 files changed

+7
-90
lines changed

docs/developer_guides/chap04_message_stream.md

-1
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,6 @@ type Factory interface {
216216
Init(params *paramtable.ComponentParam) error
217217
NewMsgStream(ctx context.Context) (MsgStream, error)
218218
NewTtMsgStream(ctx context.Context) (MsgStream, error)
219-
NewQueryMsgStream(ctx context.Context) (MsgStream, error)
220219
}
221220

222221
// Pulsar

internal/datanode/flow_graph_dmstream_input_node_test.go

-4
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,6 @@ func (mm *mockMsgStreamFactory) NewTtMsgStream(ctx context.Context) (msgstream.M
5656
return &mockTtMsgStream{}, nil
5757
}
5858

59-
func (mm *mockMsgStreamFactory) NewQueryMsgStream(ctx context.Context) (msgstream.MsgStream, error) {
60-
return nil, nil
61-
}
62-
6359
func (mm *mockMsgStreamFactory) NewMsgStreamDisposer(ctx context.Context) func([]string, string) error {
6460
return nil
6561
}

internal/indexnode/chunkmgr_mock.go

-5
Original file line numberDiff line numberDiff line change
@@ -245,11 +245,6 @@ func (f *mockFactory) NewTtMsgStream(context.Context) (msgstream.MsgStream, erro
245245
return nil, errNotImplErr
246246
}
247247

248-
func (f *mockFactory) NewQueryMsgStream(context.Context) (msgstream.MsgStream, error) {
249-
// TODO
250-
return nil, errNotImplErr
251-
}
252-
253248
func (f *mockFactory) NewMsgStreamDisposer(ctx context.Context) func([]string, string) error {
254249
// TODO
255250
return nil

internal/mq/msgstream/mq_factory_test.go

-5
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,4 @@ func TestRmsFactory(t *testing.T) {
4141
_, err = rmsFactory.NewTtMsgStream(ctx)
4242
assert.NoError(t, err)
4343

44-
_, err = rmsFactory.NewQueryMsgStream(ctx)
45-
assert.NoError(t, err)
46-
47-
err = rmsFactory.NewMsgStreamDisposer(ctx)([]string{"hello"}, "xx")
48-
assert.NoError(t, err)
4944
}

internal/proxy/channels_mgr.go

+4-19
Original file line numberDiff line numberDiff line change
@@ -113,21 +113,12 @@ func getDmlChannelsFunc(ctx context.Context, rc types.RootCoord) getChannelsFunc
113113
}
114114
}
115115

116-
// streamType indicates which type of message stream should be created.
117-
type streamType int
118-
119-
const (
120-
dmlStreamType streamType = iota
121-
dqlStreamType
122-
)
123-
124116
type singleTypeChannelsMgr struct {
125117
infos map[UniqueID]streamInfos // collection id -> stream infos
126118
mu sync.RWMutex
127119

128120
getChannelsFunc getChannelsFuncType
129121
repackFunc repackFuncType
130-
singleStreamType streamType
131122
msgStreamFactory msgstream.Factory
132123
}
133124

@@ -184,15 +175,11 @@ func (mgr *singleTypeChannelsMgr) streamExistPrivate(collectionID UniqueID) bool
184175
return ok && streamInfos.stream != nil
185176
}
186177

187-
func createStream(factory msgstream.Factory, streamType streamType, pchans []pChan, repack repackFuncType) (msgstream.MsgStream, error) {
178+
func createStream(factory msgstream.Factory, pchans []pChan, repack repackFuncType) (msgstream.MsgStream, error) {
188179
var stream msgstream.MsgStream
189180
var err error
190181

191-
if streamType == dqlStreamType {
192-
stream, err = factory.NewQueryMsgStream(context.Background())
193-
} else {
194-
stream, err = factory.NewMsgStream(context.Background())
195-
}
182+
stream, err = factory.NewMsgStream(context.Background())
196183

197184
if err != nil {
198185
return nil, err
@@ -240,7 +227,7 @@ func (mgr *singleTypeChannelsMgr) createMsgStream(collectionID UniqueID) (msgstr
240227
return nil, err
241228
}
242229

243-
stream, err := createStream(mgr.msgStreamFactory, mgr.singleStreamType, channelInfos.pchans, mgr.repackFunc)
230+
stream, err := createStream(mgr.msgStreamFactory, channelInfos.pchans, mgr.repackFunc)
244231
if err != nil {
245232
// What if stream created by other goroutines?
246233
log.Error("failed to create message stream", zap.Error(err), zap.Int64("collection", collectionID))
@@ -309,13 +296,11 @@ func newSingleTypeChannelsMgr(
309296
getChannelsFunc getChannelsFuncType,
310297
msgStreamFactory msgstream.Factory,
311298
repackFunc repackFuncType,
312-
singleStreamType streamType,
313299
) *singleTypeChannelsMgr {
314300
return &singleTypeChannelsMgr{
315301
infos: make(map[UniqueID]streamInfos),
316302
getChannelsFunc: getChannelsFunc,
317303
repackFunc: repackFunc,
318-
singleStreamType: singleStreamType,
319304
msgStreamFactory: msgStreamFactory,
320305
}
321306
}
@@ -355,6 +340,6 @@ func newChannelsMgrImpl(
355340
msgStreamFactory msgstream.Factory,
356341
) *channelsMgrImpl {
357342
return &channelsMgrImpl{
358-
dmlChannelsMgr: newSingleTypeChannelsMgr(getDmlChannelsFunc, msgStreamFactory, dmlRepackFunc, dmlStreamType),
343+
dmlChannelsMgr: newSingleTypeChannelsMgr(getDmlChannelsFunc, msgStreamFactory, dmlRepackFunc),
359344
}
360345
}

internal/proxy/channels_mgr_test.go

+3-6
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ func Test_createStream(t *testing.T) {
213213
factory.fQStream = func(ctx context.Context) (msgstream.MsgStream, error) {
214214
return nil, errors.New("mock")
215215
}
216-
_, err := createStream(factory, dmlStreamType, nil, nil)
216+
_, err := createStream(factory, nil, nil)
217217
assert.Error(t, err)
218218
})
219219

@@ -222,7 +222,7 @@ func Test_createStream(t *testing.T) {
222222
factory.f = func(ctx context.Context) (msgstream.MsgStream, error) {
223223
return nil, errors.New("mock")
224224
}
225-
_, err := createStream(factory, dqlStreamType, nil, nil)
225+
_, err := createStream(factory, nil, nil)
226226
assert.Error(t, err)
227227
})
228228

@@ -231,7 +231,7 @@ func Test_createStream(t *testing.T) {
231231
factory.f = func(ctx context.Context) (msgstream.MsgStream, error) {
232232
return newMockMsgStream(), nil
233233
}
234-
_, err := createStream(factory, dmlStreamType, []string{"111"}, func(tsMsgs []msgstream.TsMsg, hashKeys [][]int32) (map[int32]*msgstream.MsgPack, error) {
234+
_, err := createStream(factory, []string{"111"}, func(tsMsgs []msgstream.TsMsg, hashKeys [][]int32) (map[int32]*msgstream.MsgPack, error) {
235235
return nil, nil
236236
})
237237
assert.NoError(t, err)
@@ -271,7 +271,6 @@ func Test_singleTypeChannelsMgr_createMsgStream(t *testing.T) {
271271
return channelInfos{vchans: []string{"111", "222"}, pchans: []string{"111"}}, nil
272272
},
273273
msgStreamFactory: factory,
274-
singleStreamType: dmlStreamType,
275274
repackFunc: nil,
276275
}
277276
_, err := m.createMsgStream(100)
@@ -289,7 +288,6 @@ func Test_singleTypeChannelsMgr_createMsgStream(t *testing.T) {
289288
return channelInfos{vchans: []string{"111", "222"}, pchans: []string{"111"}}, nil
290289
},
291290
msgStreamFactory: factory,
292-
singleStreamType: dmlStreamType,
293291
repackFunc: nil,
294292
}
295293
stream, err := m.createMsgStream(100)
@@ -356,7 +354,6 @@ func Test_singleTypeChannelsMgr_getStream(t *testing.T) {
356354
return channelInfos{vchans: []string{"111", "222"}, pchans: []string{"111"}}, nil
357355
},
358356
msgStreamFactory: factory,
359-
singleStreamType: dmlStreamType,
360357
repackFunc: nil,
361358
}
362359
stream, err := m.getOrCreateStream(100)

internal/proxy/mock_msgstream_test.go

-7
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,6 @@ func (m *mockMsgStreamFactory) NewTtMsgStream(ctx context.Context) (msgstream.Ms
5858
return nil, errors.New("mock")
5959
}
6060

61-
func (m *mockMsgStreamFactory) NewQueryMsgStream(ctx context.Context) (msgstream.MsgStream, error) {
62-
if m.fQStream != nil {
63-
return m.fQStream(ctx)
64-
}
65-
return nil, errors.New("mock")
66-
}
67-
6861
func newMockMsgStreamFactory() *mockMsgStreamFactory {
6962
return &mockMsgStreamFactory{}
7063
}

internal/proxy/mock_test.go

-4
Original file line numberDiff line numberDiff line change
@@ -326,10 +326,6 @@ func (factory *simpleMockMsgStreamFactory) NewTtMsgStream(ctx context.Context) (
326326
return newSimpleMockMsgStream(), nil
327327
}
328328

329-
func (factory *simpleMockMsgStreamFactory) NewQueryMsgStream(ctx context.Context) (msgstream.MsgStream, error) {
330-
return newSimpleMockMsgStream(), nil
331-
}
332-
333329
func (factory *simpleMockMsgStreamFactory) NewMsgStreamDisposer(ctx context.Context) func([]string, string) error {
334330
return nil
335331
}

internal/util/dependency/factory.go

-4
Original file line numberDiff line numberDiff line change
@@ -144,10 +144,6 @@ func (f *DefaultFactory) NewTtMsgStream(ctx context.Context) (msgstream.MsgStrea
144144
return f.msgStreamFactory.NewTtMsgStream(ctx)
145145
}
146146

147-
func (f *DefaultFactory) NewQueryMsgStream(ctx context.Context) (msgstream.MsgStream, error) {
148-
return f.msgStreamFactory.NewQueryMsgStream(ctx)
149-
}
150-
151147
func (f *DefaultFactory) NewMsgStreamDisposer(ctx context.Context) func([]string, string) error {
152148
return f.msgStreamFactory.NewMsgStreamDisposer(ctx)
153149
}

pkg/mq/msgstream/common_mq_factory.go

-10
Original file line numberDiff line numberDiff line change
@@ -40,16 +40,6 @@ func (f *CommonFactory) NewTtMsgStream(ctx context.Context) (ms MsgStream, err e
4040
return NewMqTtMsgStream(ctx, f.ReceiveBufSize, f.MQBufSize, cli, f.DispatcherFactory.NewUnmarshalDispatcher())
4141
}
4242

43-
// NewQueryMsgStream is used to generate a new QueryMsgstream object
44-
func (f *CommonFactory) NewQueryMsgStream(ctx context.Context) (ms MsgStream, err error) {
45-
defer wrapError(&err, "NewQueryMsgStream")
46-
cli, err := f.Newer()
47-
if err != nil {
48-
return nil, err
49-
}
50-
return NewMqMsgStream(ctx, f.ReceiveBufSize, f.MQBufSize, cli, f.DispatcherFactory.NewUnmarshalDispatcher())
51-
}
52-
5343
// NewMsgStreamDisposer returns a function that can be used to dispose of a message stream.
5444
// The returned function takes a slice of channel names and a subscription name, and
5545
// disposes of the message stream associated with those arguments.

pkg/mq/msgstream/factory_test.go

-3
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,6 @@ func testFactoryCommonOperation(t *testing.T, f Factory) {
8282
_, err = f.NewTtMsgStream(ctx)
8383
assert.NoError(t, err)
8484

85-
_, err = f.NewQueryMsgStream(ctx)
86-
assert.NoError(t, err)
87-
8885
err = f.NewMsgStreamDisposer(ctx)([]string{"hello"}, "xx")
8986
assert.NoError(t, err)
9087
}

pkg/mq/msgstream/mq_factory.go

-9
Original file line numberDiff line numberDiff line change
@@ -110,11 +110,6 @@ func (f *PmsFactory) getAuthentication() (pulsar.Authentication, error) {
110110
return auth, nil
111111
}
112112

113-
// NewQueryMsgStream is used to generate a new QueryMsgstream object
114-
func (f *PmsFactory) NewQueryMsgStream(ctx context.Context) (MsgStream, error) {
115-
return f.NewMsgStream(ctx)
116-
}
117-
118113
func (f *PmsFactory) NewMsgStreamDisposer(ctx context.Context) func([]string, string) error {
119114
return func(channels []string, subname string) error {
120115
// try to delete the old subscription
@@ -166,10 +161,6 @@ func (f *KmsFactory) NewTtMsgStream(ctx context.Context) (MsgStream, error) {
166161
return NewMqTtMsgStream(ctx, f.ReceiveBufSize, f.MQBufSize, kafkaClient, f.dispatcherFactory.NewUnmarshalDispatcher())
167162
}
168163

169-
func (f *KmsFactory) NewQueryMsgStream(ctx context.Context) (MsgStream, error) {
170-
return f.NewMsgStream(ctx)
171-
}
172-
173164
func (f *KmsFactory) NewMsgStreamDisposer(ctx context.Context) func([]string, string) error {
174165
return func(channels []string, subname string) error {
175166
msgstream, err := f.NewMsgStream(ctx)

pkg/mq/msgstream/mq_factory_test.go

-12
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,6 @@ func TestPmsFactory(t *testing.T) {
3434
_, err = pmsFactory.NewTtMsgStream(ctx)
3535
assert.NoError(t, err)
3636

37-
_, err = pmsFactory.NewQueryMsgStream(ctx)
38-
assert.NoError(t, err)
39-
4037
err = pmsFactory.NewMsgStreamDisposer(ctx)([]string{"hello"}, "xx")
4138
assert.NoError(t, err)
4239
}
@@ -58,9 +55,6 @@ func TestPmsFactoryWithAuth(t *testing.T) {
5855
_, err = pmsFactory.NewTtMsgStream(ctx)
5956
assert.NoError(t, err)
6057

61-
_, err = pmsFactory.NewQueryMsgStream(ctx)
62-
assert.NoError(t, err)
63-
6458
Params.Save(Params.PulsarCfg.AuthParams.Key, "")
6559
pmsFactory = NewPmsFactory(config)
6660

@@ -71,9 +65,6 @@ func TestPmsFactoryWithAuth(t *testing.T) {
7165
_, err = pmsFactory.NewTtMsgStream(ctx)
7266
assert.Error(t, err)
7367

74-
_, err = pmsFactory.NewQueryMsgStream(ctx)
75-
assert.Error(t, err)
76-
7768
}
7869

7970
func TestKafkaFactory(t *testing.T) {
@@ -86,9 +77,6 @@ func TestKafkaFactory(t *testing.T) {
8677
_, err = kmsFactory.NewTtMsgStream(ctx)
8778
assert.NoError(t, err)
8879

89-
_, err = kmsFactory.NewQueryMsgStream(ctx)
90-
assert.NoError(t, err)
91-
9280
// err = kmsFactory.NewMsgStreamDisposer(ctx)([]string{"hello"}, "xx")
9381
// assert.NoError(t, err)
9482
}

pkg/mq/msgstream/msgstream.go

-1
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,5 @@ type MsgStream interface {
7373
type Factory interface {
7474
NewMsgStream(ctx context.Context) (MsgStream, error)
7575
NewTtMsgStream(ctx context.Context) (MsgStream, error)
76-
NewQueryMsgStream(ctx context.Context) (MsgStream, error)
7776
NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
7877
}

0 commit comments

Comments
 (0)