@@ -21,7 +21,7 @@ use trieve_server::handlers::group_handler::dataset_owns_group;
21
21
use trieve_server:: operators:: chunk_operator:: {
22
22
bulk_insert_chunk_metadata_query, bulk_revert_insert_chunk_metadata_query,
23
23
get_row_count_for_organization_id_query, insert_chunk_boost, insert_chunk_metadata_query,
24
- update_chunk_boost_query, update_chunk_metadata_query,
24
+ update_chunk_boost_query, update_chunk_metadata_query, update_dataset_chunk_count ,
25
25
} ;
26
26
use trieve_server:: operators:: clickhouse_operator:: { ClickHouseEvent , EventQueue } ;
27
27
use trieve_server:: operators:: dataset_operator:: {
@@ -567,9 +567,9 @@ pub async fn bulk_upload_chunks(
567
567
"calling_BULK_insert_chunk_metadata_query" ,
568
568
) ;
569
569
570
- let only_insert_qdrant = std :: env :: var ( "ONLY_INSERT_QDRANT" ) . unwrap_or ( "false" . to_string ( ) ) ;
570
+ let only_insert_qdrant = dataset_config . QDRANT_ONLY ;
571
571
572
- let inserted_chunk_metadatas = if only_insert_qdrant == "true" {
572
+ let inserted_chunk_metadatas = if only_insert_qdrant {
573
573
ingestion_data
574
574
. clone ( )
575
575
. into_iter ( )
@@ -733,7 +733,13 @@ pub async fn bulk_upload_chunks(
733
733
) )
734
734
. then (
735
735
|( chunk_data, embedding_vector, splade_vector, bm25_vector) | async {
736
- let qdrant_point_id = chunk_data. chunk_metadata . qdrant_point_id ;
736
+ let mut qdrant_point_id = chunk_data. chunk_metadata . qdrant_point_id ;
737
+ if only_insert_qdrant {
738
+ if let Some ( tracking_id) = chunk_data. clone ( ) . chunk_metadata . tracking_id {
739
+ qdrant_point_id =
740
+ uuid:: Uuid :: new_v5 ( & uuid:: Uuid :: NAMESPACE_OID , tracking_id. as_bytes ( ) ) ;
741
+ }
742
+ }
737
743
738
744
let chunk_tags: Option < Vec < Option < String > > > =
739
745
if let Some ( ref group_ids) = chunk_data. group_ids {
@@ -789,7 +795,6 @@ pub async fn bulk_upload_chunks(
789
795
) ;
790
796
}
791
797
792
- // If qdrant_point_id does not exist, does not get written to qdrant
793
798
Ok ( PointStruct :: new (
794
799
qdrant_point_id. to_string ( ) ,
795
800
vector_payload,
@@ -816,18 +821,31 @@ pub async fn bulk_upload_chunks(
816
821
"calling_BULK_create_new_qdrant_points_query" ,
817
822
) ;
818
823
819
- let create_point_result =
824
+ let create_point_result: Result < ( ) , ServiceError > =
820
825
bulk_upsert_qdrant_points_query ( qdrant_points, dataset_config. clone ( ) ) . await ;
821
826
822
827
insert_tx. finish ( ) ;
823
828
824
- if let Err ( err) = create_point_result {
825
- if !upsert_by_tracking_id_being_used {
826
- bulk_revert_insert_chunk_metadata_query ( inserted_chunk_metadata_ids, web_pool. clone ( ) )
829
+ if !only_insert_qdrant {
830
+ if let Err ( err) = create_point_result {
831
+ if !upsert_by_tracking_id_being_used {
832
+ bulk_revert_insert_chunk_metadata_query (
833
+ inserted_chunk_metadata_ids,
834
+ web_pool. clone ( ) ,
835
+ )
827
836
. await ?;
828
- }
837
+ }
829
838
830
- return Err ( err) ;
839
+ return Err ( err) ;
840
+ }
841
+ } else {
842
+ create_point_result?;
843
+ update_dataset_chunk_count (
844
+ payload. dataset_id ,
845
+ inserted_chunk_metadata_ids. len ( ) as i32 ,
846
+ web_pool. clone ( ) ,
847
+ )
848
+ . await ?;
831
849
}
832
850
833
851
Ok ( inserted_chunk_metadata_ids)
@@ -841,14 +859,16 @@ async fn upload_chunk(
841
859
web_pool : actix_web:: web:: Data < models:: Pool > ,
842
860
reqwest_client : reqwest:: Client ,
843
861
) -> Result < uuid:: Uuid , ServiceError > {
844
- let tx_ctx = sentry:: TransactionContext :: new (
845
- "ingestion worker upload_chunk" ,
846
- "ingestion worker upload_chunk" ,
847
- ) ;
848
- let transaction = sentry:: start_transaction ( tx_ctx) ;
849
- sentry:: configure_scope ( |scope| scope. set_span ( Some ( transaction. clone ( ) . into ( ) ) ) ) ;
850
-
862
+ let dataset_id = payload. dataset_id ;
863
+ let qdrant_only = dataset_config. QDRANT_ONLY ;
851
864
let mut qdrant_point_id = uuid:: Uuid :: new_v4 ( ) ;
865
+ if qdrant_only {
866
+ if let Some ( tracking_id) = payload. chunk . tracking_id . clone ( ) {
867
+ qdrant_point_id =
868
+ uuid:: Uuid :: new_v5 ( & uuid:: Uuid :: NAMESPACE_OID , tracking_id. as_bytes ( ) ) ;
869
+ }
870
+ }
871
+
852
872
let content = match payload. chunk . convert_html_to_text . unwrap_or ( true ) {
853
873
true => convert_html_to_text ( & ( payload. chunk . chunk_html . clone ( ) . unwrap_or_default ( ) ) ) ,
854
874
false => payload. chunk . chunk_html . clone ( ) . unwrap_or_default ( ) ,
@@ -1015,44 +1035,50 @@ async fn upload_chunk(
1015
1035
1016
1036
//if collision is not nil, insert chunk with collision
1017
1037
let chunk_metadata_id = {
1038
+ let original_id = payload. ingest_specific_chunk_metadata . id ;
1039
+ let mut inserted_chunk_id = original_id;
1018
1040
payload. ingest_specific_chunk_metadata . qdrant_point_id = qdrant_point_id;
1019
1041
1020
- let insert_tx = transaction. start_child (
1021
- "calling_insert_chunk_metadata_query" ,
1022
- "calling_insert_chunk_metadata_query" ,
1023
- ) ;
1024
-
1025
- let inserted_chunk = insert_chunk_metadata_query (
1026
- chunk_metadata. clone ( ) ,
1027
- payload. chunk . group_ids . clone ( ) ,
1028
- payload. dataset_id ,
1029
- payload. upsert_by_tracking_id ,
1030
- web_pool. clone ( ) ,
1031
- )
1032
- . await ?;
1033
-
1034
- if payload. chunk . fulltext_boost . is_some ( ) || payload. chunk . semantic_boost . is_some ( ) {
1035
- insert_chunk_boost (
1036
- ChunkBoost {
1037
- chunk_id : inserted_chunk. id ,
1038
- fulltext_boost_phrase : payload. chunk . fulltext_boost . clone ( ) . map ( |x| x. phrase ) ,
1039
- fulltext_boost_factor : payload. chunk . fulltext_boost . map ( |x| x. boost_factor ) ,
1040
- semantic_boost_phrase : payload. chunk . semantic_boost . clone ( ) . map ( |x| x. phrase ) ,
1041
- semantic_boost_factor : payload
1042
- . chunk
1043
- . semantic_boost
1044
- . map ( |x| x. distance_factor as f64 ) ,
1045
- } ,
1042
+ let group_tag_set = if qdrant_only {
1043
+ None
1044
+ } else {
1045
+ let inserted_chunk = insert_chunk_metadata_query (
1046
+ chunk_metadata. clone ( ) ,
1047
+ payload. chunk . group_ids . clone ( ) ,
1048
+ payload. dataset_id ,
1049
+ payload. upsert_by_tracking_id ,
1046
1050
web_pool. clone ( ) ,
1047
1051
)
1048
1052
. await ?;
1049
- }
1050
-
1051
- insert_tx. finish ( ) ;
1053
+ inserted_chunk_id = inserted_chunk. id ;
1054
+
1055
+ if payload. chunk . fulltext_boost . is_some ( ) || payload. chunk . semantic_boost . is_some ( ) {
1056
+ insert_chunk_boost (
1057
+ ChunkBoost {
1058
+ chunk_id : inserted_chunk. id ,
1059
+ fulltext_boost_phrase : payload
1060
+ . chunk
1061
+ . fulltext_boost
1062
+ . clone ( )
1063
+ . map ( |x| x. phrase ) ,
1064
+ fulltext_boost_factor : payload. chunk . fulltext_boost . map ( |x| x. boost_factor ) ,
1065
+ semantic_boost_phrase : payload
1066
+ . chunk
1067
+ . semantic_boost
1068
+ . clone ( )
1069
+ . map ( |x| x. phrase ) ,
1070
+ semantic_boost_factor : payload
1071
+ . chunk
1072
+ . semantic_boost
1073
+ . map ( |x| x. distance_factor as f64 ) ,
1074
+ } ,
1075
+ web_pool. clone ( ) ,
1076
+ )
1077
+ . await ?;
1078
+ }
1052
1079
1053
- qdrant_point_id = inserted_chunk. qdrant_point_id ;
1080
+ qdrant_point_id = inserted_chunk. qdrant_point_id ;
1054
1081
1055
- let chunk_tags: Option < Vec < Option < String > > > =
1056
1082
if let Some ( ref group_ids) = payload. chunk . group_ids {
1057
1083
Some (
1058
1084
get_groups_from_group_ids_query ( group_ids. clone ( ) , web_pool. clone ( ) )
@@ -1065,10 +1091,11 @@ async fn upload_chunk(
1065
1091
)
1066
1092
} else {
1067
1093
None
1068
- } ;
1094
+ }
1095
+ } ;
1069
1096
1070
1097
let qdrant_payload =
1071
- QdrantPayload :: new ( chunk_metadata, payload. chunk . group_ids , None , chunk_tags ) ;
1098
+ QdrantPayload :: new ( chunk_metadata, payload. chunk . group_ids , None , group_tag_set ) ;
1072
1099
1073
1100
let vector_name = match & embedding_vector {
1074
1101
Some ( embedding_vector) => match embedding_vector. len ( ) {
@@ -1109,28 +1136,27 @@ async fn upload_chunk(
1109
1136
vector_payload,
1110
1137
qdrant_payload,
1111
1138
) ;
1112
- let insert_tx = transaction. start_child (
1113
- "calling_bulk_create_new_qdrant_points_query" ,
1114
- "calling_bulk_create_new_qdrant_points_query" ,
1115
- ) ;
1116
1139
1117
- if let Err ( e) = bulk_upsert_qdrant_points_query ( vec ! [ point] , dataset_config) . await {
1140
+ let upsert_qdrant_point_result =
1141
+ bulk_upsert_qdrant_points_query ( vec ! [ point] , dataset_config) . await ;
1142
+
1143
+ if let Err ( e) = upsert_qdrant_point_result {
1118
1144
log:: error!( "Failed to create qdrant point: {:?}" , e) ;
1119
1145
1120
- if payload. upsert_by_tracking_id {
1121
- bulk_revert_insert_chunk_metadata_query ( vec ! [ inserted_chunk . id ] , web_pool. clone ( ) )
1146
+ if !qdrant_only && ( payload. upsert_by_tracking_id || original_id == inserted_chunk_id ) {
1147
+ bulk_revert_insert_chunk_metadata_query ( vec ! [ inserted_chunk_id ] , web_pool. clone ( ) )
1122
1148
. await ?;
1123
1149
}
1124
1150
1125
1151
return Err ( e) ;
1126
1152
} ;
1153
+ if qdrant_only {
1154
+ update_dataset_chunk_count ( dataset_id, 1_i32 , web_pool. clone ( ) ) . await ?;
1155
+ }
1127
1156
1128
- insert_tx. finish ( ) ;
1129
-
1130
- inserted_chunk. id
1157
+ inserted_chunk_id
1131
1158
} ;
1132
1159
1133
- transaction. finish ( ) ;
1134
1160
Ok ( chunk_metadata_id)
1135
1161
}
1136
1162
0 commit comments