Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/core/src/index/Meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ constexpr const char* SCALAR_INDEX_ENGINE_VERSION =
"scalar_index_engine_version";
constexpr const char* TANTIVY_INDEX_VERSION = "tantivy_index_version";
constexpr uint32_t TANTIVY_INDEX_LATEST_VERSION = 7;
constexpr const char* INDEX_NON_ENCODING = "index.nonEncoding";

// index meta
constexpr const char* COLLECTION_ID = "collection_id";
Expand Down
7 changes: 6 additions & 1 deletion internal/core/src/indexbuilder/index_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@
config[milvus::index::SCALAR_INDEX_ENGINE_VERSION] =
scalar_index_engine_version;

// check index encoding config
auto index_non_encoding_str =
config.value(milvus::index::INDEX_NON_ENCODING, "false");
bool index_non_encoding = index_non_encoding_str == "true";

Check warning on line 186 in internal/core/src/indexbuilder/index_c.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/indexbuilder/index_c.cpp#L184-L186

Added lines #L184 - L186 were not covered by tests

// init file manager
milvus::storage::FieldDataMeta field_meta{
build_index_info->collectionid(),
Expand All @@ -197,7 +202,7 @@
build_index_info->field_schema().name(),
field_type,
build_index_info->dim(),
};
index_non_encoding};

Check warning on line 205 in internal/core/src/indexbuilder/index_c.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/indexbuilder/index_c.cpp#L205

Added line #L205 was not covered by tests
auto chunk_manager =
milvus::storage::CreateChunkManager(storage_config);

Expand Down
9 changes: 6 additions & 3 deletions internal/core/src/storage/IndexData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ IndexData::Serialize(StorageType medium) {

std::vector<uint8_t>
IndexData::serialize_to_remote_file() {
AssertInfo(field_data_meta_.has_value(), "field data not exist");
AssertInfo(field_data_meta_.has_value(), "field data meta not exist");
AssertInfo(index_meta_.has_value(), "index meta not exist");
// create descriptor event
DescriptorEvent descriptor_event;
Expand All @@ -60,8 +60,11 @@ IndexData::serialize_to_remote_file() {
des_fix_part.field_id = field_data_meta_->field_id;
des_fix_part.start_timestamp = time_range_.first;
des_fix_part.end_timestamp = time_range_.second;
des_fix_part.data_type =
milvus::proto::schema::DataType(milvus::proto::schema::DataType::None);
des_fix_part.data_type = index_meta_->index_non_encoding
? milvus::proto::schema::DataType(
milvus::proto::schema::DataType::None)
: milvus::proto::schema::DataType(
payload_reader_->get_payload_datatype());
for (auto i = int8_t(EventType::DescriptorEvent);
i < int8_t(EventType::EventTypeEnd);
i++) {
Expand Down
3 changes: 3 additions & 0 deletions internal/core/src/storage/PayloadReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ class PayloadReader {
if (payload_buf_) {
return payload_buf_->Size();
}
if (field_data_) {
return field_data_->Size();
}
return 0;
}

Expand Down
1 change: 1 addition & 0 deletions internal/core/src/storage/Types.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ struct IndexMeta {
std::string field_name;
DataType field_type;
int64_t dim;
bool index_non_encoding;
};

struct StorageConfig {
Expand Down
18 changes: 14 additions & 4 deletions internal/core/src/storage/Util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -628,11 +628,21 @@
IndexMeta index_meta,
FieldDataMeta field_meta,
std::string object_key) {
std::shared_ptr<IndexData> index_data = nullptr;
if (index_meta.index_non_encoding) {
index_data = std::make_shared<IndexData>(buf, batch_size);

Check warning on line 633 in internal/core/src/storage/Util.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/Util.cpp#L633

Added line #L633 was not covered by tests
// index-build tasks assigned from new milvus-coord nodes to none-encoding
} else {
auto field_data = CreateFieldData(DataType::INT8, false);
field_data->FillFieldData(buf, batch_size);
auto payload_reader = std::make_shared<PayloadReader>(field_data);
index_data = std::make_shared<IndexData>(payload_reader);
// index-build tasks assigned from old milvus-coord nodes, fallback to int8 encoding
}
// index not use valid_data, so no need to set nullable==true
auto indexData = std::make_shared<IndexData>(buf, batch_size);
indexData->set_index_meta(index_meta);
indexData->SetFieldDataMeta(field_meta);
auto serialized_index_data = indexData->serialize_to_remote_file();
index_data->set_index_meta(index_meta);
index_data->SetFieldDataMeta(field_meta);
auto serialized_index_data = index_data->serialize_to_remote_file();
auto serialized_index_size = serialized_index_data.size();
chunk_manager->Write(
object_key, serialized_index_data.data(), serialized_index_size);
Expand Down
20 changes: 20 additions & 0 deletions internal/datacoord/index_engine_version_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,21 @@ type IndexEngineVersionManager interface {

GetCurrentScalarIndexEngineVersion() int32
GetMinimalScalarIndexEngineVersion() int32
GetIndexNonEncoding() bool
}

type versionManagerImpl struct {
mu lock.Mutex
versions map[int64]sessionutil.IndexEngineVersion
scalarIndexVersions map[int64]sessionutil.IndexEngineVersion
indexNonEncoding map[int64]bool
}

func newIndexEngineVersionManager() IndexEngineVersionManager {
return &versionManagerImpl{
versions: map[int64]sessionutil.IndexEngineVersion{},
scalarIndexVersions: map[int64]sessionutil.IndexEngineVersion{},
indexNonEncoding: map[int64]bool{},
}
}

Expand All @@ -58,6 +61,7 @@ func (m *versionManagerImpl) RemoveNode(session *sessionutil.Session) {

delete(m.versions, session.ServerID)
delete(m.scalarIndexVersions, session.ServerID)
delete(m.indexNonEncoding, session.ServerID)
}

func (m *versionManagerImpl) Update(session *sessionutil.Session) {
Expand All @@ -71,6 +75,7 @@ func (m *versionManagerImpl) addOrUpdate(session *sessionutil.Session) {
log.Info("addOrUpdate version", zap.Int64("nodeId", session.ServerID), zap.Int32("minimal", session.IndexEngineVersion.MinimalIndexVersion), zap.Int32("current", session.IndexEngineVersion.CurrentIndexVersion))
m.versions[session.ServerID] = session.IndexEngineVersion
m.scalarIndexVersions[session.ServerID] = session.ScalarIndexEngineVersion
m.indexNonEncoding[session.ServerID] = session.IndexNonEncoding
}

func (m *versionManagerImpl) GetCurrentIndexEngineVersion() int32 {
Expand Down Expand Up @@ -148,3 +153,18 @@ func (m *versionManagerImpl) GetMinimalScalarIndexEngineVersion() int32 {
log.Info("Merged minimal scalar index version", zap.Int32("minimal", minimal))
return minimal
}

func (m *versionManagerImpl) GetIndexNonEncoding() bool {
m.mu.Lock()
defer m.mu.Unlock()
if len(m.indexNonEncoding) == 0 {
log.Info("indexNonEncoding map is empty")
// by default, we fall back to old index format for safety
return false
}
noneEncoding := true
for _, encoding := range m.indexNonEncoding {
noneEncoding = noneEncoding && encoding
}
return noneEncoding
}
50 changes: 50 additions & 0 deletions internal/datacoord/index_engine_version_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,53 @@ func Test_IndexEngineVersionManager_GetMergedScalarIndexVersion(t *testing.T) {
assert.Equal(t, int32(20), m.GetCurrentScalarIndexEngineVersion())
assert.Equal(t, int32(0), m.GetMinimalScalarIndexEngineVersion())
}

func Test_IndexEngineVersionManager_GetIndexNoneEncoding(t *testing.T) {
m := newIndexEngineVersionManager()

// empty
assert.False(t, m.GetIndexNonEncoding())

// startup
m.Startup(map[string]*sessionutil.Session{
"1": {
SessionRaw: sessionutil.SessionRaw{
ServerID: 1,
ScalarIndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 20, MinimalIndexVersion: 0},
IndexNonEncoding: false,
},
},
})
assert.False(t, m.GetIndexNonEncoding())

// add node
m.AddNode(&sessionutil.Session{
SessionRaw: sessionutil.SessionRaw{
ServerID: 2,
ScalarIndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 10, MinimalIndexVersion: 5},
IndexNonEncoding: true,
},
})
// server1 is still use int8 encoding, the global index encoding must be int8
assert.False(t, m.GetIndexNonEncoding())

// update
m.Update(&sessionutil.Session{
SessionRaw: sessionutil.SessionRaw{
ServerID: 2,
ScalarIndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 5, MinimalIndexVersion: 2},
IndexNonEncoding: true,
},
})
assert.False(t, m.GetIndexNonEncoding())

// remove
m.RemoveNode(&sessionutil.Session{
SessionRaw: sessionutil.SessionRaw{
ServerID: 1,
IndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 5, MinimalIndexVersion: 3},
},
})
// after removing server1, then global none encoding should be true
assert.True(t, m.GetIndexNonEncoding())
}
45 changes: 45 additions & 0 deletions internal/datacoord/mock_index_engine_version_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,8 @@ func (s *Server) handleSessionEvent(ctx context.Context, role string, event *ses
case sessionutil.SessionAddEvent:
log.Info("received querynode register",
zap.String("address", event.Session.Address),
zap.Int64("serverID", event.Session.ServerID))
zap.Int64("serverID", event.Session.ServerID),
zap.Bool("indexNonEncoding", event.Session.IndexNonEncoding))
s.indexEngineVersionManager.AddNode(event.Session)
case sessionutil.SessionDelEvent:
log.Info("received querynode unregister",
Expand Down
45 changes: 0 additions & 45 deletions internal/datacoord/session/mock_worker_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion internal/datacoord/task_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,14 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule
}
}
}

indexNonEncoding := "false"
if dependency.indexEngineVersionManager.GetIndexNonEncoding() {
indexNonEncoding = "true"
}
indexParams = append(indexParams, &commonpb.KeyValuePair{
Key: common.IndexNonEncoding,
Value: indexNonEncoding,
})
it.req = &workerpb.CreateJobRequest{
ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(),
IndexFilePrefix: path.Join(dependency.chunkManager.RootPath(), common.SegmentIndexPath),
Expand Down Expand Up @@ -278,6 +285,7 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule
zap.Int64("segID", segment.GetID()),
zap.Int32("CurrentIndexVersion", it.req.GetCurrentIndexVersion()),
zap.Int32("CurrentScalarIndexVersion", it.req.GetCurrentScalarIndexVersion()),
zap.String("IndexNonEncoding", indexNonEncoding),
zap.Int64("segID", segment.GetID()))
return true
}
Expand Down
3 changes: 2 additions & 1 deletion internal/querynodev2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ func (node *QueryNode) initSession() error {
minimalIndexVersion, currentIndexVersion := getIndexEngineVersion()
node.session = sessionutil.NewSession(node.ctx,
sessionutil.WithIndexEngineVersion(minimalIndexVersion, currentIndexVersion),
sessionutil.WithScalarIndexEngineVersion(common.MinimalScalarIndexEngineVersion, common.CurrentScalarIndexEngineVersion))
sessionutil.WithScalarIndexEngineVersion(common.MinimalScalarIndexEngineVersion, common.CurrentScalarIndexEngineVersion),
sessionutil.WithIndexNonEncoding())
if node.session == nil {
return fmt.Errorf("session is nil, the etcd client connection may have failed")
}
Expand Down
7 changes: 7 additions & 0 deletions internal/util/sessionutil/session_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type SessionRaw struct {
Version string `json:"Version"`
IndexEngineVersion IndexEngineVersion `json:"IndexEngineVersion,omitempty"`
ScalarIndexEngineVersion IndexEngineVersion `json:"ScalarIndexEngineVersion,omitempty"`
IndexNonEncoding bool `json:"IndexNonEncoding,omitempty"`
LeaseID *clientv3.LeaseID `json:"LeaseID,omitempty"`

HostName string `json:"HostName,omitempty"`
Expand Down Expand Up @@ -194,6 +195,12 @@ func WithScalarIndexEngineVersion(minimal, current int32) SessionOption {
}
}

func WithIndexNonEncoding() SessionOption {
return func(session *Session) {
session.IndexNonEncoding = true
}
}

func (s *Session) apply(opts ...SessionOption) {
for _, opt := range opts {
opt(s)
Expand Down
1 change: 1 addition & 0 deletions pkg/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ const (
IndexOffsetCacheEnabledKey = "indexoffsetcache.enabled"
ReplicateIDKey = "replicate.id"
ReplicateEndTSKey = "replicate.endTS"
IndexNonEncoding = "index.nonEncoding"
)

const (
Expand Down
Loading