From 13ed94c33f842bd81a8afffe996c5c43387761f7 Mon Sep 17 00:00:00 2001 From: "chenlantian.michael" Date: Fri, 17 Oct 2025 12:21:18 +0800 Subject: [PATCH 1/8] feat: support milvus sink writer flush by interval --- .../milvus/config/MilvusSinkOptions.java | 7 ++ .../milvus/sink/MilvusBufferBatchWriter.java | 4 + .../milvus/sink/MilvusSinkWriter.java | 84 ++++++++++++++-- .../e2e/connector/v2/milvus/MilvusIT.java | 97 +++++++++++++++++++ .../resources/streaming-fake-to-milvus.conf | 69 +++++++++++++ 5 files changed, 253 insertions(+), 8 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/streaming-fake-to-milvus.conf diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkOptions.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkOptions.java index 1c7d00f3a98..176a194c229 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkOptions.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkOptions.java @@ -82,6 +82,13 @@ public class MilvusSinkOptions extends MilvusBaseOptions { .intType() .defaultValue(1000) .withDescription("writer batch size"); + + public static final Option BATCH_INTERVAL = + Options.key("batch_interval") + .intType() + .defaultValue(1000) + .withDescription("writer batch interval"); + public static final Option RATE_LIMIT = Options.key("rate_limit") .intType() diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusBufferBatchWriter.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusBufferBatchWriter.java index 73f4ac516e3..71408b4a00a 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusBufferBatchWriter.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusBufferBatchWriter.java @@ -220,6 +220,10 @@ public boolean needFlush() { return this.writeCache.get() >= this.batchSize; } + public long cachedSize() { + return this.writeCache.get(); + } + public void flush() throws Exception { log.info("Starting to put {} records to Milvus.", this.writeCache.get()); // Flush the batch writer diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java index 98b2b46c3b4..3c693ed6e8a 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkOptions; import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode; import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException; import org.apache.seatunnel.connectors.seatunnel.milvus.state.MilvusCommitInfo; @@ -32,6 +33,11 @@ import java.io.IOException; import java.util.List; import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; /** MilvusSinkWriter is a sink writer that will write {@link SeaTunnelRow} to Milvus. */ @Slf4j @@ -41,6 +47,12 @@ public class MilvusSinkWriter private final MilvusBufferBatchWriter batchWriter; private ReadonlyConfig config; + private final ScheduledExecutorService scheduler; + + private final ReentrantLock lock = new ReentrantLock(); + + private final AtomicReference exception = new AtomicReference<>(); + public MilvusSinkWriter( Context context, CatalogTable catalogTable, @@ -48,6 +60,16 @@ public MilvusSinkWriter( List milvusSinkStates) { this.batchWriter = new MilvusBufferBatchWriter(catalogTable, config); this.config = config; + int batchInterval = config.get(MilvusSinkOptions.BATCH_INTERVAL); + this.scheduler = batchInterval > 0 ? Executors.newSingleThreadScheduledExecutor() : null; + if (scheduler != null) { + log.info("create Milvus sink writer with batch interval: {}", batchInterval); + scheduler.scheduleAtFixedRate( + new BatchWriterFlushRunnable(batchWriter), + 0, + batchInterval, + TimeUnit.MILLISECONDS); + } log.info("create Milvus sink writer success"); log.info("MilvusSinkWriter config: " + config); } @@ -59,15 +81,22 @@ public MilvusSinkWriter( */ @Override public void write(SeaTunnelRow element) { - batchWriter.addToBatch(element); - if (batchWriter.needFlush()) { - try { - // Flush the batch writer - batchWriter.flush(); - } catch (Exception e) { - log.error("flush Milvus sink writer failed", e); - throw new MilvusConnectorException(MilvusConnectionErrorCode.WRITE_DATA_FAIL, e); + try { + lock.lock(); + checkExceptionAndRethrow(); + batchWriter.addToBatch(element); + if (batchWriter.needFlush()) { + try { + // Flush the batch writer + batchWriter.flush(); + } catch (Exception e) { + log.error("flush Milvus sink writer failed", e); + throw new MilvusConnectorException( + MilvusConnectionErrorCode.WRITE_DATA_FAIL, e); + } } + } finally { + lock.unlock(); } } @@ -101,13 +130,52 @@ public void abortPrepare() {} @Override public void close() throws IOException { try { + lock.lock(); + // make sure any errors from batch writer flush are handled + checkExceptionAndRethrow(); log.info("Stopping Milvus Client"); batchWriter.flush(); batchWriter.close(); + scheduler.shutdown(); log.info("Stop Milvus Client success"); } catch (Exception e) { log.error("Stop Milvus Client failed", e); throw new MilvusConnectorException(MilvusConnectionErrorCode.CLOSE_CLIENT_ERROR, e); + } finally { + lock.unlock(); + } + } + + /** BatchWriterFlushRunnable is a runnable that will invoke batch writer flush method */ + private class BatchWriterFlushRunnable implements Runnable { + private final MilvusBufferBatchWriter batchWriter; + + public BatchWriterFlushRunnable(MilvusBufferBatchWriter batchWriter) { + this.batchWriter = batchWriter; + } + + @Override + public void run() { + try { + lock.lock(); + if (batchWriter.cachedSize() == 0) { + return; + } + batchWriter.flush(); + } catch (Exception e) { + log.error("flush Milvus sink writer failed", e); + exception.compareAndSet(null, e); + } finally { + lock.unlock(); + } + } + } + + /** Check exception in current thread, if exception is not null, throw it. */ + private void checkExceptionAndRethrow() { + if (exception.get() != null) { + throw new MilvusConnectorException( + MilvusConnectionErrorCode.WRITE_DATA_FAIL, exception.get()); } } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java index 3aed4f14550..e7f7b84f4d3 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.e2e.connector.v2.milvus; +import io.milvus.grpc.QueryResults; +import io.milvus.param.dml.QueryParam; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.table.catalog.Catalog; import org.apache.seatunnel.api.table.catalog.CatalogTable; @@ -715,4 +717,99 @@ private void verifyIndexesExist(String database, String collection) { log.info("Index verification passed for collection: {}.{}", database, collection); } + + @TestTemplate + public void testStreamingFakeToMilvus(TestContainer container) throws IOException, InterruptedException { + // collection1 not flush by batch interval + String jobId1 = "1"; + String database1 = "test1"; + String collection1 = "simple_example_1"; + new Thread(() -> { + try { + container.executeJob("/streaming-fake-to-milvus.conf", jobId1, + "database=" + database1, "collection=" + collection1, "batch_size=3", "batch_interval=0"); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + }).start(); + + // collection2 flush by batch interval + String jobId2 = "2"; + String database2 = "test2"; + String collection2 = "simple_example_2"; + new Thread(() -> { + try { + container.executeJob("/streaming-fake-to-milvus.conf", jobId2, + "database=" + database2, "collection=" + collection2, "batch_size=3", "batch_interval=1000"); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + }).start(); + + // collection2 count write records + long count; + waitCollectionReady(database2, collection2); + do { + count = countCollectionEntities(database2, collection2); + } while (count < 9); + TimeUnit.SECONDS.sleep(10); + count = countCollectionEntities(database2, collection2); + Assertions.assertEquals(10, count); + + // collection1 count write records + waitCollectionReady(database1, collection1); + do { + count = countCollectionEntities(database1, collection1); + } while (count < 9); + TimeUnit.SECONDS.sleep(10); + count = countCollectionEntities(database1, collection1); + Assertions.assertEquals(9, count); + + // cancel jobs + container.cancelJob(jobId1); + container.cancelJob(jobId2); + } + + private void waitCollectionReady(String databaseName, String collectionName) throws InterruptedException { + // assert table exist + R hasCollectionResponse; + do { + TimeUnit.SECONDS.sleep(1); + hasCollectionResponse = + this.milvusClient.hasCollection( + HasCollectionParam.newBuilder() + .withDatabaseName(databaseName) + .withCollectionName(collectionName) + .build()); + Assertions.assertEquals(R.Status.Success.getCode(), hasCollectionResponse.getStatus()); + } while (!hasCollectionResponse.getData()); + + // assert index exist + R describeIndexResponse; + do { + TimeUnit.SECONDS.sleep(1); + describeIndexResponse = + this.milvusClient.describeIndex( + DescribeIndexParam.newBuilder() + .withDatabaseName(databaseName) + .withCollectionName(collectionName) + .build()); + Assertions.assertEquals(R.Status.Success.getCode(), describeIndexResponse.getStatus()); + } while (describeIndexResponse.getData() == null || describeIndexResponse.getData().getIndexDescriptionsList().isEmpty()); + + // load collection + R loadCollectionResponse = milvusClient.loadCollection(LoadCollectionParam.newBuilder() + .withDatabaseName(databaseName).withCollectionName(collectionName).build()); + Assertions.assertEquals(R.Status.Success.getCode(), loadCollectionResponse.getStatus()); + } + + private long countCollectionEntities(String databaseName, String collectionName) { + R queryResults = milvusClient.query(QueryParam.newBuilder() + .withDatabaseName(databaseName) + .withCollectionName(collectionName) + .withOutFields(Collections.singletonList("count(*)")) + .build()); + Assertions.assertEquals(R.Status.Success.getCode(), queryResults.getStatus()); + return queryResults.getData().getFieldsData(0).getScalars().getLongData().getDataList().get(0); + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/streaming-fake-to-milvus.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/streaming-fake-to-milvus.conf new file mode 100644 index 00000000000..eaa4a315930 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/streaming-fake-to-milvus.conf @@ -0,0 +1,69 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "STREAMING" +} + +source { + FakeSource { + row.num = 10 + vector.dimension= 4 + schema = { + table = ${collection} + columns = [ + { + name = book_id + type = bigint + nullable = false + defaultValue = 0 + comment = "primary key id" + }, + { + name = book_intro + type = float_vector + columnScale =4 + comment = "vector" + }, + { + name = book_title + type = string + nullable = true + comment = "topic" + } + ] + primaryKey { + name = book_id + columnNames = [book_id] + } + } + } +} + +sink { + Milvus { + url = "http://milvus-e2e:19530" + token = "root:Milvus" + database = ${database} + load_collection = true + create_index = true + enable_upsert = false + batch_size = ${batch_size} + batch_interval = ${batch_interval} + } +} \ No newline at end of file From d1edacd984884764556a894e58709f7246615ceb Mon Sep 17 00:00:00 2001 From: "chenlantian.michael" Date: Fri, 17 Oct 2025 13:00:04 +0800 Subject: [PATCH 2/8] chore: code style --- .../e2e/connector/v2/milvus/MilvusIT.java | 87 ++++++++++++------- 1 file changed, 58 insertions(+), 29 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java index e7f7b84f4d3..1f2a4874962 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java @@ -17,8 +17,6 @@ package org.apache.seatunnel.e2e.connector.v2.milvus; -import io.milvus.grpc.QueryResults; -import io.milvus.param.dml.QueryParam; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.table.catalog.Catalog; import org.apache.seatunnel.api.table.catalog.CatalogTable; @@ -60,6 +58,7 @@ import io.milvus.grpc.IndexDescription; import io.milvus.grpc.KeyValuePair; import io.milvus.grpc.MutationResult; +import io.milvus.grpc.QueryResults; import io.milvus.param.ConnectParam; import io.milvus.param.IndexType; import io.milvus.param.MetricType; @@ -71,6 +70,7 @@ import io.milvus.param.collection.HasCollectionParam; import io.milvus.param.collection.LoadCollectionParam; import io.milvus.param.dml.InsertParam; +import io.milvus.param.dml.QueryParam; import io.milvus.param.index.CreateIndexParam; import io.milvus.param.index.DescribeIndexParam; import lombok.extern.slf4j.Slf4j; @@ -719,32 +719,47 @@ private void verifyIndexesExist(String database, String collection) { } @TestTemplate - public void testStreamingFakeToMilvus(TestContainer container) throws IOException, InterruptedException { + public void testStreamingFakeToMilvus(TestContainer container) + throws IOException, InterruptedException { // collection1 not flush by batch interval String jobId1 = "1"; String database1 = "test1"; String collection1 = "simple_example_1"; - new Thread(() -> { - try { - container.executeJob("/streaming-fake-to-milvus.conf", jobId1, - "database=" + database1, "collection=" + collection1, "batch_size=3", "batch_interval=0"); - } catch (IOException | InterruptedException e) { - throw new RuntimeException(e); - } - }).start(); + new Thread( + () -> { + try { + container.executeJob( + "/streaming-fake-to-milvus.conf", + jobId1, + "database=" + database1, + "collection=" + collection1, + "batch_size=3", + "batch_interval=0"); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + }) + .start(); // collection2 flush by batch interval String jobId2 = "2"; String database2 = "test2"; String collection2 = "simple_example_2"; - new Thread(() -> { - try { - container.executeJob("/streaming-fake-to-milvus.conf", jobId2, - "database=" + database2, "collection=" + collection2, "batch_size=3", "batch_interval=1000"); - } catch (IOException | InterruptedException e) { - throw new RuntimeException(e); - } - }).start(); + new Thread( + () -> { + try { + container.executeJob( + "/streaming-fake-to-milvus.conf", + jobId2, + "database=" + database2, + "collection=" + collection2, + "batch_size=3", + "batch_interval=1000"); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + }) + .start(); // collection2 count write records long count; @@ -770,7 +785,8 @@ public void testStreamingFakeToMilvus(TestContainer container) throws IOExceptio container.cancelJob(jobId2); } - private void waitCollectionReady(String databaseName, String collectionName) throws InterruptedException { + private void waitCollectionReady(String databaseName, String collectionName) + throws InterruptedException { // assert table exist R hasCollectionResponse; do { @@ -795,21 +811,34 @@ private void waitCollectionReady(String databaseName, String collectionName) thr .withCollectionName(collectionName) .build()); Assertions.assertEquals(R.Status.Success.getCode(), describeIndexResponse.getStatus()); - } while (describeIndexResponse.getData() == null || describeIndexResponse.getData().getIndexDescriptionsList().isEmpty()); + } while (describeIndexResponse.getData() == null + || describeIndexResponse.getData().getIndexDescriptionsList().isEmpty()); // load collection - R loadCollectionResponse = milvusClient.loadCollection(LoadCollectionParam.newBuilder() - .withDatabaseName(databaseName).withCollectionName(collectionName).build()); + R loadCollectionResponse = + milvusClient.loadCollection( + LoadCollectionParam.newBuilder() + .withDatabaseName(databaseName) + .withCollectionName(collectionName) + .build()); Assertions.assertEquals(R.Status.Success.getCode(), loadCollectionResponse.getStatus()); } private long countCollectionEntities(String databaseName, String collectionName) { - R queryResults = milvusClient.query(QueryParam.newBuilder() - .withDatabaseName(databaseName) - .withCollectionName(collectionName) - .withOutFields(Collections.singletonList("count(*)")) - .build()); + R queryResults = + milvusClient.query( + QueryParam.newBuilder() + .withDatabaseName(databaseName) + .withCollectionName(collectionName) + .withOutFields(Collections.singletonList("count(*)")) + .build()); Assertions.assertEquals(R.Status.Success.getCode(), queryResults.getStatus()); - return queryResults.getData().getFieldsData(0).getScalars().getLongData().getDataList().get(0); + return queryResults + .getData() + .getFieldsData(0) + .getScalars() + .getLongData() + .getDataList() + .get(0); } } From 861f06f48f94bdfd69d61d1ae3e4c7c40e1cce80 Mon Sep 17 00:00:00 2001 From: "chenlantian.michael" Date: Fri, 17 Oct 2025 14:34:56 +0800 Subject: [PATCH 3/8] chore: create collection index by manul --- .../e2e/connector/v2/milvus/MilvusIT.java | 57 ++++++++++++------- .../resources/streaming-fake-to-milvus.conf | 2 - 2 files changed, 37 insertions(+), 22 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java index 1f2a4874962..477f7f96072 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java @@ -84,6 +84,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -725,6 +726,7 @@ public void testStreamingFakeToMilvus(TestContainer container) String jobId1 = "1"; String database1 = "test1"; String collection1 = "simple_example_1"; + String vectorField1 = "book_intro"; new Thread( () -> { try { @@ -745,6 +747,7 @@ public void testStreamingFakeToMilvus(TestContainer container) String jobId2 = "2"; String database2 = "test2"; String collection2 = "simple_example_2"; + String vectorField2 = "book_intro"; new Thread( () -> { try { @@ -763,20 +766,20 @@ public void testStreamingFakeToMilvus(TestContainer container) // collection2 count write records long count; - waitCollectionReady(database2, collection2); + waitCollectionReady(database2, collection2, vectorField2); do { count = countCollectionEntities(database2, collection2); } while (count < 9); - TimeUnit.SECONDS.sleep(10); + TimeUnit.SECONDS.sleep(3); count = countCollectionEntities(database2, collection2); Assertions.assertEquals(10, count); // collection1 count write records - waitCollectionReady(database1, collection1); + waitCollectionReady(database1, collection1, vectorField1); do { count = countCollectionEntities(database1, collection1); } while (count < 9); - TimeUnit.SECONDS.sleep(10); + TimeUnit.SECONDS.sleep(3); count = countCollectionEntities(database1, collection1); Assertions.assertEquals(9, count); @@ -785,7 +788,8 @@ public void testStreamingFakeToMilvus(TestContainer container) container.cancelJob(jobId2); } - private void waitCollectionReady(String databaseName, String collectionName) + private void waitCollectionReady( + String databaseName, String collectionName, String vectorFieldName) throws InterruptedException { // assert table exist R hasCollectionResponse; @@ -797,22 +801,30 @@ private void waitCollectionReady(String databaseName, String collectionName) .withDatabaseName(databaseName) .withCollectionName(collectionName) .build()); - Assertions.assertEquals(R.Status.Success.getCode(), hasCollectionResponse.getStatus()); + Assertions.assertEquals( + R.Status.Success.getCode(), + hasCollectionResponse.getStatus(), + Optional.ofNullable(hasCollectionResponse.getException()) + .map(Exception::getMessage) + .orElse("")); } while (!hasCollectionResponse.getData()); - // assert index exist - R describeIndexResponse; - do { - TimeUnit.SECONDS.sleep(1); - describeIndexResponse = - this.milvusClient.describeIndex( - DescribeIndexParam.newBuilder() - .withDatabaseName(databaseName) - .withCollectionName(collectionName) - .build()); - Assertions.assertEquals(R.Status.Success.getCode(), describeIndexResponse.getStatus()); - } while (describeIndexResponse.getData() == null - || describeIndexResponse.getData().getIndexDescriptionsList().isEmpty()); + // create index + R createIndexResponse = + milvusClient.createIndex( + CreateIndexParam.newBuilder() + .withDatabaseName(databaseName) + .withCollectionName(collectionName) + .withFieldName(vectorFieldName) + .withIndexType(IndexType.FLAT) + .withMetricType(MetricType.L2) + .build()); + Assertions.assertEquals( + R.Status.Success.getCode(), + createIndexResponse.getStatus(), + Optional.ofNullable(createIndexResponse.getException()) + .map(Exception::getMessage) + .orElse("")); // load collection R loadCollectionResponse = @@ -821,7 +833,12 @@ private void waitCollectionReady(String databaseName, String collectionName) .withDatabaseName(databaseName) .withCollectionName(collectionName) .build()); - Assertions.assertEquals(R.Status.Success.getCode(), loadCollectionResponse.getStatus()); + Assertions.assertEquals( + R.Status.Success.getCode(), + loadCollectionResponse.getStatus(), + Optional.ofNullable(loadCollectionResponse.getException()) + .map(Exception::getMessage) + .orElse("")); } private long countCollectionEntities(String databaseName, String collectionName) { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/streaming-fake-to-milvus.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/streaming-fake-to-milvus.conf index eaa4a315930..7e8fe7d1f19 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/streaming-fake-to-milvus.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/streaming-fake-to-milvus.conf @@ -60,8 +60,6 @@ sink { url = "http://milvus-e2e:19530" token = "root:Milvus" database = ${database} - load_collection = true - create_index = true enable_upsert = false batch_size = ${batch_size} batch_interval = ${batch_interval} From ec0adfcce789ad9c08e8e28954513b368470d9e6 Mon Sep 17 00:00:00 2001 From: "chenlantian.michael" Date: Fri, 17 Oct 2025 15:28:53 +0800 Subject: [PATCH 4/8] fix: use different database and collection --- .../seatunnel/e2e/connector/v2/milvus/MilvusIT.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java index 477f7f96072..0222ab15f83 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java @@ -724,8 +724,8 @@ public void testStreamingFakeToMilvus(TestContainer container) throws IOException, InterruptedException { // collection1 not flush by batch interval String jobId1 = "1"; - String database1 = "test1"; - String collection1 = "simple_example_1"; + String database1 = "streaming_test1"; + String collection1 = "streaming_simple_example_1"; String vectorField1 = "book_intro"; new Thread( () -> { @@ -745,8 +745,8 @@ public void testStreamingFakeToMilvus(TestContainer container) // collection2 flush by batch interval String jobId2 = "2"; - String database2 = "test2"; - String collection2 = "simple_example_2"; + String database2 = "streaming_test2"; + String collection2 = "streaming_simple_example_2"; String vectorField2 = "book_intro"; new Thread( () -> { From 60fdd429ab355594fb91809ca7904d51d28ac955 Mon Sep 17 00:00:00 2001 From: "chenlantian.michael" Date: Fri, 17 Oct 2025 16:01:23 +0800 Subject: [PATCH 5/8] doc: update milvus sink docs --- docs/en/connector-v2/sink/Milvus.md | 29 +++++++++--------- docs/zh/connector-v2/sink/Milvus.md | 46 +++++++++++++++++++++-------- 2 files changed, 49 insertions(+), 26 deletions(-) diff --git a/docs/en/connector-v2/sink/Milvus.md b/docs/en/connector-v2/sink/Milvus.md index 0f1232ed0c0..8d8b4e3aa7d 100644 --- a/docs/en/connector-v2/sink/Milvus.md +++ b/docs/en/connector-v2/sink/Milvus.md @@ -39,20 +39,21 @@ This Milvus sink connector write data to Milvus or Zilliz Cloud, it has the foll ## Sink Options -| Name | Type | Required | Default | Description | -|----------------------|---------|----------|------------------------------|------------------------------------------------------------------------------------| -| url | String | Yes | - | The URL to connect to Milvus or Zilliz Cloud. | -| token | String | Yes | - | User:password | -| database | String | No | - | Write data to which database, default is source database. | -| schema_save_mode | enum | No | CREATE_SCHEMA_WHEN_NOT_EXIST | Auto create table when table not exist. | -| enable_auto_id | boolean | No | false | Primary key column enable autoId. | -| enable_upsert | boolean | No | false | Upsert data not insert. | -| enable_dynamic_field | boolean | No | true | Enable create table with dynamic field. | -| batch_size | int | No | 1000 | Write batch size. | -| partition_key | String | No | | Milvus partition key field | -| create_index | boolean | No | false | Automatically create vector indexes for collection to improve query performance. | -| load_collection | boolean | No | false | Load collection into Milvus memory for immediate query availability. | -| collection_description | Map | No | {} | Collection descriptions map where key is collection name and value is description. | +| Name | Type | Required | Default | Description | +|------------------------|---------------------|----------|------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------| +| url | String | Yes | - | The URL to connect to Milvus or Zilliz Cloud. | +| token | String | Yes | - | User:password | +| database | String | No | - | Write data to which database, default is source database. | +| schema_save_mode | enum | No | CREATE_SCHEMA_WHEN_NOT_EXIST | Auto create table when table not exist. | +| enable_auto_id | boolean | No | false | Primary key column enable autoId. | +| enable_upsert | boolean | No | false | Upsert data not insert. | +| enable_dynamic_field | boolean | No | true | Enable create table with dynamic field. | +| batch_size | int | No | 1000 | Write batch size. | +| batch_interval | int | No | 1000 | Write batch interval, it can be disabled if config a non-positive. Writing flush will be triggered when either `batch_size` or `batch_interval` is met. | +| partition_key | String | No | | Milvus partition key field | +| create_index | boolean | No | false | Automatically create vector indexes for collection to improve query performance. | +| load_collection | boolean | No | false | Load collection into Milvus memory for immediate query availability. | +| collection_description | Map | No | {} | Collection descriptions map where key is collection name and value is description. | ## Task Example diff --git a/docs/zh/connector-v2/sink/Milvus.md b/docs/zh/connector-v2/sink/Milvus.md index 37c1d467eed..83c24e32e9a 100644 --- a/docs/zh/connector-v2/sink/Milvus.md +++ b/docs/zh/connector-v2/sink/Milvus.md @@ -19,7 +19,7 @@ Milvus sink连接器将数据写入Milvus或Zilliz Cloud,它具有以下功能 ##数据类型映射 -| Milvus数据类型 | SeaTunnel 数据类型 | +| Milvus数据类型 | SeaTunnel 数据类型 | |---------------------|---------------------| | INT8 | TINYINT | | INT16 | SMALLINT | @@ -39,20 +39,25 @@ Milvus sink连接器将数据写入Milvus或Zilliz Cloud,它具有以下功能 ## Sink 选项 -| 名字 | 类型 | 是否必传 | 默认值 | 描述 | -|----------------------|---------|----------|------------------------------|-----------------------------------------------------------| -| url | String | 是 | - | 连接到Milvus或Zilliz Cloud的URL。 | -| token | String | 是 | - | 用户:密码 | -| database | String | 否 | - | 将数据写入哪个数据库,默认为源数据库。 | -| schema_save_mode | enum | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST | 当表不存在时自动创建表。 | -| enable_auto_id | boolean | 否 | false | 主键列启用autoId。 | -| enable_upsert | boolean | 否 | false | 是否启用upsert。 | -| enable_dynamic_field | boolean | 否 | true | 是否启用带动态字段的创建表。 | -| batch_size | int | 否 | 1000 | 写入批大小。 | -| partition_key | String | 否 | | Milvus分区键字段 | +| 名字 | 类型 | 是否必传 | 默认值 | 描述 | +|------------------------|---------------------|------|------------------------------|------------------------------------------------------------------| +| url | String | 是 | - | 连接到Milvus或Zilliz Cloud的URL。 | +| token | String | 是 | - | 用户:密码 | +| database | String | 否 | - | 将数据写入哪个数据库,默认为源数据库。 | +| schema_save_mode | enum | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST | 当表不存在时自动创建表。 | +| enable_auto_id | boolean | 否 | false | 主键列启用autoId。 | +| enable_upsert | boolean | 否 | false | 是否启用upsert。 | +| enable_dynamic_field | boolean | 否 | true | 是否启用带动态字段的创建表。 | +| batch_size | int | 否 | 1000 | 写入批大小。 | +| batch_interval | int | 否 | 1000 | 写入批间隔,非正数配置可关闭此特性。满足 `batch_size` 或 `batch_interval` 任一条件会触发写入刷新 | +| partition_key | String | 否 | | Milvus分区键字段 | +| create_index | boolean | No | false | 自动为集合创建向量索引以提高查询性能 | +| load_collection | boolean | No | false | 将集合加载到 Milvus 内存中以便立即进行查询 | +| collection_description | Map | No | {} | 集合描述映射,其中键是集合名称,值是描述 | ## 任务示例 +### 基础配置 ```bash sink { Milvus { @@ -63,6 +68,23 @@ sink { } ``` +### 带 Index 和 Loading 的高级配置 +```bash +sink { + Milvus { + url = "http://127.0.0.1:19530" + token = "username:password" + batch_size = 1000 + create_index = true + load_collection = true + collection_description = { + "user_vectors" = "User embedding vectors for recommendation" + "product_vectors" = "Product feature vectors for search" + } + } +} +``` + ## 变更日志 \ No newline at end of file From 77cad371465961291cfb510e1f032013873be6bc Mon Sep 17 00:00:00 2001 From: "chenlantian.michael" Date: Sun, 19 Oct 2025 21:00:40 +0800 Subject: [PATCH 6/8] modify: use checkpoint interval to control periodically flush --- docs/en/connector-v2/sink/Milvus.md | 29 +++---- docs/zh/connector-v2/sink/Milvus.md | 29 +++---- .../milvus/config/MilvusSinkOptions.java | 6 -- .../milvus/sink/MilvusBufferBatchWriter.java | 4 - .../milvus/sink/MilvusSinkWriter.java | 85 +++---------------- .../e2e/connector/v2/milvus/MilvusIT.java | 65 ++++---------- .../resources/streaming-fake-to-milvus.conf | 2 +- 7 files changed, 58 insertions(+), 162 deletions(-) diff --git a/docs/en/connector-v2/sink/Milvus.md b/docs/en/connector-v2/sink/Milvus.md index 8d8b4e3aa7d..59f8920950e 100644 --- a/docs/en/connector-v2/sink/Milvus.md +++ b/docs/en/connector-v2/sink/Milvus.md @@ -39,21 +39,20 @@ This Milvus sink connector write data to Milvus or Zilliz Cloud, it has the foll ## Sink Options -| Name | Type | Required | Default | Description | -|------------------------|---------------------|----------|------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------| -| url | String | Yes | - | The URL to connect to Milvus or Zilliz Cloud. | -| token | String | Yes | - | User:password | -| database | String | No | - | Write data to which database, default is source database. | -| schema_save_mode | enum | No | CREATE_SCHEMA_WHEN_NOT_EXIST | Auto create table when table not exist. | -| enable_auto_id | boolean | No | false | Primary key column enable autoId. | -| enable_upsert | boolean | No | false | Upsert data not insert. | -| enable_dynamic_field | boolean | No | true | Enable create table with dynamic field. | -| batch_size | int | No | 1000 | Write batch size. | -| batch_interval | int | No | 1000 | Write batch interval, it can be disabled if config a non-positive. Writing flush will be triggered when either `batch_size` or `batch_interval` is met. | -| partition_key | String | No | | Milvus partition key field | -| create_index | boolean | No | false | Automatically create vector indexes for collection to improve query performance. | -| load_collection | boolean | No | false | Load collection into Milvus memory for immediate query availability. | -| collection_description | Map | No | {} | Collection descriptions map where key is collection name and value is description. | +| Name | Type | Required | Default | Description | +|------------------------|---------------------|----------|------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------| +| url | String | Yes | - | The URL to connect to Milvus or Zilliz Cloud. | +| token | String | Yes | - | User:password | +| database | String | No | - | Write data to which database, default is source database. | +| schema_save_mode | enum | No | CREATE_SCHEMA_WHEN_NOT_EXIST | Auto create table when table not exist. | +| enable_auto_id | boolean | No | false | Primary key column enable autoId. | +| enable_upsert | boolean | No | false | Upsert data not insert. | +| enable_dynamic_field | boolean | No | true | Enable create table with dynamic field. | +| batch_size | int | No | 1000 | Write batch size. When the number of buffered records reaches `batch_size` or the time reaches `checkpoint.interval`, it will trigger a write flush | +| partition_key | String | No | | Milvus partition key field | +| create_index | boolean | No | false | Automatically create vector indexes for collection to improve query performance. | +| load_collection | boolean | No | false | Load collection into Milvus memory for immediate query availability. | +| collection_description | Map | No | {} | Collection descriptions map where key is collection name and value is description. | ## Task Example diff --git a/docs/zh/connector-v2/sink/Milvus.md b/docs/zh/connector-v2/sink/Milvus.md index 83c24e32e9a..b992c907fda 100644 --- a/docs/zh/connector-v2/sink/Milvus.md +++ b/docs/zh/connector-v2/sink/Milvus.md @@ -39,21 +39,20 @@ Milvus sink连接器将数据写入Milvus或Zilliz Cloud,它具有以下功能 ## Sink 选项 -| 名字 | 类型 | 是否必传 | 默认值 | 描述 | -|------------------------|---------------------|------|------------------------------|------------------------------------------------------------------| -| url | String | 是 | - | 连接到Milvus或Zilliz Cloud的URL。 | -| token | String | 是 | - | 用户:密码 | -| database | String | 否 | - | 将数据写入哪个数据库,默认为源数据库。 | -| schema_save_mode | enum | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST | 当表不存在时自动创建表。 | -| enable_auto_id | boolean | 否 | false | 主键列启用autoId。 | -| enable_upsert | boolean | 否 | false | 是否启用upsert。 | -| enable_dynamic_field | boolean | 否 | true | 是否启用带动态字段的创建表。 | -| batch_size | int | 否 | 1000 | 写入批大小。 | -| batch_interval | int | 否 | 1000 | 写入批间隔,非正数配置可关闭此特性。满足 `batch_size` 或 `batch_interval` 任一条件会触发写入刷新 | -| partition_key | String | 否 | | Milvus分区键字段 | -| create_index | boolean | No | false | 自动为集合创建向量索引以提高查询性能 | -| load_collection | boolean | No | false | 将集合加载到 Milvus 内存中以便立即进行查询 | -| collection_description | Map | No | {} | 集合描述映射,其中键是集合名称,值是描述 | +| 名字 | 类型 | 是否必传 | 默认值 | 描述 | +|------------------------|---------------------|------|------------------------------|---------------------------------------------------------------------| +| url | String | 是 | - | 连接到Milvus或Zilliz Cloud的URL。 | +| token | String | 是 | - | 用户:密码 | +| database | String | 否 | - | 将数据写入哪个数据库,默认为源数据库。 | +| schema_save_mode | enum | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST | 当表不存在时自动创建表。 | +| enable_auto_id | boolean | 否 | false | 主键列启用autoId。 | +| enable_upsert | boolean | 否 | false | 是否启用upsert。 | +| enable_dynamic_field | boolean | 否 | true | 是否启用带动态字段的创建表。 | +| batch_size | int | 否 | 1000 | 写入批大小。当缓冲记录数达到 `batch_size` 或时间达到 `checkpoint.interval` 时,将触发一次写入刷新 | +| partition_key | String | 否 | | Milvus分区键字段 | +| create_index | boolean | No | false | 自动为集合创建向量索引以提高查询性能 | +| load_collection | boolean | No | false | 将集合加载到 Milvus 内存中以便立即进行查询 | +| collection_description | Map | No | {} | 集合描述映射,其中键是集合名称,值是描述 | ## 任务示例 diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkOptions.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkOptions.java index 176a194c229..4113be01b8d 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkOptions.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkOptions.java @@ -83,12 +83,6 @@ public class MilvusSinkOptions extends MilvusBaseOptions { .defaultValue(1000) .withDescription("writer batch size"); - public static final Option BATCH_INTERVAL = - Options.key("batch_interval") - .intType() - .defaultValue(1000) - .withDescription("writer batch interval"); - public static final Option RATE_LIMIT = Options.key("rate_limit") .intType() diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusBufferBatchWriter.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusBufferBatchWriter.java index 71408b4a00a..73f4ac516e3 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusBufferBatchWriter.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusBufferBatchWriter.java @@ -220,10 +220,6 @@ public boolean needFlush() { return this.writeCache.get() >= this.batchSize; } - public long cachedSize() { - return this.writeCache.get(); - } - public void flush() throws Exception { log.info("Starting to put {} records to Milvus.", this.writeCache.get()); // Flush the batch writer diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java index 3c693ed6e8a..55402896ace 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java @@ -22,7 +22,6 @@ import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkOptions; import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode; import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException; import org.apache.seatunnel.connectors.seatunnel.milvus.state.MilvusCommitInfo; @@ -33,11 +32,6 @@ import java.io.IOException; import java.util.List; import java.util.Optional; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantLock; /** MilvusSinkWriter is a sink writer that will write {@link SeaTunnelRow} to Milvus. */ @Slf4j @@ -47,12 +41,6 @@ public class MilvusSinkWriter private final MilvusBufferBatchWriter batchWriter; private ReadonlyConfig config; - private final ScheduledExecutorService scheduler; - - private final ReentrantLock lock = new ReentrantLock(); - - private final AtomicReference exception = new AtomicReference<>(); - public MilvusSinkWriter( Context context, CatalogTable catalogTable, @@ -60,16 +48,6 @@ public MilvusSinkWriter( List milvusSinkStates) { this.batchWriter = new MilvusBufferBatchWriter(catalogTable, config); this.config = config; - int batchInterval = config.get(MilvusSinkOptions.BATCH_INTERVAL); - this.scheduler = batchInterval > 0 ? Executors.newSingleThreadScheduledExecutor() : null; - if (scheduler != null) { - log.info("create Milvus sink writer with batch interval: {}", batchInterval); - scheduler.scheduleAtFixedRate( - new BatchWriterFlushRunnable(batchWriter), - 0, - batchInterval, - TimeUnit.MILLISECONDS); - } log.info("create Milvus sink writer success"); log.info("MilvusSinkWriter config: " + config); } @@ -81,22 +59,9 @@ public MilvusSinkWriter( */ @Override public void write(SeaTunnelRow element) { - try { - lock.lock(); - checkExceptionAndRethrow(); - batchWriter.addToBatch(element); - if (batchWriter.needFlush()) { - try { - // Flush the batch writer - batchWriter.flush(); - } catch (Exception e) { - log.error("flush Milvus sink writer failed", e); - throw new MilvusConnectorException( - MilvusConnectionErrorCode.WRITE_DATA_FAIL, e); - } - } - } finally { - lock.unlock(); + batchWriter.addToBatch(element); + if (batchWriter.needFlush()) { + flush(); } } @@ -110,6 +75,7 @@ public void write(SeaTunnelRow element) { */ @Override public Optional prepareCommit() throws IOException { + flush(); return Optional.empty(); } @@ -130,52 +96,23 @@ public void abortPrepare() {} @Override public void close() throws IOException { try { - lock.lock(); - // make sure any errors from batch writer flush are handled - checkExceptionAndRethrow(); log.info("Stopping Milvus Client"); batchWriter.flush(); batchWriter.close(); - scheduler.shutdown(); log.info("Stop Milvus Client success"); } catch (Exception e) { log.error("Stop Milvus Client failed", e); throw new MilvusConnectorException(MilvusConnectionErrorCode.CLOSE_CLIENT_ERROR, e); - } finally { - lock.unlock(); - } - } - - /** BatchWriterFlushRunnable is a runnable that will invoke batch writer flush method */ - private class BatchWriterFlushRunnable implements Runnable { - private final MilvusBufferBatchWriter batchWriter; - - public BatchWriterFlushRunnable(MilvusBufferBatchWriter batchWriter) { - this.batchWriter = batchWriter; - } - - @Override - public void run() { - try { - lock.lock(); - if (batchWriter.cachedSize() == 0) { - return; - } - batchWriter.flush(); - } catch (Exception e) { - log.error("flush Milvus sink writer failed", e); - exception.compareAndSet(null, e); - } finally { - lock.unlock(); - } } } - /** Check exception in current thread, if exception is not null, throw it. */ - private void checkExceptionAndRethrow() { - if (exception.get() != null) { - throw new MilvusConnectorException( - MilvusConnectionErrorCode.WRITE_DATA_FAIL, exception.get()); + private void flush() { + try { + // Flush the batch writer + batchWriter.flush(); + } catch (Exception e) { + log.error("flush Milvus sink writer failed", e); + throw new MilvusConnectorException(MilvusConnectionErrorCode.WRITE_DATA_FAIL, e); } } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java index 0222ab15f83..7383077b69c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java @@ -85,6 +85,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -722,70 +723,40 @@ private void verifyIndexesExist(String database, String collection) { @TestTemplate public void testStreamingFakeToMilvus(TestContainer container) throws IOException, InterruptedException { - // collection1 not flush by batch interval - String jobId1 = "1"; - String database1 = "streaming_test1"; - String collection1 = "streaming_simple_example_1"; - String vectorField1 = "book_intro"; + // flush by checkpoint interval + String jobId = "1"; + String database = "streaming_test"; + String collection = "streaming_simple_example"; + String vectorField = "book_intro"; + int checkpointInterval = 30000; new Thread( () -> { try { container.executeJob( "/streaming-fake-to-milvus.conf", - jobId1, - "database=" + database1, - "collection=" + collection1, - "batch_size=3", - "batch_interval=0"); + jobId, + "database=" + database, + "collection=" + collection, + "batch_size=3"); } catch (IOException | InterruptedException e) { throw new RuntimeException(e); } }) .start(); - // collection2 flush by batch interval - String jobId2 = "2"; - String database2 = "streaming_test2"; - String collection2 = "streaming_simple_example_2"; - String vectorField2 = "book_intro"; - new Thread( - () -> { - try { - container.executeJob( - "/streaming-fake-to-milvus.conf", - jobId2, - "database=" + database2, - "collection=" + collection2, - "batch_size=3", - "batch_interval=1000"); - } catch (IOException | InterruptedException e) { - throw new RuntimeException(e); - } - }) - .start(); - - // collection2 count write records + // count write records long count; - waitCollectionReady(database2, collection2, vectorField2); - do { - count = countCollectionEntities(database2, collection2); - } while (count < 9); - TimeUnit.SECONDS.sleep(3); - count = countCollectionEntities(database2, collection2); - Assertions.assertEquals(10, count); - - // collection1 count write records - waitCollectionReady(database1, collection1, vectorField1); + waitCollectionReady(database, collection, vectorField); do { - count = countCollectionEntities(database1, collection1); + count = countCollectionEntities(database, collection); } while (count < 9); - TimeUnit.SECONDS.sleep(3); - count = countCollectionEntities(database1, collection1); Assertions.assertEquals(9, count); + TimeUnit.MILLISECONDS.sleep(checkpointInterval); + count = countCollectionEntities(database, collection); + Assertions.assertEquals(10, count); // cancel jobs - container.cancelJob(jobId1); - container.cancelJob(jobId2); + container.cancelJob(jobId); } private void waitCollectionReady( diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/streaming-fake-to-milvus.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/streaming-fake-to-milvus.conf index 7e8fe7d1f19..4f7a8b9ab2a 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/streaming-fake-to-milvus.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/streaming-fake-to-milvus.conf @@ -18,6 +18,7 @@ env { parallelism = 1 job.mode = "STREAMING" + checkpoint.interval = 30000 } source { @@ -62,6 +63,5 @@ sink { database = ${database} enable_upsert = false batch_size = ${batch_size} - batch_interval = ${batch_interval} } } \ No newline at end of file From 42b22c7daa85e51f7d348588c25f4e259bb56129 Mon Sep 17 00:00:00 2001 From: "chenlantian.michael" Date: Sun, 19 Oct 2025 21:06:54 +0800 Subject: [PATCH 7/8] chore: code format --- .../org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java index 7383077b69c..0b05313a25d 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java @@ -85,7 +85,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; From 75455f35d64c879e0b28db1a837010724d75e1df Mon Sep 17 00:00:00 2001 From: "chenlantian.michael" Date: Sun, 19 Oct 2025 23:05:33 +0800 Subject: [PATCH 8/8] chore: optimize test case --- .../e2e/connector/v2/milvus/MilvusIT.java | 79 ++++++++++--------- 1 file changed, 40 insertions(+), 39 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java index 0b05313a25d..1ad1fc9882c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java @@ -85,6 +85,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -728,56 +729,56 @@ public void testStreamingFakeToMilvus(TestContainer container) String collection = "streaming_simple_example"; String vectorField = "book_intro"; int checkpointInterval = 30000; - new Thread( - () -> { - try { - container.executeJob( - "/streaming-fake-to-milvus.conf", - jobId, - "database=" + database, - "collection=" + collection, - "batch_size=3"); - } catch (IOException | InterruptedException e) { - throw new RuntimeException(e); - } - }) - .start(); + CompletableFuture.runAsync( + () -> { + try { + container.executeJob( + "/streaming-fake-to-milvus.conf", + jobId, + "database=" + database, + "collection=" + collection, + "batch_size=3"); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + }); // count write records - long count; waitCollectionReady(database, collection, vectorField); - do { - count = countCollectionEntities(database, collection); - } while (count < 9); - Assertions.assertEquals(9, count); + Awaitility.await() + .atMost(60, TimeUnit.SECONDS) + .pollInterval(2, TimeUnit.SECONDS) + .until(() -> countCollectionEntities(database, collection) >= 9); + Assertions.assertEquals(9, countCollectionEntities(database, collection)); TimeUnit.MILLISECONDS.sleep(checkpointInterval); - count = countCollectionEntities(database, collection); - Assertions.assertEquals(10, count); + Assertions.assertEquals(10, countCollectionEntities(database, collection)); // cancel jobs container.cancelJob(jobId); } private void waitCollectionReady( - String databaseName, String collectionName, String vectorFieldName) - throws InterruptedException { + String databaseName, String collectionName, String vectorFieldName) { // assert table exist - R hasCollectionResponse; - do { - TimeUnit.SECONDS.sleep(1); - hasCollectionResponse = - this.milvusClient.hasCollection( - HasCollectionParam.newBuilder() - .withDatabaseName(databaseName) - .withCollectionName(collectionName) - .build()); - Assertions.assertEquals( - R.Status.Success.getCode(), - hasCollectionResponse.getStatus(), - Optional.ofNullable(hasCollectionResponse.getException()) - .map(Exception::getMessage) - .orElse("")); - } while (!hasCollectionResponse.getData()); + Awaitility.await() + .atMost(60, TimeUnit.SECONDS) + .pollInterval(2, TimeUnit.SECONDS) + .until( + () -> { + R hasCollectionResponse = + this.milvusClient.hasCollection( + HasCollectionParam.newBuilder() + .withDatabaseName(databaseName) + .withCollectionName(collectionName) + .build()); + Assertions.assertEquals( + R.Status.Success.getCode(), + hasCollectionResponse.getStatus(), + Optional.ofNullable(hasCollectionResponse.getException()) + .map(Exception::getMessage) + .orElse("")); + return hasCollectionResponse.getData(); + }); // create index R createIndexResponse =