@@ -30,7 +30,6 @@ import (
30
30
"github.com/milvus-io/milvus/internal/allocator"
31
31
"github.com/milvus-io/milvus/internal/kv"
32
32
rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb"
33
- "github.com/milvus-io/milvus/pkg/common"
34
33
"github.com/milvus-io/milvus/pkg/log"
35
34
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
36
35
"github.com/milvus-io/milvus/pkg/util/hardware"
@@ -118,6 +117,7 @@ var topicMu = sync.Map{}
118
117
119
118
type rocksmq struct {
120
119
store * gorocksdb.DB
120
+ cfh []* gorocksdb.ColumnFamilyHandle
121
121
kv kv.BaseKV
122
122
idAllocator allocator.Interface
123
123
storeMu * sync.Mutex
@@ -222,8 +222,13 @@ func NewRocksMQ(name string, idAllocator allocator.Interface) (*rocksmq, error)
222
222
optsStore .IncreaseParallelism (parallelism )
223
223
// enable back ground flush
224
224
optsStore .SetMaxBackgroundFlushes (1 )
225
+ // use properties as the column families to store trace id
226
+ optsStore .SetCreateIfMissingColumnFamilies (true )
225
227
226
- db , err := gorocksdb .OpenDb (optsStore , name )
228
+ // db, err := gorocksdb.OpenDb(opts, name)
229
+ // use properties as the column families to store trace id
230
+ giveColumnFamilies := []string {"default" , "properties" }
231
+ db , cfHandles , err := gorocksdb .OpenDbColumnFamilies (optsStore , name , giveColumnFamilies , []* gorocksdb.Options {optsStore , optsStore })
227
232
if err != nil {
228
233
return nil , err
229
234
}
@@ -243,6 +248,7 @@ func NewRocksMQ(name string, idAllocator allocator.Interface) (*rocksmq, error)
243
248
244
249
rmq := & rocksmq {
245
250
store : db ,
251
+ cfh : cfHandles ,
246
252
kv : kv ,
247
253
idAllocator : mqIDAllocator ,
248
254
storeMu : & sync.Mutex {},
@@ -634,17 +640,17 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni
634
640
for i := 0 ; i < msgLen && idStart + UniqueID (i ) < idEnd ; i ++ {
635
641
msgID := idStart + UniqueID (i )
636
642
key := path .Join (topicName , strconv .FormatInt (msgID , 10 ))
637
- batch .Put ([]byte (key ), messages [i ].Payload )
638
- properties , err := json .Marshal (messages [i ].Properties )
639
- if err != nil {
640
- log .Warn ("properties marshal failed" ,
641
- zap .Int64 ("msgID" , msgID ),
642
- zap .String ("topicName" , topicName ),
643
- zap .Error (err ))
644
- return nil , err
643
+ batch .PutCF (rmq .cfh [0 ], []byte (key ), messages [i ].Payload )
644
+ // batch.Put([]byte(key), messages[i].Payload)
645
+ if messages [i ].Properties != nil {
646
+ properties , err := json .Marshal (messages [i ].Properties )
647
+ if err != nil {
648
+ log .Warn ("properties marshal failed" , zap .Int64 ("msgID" , msgID ), zap .String ("topicName" , topicName ),
649
+ zap .Error (err ))
650
+ return nil , err
651
+ }
652
+ batch .PutCF (rmq .cfh [1 ], []byte (key ), properties )
645
653
}
646
- pKey := path .Join (common .PropertiesKey , topicName , strconv .FormatInt (msgID , 10 ))
647
- batch .Put ([]byte (pKey ), properties )
648
654
msgIDs [i ] = msgID
649
655
msgSizes [msgID ] = int64 (len (messages [i ].Payload ))
650
656
}
@@ -777,8 +783,10 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
777
783
readOpts := gorocksdb .NewDefaultReadOptions ()
778
784
defer readOpts .Destroy ()
779
785
prefix := topicName + "/"
780
- iter := rocksdbkv .NewRocksIteratorWithUpperBound (rmq .store , typeutil .AddOne (prefix ), readOpts )
786
+ iter := rocksdbkv .NewRocksIteratorCFWithUpperBound (rmq .store , rmq .cfh [0 ], typeutil .AddOne (prefix ), readOpts )
787
+ iterProperty := rocksdbkv .NewRocksIteratorCFWithUpperBound (rmq .store , rmq .cfh [1 ], typeutil .AddOne (prefix ), readOpts )
781
788
defer iter .Close ()
789
+ defer iterProperty .Close ()
782
790
783
791
var dataKey string
784
792
if currentID == DefaultMessageID {
@@ -787,30 +795,39 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
787
795
dataKey = path .Join (topicName , strconv .FormatInt (currentID , 10 ))
788
796
}
789
797
iter .Seek ([]byte (dataKey ))
798
+ iterProperty .Seek ([]byte (dataKey ))
799
+
790
800
consumerMessage := make ([]ConsumerMessage , 0 , n )
791
801
offset := 0
802
+
792
803
for ; iter .Valid () && offset < n ; iter .Next () {
793
804
key := iter .Key ()
794
805
val := iter .Value ()
795
806
strKey := string (key .Data ())
796
807
key .Free ()
797
- offset ++
808
+ properties := make (map [string ]string )
809
+ var propertiesValue []byte
810
+
798
811
msgID , err := strconv .ParseInt (strKey [len (topicName )+ 1 :], 10 , 64 )
799
812
if err != nil {
800
813
val .Free ()
801
814
return nil , err
802
815
}
803
- askedProperties := path .Join (common .PropertiesKey , topicName , strconv .FormatInt (msgID , 10 ))
804
- opts := gorocksdb .NewDefaultReadOptions ()
805
- defer opts .Destroy ()
806
- propertiesValue , err := rmq .store .GetBytes (opts , []byte (askedProperties ))
807
- if err != nil {
808
- return nil , err
816
+ offset ++
817
+
818
+ if iterProperty .Valid () && string (iterProperty .Key ().Data ()) == string (iter .Key ().Data ()) {
819
+ // the key of properties is the same with the key of payload
820
+ // to prevent mix message with or without property column family
821
+ propertiesValue = iterProperty .Value ().Data ()
822
+ iterProperty .Next ()
809
823
}
810
- properties := make (map [string ]string )
824
+
825
+ // between 2.2.0 and 2.3.0, the key of Payload is topic/properties/msgid/Payload
826
+ // will ingnore the property before 2.3.0, just make sure property empty is ok for 2.3
827
+
828
+ // before 2.2.0, there have no properties in ProducerMessage and ConsumerMessage in rocksmq
829
+ // when produce before 2.2.0, but consume after 2.2.0, propertiesValue will be []
811
830
if len (propertiesValue ) != 0 {
812
- // before 2.2.0, there have no properties in ProducerMessage and ConsumerMessage in rocksmq
813
- // when produce before 2.2.0, but consume in 2.2.0, propertiesValue will be []
814
831
if err = json .Unmarshal (propertiesValue , & properties ); err != nil {
815
832
return nil , err
816
833
}
@@ -1008,11 +1025,11 @@ func (rmq *rocksmq) SeekToLatest(topicName, groupName string) error {
1008
1025
func (rmq * rocksmq ) getLatestMsg (topicName string ) (int64 , error ) {
1009
1026
readOpts := gorocksdb .NewDefaultReadOptions ()
1010
1027
defer readOpts .Destroy ()
1011
- iter := rocksdbkv .NewRocksIterator (rmq .store , readOpts )
1028
+ iter := rocksdbkv .NewRocksIteratorCF (rmq .store , rmq . cfh [ 0 ] , readOpts )
1012
1029
defer iter .Close ()
1013
1030
1014
1031
prefix := topicName + "/"
1015
- // seek to the last message of thie topic
1032
+ // seek to the last message of the topic
1016
1033
iter .SeekForPrev ([]byte (typeutil .AddOne (prefix )))
1017
1034
1018
1035
// if iterate fail
@@ -1037,6 +1054,7 @@ func (rmq *rocksmq) getLatestMsg(topicName string) (int64, error) {
1037
1054
}
1038
1055
1039
1056
msgID , err := strconv .ParseInt (seekMsgID [len (topicName )+ 1 :], 10 , 64 )
1057
+
1040
1058
if err != nil {
1041
1059
return DefaultMessageID , err
1042
1060
}
0 commit comments