From f8451b37e13800cdc25e0ab2fede0e26e05570bb Mon Sep 17 00:00:00 2001 From: skeptrune Date: Thu, 12 Dec 2024 20:26:29 -0800 Subject: [PATCH] wip: can't deserialize firecrawl response in crawl_operator from crawl-worker --- .vscode/launch.json | 16 +- clients/ts-sdk/openapi.json | 258 +++++++++++++++++++++++++ clients/ts-sdk/src/types.gen.ts | 88 +++++++++ docker-compose-firecrawl.yml | 8 +- server/src/bin/crawl-worker.rs | 48 ++--- server/src/data/models.rs | 14 +- server/src/handlers/dataset_handler.rs | 4 +- server/src/handlers/file_handler.rs | 87 ++++++++- server/src/lib.rs | 10 + server/src/operators/crawl_operator.rs | 183 +++++++++++++++++- 10 files changed, 679 insertions(+), 37 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index e695d36e8e..c500c54e24 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -7,7 +7,7 @@ "name": "Debug executable 'trieve-server'", "cargo": { "args": [ - "+default", + "+nightly", "build", "--manifest-path=./server/Cargo.toml", "--bin=trieve-server", @@ -35,6 +35,20 @@ "args": [], "cwd": "${workspaceFolder}/server" }, + { + "type": "lldb", + "request": "launch", + "name": "Debug executable 'crawl-worker'", + "cargo": { + "args": [ + "build", + "--manifest-path=./server/Cargo.toml", + "--bin=crawl-worker" + ] + }, + "args": [], + "cwd": "${workspaceFolder}/server" + }, { "type": "lldb", "request": "launch", diff --git a/clients/ts-sdk/openapi.json b/clients/ts-sdk/openapi.json index 2cc419d7a7..e0417e8c8f 100644 --- a/clients/ts-sdk/openapi.json +++ b/clients/ts-sdk/openapi.json @@ -4387,6 +4387,42 @@ ] } }, + "/api/file/html_page": { + "post": { + "tags": [ + "File" + ], + "summary": "Upload HTML Page", + "description": "Chunk HTML by headings and queue for indexing into the specified dataset.", + "operationId": "upload_html_page", + "requestBody": { + "description": "JSON request payload to upload a file", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/UploadHtmlPageReqPayload" + } + } + }, + "required": true + }, + "responses": { + "204": { + "description": "Confirmation that html is being processed" + }, + "400": { + "description": "Service error relating to processing the file", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorResponseBody" + } + } + } + } + } + } + }, "/api/file/{file_id}": { "get": { "tags": [ @@ -8460,6 +8496,15 @@ "type": "string", "description": "The URL to crawl", "nullable": true + }, + "webhook_metadata": { + "description": "Metadata to send back with the webhook call for each successful page scrape", + "nullable": true + }, + "webhook_url": { + "type": "string", + "description": "Host to call back on the webhook for each successful page scrape", + "nullable": true } }, "example": { @@ -9642,6 +9687,44 @@ "dot" ] }, + "Document": { + "type": "object", + "required": [ + "metadata" + ], + "properties": { + "extract": { + "type": "string", + "nullable": true + }, + "html": { + "type": "string", + "nullable": true + }, + "links": { + "type": "array", + "items": { + "type": "string" + }, + "nullable": true + }, + "markdown": { + "type": "string", + "nullable": true + }, + "metadata": { + "$ref": "#/components/schemas/Metadata" + }, + "rawHtml": { + "type": "string", + "nullable": true + }, + "screenshot": { + "type": "string", + "nullable": true + } + } + }, "EditMessageReqPayload": { "type": "object", "required": [ @@ -11680,6 +11763,152 @@ "updated_at": "2021-01-01 00:00:00.000" } }, + "Metadata": { + "type": "object", + "properties": { + "articleSection": { + "type": "string", + "nullable": true + }, + "articleTag": { + "type": "string", + "nullable": true + }, + "dcDate": { + "type": "string", + "nullable": true + }, + "dcDateCreated": { + "type": "string", + "nullable": true + }, + "dcDescription": { + "type": "string", + "nullable": true + }, + "dcSubject": { + "type": "string", + "nullable": true + }, + "dcTermsAudience": { + "type": "string", + "nullable": true + }, + "dcTermsCreated": { + "type": "string", + "nullable": true + }, + "dcTermsKeywords": { + "type": "string", + "nullable": true + }, + "dcTermsSubject": { + "type": "string", + "nullable": true + }, + "dcTermsType": { + "type": "string", + "nullable": true + }, + "dcType": { + "type": "string", + "nullable": true + }, + "description": { + "type": "string", + "nullable": true + }, + "error": { + "type": "string", + "nullable": true + }, + "keywords": { + "type": "string", + "nullable": true + }, + "language": { + "type": "string", + "nullable": true + }, + "modifiedTime": { + "type": "string", + "nullable": true + }, + "ogAudio": { + "type": "string", + "nullable": true + }, + "ogDescription": { + "type": "string", + "nullable": true + }, + "ogDeterminer": { + "type": "string", + "nullable": true + }, + "ogImage": { + "type": "string", + "nullable": true + }, + "ogLocale": { + "type": "string", + "nullable": true + }, + "ogLocaleAlternate": { + "type": "array", + "items": { + "type": "string" + }, + "nullable": true + }, + "ogSiteName": { + "type": "string", + "nullable": true + }, + "ogTitle": { + "type": "string", + "nullable": true + }, + "ogUrl": { + "type": "string", + "nullable": true + }, + "ogVideo": { + "type": "string", + "nullable": true + }, + "publishedTime": { + "type": "string", + "nullable": true + }, + "robots": { + "type": "string", + "nullable": true + }, + "site_map": { + "allOf": [ + { + "$ref": "#/components/schemas/Sitemap" + } + ], + "nullable": true + }, + "sourceURL": { + "type": "string", + "nullable": true + }, + "statusCode": { + "type": "integer", + "format": "int32", + "nullable": true, + "minimum": 0 + }, + "title": { + "type": "string", + "nullable": true + } + } + }, "MmrOptions": { "type": "object", "description": "MMR Options lets you specify different methods to rerank the chunks in the result set using Maximal Marginal Relevance. If not specified, this defaults to the score of the chunks.", @@ -15025,6 +15254,17 @@ "pos_in_queue": 1 } }, + "Sitemap": { + "type": "object", + "required": [ + "changefreq" + ], + "properties": { + "changefreq": { + "type": "string" + } + } + }, "SlimChunkMetadata": { "type": "object", "required": [ @@ -16378,6 +16618,24 @@ } } }, + "UploadHtmlPageReqPayload": { + "type": "object", + "required": [ + "data", + "metadata", + "scrapeId" + ], + "properties": { + "data": { + "$ref": "#/components/schemas/Document" + }, + "metadata": {}, + "scrapeId": { + "type": "string", + "format": "uuid" + } + } + }, "UsageGraphPoint": { "type": "object", "required": [ diff --git a/clients/ts-sdk/src/types.gen.ts b/clients/ts-sdk/src/types.gen.ts index 7fa1b05421..14db79524f 100644 --- a/clients/ts-sdk/src/types.gen.ts +++ b/clients/ts-sdk/src/types.gen.ts @@ -624,6 +624,14 @@ export type CrawlOptions = { * The URL to crawl */ site_url?: (string) | null; + /** + * Metadata to send back with the webhook call for each successful page scrape + */ + webhook_metadata?: unknown; + /** + * Host to call back on the webhook for each successful page scrape + */ + webhook_url?: (string) | null; }; /** @@ -1084,6 +1092,16 @@ export type DeprecatedSearchOverGroupsResponseBody = { export type DistanceMetric = 'euclidean' | 'cosine' | 'manhattan' | 'dot'; +export type Document = { + extract?: (string) | null; + html?: (string) | null; + links?: Array<(string)> | null; + markdown?: (string) | null; + metadata: Metadata; + rawHtml?: (string) | null; + screenshot?: (string) | null; +}; + export type EditMessageReqPayload = { /** * If concat user messages query is set to true, all of the user messages in the topic will be concatenated together and used as the search query. If not specified, this defaults to false. Default is false. @@ -1879,6 +1897,42 @@ export type Message = { updated_at: string; }; +export type Metadata = { + articleSection?: (string) | null; + articleTag?: (string) | null; + dcDate?: (string) | null; + dcDateCreated?: (string) | null; + dcDescription?: (string) | null; + dcSubject?: (string) | null; + dcTermsAudience?: (string) | null; + dcTermsCreated?: (string) | null; + dcTermsKeywords?: (string) | null; + dcTermsSubject?: (string) | null; + dcTermsType?: (string) | null; + dcType?: (string) | null; + description?: (string) | null; + error?: (string) | null; + keywords?: (string) | null; + language?: (string) | null; + modifiedTime?: (string) | null; + ogAudio?: (string) | null; + ogDescription?: (string) | null; + ogDeterminer?: (string) | null; + ogImage?: (string) | null; + ogLocale?: (string) | null; + ogLocaleAlternate?: Array<(string)> | null; + ogSiteName?: (string) | null; + ogTitle?: (string) | null; + ogUrl?: (string) | null; + ogVideo?: (string) | null; + publishedTime?: (string) | null; + robots?: (string) | null; + site_map?: ((Sitemap) | null); + sourceURL?: (string) | null; + statusCode?: (number) | null; + title?: (string) | null; +}; + /** * MMR Options lets you specify different methods to rerank the chunks in the result set using Maximal Marginal Relevance. If not specified, this defaults to the score of the chunks. */ @@ -2778,6 +2832,10 @@ export type SingleQueuedChunkResponse = { pos_in_queue: number; }; +export type Sitemap = { + changefreq: string; +}; + export type SlimChunkMetadata = { created_at: string; dataset_id: string; @@ -3270,6 +3328,12 @@ export type UploadFileResponseBody = { file_metadata: File; }; +export type UploadHtmlPageReqPayload = { + data: Document; + metadata: unknown; + scrapeId: string; +}; + export type UsageGraphPoint = { requests: number; time_stamp: string; @@ -4238,6 +4302,15 @@ export type CreatePresignedUrlForCsvJsonlData = { export type CreatePresignedUrlForCsvJsonlResponse = (CreatePresignedUrlForCsvJsonResponseBody); +export type UploadHtmlPageData = { + /** + * JSON request payload to upload a file + */ + requestBody: UploadHtmlPageReqPayload; +}; + +export type UploadHtmlPageResponse = (void); + export type GetFileHandlerData = { /** * The id of the file to fetch @@ -5719,6 +5792,21 @@ export type $OpenApiTs = { }; }; }; + '/api/file/html_page': { + post: { + req: UploadHtmlPageData; + res: { + /** + * Confirmation that html is being processed + */ + 204: void; + /** + * Service error relating to processing the file + */ + 400: ErrorResponseBody; + }; + }; + }; '/api/file/{file_id}': { get: { req: GetFileHandlerData; diff --git a/docker-compose-firecrawl.yml b/docker-compose-firecrawl.yml index bcd62cfb7a..0e5d6e6531 100644 --- a/docker-compose-firecrawl.yml +++ b/docker-compose-firecrawl.yml @@ -2,7 +2,7 @@ name: firecrawl services: # Firecrawl services playwright-service: - image: trieve/puppeteer-service-ts:v0.0.7 + image: trieve/puppeteer-service-ts:v0.0.8 environment: - PORT=3000 - PROXY_SERVER=${PROXY_SERVER} @@ -15,7 +15,7 @@ services: - backend firecrawl-api: - image: trieve/firecrawl:v0.0.47 + image: trieve/firecrawl:v0.0.48 networks: - backend environment: @@ -27,7 +27,6 @@ services: - BULL_AUTH_KEY=${BULL_AUTH_KEY} - TEST_API_KEY=${TEST_API_KEY} - HOST=${HOST:-0.0.0.0} - - SELF_HOSTED_WEBHOOK_URL=${SELF_HOSTED_WEBHOOK_URL} - LOGGING_LEVEL=${LOGGING_LEVEL} extra_hosts: - "host.docker.internal:host-gateway" @@ -38,7 +37,7 @@ services: command: ["pnpm", "run", "start:production"] firecrawl-worker: - image: trieve/firecrawl:v0.0.46 + image: trieve/firecrawl:v0.0.48 networks: - backend environment: @@ -51,7 +50,6 @@ services: - TEST_API_KEY=${TEST_API_KEY} - SCRAPING_BEE_API_KEY=${SCRAPING_BEE_API_KEY} - HOST=${HOST:-0.0.0.0} - - SELF_HOSTED_WEBHOOK_URL=${SELF_HOSTED_WEBHOOK_URL} - LOGGING_LEVEL=${LOGGING_LEVEL} extra_hosts: - "host.docker.internal:host-gateway" diff --git a/server/src/bin/crawl-worker.rs b/server/src/bin/crawl-worker.rs index d39d337687..5dea9529c8 100644 --- a/server/src/bin/crawl-worker.rs +++ b/server/src/bin/crawl-worker.rs @@ -296,17 +296,17 @@ async fn get_chunks_with_firecrawl( let page_count = data.len(); for page in data { - let page = match page { + let crawl_doc = match page { Some(page) => page, None => continue, }; - if page.metadata.status_code != Some(200) { - log::error!("Error getting metadata for page: {:?}", page.metadata); + if crawl_doc.metadata.status_code != Some(200) { + log::error!("Error getting metadata for page: {:?}", crawl_doc.metadata); continue; } - let page_link = page + let page_link = crawl_doc .metadata .source_url .clone() @@ -316,14 +316,18 @@ async fn get_chunks_with_firecrawl( if page_link.is_empty() { println!( "Error page source_url is not present for page_metadata: {:?}", - page.metadata + crawl_doc.metadata ); continue; } - let page_title = page.metadata.og_title.clone().unwrap_or_default(); - let page_description = page.metadata.og_description.clone().unwrap_or_default(); - let page_html = page.html.clone().unwrap_or_default(); + let page_title = crawl_doc.metadata.og_title.clone().unwrap_or_default(); + let page_description = crawl_doc + .metadata + .og_description + .clone() + .unwrap_or(crawl_doc.metadata.description.unwrap_or_default().clone()); + let page_html = crawl_doc.html.clone().unwrap_or_default(); let page_tags = get_tags(page_link.clone()); if let Some(spec) = &spec { @@ -588,20 +592,20 @@ async fn get_chunks_with_firecrawl( #[allow(clippy::print_stdout)] async fn crawl( - scrape_request: CrawlRequest, + crawl_request: CrawlRequest, pool: web::Data, redis_pool: web::Data, ) -> Result { - log::info!("Starting crawl for scrape_id: {}", scrape_request.id); + log::info!("Starting crawl for scrape_id: {}", crawl_request.id); let (page_count, chunks_created) = if let Some(ScrapeOptions::Shopify(_)) = - scrape_request.crawl_options.scrape_options.clone() + crawl_request.crawl_options.scrape_options.clone() { let mut cur_page = 1; let mut chunk_count = 0; loop { let mut chunks: Vec = Vec::new(); - let cleaned_url = scrape_request.url.trim_end_matches("/"); + let cleaned_url = crawl_request.url.trim_end_matches("/"); let url = format!("{}/products.json?page={}", cleaned_url, cur_page); let response: ShopifyResponse = ureq::get(&url) @@ -622,7 +626,7 @@ async fn crawl( &product, &product.variants[0], cleaned_url, - &scrape_request, + &crawl_request, )?); } else { for variant in &product.variants { @@ -630,7 +634,7 @@ async fn crawl( &product, variant, cleaned_url, - &scrape_request, + &crawl_request, )?); } } @@ -640,7 +644,7 @@ async fn crawl( for chunk in chunks_to_upload { let (chunk_ingestion_message, chunk_metadatas) = - create_chunk_metadata(chunk.to_vec(), scrape_request.dataset_id).await?; + create_chunk_metadata(chunk.to_vec(), crawl_request.dataset_id).await?; let mut redis_conn = redis_pool .get() @@ -671,12 +675,12 @@ async fn crawl( (cur_page, chunk_count) } else { let (chunks, page_count) = - get_chunks_with_firecrawl(scrape_request.clone(), pool.clone()).await?; + get_chunks_with_firecrawl(crawl_request.clone(), pool.clone()).await?; let chunks_to_upload = chunks.chunks(120); - for chunk in chunks_to_upload { + for batch in chunks_to_upload { let (chunk_ingestion_message, chunk_metadatas) = - create_chunk_metadata(chunk.to_vec(), scrape_request.dataset_id).await?; + create_chunk_metadata(batch.to_vec(), crawl_request.dataset_id).await?; let mut redis_conn = redis_pool .get() @@ -703,21 +707,21 @@ async fn crawl( }; update_crawl_status( - scrape_request.scrape_id, + crawl_request.scrape_id, CrawlStatus::Completed, pool.clone(), ) .await?; update_next_crawl_at( - scrape_request.scrape_id, - scrape_request.next_crawl_at + scrape_request.interval, + crawl_request.scrape_id, + crawl_request.next_crawl_at + crawl_request.interval, pool.clone(), ) .await?; Ok(ScrapeReport { - request_id: scrape_request.id, + request_id: crawl_request.id, pages_scraped: page_count, chunks_created, }) diff --git a/server/src/data/models.rs b/server/src/data/models.rs index bd48ac1586..cafc2943de 100644 --- a/server/src/data/models.rs +++ b/server/src/data/models.rs @@ -7688,6 +7688,10 @@ pub struct CrawlOptions { pub body_remove_strings: Option>, /// Options for including an openapi spec in the crawl pub scrape_options: Option, + /// Host to call back on the webhook for each successful page scrape + pub webhook_url: Option, + /// Metadata to send back with the webhook call for each successful page scrape + pub webhook_metadata: Option, } #[derive(Serialize, Deserialize, Debug, ToSchema, Clone)] @@ -7733,6 +7737,8 @@ impl CrawlOptions { .body_remove_strings .clone() .or(other.body_remove_strings.clone()), + webhook_url: None, + webhook_metadata: None, } } } @@ -7753,6 +7759,10 @@ pub struct FirecrawlCrawlRequest { pub allow_external_links: Option, #[serde(skip_serializing_if = "Option::is_none")] pub scrape_options: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub webhook_url: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub webhook_metadata: Option, } #[derive(Debug, Serialize, Deserialize, Clone, ToSchema, Default)] @@ -7780,9 +7790,11 @@ impl From for FirecrawlCrawlRequest { scrape_options: Some(FirecrawlScraperOptions { include_tags: crawl_options.include_tags, exclude_tags: crawl_options.exclude_tags, - formats: Some(vec!["html".to_string(), "rawHtml".to_string()]), + formats: Some(vec!["rawHtml".to_string()]), wait_for: Some(1000), }), + webhook_url: crawl_options.webhook_url, + webhook_metadata: crawl_options.webhook_metadata, } } } diff --git a/server/src/handlers/dataset_handler.rs b/server/src/handlers/dataset_handler.rs index 653c29fcc3..37ce707cfa 100644 --- a/server/src/handlers/dataset_handler.rs +++ b/server/src/handlers/dataset_handler.rs @@ -9,7 +9,7 @@ use crate::{ middleware::auth_middleware::{verify_admin, verify_owner}, operators::{ crawl_operator::{ - crawl, get_crawl_request_by_dataset_id_query, update_crawl_settings_for_dataset, + create_crawl_query, get_crawl_request_by_dataset_id_query, update_crawl_settings_for_dataset, validate_crawl_options, }, dataset_operator::{ @@ -175,7 +175,7 @@ pub async fn create_dataset( let d = create_dataset_query(dataset.clone(), pool.clone()).await?; if let Some(crawl_options) = data.crawl_options.clone() { - crawl( + create_crawl_query( crawl_options.clone(), pool.clone(), redis_pool.clone(), diff --git a/server/src/handlers/file_handler.rs b/server/src/handlers/file_handler.rs index 561e6a4577..353a78ff81 100644 --- a/server/src/handlers/file_handler.rs +++ b/server/src/handlers/file_handler.rs @@ -10,10 +10,11 @@ use crate::{ errors::ServiceError, middleware::auth_middleware::verify_member, operators::{ + crawl_operator::{process_crawl_doc, Document}, file_operator::{ delete_file_query, get_aws_bucket, get_dataset_file_query, get_file_query, }, - organization_operator::get_file_size_sum_org, + organization_operator::{get_file_size_sum_org, hash_function}, }, }; use actix_web::{web, HttpResponse}; @@ -205,6 +206,90 @@ pub async fn upload_file_handler( Ok(HttpResponse::Ok().json(result)) } +#[derive(Debug, Serialize, Deserialize, Clone, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct UploadHtmlPageReqPayload { + pub data: Document, + pub metadata: serde_json::Value, + pub scrape_id: uuid::Uuid, +} + +/// Upload HTML Page +/// +/// Chunk HTML by headings and queue for indexing into the specified dataset. +#[utoipa::path( + post, + path = "/file/html_page", + context_path = "/api", + tag = "File", + request_body(content = UploadHtmlPageReqPayload, description = "JSON request payload to upload a file", content_type = "application/json"), + responses( + (status = 204, description = "Confirmation that html is being processed"), + (status = 400, description = "Service error relating to processing the file", body = ErrorResponseBody), + ), +)] +pub async fn upload_html_page( + data: web::Json, + redis_pool: web::Data, +) -> Result { + let req_payload = data.into_inner(); + + let dataset_id = req_payload + .metadata + .as_object() + .ok_or_else(|| { + ServiceError::BadRequest("metadata field must be a JSON object".to_string()) + })? + .get("dataset_id") + .ok_or_else(|| { + ServiceError::BadRequest("metadata field is required to specify dataset_id".to_string()) + })? + .as_str() + .ok_or_else(|| { + ServiceError::BadRequest("metadata field must have a valid dataset_id".to_string()) + })? + .parse::() + .map_err(|_| { + log::error!("metadata field must have a valid dataset_id"); + ServiceError::BadRequest("metadata field must have a valid dataset_id".to_string()) + })?; + + let webhook_secret = req_payload + .metadata + .as_object() + .ok_or_else(|| { + ServiceError::BadRequest("metadata field must be a JSON object".to_string()) + })? + .get("webhook_secret") + .ok_or_else(|| { + ServiceError::BadRequest("metadata field is required to specify dataset_id".to_string()) + })? + .as_str() + .ok_or_else(|| { + ServiceError::BadRequest("metadata field must have a valid dataset_id".to_string()) + })? + .parse::() + .map_err(|_| { + log::error!("metadata field must have a valid dataset_id"); + ServiceError::BadRequest("metadata field must have a valid dataset_id".to_string()) + })?; + + let cur_secret = hash_function( + std::env::var("STRIPE_WEBHOOK_SECRET") + .unwrap_or("firecrawl".to_string()) + .as_str(), + ); + + if webhook_secret != cur_secret { + log::error!("Webhook secret does not match."); + return Err(ServiceError::BadRequest("Webhook secret does not match.".to_string()).into()); + } + + process_crawl_doc(dataset_id, req_payload.data, redis_pool).await?; + + Ok(HttpResponse::NoContent().finish()) +} + /// Get File Signed URL /// /// Get a signed s3 url corresponding to the file_id requested such that you can download the file. diff --git a/server/src/lib.rs b/server/src/lib.rs index 263b53d689..d29a23c12c 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -225,6 +225,7 @@ impl Modify for SecurityAddon { handlers::file_handler::get_file_handler, handlers::file_handler::delete_file_handler, handlers::file_handler::create_presigned_url_for_csv_jsonl, + handlers::file_handler::upload_html_page, handlers::event_handler::get_events, handlers::organization_handler::create_organization, handlers::organization_handler::get_organization, @@ -353,6 +354,7 @@ impl Modify for SecurityAddon { handlers::file_handler::UploadFileResponseBody, handlers::file_handler::CreatePresignedUrlForCsvJsonlReqPayload, handlers::file_handler::CreatePresignedUrlForCsvJsonResponseBody, + handlers::file_handler::UploadHtmlPageReqPayload, handlers::invitation_handler::InvitationData, handlers::event_handler::GetEventsData, handlers::organization_handler::CreateOrganizationReqPayload, @@ -387,6 +389,9 @@ impl Modify for SecurityAddon { operators::analytics_operator::CTRRecommendationsWithoutClicksResponse, operators::analytics_operator::PopularFiltersResponse, operators::chunk_operator::HighlightStrategy, + operators::crawl_operator::Document, + operators::crawl_operator::Metadata, + operators::crawl_operator::Sitemap, handlers::stripe_handler::CreateSetupCheckoutSessionResPayload, handlers::page_handler::PublicPageSearchOptions, handlers::page_handler::OpenGraphMetadata, @@ -1097,6 +1102,11 @@ pub fn main() -> std::io::Result<()> { web::post().to(handlers::file_handler::upload_file_handler), ), ) + .service( + web::resource("/html_page").route( + web::post().to(handlers::file_handler::upload_html_page), + ), + ) .service( web::resource("/csv_or_jsonl") .route(web::post().to(handlers::file_handler::create_presigned_url_for_csv_jsonl)), diff --git a/server/src/operators/crawl_operator.rs b/server/src/operators/crawl_operator.rs index d15b1ffcbf..bd4745fab9 100644 --- a/server/src/operators/crawl_operator.rs +++ b/server/src/operators/crawl_operator.rs @@ -2,7 +2,10 @@ use crate::data::models::CrawlOptions; use crate::data::models::CrawlStatus; use crate::data::models::FirecrawlCrawlRequest; use crate::data::models::RedisPool; +use crate::handlers::chunk_handler::ChunkReqPayload; use crate::handlers::chunk_handler::CrawlInterval; +use crate::handlers::chunk_handler::FullTextBoost; +use crate::handlers::chunk_handler::SemanticBoost; use crate::{ data::models::{CrawlRequest, CrawlRequestPG, Pool, ScrapeOptions}, errors::ServiceError, @@ -16,7 +19,10 @@ use reqwest::Url; use scraper::Html; use scraper::Selector; use serde::{Deserialize, Serialize}; +use utoipa::ToSchema; +use super::chunk_operator::create_chunk_metadata; +use super::organization_operator::hash_function; use super::parse_operator::convert_html_to_text; #[derive(Debug, Serialize, Deserialize, Clone)] @@ -39,7 +45,7 @@ pub enum Status { Cancelled, } -#[derive(Debug, Serialize, Deserialize, Clone)] +#[derive(Debug, Serialize, Deserialize, Clone, ToSchema)] pub struct Document { pub markdown: Option, pub extract: Option, @@ -51,7 +57,7 @@ pub struct Document { pub metadata: Metadata, } -#[derive(Debug, Serialize, Deserialize, Clone)] +#[derive(Debug, Serialize, Deserialize, Clone, ToSchema)] pub struct Metadata { pub title: Option, pub description: Option, @@ -114,7 +120,7 @@ pub struct Metadata { pub site_map: Option, } -#[derive(Debug, Serialize, Deserialize, Clone)] +#[derive(Debug, Serialize, Deserialize, Clone, ToSchema)] pub struct Sitemap { pub changefreq: String, } @@ -135,7 +141,7 @@ pub fn validate_crawl_options(crawl_options: &CrawlOptions) -> Result, redis_pool: web::Data, @@ -143,6 +149,20 @@ pub async fn crawl( ) -> Result { validate_crawl_options(&crawl_options)?; + let webhook_url = format!( + "{}/api/file/html_page", + std::env::var("FOO_BAR").unwrap_or( + "https://5b0f-2600-1700-460-1070-f5b9-429e-fb2-70d5.ngrok-free.app".to_string() + ) + ); + let webhook_metadata = serde_json::json!({ + "dataset_id": dataset_id, + "webhook_secret": hash_function(std::env::var("STRIPE_WEBHOOK_SECRET").unwrap_or("firecrawl".to_string()).as_str()) + }); + let mut crawl_options = crawl_options.clone(); + crawl_options.webhook_url = Some(webhook_url); + crawl_options.webhook_metadata = Some(webhook_metadata); + let scrape_id = if let Some(ScrapeOptions::Shopify(_)) = crawl_options.scrape_options { uuid::Uuid::nil() } else { @@ -178,6 +198,7 @@ pub async fn get_crawl_request_by_dataset_id_query( crawl_requests_table::dataset_id, crawl_requests_table::created_at, )) + .order_by(crawl_requests_table::created_at.desc()) .first(&mut conn) .await .optional() @@ -406,7 +427,7 @@ pub async fn update_crawl_settings_for_dataset( ) })?; - crawl( + create_crawl_query( merged_options.clone(), pool.clone(), redis_pool.clone(), @@ -676,3 +697,155 @@ fn extract_all_headings(html: &str) -> String { .collect::>() .join("\n") } + +pub async fn process_crawl_doc( + dataset_id: uuid::Uuid, + crawl_doc: Document, + redis_pool: web::Data, +) -> Result<(), ServiceError> { + if crawl_doc.metadata.status_code != Some(200) { + log::error!("Error getting metadata for page: {:?}", crawl_doc.metadata); + return Err(ServiceError::BadRequest( + "Error getting metadata for page".to_string(), + )); + } + + let page_link = crawl_doc + .metadata + .source_url + .clone() + .unwrap_or_default() + .trim_end_matches("/") + .to_string(); + if page_link.is_empty() { + log::error!( + "Error page source_url is not present for page_metadata: {:?}", + crawl_doc.metadata + ); + return Ok(()); + } + + let page_title = crawl_doc.metadata.og_title.clone().unwrap_or_default(); + let page_description = crawl_doc + .metadata + .og_description + .clone() + .unwrap_or(crawl_doc.metadata.description.unwrap_or_default().clone()); + let page_html = crawl_doc.html.clone().unwrap_or_default(); + let page_tags = get_tags(page_link.clone()); + + let chunked_html = chunk_html(&page_html.clone(), None, None); + let mut chunks = vec![]; + + for chunk in chunked_html { + let heading = chunk.0.clone(); + let chunk_html = chunk.1.clone(); + + if chunk_html.is_empty() { + log::error!("Skipping empty chunk for page: {}", page_link); + return Ok(()); + } + + let mut metadata = serde_json::json!({ + "url": page_link.clone(), + "hierarchy": chunk.0.clone(), + }); + + let mut semantic_boost_phrase = heading.clone(); + let mut fulltext_boost_phrase = heading.clone(); + metadata["heading"] = serde_json::json!(heading.clone()); + + if !page_title.is_empty() { + semantic_boost_phrase.push_str(format!("\n\n{}", page_title).as_str()); + fulltext_boost_phrase.push_str(format!("\n\n{}", page_title).as_str()); + + metadata["title"] = serde_json::json!(page_title.clone()); + metadata["hierarchy"] + .as_array_mut() + .unwrap_or(&mut vec![]) + .insert(0, serde_json::json!(page_title.clone())); + } + if !page_description.is_empty() { + semantic_boost_phrase.push_str(format!("\n\n{}", page_description).as_str()); + + metadata["description"] = serde_json::json!(page_description.clone()); + } + + let tracking_hash_val = if heading.is_empty() { + chunk_html.clone() + } else { + heading.clone() + }; + + let chunk = ChunkReqPayload { + chunk_html: Some(chunk_html.clone()), + link: Some(page_link.clone()), + tag_set: Some(page_tags.clone()), + metadata: Some(serde_json::json!(metadata)), + tracking_id: Some(hash_function(&format!( + "{}{}", + page_link + .trim_end_matches("/") + .split("/") + .collect::>() + .split_at(3) + .1 + .join("/"), + tracking_hash_val + ))), + upsert_by_tracking_id: Some(true), + group_tracking_ids: Some(vec![if !page_title.is_empty() { + page_title.clone() + } else { + page_link.clone() + }]), + fulltext_boost: if !fulltext_boost_phrase.is_empty() { + Some(FullTextBoost { + phrase: fulltext_boost_phrase, + boost_factor: 1.3, + }) + } else { + None + }, + semantic_boost: if !semantic_boost_phrase.is_empty() { + Some(SemanticBoost { + phrase: semantic_boost_phrase, + distance_factor: 0.3, + }) + } else { + None + }, + convert_html_to_text: Some(true), + ..Default::default() + }; + chunks.push(chunk); + } + + log::info!("Uploading {} chunks for page: {}", chunks.len(), page_link); + let chunks_to_upload = chunks.chunks(120); + for batch in chunks_to_upload { + let (chunk_ingestion_message, chunk_metadatas) = + create_chunk_metadata(batch.to_vec(), dataset_id).await?; + + let mut redis_conn = redis_pool + .get() + .await + .map_err(|err| ServiceError::BadRequest(err.to_string()))?; + + if !chunk_metadatas.is_empty() { + let serialized_message: String = serde_json::to_string(&chunk_ingestion_message) + .map_err(|_| { + ServiceError::BadRequest("Failed to Serialize BulkUploadMessage".to_string()) + })?; + + redis::cmd("lpush") + .arg("ingestion") + .arg(&serialized_message) + .query_async::(&mut *redis_conn) + .await + .map_err(|err| ServiceError::BadRequest(err.to_string()))?; + } + } + + Ok(()) +}