1
- use actix_web:: web;
2
- use futures:: future:: join_all;
3
- use itertools:: Itertools ;
4
- use serde:: { Deserialize , Serialize } ;
5
- use ureq:: json;
6
- use utoipa:: ToSchema ;
7
-
1
+ use super :: chunk_operator:: get_metadata_from_tracking_id_query;
8
2
use crate :: {
9
3
data:: models:: {
10
4
ClusterAnalyticsFilter , ClusterTopicsClickhouse , DatasetAnalytics , Granularity ,
@@ -26,8 +20,14 @@ use crate::{
26
20
CTRDataRequestBody , GetTopDatasetsRequestBody , RateQueryRequest ,
27
21
} ,
28
22
} ;
29
-
30
- use super :: chunk_operator:: get_metadata_from_tracking_id_query;
23
+ use actix_web:: web;
24
+ use diesel:: prelude:: * ;
25
+ use diesel_async:: RunQueryDsl ;
26
+ use futures:: future:: join_all;
27
+ use itertools:: Itertools ;
28
+ use serde:: { Deserialize , Serialize } ;
29
+ use ureq:: json;
30
+ use utoipa:: ToSchema ;
31
31
32
32
#[ derive( Debug , Serialize , Deserialize , ToSchema ) ]
33
33
pub struct SearchClusterResponse {
@@ -1359,7 +1359,10 @@ pub async fn set_query_rating_query(
1359
1359
pub async fn get_top_datasets_query (
1360
1360
data : GetTopDatasetsRequestBody ,
1361
1361
clickhouse_client : & clickhouse:: Client ,
1362
+ pool : web:: Data < Pool > ,
1362
1363
) -> Result < Vec < TopDatasetsResponse > , ServiceError > {
1364
+ use crate :: data:: schema:: datasets:: dsl as datasets_columns;
1365
+
1363
1366
let mut query_string = format ! (
1364
1367
"SELECT
1365
1368
dataset_id,
@@ -1393,7 +1396,7 @@ pub async fn get_top_datasets_query(
1393
1396
LIMIT 10" ,
1394
1397
) ;
1395
1398
1396
- let clickhouse_query = clickhouse_client
1399
+ let clickhouse_resp_data = clickhouse_client
1397
1400
. query ( query_string. as_str ( ) )
1398
1401
. fetch_all :: < TopDatasetsResponseClickhouse > ( )
1399
1402
. await
@@ -1402,9 +1405,34 @@ pub async fn get_top_datasets_query(
1402
1405
ServiceError :: InternalServerError ( "Error fetching query" . to_string ( ) )
1403
1406
} ) ?;
1404
1407
1405
- let response = clickhouse_query
1408
+ let dataset_ids = clickhouse_resp_data
1409
+ . iter ( )
1410
+ . map ( |x| x. dataset_id )
1411
+ . collect :: < Vec < _ > > ( ) ;
1412
+ let mut conn = pool
1413
+ . get ( )
1414
+ . await
1415
+ . map_err ( |_| ServiceError :: BadRequest ( "Could not get database connection" . to_string ( ) ) ) ?;
1416
+ let dataset_id_and_tracking_ids = datasets_columns:: datasets
1417
+ . select ( ( datasets_columns:: id, datasets_columns:: tracking_id) )
1418
+ . filter ( datasets_columns:: id. eq_any ( dataset_ids) )
1419
+ . load :: < ( uuid:: Uuid , Option < String > ) > ( & mut conn)
1420
+ . await
1421
+ . map_err ( |e| {
1422
+ log:: error!( "Error fetching dataset ids: {:?}" , e) ;
1423
+ ServiceError :: InternalServerError ( "Error fetching dataset ids" . to_string ( ) )
1424
+ } ) ?;
1425
+
1426
+ let response = clickhouse_resp_data
1406
1427
. into_iter ( )
1407
- . map ( |x| x. into ( ) )
1428
+ . map ( |x| {
1429
+ let mut top_dataset_resps = TopDatasetsResponse :: from ( x. clone ( ) ) ;
1430
+ top_dataset_resps. dataset_tracking_id = dataset_id_and_tracking_ids
1431
+ . iter ( )
1432
+ . find ( |( id, _) | id == & x. dataset_id )
1433
+ . and_then ( |( _, tracking_id) | tracking_id. clone ( ) ) ;
1434
+ top_dataset_resps
1435
+ } )
1408
1436
. collect :: < Vec < _ > > ( ) ;
1409
1437
1410
1438
Ok ( response)
0 commit comments