Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions docs/en/connector-v2/sink/Milvus.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,20 @@ This Milvus sink connector write data to Milvus or Zilliz Cloud, it has the foll

## Sink Options

| Name | Type | Required | Default | Description |
|----------------------|---------|----------|------------------------------|------------------------------------------------------------------------------------|
| url | String | Yes | - | The URL to connect to Milvus or Zilliz Cloud. |
| token | String | Yes | - | User:password |
| database | String | No | - | Write data to which database, default is source database. |
| schema_save_mode | enum | No | CREATE_SCHEMA_WHEN_NOT_EXIST | Auto create table when table not exist. |
| enable_auto_id | boolean | No | false | Primary key column enable autoId. |
| enable_upsert | boolean | No | false | Upsert data not insert. |
| enable_dynamic_field | boolean | No | true | Enable create table with dynamic field. |
| batch_size | int | No | 1000 | Write batch size. |
| partition_key | String | No | | Milvus partition key field |
| create_index | boolean | No | false | Automatically create vector indexes for collection to improve query performance. |
| load_collection | boolean | No | false | Load collection into Milvus memory for immediate query availability. |
| collection_description | Map<String, String> | No | {} | Collection descriptions map where key is collection name and value is description. |
| Name | Type | Required | Default | Description |
|------------------------|---------------------|----------|------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------|
| url | String | Yes | - | The URL to connect to Milvus or Zilliz Cloud. |
| token | String | Yes | - | User:password |
| database | String | No | - | Write data to which database, default is source database. |
| schema_save_mode | enum | No | CREATE_SCHEMA_WHEN_NOT_EXIST | Auto create table when table not exist. |
| enable_auto_id | boolean | No | false | Primary key column enable autoId. |
| enable_upsert | boolean | No | false | Upsert data not insert. |
| enable_dynamic_field | boolean | No | true | Enable create table with dynamic field. |
| batch_size | int | No | 1000 | Write batch size. When the number of buffered records reaches `batch_size` or the time reaches `checkpoint.interval`, it will trigger a write flush |
| partition_key | String | No | | Milvus partition key field |
| create_index | boolean | No | false | Automatically create vector indexes for collection to improve query performance. |
| load_collection | boolean | No | false | Load collection into Milvus memory for immediate query availability. |
| collection_description | Map<String, String> | No | {} | Collection descriptions map where key is collection name and value is description. |

## Task Example

Expand Down
45 changes: 33 additions & 12 deletions docs/zh/connector-v2/sink/Milvus.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Milvus sink连接器将数据写入Milvus或Zilliz Cloud,它具有以下功能

##数据类型映射

| Milvus数据类型 | SeaTunnel 数据类型 |
| Milvus数据类型 | SeaTunnel 数据类型 |
|---------------------|---------------------|
| INT8 | TINYINT |
| INT16 | SMALLINT |
Expand All @@ -39,20 +39,24 @@ Milvus sink连接器将数据写入Milvus或Zilliz Cloud,它具有以下功能

## Sink 选项

| 名字 | 类型 | 是否必传 | 默认值 | 描述 |
|----------------------|---------|----------|------------------------------|-----------------------------------------------------------|
| url | String | 是 | - | 连接到Milvus或Zilliz Cloud的URL。 |
| token | String | 是 | - | 用户:密码 |
| database | String | 否 | - | 将数据写入哪个数据库,默认为源数据库。 |
| schema_save_mode | enum | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST | 当表不存在时自动创建表。 |
| enable_auto_id | boolean | 否 | false | 主键列启用autoId。 |
| enable_upsert | boolean | 否 | false | 是否启用upsert。 |
| enable_dynamic_field | boolean | 否 | true | 是否启用带动态字段的创建表。 |
| batch_size | int | 否 | 1000 | 写入批大小。 |
| partition_key | String | 否 | | Milvus分区键字段 |
| 名字 | 类型 | 是否必传 | 默认值 | 描述 |
|------------------------|---------------------|------|------------------------------|---------------------------------------------------------------------|
| url | String | 是 | - | 连接到Milvus或Zilliz Cloud的URL。 |
| token | String | 是 | - | 用户:密码 |
| database | String | 否 | - | 将数据写入哪个数据库,默认为源数据库。 |
| schema_save_mode | enum | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST | 当表不存在时自动创建表。 |
| enable_auto_id | boolean | 否 | false | 主键列启用autoId。 |
| enable_upsert | boolean | 否 | false | 是否启用upsert。 |
| enable_dynamic_field | boolean | 否 | true | 是否启用带动态字段的创建表。 |
| batch_size | int | 否 | 1000 | 写入批大小。当缓冲记录数达到 `batch_size` 或时间达到 `checkpoint.interval` 时,将触发一次写入刷新 |
| partition_key | String | 否 | | Milvus分区键字段 |
| create_index | boolean | No | false | 自动为集合创建向量索引以提高查询性能 |
| load_collection | boolean | No | false | 将集合加载到 Milvus 内存中以便立即进行查询 |
| collection_description | Map<String, String> | No | {} | 集合描述映射,其中键是集合名称,值是描述 |

## 任务示例

### 基础配置
```bash
sink {
Milvus {
Expand All @@ -63,6 +67,23 @@ sink {
}
```

### 带 Index 和 Loading 的高级配置
```bash
sink {
Milvus {
url = "http://127.0.0.1:19530"
token = "username:password"
batch_size = 1000
create_index = true
load_collection = true
collection_description = {
"user_vectors" = "User embedding vectors for recommendation"
"product_vectors" = "Product feature vectors for search"
}
}
}
```

## 变更日志

<ChangeLog />
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public class MilvusSinkOptions extends MilvusBaseOptions {
.intType()
.defaultValue(1000)
.withDescription("writer batch size");

public static final Option<Integer> RATE_LIMIT =
Options.key("rate_limit")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,7 @@ public MilvusSinkWriter(
public void write(SeaTunnelRow element) {
batchWriter.addToBatch(element);
if (batchWriter.needFlush()) {
try {
// Flush the batch writer
batchWriter.flush();
} catch (Exception e) {
log.error("flush Milvus sink writer failed", e);
throw new MilvusConnectorException(MilvusConnectionErrorCode.WRITE_DATA_FAIL, e);
}
flush();
}
}

Expand All @@ -81,6 +75,7 @@ public void write(SeaTunnelRow element) {
*/
@Override
public Optional<MilvusCommitInfo> prepareCommit() throws IOException {
flush();
return Optional.empty();
}

Expand Down Expand Up @@ -110,4 +105,14 @@ public void close() throws IOException {
throw new MilvusConnectorException(MilvusConnectionErrorCode.CLOSE_CLIENT_ERROR, e);
}
}

private void flush() {
try {
// Flush the batch writer
batchWriter.flush();
} catch (Exception e) {
log.error("flush Milvus sink writer failed", e);
throw new MilvusConnectorException(MilvusConnectionErrorCode.WRITE_DATA_FAIL, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import io.milvus.grpc.IndexDescription;
import io.milvus.grpc.KeyValuePair;
import io.milvus.grpc.MutationResult;
import io.milvus.grpc.QueryResults;
import io.milvus.param.ConnectParam;
import io.milvus.param.IndexType;
import io.milvus.param.MetricType;
Expand All @@ -69,6 +70,7 @@
import io.milvus.param.collection.HasCollectionParam;
import io.milvus.param.collection.LoadCollectionParam;
import io.milvus.param.dml.InsertParam;
import io.milvus.param.dml.QueryParam;
import io.milvus.param.index.CreateIndexParam;
import io.milvus.param.index.DescribeIndexParam;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -82,6 +84,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -715,4 +719,114 @@ private void verifyIndexesExist(String database, String collection) {

log.info("Index verification passed for collection: {}.{}", database, collection);
}

@TestTemplate
public void testStreamingFakeToMilvus(TestContainer container)
throws IOException, InterruptedException {
// flush by checkpoint interval
String jobId = "1";
String database = "streaming_test";
String collection = "streaming_simple_example";
String vectorField = "book_intro";
int checkpointInterval = 30000;
CompletableFuture.runAsync(
() -> {
try {
container.executeJob(
"/streaming-fake-to-milvus.conf",
jobId,
"database=" + database,
"collection=" + collection,
"batch_size=3");
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
});

// count write records
waitCollectionReady(database, collection, vectorField);
Awaitility.await()
.atMost(60, TimeUnit.SECONDS)
.pollInterval(2, TimeUnit.SECONDS)
.until(() -> countCollectionEntities(database, collection) >= 9);
Assertions.assertEquals(9, countCollectionEntities(database, collection));
TimeUnit.MILLISECONDS.sleep(checkpointInterval);
Assertions.assertEquals(10, countCollectionEntities(database, collection));

// cancel jobs
container.cancelJob(jobId);
}

private void waitCollectionReady(
String databaseName, String collectionName, String vectorFieldName) {
// assert table exist
Awaitility.await()
.atMost(60, TimeUnit.SECONDS)
.pollInterval(2, TimeUnit.SECONDS)
.until(
() -> {
R<Boolean> hasCollectionResponse =
this.milvusClient.hasCollection(
HasCollectionParam.newBuilder()
.withDatabaseName(databaseName)
.withCollectionName(collectionName)
.build());
Assertions.assertEquals(
R.Status.Success.getCode(),
hasCollectionResponse.getStatus(),
Optional.ofNullable(hasCollectionResponse.getException())
.map(Exception::getMessage)
.orElse(""));
return hasCollectionResponse.getData();
});

// create index
R<RpcStatus> createIndexResponse =
milvusClient.createIndex(
CreateIndexParam.newBuilder()
.withDatabaseName(databaseName)
.withCollectionName(collectionName)
.withFieldName(vectorFieldName)
.withIndexType(IndexType.FLAT)
.withMetricType(MetricType.L2)
.build());
Assertions.assertEquals(
R.Status.Success.getCode(),
createIndexResponse.getStatus(),
Optional.ofNullable(createIndexResponse.getException())
.map(Exception::getMessage)
.orElse(""));

// load collection
R<RpcStatus> loadCollectionResponse =
milvusClient.loadCollection(
LoadCollectionParam.newBuilder()
.withDatabaseName(databaseName)
.withCollectionName(collectionName)
.build());
Assertions.assertEquals(
R.Status.Success.getCode(),
loadCollectionResponse.getStatus(),
Optional.ofNullable(loadCollectionResponse.getException())
.map(Exception::getMessage)
.orElse(""));
}

private long countCollectionEntities(String databaseName, String collectionName) {
R<QueryResults> queryResults =
milvusClient.query(
QueryParam.newBuilder()
.withDatabaseName(databaseName)
.withCollectionName(collectionName)
.withOutFields(Collections.singletonList("count(*)"))
.build());
Assertions.assertEquals(R.Status.Success.getCode(), queryResults.getStatus());
return queryResults
.getData()
.getFieldsData(0)
.getScalars()
.getLongData()
.getDataList()
.get(0);
}
}
Loading