@@ -157,16 +157,14 @@ pub async fn create_crawl_query(
157
157
. as_ref ( )
158
158
. is_some_and ( |f| matches ! ( f, & ScrapeOptions :: Youtube ( _) ) )
159
159
{
160
- create_youtube_crawl_request ( crawl_options, dataset_id, broccoli_queue)
160
+ create_youtube_crawl_request ( crawl_options, dataset_id, pool , broccoli_queue)
161
161
. await
162
162
. map_err ( |err| ServiceError :: BadRequest ( format ! ( "Could not crawl site: {}" , err) ) ) ?;
163
163
Ok ( None )
164
164
} else {
165
165
let webhook_url = format ! (
166
166
"{}/api/file/html_page" ,
167
- std:: env:: var( "FOO_BAR" ) . unwrap_or(
168
- "https://5b0f-2600-1700-460-1070-f5b9-429e-fb2-70d5.ngrok-free.app" . to_string( )
169
- )
167
+ std:: env:: var( "BASE_SERVER_URL" ) . unwrap_or( "https://api.trieve.ai" . to_string( ) )
170
168
) ;
171
169
let webhook_metadata = serde_json:: json!( {
172
170
"dataset_id" : dataset_id,
@@ -307,10 +305,45 @@ pub async fn create_crawl_request(
307
305
pub async fn create_youtube_crawl_request (
308
306
crawl_options : CrawlOptions ,
309
307
dataset_id : uuid:: Uuid ,
308
+ pool : web:: Data < Pool > ,
310
309
broccoli_queue : web:: Data < BroccoliQueue > ,
311
310
) -> Result < ( ) , ServiceError > {
311
+ use crate :: data:: schema:: crawl_requests:: dsl as crawl_requests_table;
312
+
313
+ let interval = match crawl_options. interval {
314
+ Some ( CrawlInterval :: Daily ) => std:: time:: Duration :: from_secs ( 60 * 60 * 24 ) ,
315
+ Some ( CrawlInterval :: Weekly ) => std:: time:: Duration :: from_secs ( 60 * 60 * 24 * 7 ) ,
316
+ Some ( CrawlInterval :: Monthly ) => std:: time:: Duration :: from_secs ( 60 * 60 * 24 * 30 ) ,
317
+ None => std:: time:: Duration :: from_secs ( 60 * 60 * 24 ) ,
318
+ } ;
319
+
320
+ let new_crawl_request: CrawlRequestPG = CrawlRequest {
321
+ id : uuid:: Uuid :: new_v4 ( ) ,
322
+ url : crawl_options. site_url . clone ( ) . unwrap_or_default ( ) ,
323
+ status : CrawlStatus :: Pending ,
324
+ interval,
325
+ next_crawl_at : chrono:: Utc :: now ( ) . naive_utc ( ) ,
326
+ crawl_options,
327
+ scrape_id : uuid:: Uuid :: default ( ) ,
328
+ dataset_id,
329
+ created_at : chrono:: Utc :: now ( ) . naive_utc ( ) ,
330
+ attempt_number : 0 ,
331
+ }
332
+ . into ( ) ;
333
+
334
+ let mut conn = pool
335
+ . get ( )
336
+ . await
337
+ . map_err ( |e| ServiceError :: InternalServerError ( e. to_string ( ) ) ) ?;
338
+
339
+ diesel:: insert_into ( crawl_requests_table:: crawl_requests)
340
+ . values ( & new_crawl_request)
341
+ . execute ( & mut conn)
342
+ . await
343
+ . map_err ( |e| ServiceError :: InternalServerError ( e. to_string ( ) ) ) ?;
344
+
312
345
let message = VideoCrawlMessage {
313
- channel_url : crawl_options . site_url . clone ( ) . unwrap_or_default ( ) ,
346
+ channel_url : new_crawl_request . url . clone ( ) ,
314
347
dataset_id,
315
348
} ;
316
349
broccoli_queue
@@ -375,100 +408,91 @@ pub async fn update_crawl_settings_for_dataset(
375
408
redis_pool : web:: Data < RedisPool > ,
376
409
) -> Result < ( ) , ServiceError > {
377
410
use crate :: data:: schema:: crawl_requests:: dsl as crawl_requests_table;
378
- let mut merged_options = crawl_options. clone ( ) ;
379
- if crawl_options
380
- . scrape_options
381
- . as_ref ( )
382
- . is_some_and ( |f| matches ! ( f, & ScrapeOptions :: Youtube ( _) ) )
383
- {
384
- let mut conn = pool
385
- . get ( )
386
- . await
387
- . map_err ( |e| ServiceError :: InternalServerError ( e. to_string ( ) ) ) ?;
388
-
389
- let prev_crawl_req = crawl_requests_table:: crawl_requests
390
- . select ( (
391
- crawl_requests_table:: id,
392
- crawl_requests_table:: url,
393
- crawl_requests_table:: status,
394
- crawl_requests_table:: next_crawl_at,
395
- crawl_requests_table:: interval,
396
- crawl_requests_table:: crawl_options,
397
- crawl_requests_table:: scrape_id,
398
- crawl_requests_table:: dataset_id,
399
- crawl_requests_table:: created_at,
400
- ) )
401
- . filter ( crawl_requests_table:: dataset_id. eq ( dataset_id) )
402
- . first :: < CrawlRequestPG > ( & mut conn)
403
- . await
404
- . optional ( ) ?;
411
+ let mut conn = pool
412
+ . get ( )
413
+ . await
414
+ . map_err ( |e| ServiceError :: InternalServerError ( e. to_string ( ) ) ) ?;
405
415
406
- if let Some ( ref url) = crawl_options. site_url {
407
- diesel:: update (
408
- crawl_requests_table:: crawl_requests
409
- . filter ( crawl_requests_table:: dataset_id. eq ( dataset_id) ) ,
410
- )
411
- . set ( crawl_requests_table:: url. eq ( url) )
412
- . execute ( & mut conn)
413
- . await
414
- . map_err ( |e| {
415
- log:: error!( "Error updating url on crawl_requests: {:?}" , e) ;
416
- ServiceError :: InternalServerError (
417
- "Error updating url on crawl_requests" . to_string ( ) ,
418
- )
419
- } ) ?;
420
- }
416
+ let prev_crawl_req = crawl_requests_table:: crawl_requests
417
+ . select ( (
418
+ crawl_requests_table:: id,
419
+ crawl_requests_table:: url,
420
+ crawl_requests_table:: status,
421
+ crawl_requests_table:: next_crawl_at,
422
+ crawl_requests_table:: interval,
423
+ crawl_requests_table:: crawl_options,
424
+ crawl_requests_table:: scrape_id,
425
+ crawl_requests_table:: dataset_id,
426
+ crawl_requests_table:: created_at,
427
+ ) )
428
+ . filter ( crawl_requests_table:: dataset_id. eq ( dataset_id) )
429
+ . first :: < CrawlRequestPG > ( & mut conn)
430
+ . await
431
+ . optional ( ) ?;
421
432
422
- if let Some ( interval) = crawl_options. interval . clone ( ) {
423
- let interval = match interval {
424
- CrawlInterval :: Daily => std:: time:: Duration :: from_secs ( 60 * 60 * 24 ) ,
425
- CrawlInterval :: Weekly => std:: time:: Duration :: from_secs ( 60 * 60 * 24 * 7 ) ,
426
- CrawlInterval :: Monthly => std:: time:: Duration :: from_secs ( 60 * 60 * 24 * 30 ) ,
427
- } ;
428
- diesel:: update (
429
- crawl_requests_table:: crawl_requests
430
- . filter ( crawl_requests_table:: dataset_id. eq ( dataset_id) ) ,
431
- )
432
- . set ( crawl_requests_table:: interval. eq ( interval. as_secs ( ) as i32 ) )
433
- . execute ( & mut conn)
434
- . await
435
- . map_err ( |e| {
436
- log:: error!( "Error updating interval on crawl_requests: {:?}" , e) ;
437
- ServiceError :: InternalServerError (
438
- "Error updating interval on crawl_requests" . to_string ( ) ,
439
- )
440
- } ) ?;
441
- }
433
+ if let Some ( ref url) = crawl_options. site_url {
434
+ diesel:: update (
435
+ crawl_requests_table:: crawl_requests
436
+ . filter ( crawl_requests_table:: dataset_id. eq ( dataset_id) ) ,
437
+ )
438
+ . set ( crawl_requests_table:: url. eq ( url) )
439
+ . execute ( & mut conn)
440
+ . await
441
+ . map_err ( |e| {
442
+ log:: error!( "Error updating url on crawl_requests: {:?}" , e) ;
443
+ ServiceError :: InternalServerError ( "Error updating url on crawl_requests" . to_string ( ) )
444
+ } ) ?;
445
+ }
442
446
443
- merged_options = if let Some ( prev_crawl_req) = prev_crawl_req {
444
- let previous_crawl_options: CrawlOptions =
445
- serde_json:: from_value ( prev_crawl_req. crawl_options )
446
- . map_err ( |e| ServiceError :: InternalServerError ( e. to_string ( ) ) ) ?;
447
- crawl_options. merge ( previous_crawl_options)
448
- } else {
449
- crawl_options
447
+ if let Some ( interval) = crawl_options. interval . clone ( ) {
448
+ let interval = match interval {
449
+ CrawlInterval :: Daily => std:: time:: Duration :: from_secs ( 60 * 60 * 24 ) ,
450
+ CrawlInterval :: Weekly => std:: time:: Duration :: from_secs ( 60 * 60 * 24 * 7 ) ,
451
+ CrawlInterval :: Monthly => std:: time:: Duration :: from_secs ( 60 * 60 * 24 * 30 ) ,
450
452
} ;
451
-
452
453
diesel:: update (
453
454
crawl_requests_table:: crawl_requests
454
455
. filter ( crawl_requests_table:: dataset_id. eq ( dataset_id) ) ,
455
456
)
456
- . set ( crawl_requests_table:: crawl_options. eq (
457
- serde_json:: to_value ( merged_options. clone ( ) ) . map_err ( |e| {
458
- log:: error!( "Failed to serialize crawl options: {:?}" , e) ;
459
- ServiceError :: BadRequest ( "Failed to serialize crawl options" . to_string ( ) )
460
- } ) ?,
461
- ) )
457
+ . set ( crawl_requests_table:: interval. eq ( interval. as_secs ( ) as i32 ) )
462
458
. execute ( & mut conn)
463
459
. await
464
460
. map_err ( |e| {
465
- log:: error!( "Error updating crawl options on crawl_requests: {:?}" , e) ;
461
+ log:: error!( "Error updating interval on crawl_requests: {:?}" , e) ;
466
462
ServiceError :: InternalServerError (
467
- "Error updating crawl options on crawl_requests" . to_string ( ) ,
463
+ "Error updating interval on crawl_requests" . to_string ( ) ,
468
464
)
469
465
} ) ?;
470
466
}
471
467
468
+ let merged_options = if let Some ( prev_crawl_req) = prev_crawl_req {
469
+ let previous_crawl_options: CrawlOptions =
470
+ serde_json:: from_value ( prev_crawl_req. crawl_options )
471
+ . map_err ( |e| ServiceError :: InternalServerError ( e. to_string ( ) ) ) ?;
472
+ crawl_options. merge ( previous_crawl_options)
473
+ } else {
474
+ crawl_options
475
+ } ;
476
+
477
+ diesel:: update (
478
+ crawl_requests_table:: crawl_requests
479
+ . filter ( crawl_requests_table:: dataset_id. eq ( dataset_id) ) ,
480
+ )
481
+ . set ( crawl_requests_table:: crawl_options. eq (
482
+ serde_json:: to_value ( merged_options. clone ( ) ) . map_err ( |e| {
483
+ log:: error!( "Failed to serialize crawl options: {:?}" , e) ;
484
+ ServiceError :: BadRequest ( "Failed to serialize crawl options" . to_string ( ) )
485
+ } ) ?,
486
+ ) )
487
+ . execute ( & mut conn)
488
+ . await
489
+ . map_err ( |e| {
490
+ log:: error!( "Error updating crawl options on crawl_requests: {:?}" , e) ;
491
+ ServiceError :: InternalServerError (
492
+ "Error updating crawl options on crawl_requests" . to_string ( ) ,
493
+ )
494
+ } ) ?;
495
+
472
496
create_crawl_query (
473
497
merged_options. clone ( ) ,
474
498
pool. clone ( ) ,
0 commit comments