Skip to content

Conversation

@loupipalien
Copy link
Contributor

Purpose of this pull request

Added a batch_interval parameter to start a scheduler task for flushing by interval

Does this PR introduce any user-facing change?

Yes.
Now, when running a source2milvus task in STREAMING mode, the milvus sink will only perform a flush after accumulating enough data records to reach the batch_size. If the batch_size is not reached, the data read from the source will remain in memory and will not be flushed to milvus until new data is written to reach the batch_size.
After adding this feature, it will trigger a flush either batch_size or batch_interval is reached

How was this patch tested?

Added a new test case

Check list

Comment on lines 65 to 72
if (scheduler != null) {
log.info("create Milvus sink writer with batch interval: {}", batchInterval);
scheduler.scheduleAtFixedRate(
new BatchWriterFlushRunnable(batchWriter),
0,
batchInterval,
TimeUnit.MILLISECONDS);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not recommend opening a separate thread to handle this issue. You can solve this problem through prepareCommit()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhangshenghang Do you mean to periodically flush data through the prepareCommit() triggered by checkpoint.interval in STREAMING mode?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhangshenghang Do you mean to periodically flush data through the prepareCommit() triggered by checkpoint.interval in STREAMING mode?

Yes, you can refer to other connectors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhangshenghang Got it. There is a question, if i want to reduce data visibility needs to be ensured, the checkpoint.interval needs to be set relatively small, but frequent checkpoints are likely to affect performance. Adding a new batch_interval parameter can avoid frequent checkpoints to ensure data visibility, but the visibility delay of the last few data entries will still be checkpoint.inteval rather than batch_interval, it can not ensure a unified visibility delay.
Need your suggestion, adding a new parameter batch_interval or using checkpoint.inteval to control interval flush?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not willing to add this parameter because we already have checkpoint.interval, which can achieve the same effect. Are you willing to try to modify checkpoint.interval?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your suggestion, i will usecheckpoint.interval to achieve this

Copy link
Member

@Hisoka-X Hisoka-X left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @loupipalien ! Overall LGTM except some minior problem.

String collection = "streaming_simple_example";
String vectorField = "book_intro";
int checkpointInterval = 30000;
new Thread(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about use CompletedFuture.runAsync?

Comment on lines 748 to 751
waitCollectionReady(database, collection, vectorField);
do {
count = countCollectionEntities(database, collection);
} while (count < 9);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use Awaitility.await() .atMost(60, TimeUnit.SECONDS) .pollInterval(2, TimeUnit.SECONDS) to verify data in loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@loupipalien loupipalien force-pushed the milvus-writer-flush-by-interval branch from 88221c3 to 3978ae4 Compare October 20, 2025 02:59
@loupipalien
Copy link
Contributor Author

@Hisoka-X @zhangshenghang The ci of some modules still fails after rebase dev, it doesn't seem to be caused by the changes in the PR, can you help ❤️
image

@dybyte
Copy link
Contributor

dybyte commented Oct 21, 2025

Please pull the latest changes and try again. @loupipalien

@loupipalien loupipalien force-pushed the milvus-writer-flush-by-interval branch from 3978ae4 to 75455f3 Compare October 22, 2025 12:54
@loupipalien
Copy link
Contributor Author

loupipalien commented Oct 22, 2025

Please pull the latest changes and try again. @loupipalien

@dybyte Thanks, it works

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants