Skip to content

Commit f97127a

Browse files
chyezhyiwangdr
andauthored
add nats mq wrappers (milvus-io#24445)
bug fixup, configurable natsmq, add unittest, pass e2e. move natsmq to pkg project Signed-off-by: chyezh <[email protected]> Co-authored-by: yiwangdr <[email protected]>
1 parent 5f7099a commit f97127a

33 files changed

+3295
-242
lines changed

.gitignore

+5
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ cpp_coverage/
8383

8484
# virtualenv
8585
venv/
86+
.venv/
87+
88+
# gopls generated
89+
go.work
90+
go.work.sum
8691

8792
# docker compose volumes
8893
deployments/docker/*/volumes

cmd/roles/roles.go

+2-14
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ func runComponent[T component](ctx context.Context,
8888
localMsg bool,
8989
runWg *sync.WaitGroup,
9090
creator func(context.Context, dependency.Factory) (T, error),
91-
metricRegister func(*prometheus.Registry)) T {
91+
metricRegister func(*prometheus.Registry),
92+
) T {
9293
var role T
9394
var wg sync.WaitGroup
9495

@@ -247,19 +248,6 @@ func (mr *MilvusRoles) Run(local bool, alias string) {
247248

248249
paramtable.Init()
249250
params := paramtable.Get()
250-
251-
if params.RocksmqEnable() {
252-
path, err := params.Load("rocksmq.path")
253-
if err != nil {
254-
panic(err)
255-
}
256-
257-
if err = rocksmqimpl.InitRocksMQ(path); err != nil {
258-
panic(err)
259-
}
260-
defer stopRocksmq()
261-
}
262-
263251
if params.EtcdCfg.UseEmbedEtcd.GetAsBool() {
264252
// Start etcd server.
265253
etcd.InitEtcdServer(

configs/milvus.yaml

+36-16
Original file line numberDiff line numberDiff line change
@@ -75,23 +75,28 @@ minio:
7575
# For more information, refer to
7676
# aws: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_use.html
7777
# gcp: https://cloud.google.com/storage/docs/access-control/iam
78-
# aliyun (ack): https://www.alibabacloud.com/help/en/container-service-for-kubernetes/latest/use-rrsa-to-enforce-access-control
78+
# aliyun (ack): https://www.alibabacloud.com/help/en/container-service-for-kubernetes/latest/use-rrsa-to-enforce-access-control
7979
# aliyun (ecs): https://www.alibabacloud.com/help/en/elastic-compute-service/latest/attach-an-instance-ram-role
8080
useIAM: false
81-
# Cloud Provider of S3. Supports: "aws", "gcp", "aliyun".
81+
# Cloud Provider of S3. Supports: "aws", "gcp", "aliyun".
8282
# You can use "aws" for other cloud provider supports S3 API with signature v4, e.g.: minio
8383
# You can use "gcp" for other cloud provider supports S3 API with signature v2
84-
# You can use "aliyun" for other cloud provider uses virtual host style bucket
84+
# You can use "aliyun" for other cloud provider uses virtual host style bucket
8585
# When useIAM enabled, only "aws", "gcp", "aliyun" is supported for now
8686
cloudProvider: aws
8787
# Custom endpoint for fetch IAM role credentials. when useIAM is true & cloudProvider is "aws".
8888
# Leave it empty if you want to use AWS default endpoint
89-
iamEndpoint:
89+
iamEndpoint:
9090

91-
# Milvus supports three MQ: rocksmq(based on RockDB), Pulsar and Kafka, which should be reserved in config what you use.
92-
# There is a note about enabling priority if we config multiple mq in this file
93-
# 1. standalone(local) mode: rocksmq(default) > Pulsar > Kafka
94-
# 2. cluster mode: Pulsar(default) > Kafka (rocksmq is unsupported)
91+
# Milvus supports four MQ: rocksmq(based on RockDB), natsmq(embedded nats-server), Pulsar and Kafka.
92+
# You can change your mq by setting mq.type field.
93+
# If you don't set mq.type field as default, there is a note about enabling priority if we config multiple mq in this file.
94+
# 1. standalone(local) mode: rocksmq(default) > natsmq > Pulsar > Kafka
95+
# 2. cluster mode: Pulsar(default) > Kafka (rocksmq and natsmq is unsupported in cluster mode)
96+
mq:
97+
# Default value: "default"
98+
# Valid values: [default, pulsar, kafka, rocksmq, natsmq]
99+
type: default
95100

96101
# Related configuration of pulsar, used to manage Milvus logs of recent mutation operations, output streaming log, and provide log publish-subscribe services.
97102
pulsar:
@@ -104,9 +109,9 @@ pulsar:
104109

105110
# If you want to enable kafka, needs to comment the pulsar configs
106111
# kafka:
107-
# brokerList:
108-
# saslUsername:
109-
# saslPassword:
112+
# brokerList:
113+
# saslUsername:
114+
# saslPassword:
110115
# saslMechanisms: PLAIN
111116
# securityProtocol: SASL_SSL
112117

@@ -120,6 +125,21 @@ rocksmq:
120125
retentionSizeInMB: 8192 # 8 GB, 8 * 1024 MB, The retention size of the message in rocksmq.
121126
compactionInterval: 86400 # 1 day, trigger rocksdb compaction every day to remove deleted data
122127

128+
# natsmq configuration.
129+
natsmq:
130+
server: # server side configuration for natsmq.
131+
port: 4222 # 4222 by default, Port for nats server listening.
132+
storeDir: /var/lib/milvus/nats # /var/lib/milvus/nats by default, directory to use for JetStream storage of nats.
133+
maxFileStore: 17179869184 # (B) 16GB by default, Maximum size of the 'file' storage.
134+
maxPayload: 8388608 # (B) 8MB by default, Maximum number of bytes in a message payload.
135+
maxPending: 67108864 # (B) 64MB by default, Maximum number of bytes buffered for a connection Applies to client connections.
136+
initializeTimeout: 4000 # (ms) 4s by default, waiting for initialization of natsmq finished.
137+
monitor:
138+
debug: false # false by default, If true enable debug log messages.
139+
logTime: true # true by default, If set to false, log without timestamps.
140+
logFile: # no log file by default, Log file path relative to.. .
141+
logSizeLimit: 0 # (B) 0, unlimited by default, Size in bytes after the log file rolls over to a new one.
142+
123143
# Related configuration of rootCoord, used to handle data definition language (DDL) and data control language (DCL) requests
124144
rootCoord:
125145
dmlChannelNum: 16 # The number of dml channels created at system startup
@@ -192,7 +212,7 @@ queryCoord:
192212
clientMaxRecvSize: 268435456
193213
taskMergeCap: 1
194214
taskExecutionCap: 256
195-
enableActiveStandby: false # Enable active-standby
215+
enableActiveStandby: false # Enable active-standby
196216

197217
# Related configuration of queryNode, used to run hybrid search between vector and scalar data.
198218
queryNode:
@@ -203,7 +223,7 @@ queryNode:
203223
stats:
204224
publishInterval: 1000 # Interval for querynode to report node information (milliseconds)
205225
segcore:
206-
knowhereThreadPoolNumRatio: 4
226+
knowhereThreadPoolNumRatio: 4
207227
# Use more threads to make better use of SSD throughput in disk index.
208228
# This parameter is only useful when enable-disk = true.
209229
# And this value should be a number greater than 1 and less than 32.
@@ -288,7 +308,7 @@ dataCoord:
288308
compactableProportion: 0.5
289309
# over (compactableProportion * segment max # of rows) rows.
290310
# MUST BE GREATER THAN OR EQUAL TO <smallProportion>!!!
291-
# During compaction, the size of segment # of rows is able to exceed segment max # of rows by (expansionRate-1) * 100%.
311+
# During compaction, the size of segment # of rows is able to exceed segment max # of rows by (expansionRate-1) * 100%.
292312
expansionRate: 1.25
293313
enableCompaction: true # Enable data segment compaction
294314
compaction:
@@ -331,7 +351,7 @@ dataNode:
331351
log:
332352
level: info # Only supports debug, info, warn, error, panic, or fatal. Default 'info'.
333353
file:
334-
rootPath: # root dir path to put logs, default "" means no log file will print. please adjust in embedded Milvus: /tmp/milvus/logs
354+
rootPath: # root dir path to put logs, default "" means no log file will print. please adjust in embedded Milvus: /tmp/milvus/logs
335355
maxSize: 300 # MB
336356
maxAge: 10 # Maximum time for log retention in day.
337357
maxBackups: 20
@@ -552,7 +572,7 @@ trace:
552572
# Fractions >= 1 will always sample. Fractions < 0 are treated as zero.
553573
sampleFraction: 0
554574
jaeger:
555-
url: # when exporter is jaeger should set the jaeger's URL
575+
url: # when exporter is jaeger should set the jaeger's URL
556576

557577
autoIndex:
558578
params:

go.mod

+15-9
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,10 @@ require (
1414
github.com/casbin/casbin/v2 v2.44.2
1515
github.com/casbin/json-adapter/v2 v2.0.0
1616
github.com/cockroachdb/errors v1.9.1
17-
github.com/confluentinc/confluent-kafka-go v1.9.1
1817
github.com/gin-gonic/gin v1.9.0
1918
github.com/gofrs/flock v0.8.1
2019
github.com/golang/protobuf v1.5.3
21-
github.com/klauspost/compress v1.14.4
20+
github.com/klauspost/compress v1.16.5
2221
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
2322
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230531124827-410c849303a9
2423
github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000
@@ -39,11 +38,11 @@ require (
3938
go.uber.org/atomic v1.10.0
4039
go.uber.org/multierr v1.6.0
4140
go.uber.org/zap v1.17.0
42-
golang.org/x/crypto v0.5.0
41+
golang.org/x/crypto v0.8.0
4342
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17
4443
golang.org/x/oauth2 v0.6.0
4544
golang.org/x/sync v0.1.0
46-
golang.org/x/text v0.8.0
45+
golang.org/x/text v0.9.0
4746
google.golang.org/grpc v1.54.0
4847
gorm.io/driver/mysql v1.3.5
4948
gorm.io/gorm v1.23.8
@@ -56,13 +55,20 @@ require (
5655
github.com/benesch/cgosymbolizer v0.0.0-20190515212042-bec6fe6e597b // indirect
5756
github.com/bytedance/sonic v1.8.0 // indirect
5857
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
58+
github.com/confluentinc/confluent-kafka-go v1.9.1 // indirect
5959
github.com/containerd/cgroups v1.1.0 // indirect
6060
github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c // indirect
6161
github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect
6262
github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 // indirect
63+
github.com/minio/highwayhash v1.0.2 // indirect
64+
github.com/nats-io/jwt/v2 v2.4.1 // indirect
65+
github.com/nats-io/nats-server/v2 v2.9.17 // indirect
66+
github.com/nats-io/nats.go v1.24.0 // indirect
67+
github.com/nats-io/nkeys v0.4.4 // indirect
68+
github.com/nats-io/nuid v1.0.1 // indirect
6369
github.com/pelletier/go-toml/v2 v2.0.6 // indirect
6470
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
65-
go.uber.org/automaxprocs v1.4.0 // indirect
71+
go.uber.org/automaxprocs v1.5.1 // indirect
6672
golang.org/x/arch v0.0.0-20210923205945-b76863e36670 // indirect
6773
)
6874

@@ -190,10 +196,10 @@ require (
190196
go.opentelemetry.io/otel/sdk v1.13.0 // indirect
191197
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
192198
golang.org/x/mod v0.9.0 // indirect
193-
golang.org/x/net v0.8.0 // indirect
194-
golang.org/x/sys v0.6.0 // indirect
195-
golang.org/x/term v0.6.0 // indirect
196-
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
199+
golang.org/x/net v0.9.0 // indirect
200+
golang.org/x/sys v0.7.0 // indirect
201+
golang.org/x/term v0.7.0 // indirect
202+
golang.org/x/time v0.3.0 // indirect
197203
golang.org/x/tools v0.7.0 // indirect
198204
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
199205
gonum.org/v1/gonum v0.9.3 // indirect

0 commit comments

Comments
 (0)