-
Notifications
You must be signed in to change notification settings - Fork 3.4k
Open
Labels
kind/bugIssues or changes related a bugIssues or changes related a bugtriage/acceptedIndicates an issue or PR is ready to be actively worked on.Indicates an issue or PR is ready to be actively worked on.
Milestone
Description
Is there an existing issue for this?
- I have searched the existing issues
Environment
- Milvus version: 2.6.0-20250904-4662aff36-4026c70
- Deployment mode(standalone or cluster): standalone
- MQ type(rocksmq, pulsar or kafka): both
- SDK version(e.g. pymilvus v2.0.0rc2): pymilvus 2.7.0rc25
- OS(Ubuntu or CentOS):
- CPU/Memory:
- GPU:
- Others:
Current Behavior

Expected Behavior
L0 compaction success
Steps To Reproduce
#!/usr/bin/env python3
"""
Final version of Milvus mixed business concurrency test script
Simplified version for stable operation
"""
import threading
import time
import random
from pymilvus import MilvusClient, DataType, CollectionSchema, FieldSchema
# ================= Configuration =================
MILVUS_URI = "https://in01-26602b75ab60116.aws-us-west-2.vectordb-uat3.zillizcloud.com:19530"
TOKEN = "root:J2)VWbwC(O/Weo}1@3[O1D$pNPO>RnPo"
DATABASE = "cmek_database_6e28ghE7"
COLLECTION_NAME = "final_mixed_test"
DIM = 768
# Business pressure configuration (number of requests per operation type)
BUSINESS_CONFIG = {
"insert": 500000, # Number of insert operations
"upsert": 2500, # Number of upsert operations
"delete": 100, # Number of delete operations
"search": 250000, # Number of search operations
}
# Global statistics results
results = {
"insert": {"success": 0, "fail": 0, "times": [], "errors": []},
"upsert": {"success": 0, "fail": 0, "times": [], "errors": []},
"delete": {"success": 0, "fail": 0, "times": [], "errors": []},
"search": {"success": 0, "fail": 0, "times": [], "errors": []},
}
results_lock = threading.Lock()
# Monitoring flag
monitoring = True
# ================= Utility Functions =================
def random_vector():
"""Generate random vector"""
return [random.random() for _ in range(DIM)]
def random_data(count=1):
"""Generate random test data"""
data = []
for i in range(count):
data.append({
"embedding": random_vector(),
"text": f"text_{random.randint(1, 10000)}",
"category": random.randint(1, 10000),
"score": random.random()
})
return data
def record_result(operation, success, response_time, error_message=None):
"""Record operation result"""
with results_lock:
results[operation]["success" if success else "fail"] += 1
results[operation]["times"].append(response_time)
if not success and error_message:
results[operation]["errors"].append(error_message)
# ================= Business Operation Functions =================
def insert_worker():
"""Insert operation worker thread"""
print(f"Starting insert operations, target requests: {BUSINESS_CONFIG['insert']}")
for i in range(BUSINESS_CONFIG['insert']):
start_time = time.time()
try:
data = random_data(1)
# Remove id field since collection has auto_id=True
if "id" in data[0]:
del data[0]["id"]
result = client.insert(collection_name=COLLECTION_NAME, data=data)
response_time = time.time() - start_time
if result.get('insert_count', 0) > 0:
record_result("insert", True, response_time)
else:
error_msg = f"Insert failed: {result}"
record_result("insert", False, response_time, error_msg)
print(error_msg)
except Exception as e:
response_time = time.time() - start_time
error_msg = f"Insert error: {e}"
record_result("insert", False, response_time, error_msg)
print(error_msg)
# Control request frequency
# time.sleep(0.001)
def upsert_worker():
"""Upsert operation worker thread"""
print(f"Starting upsert operations, target requests: {BUSINESS_CONFIG['upsert']}")
# First insert some data for upsert
print("Inserting some data for upsert...")
initial_data = random_data(10) # Insert 10 records
try:
insert_result = client.insert(collection_name=COLLECTION_NAME, data=initial_data)
inserted_ids = insert_result.get('ids', [])
print(f"Inserted {len(inserted_ids)} records for upsert")
except Exception as e:
print(f"Initial insert failed: {e}")
inserted_ids = []
for i in range(BUSINESS_CONFIG['upsert']):
start_time = time.time()
try:
data = random_data(1)
# Use existing ID for upsert
if inserted_ids:
data[0]["id"] = inserted_ids[i % len(inserted_ids)]
result = client.upsert(collection_name=COLLECTION_NAME, data=data)
response_time = time.time() - start_time
if result.get('upsert_count', 0) > 0:
record_result("upsert", True, response_time)
else:
error_msg = f"Upsert failed: {result}"
record_result("upsert", False, response_time, error_msg)
print(error_msg)
except Exception as e:
response_time = time.time() - start_time
error_msg = f"Upsert error: {e}"
record_result("upsert", False, response_time, error_msg)
print(error_msg)
# time.sleep(0.002)
def delete_worker():
"""Delete operation worker thread"""
print(f"Starting delete operations, target requests: {BUSINESS_CONFIG['delete']}")
for i in range(BUSINESS_CONFIG['delete']):
start_time = time.time()
try:
# Randomly delete data of a certain category
category = random.randint(1, 10000)
filter_expr = f"category == {category}"
result = client.delete(collection_name=COLLECTION_NAME, filter=filter_expr)
response_time = time.time() - start_time
record_result("delete", True, response_time)
except Exception as e:
response_time = time.time() - start_time
error_msg = f"Delete error: {e}"
record_result("delete", False, response_time, error_msg)
print(error_msg)
# time.sleep(0.003)
def search_worker():
"""Search operation worker thread"""
print(f"Starting search operations, target requests: {BUSINESS_CONFIG['search']}")
for i in range(BUSINESS_CONFIG['search']):
start_time = time.time()
try:
search_vector = random_vector()
# Randomly decide whether to use filter conditions
use_filter = random.random() < 0.3
filter_expr = f"category == {random.randint(1, 10)}" if use_filter else None
result = client.search(
collection_name=COLLECTION_NAME,
data=[search_vector],
anns_field="embedding",
search_params={"metric_type": "L2", "params": {"nprobe": 10}},
limit=10,
filter=filter_expr
)
response_time = time.time() - start_time
if result and len(result) > 0:
record_result("search", True, response_time)
else:
error_msg = f"Search failed: no results"
record_result("search", False, response_time, error_msg)
except Exception as e:
response_time = time.time() - start_time
error_msg = f"Search error: {e}"
record_result("search", False, response_time, error_msg)
print(error_msg)
# time.sleep(0.002)
def monitor_progress():
"""Monitor progress, print statistics every 60 seconds"""
global monitoring
while monitoring:
time.sleep(60) # Wait 60 seconds
if not monitoring:
break
with results_lock:
print("\n" + "="*60)
print(f"📊 Real-time Monitoring Report - {time.strftime('%Y-%m-%d %H:%M:%S')}")
print("="*60)
total_requests = 0
total_success = 0
total_fail = 0
for operation, stats in results.items():
success = stats["success"]
fail = stats["fail"]
total = success + fail
total_requests += total
total_success += success
total_fail += fail
if total > 0:
success_rate = success / total
fail_rate = fail / total
avg_time = sum(stats["times"]) / len(stats["times"]) if stats["times"] else 0
print(f"\n🔹 {operation.upper()}:")
print(f" Total requests: {total}")
print(f" Success: {success} ({success_rate:.2%})")
print(f" Failed: {fail} ({fail_rate:.2%})")
print(f" Average response time: {avg_time:.3f}s")
# Show recent errors
if stats["errors"]:
recent_errors = stats["errors"][-3:] # Show last 3 errors
print(f" Recent errors:")
for error in recent_errors:
print(f" - {error}")
# Overall statistics
if total_requests > 0:
overall_success_rate = total_success / total_requests
overall_fail_rate = total_fail / total_requests
print(f"\n📈 Overall Statistics:")
print(f" Total requests: {total_requests}")
print(f" Total success: {total_success} ({overall_success_rate:.2%})")
print(f" Total failed: {total_fail} ({overall_fail_rate:.2%})")
print("="*60)
# ================= Main Function =================
def main():
global client
print("=== Milvus Mixed Business Concurrency Test ===")
# Connect to Milvus
print("Connecting to Milvus...")
client = MilvusClient(uri=MILVUS_URI, token=TOKEN)
client.using_database(DATABASE)
# Delete existing collection (if exists)
if client.has_collection(COLLECTION_NAME):
print(f"Deleting existing collection: {COLLECTION_NAME}")
client.drop_collection(COLLECTION_NAME)
# Create collection
print(f"Creating collection: {COLLECTION_NAME}")
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=DIM),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=1000, is_clustering_key=True),
FieldSchema(name="category", dtype=DataType.INT64),
FieldSchema(name="score", dtype=DataType.FLOAT)
]
schema = CollectionSchema(fields=fields)
client.create_collection(collection_name=COLLECTION_NAME, schema=schema)
# Create index
print("Creating index...")
from pymilvus.milvus_client.index import IndexParams
index_params = IndexParams()
index_params.add_index(
field_name="embedding",
index_type="HNSW",
metric_type="L2",
params={"nlist": 1024}
)
client.create_index(
collection_name=COLLECTION_NAME,
index_params=index_params
)
# Load collection
print("Loading collection...")
client.load_collection(COLLECTION_NAME)
# Wait for loading to complete
print("Waiting for collection loading to complete...")
time.sleep(3)
# Check loading state
load_state = client.get_load_state(COLLECTION_NAME)
print(f"Collection loading state: {load_state}")
# Start concurrent testing
print("\nStarting concurrent testing...")
start_time = time.time()
# Create and start monitoring thread
monitor_thread = threading.Thread(target=monitor_progress, daemon=True)
monitor_thread.start()
# Create and start business threads
threads = [
threading.Thread(target=insert_worker),
threading.Thread(target=upsert_worker),
threading.Thread(target=delete_worker),
threading.Thread(target=search_worker),
]
for t in threads:
t.start()
# Wait for all business threads to complete
for t in threads:
t.join()
# Stop monitoring
global monitoring
monitoring = False
end_time = time.time()
duration = end_time - start_time
# Print results
print("\n=== Test Results ===")
print(f"Total test duration: {duration:.2f} seconds")
print("-" * 50)
total_requests = 0
total_success = 0
for operation, stats in results.items():
success = stats["success"]
fail = stats["fail"]
total = success + fail
total_requests += total
total_success += success
if total > 0:
success_rate = success / total
avg_time = sum(stats["times"]) / len(stats["times"]) if stats["times"] else 0
rps = total / duration
print(f"{operation.upper()}:")
print(f" Total requests: {total}")
print(f" Success: {success}")
print(f" Failed: {fail}")
print(f" Success rate: {success_rate:.2%}")
print(f" Average response time: {avg_time:.3f}s")
print(f" Requests per second: {rps:.2f}")
print()
# Overall statistics
overall_success_rate = total_success / total_requests if total_requests > 0 else 0
overall_rps = total_requests / duration
print("Overall Statistics:")
print(f" Total requests: {total_requests}")
print(f" Total success: {total_success}")
print(f" Total failed: {total_requests - total_success}")
print(f" Overall success rate: {overall_success_rate:.2%}")
print(f" Overall error rate: {1-overall_success_rate:.2%}")
print(f" Overall QPS: {overall_rps:.2f}")
res = client.compact(collection_name=COLLECTION_NAME)
print(f"Compact result: {res}")
if __name__ == "__main__":
main()
Milvus Log
Anything else?
No response
Metadata
Metadata
Assignees
Labels
kind/bugIssues or changes related a bugIssues or changes related a bugtriage/acceptedIndicates an issue or PR is ready to be actively worked on.Indicates an issue or PR is ready to be actively worked on.