From 285b765290f415cc9e9319bd50d58f815a03066a Mon Sep 17 00:00:00 2001 From: Tamo Date: Mon, 6 Feb 2023 00:46:15 +0100 Subject: [PATCH 1/2] provide two methods to add documents from an async reader --- Cargo.toml | 1 + src/indexes.rs | 134 +++++++++++++++++++++++++++++++++++++++++++++++++ src/request.rs | 94 ++++++++++++++++++++++++++++++++++ 3 files changed, 229 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index 44ae64c4..e4ab55e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ meilisearch-index-setting-macro = { path = "meilisearch-index-setting-macro", ve [target.'cfg(not(target_arch = "wasm32"))'.dependencies] futures = "0.3" +futures-io = "0.3.26" isahc = { version = "1.0", features = ["http2", "text-decoding"], default_features = false } uuid = { version = "1.1.2", features = ["v4"] } diff --git a/src/indexes.rs b/src/indexes.rs index ee8b793b..c37b9817 100644 --- a/src/indexes.rs +++ b/src/indexes.rs @@ -559,6 +559,73 @@ impl Index { .await } + /// Add a raw and unchecked payload to meilisearch. + /// This can be useful if your application is only forwarding data from other sources. + /// + /// If you send an already existing document (same id) the **whole existing document** will be overwritten by the new document. + /// Fields previously in the document not present in the new document are removed. + /// + /// For a partial update of the document see [Index::add_or_update_unchecked_payload]. + /// + /// # Example + /// + /// ``` + /// use serde::{Serialize, Deserialize}; + /// + /// # use meilisearch_sdk::{client::*, indexes::*}; + /// # use std::thread::sleep; + /// # use std::time::Duration; + /// # + /// # let MEILISEARCH_URL = option_env!("MEILISEARCH_URL").unwrap_or("http://localhost:7700"); + /// # let MEILISEARCH_API_KEY = option_env!("MEILISEARCH_API_KEY").unwrap_or("masterKey"); + /// # futures::executor::block_on(async move { + /// let client = Client::new(MEILISEARCH_URL, MEILISEARCH_API_KEY); + /// let movie_index = client.index("add_or_replace_unchecked_payload"); + /// + /// let task = movie_index.add_or_replace_unchecked_payload( + /// r#"{ "id": 1, "body": "doggo" } + /// { "id": 2, "body": "catto" }"#.as_bytes(), + /// "application/x-ndjson", + /// Some("id"), + /// ).await.unwrap(); + /// // Meilisearch may take some time to execute the request so we are going to wait till it's completed + /// client.wait_for_task(task, None, None).await.unwrap(); + /// + /// let movies = movie_index.get_documents::().await.unwrap(); + /// assert!(movies.results.len() == 2); + /// # movie_index.delete().await.unwrap().wait_for_completion(&client, None, None).await.unwrap(); + /// # }); + /// ``` + #[cfg(not(target_arch = "wasm32"))] + pub async fn add_or_replace_unchecked_payload< + T: futures_io::AsyncRead + Send + Sync + 'static, + >( + &self, + payload: T, + content_type: &str, + primary_key: Option<&str>, + ) -> Result { + let url = if let Some(primary_key) = primary_key { + format!( + "{}/indexes/{}/documents?primaryKey={}", + self.client.host, self.uid, primary_key + ) + } else { + format!("{}/indexes/{}/documents", self.client.host, self.uid) + }; + stream_request::<(), T, TaskInfo>( + &url, + &self.client.api_key, + Method::Post { + query: (), + body: payload, + }, + content_type, + 202, + ) + .await + } + /// Alias for [Index::add_or_replace]. pub async fn add_documents( &self, @@ -648,6 +715,73 @@ impl Index { .await } + /// Add a raw and unchecked payload to meilisearch. + /// This can be useful if your application is only forwarding data from other sources. + /// + /// If you send an already existing document (same id) the old document will be only partially updated according to the fields of the new document. + /// Thus, any fields not present in the new document are kept and remained unchanged. + /// + /// To completely overwrite a document, check out the [Index::add_or_replace_unchecked_payload] documents method. + /// + /// # Example + /// + /// ``` + /// use serde::{Serialize, Deserialize}; + /// + /// # use meilisearch_sdk::{client::*, indexes::*}; + /// # use std::thread::sleep; + /// # use std::time::Duration; + /// # + /// # let MEILISEARCH_URL = option_env!("MEILISEARCH_URL").unwrap_or("http://localhost:7700"); + /// # let MEILISEARCH_API_KEY = option_env!("MEILISEARCH_API_KEY").unwrap_or("masterKey"); + /// # futures::executor::block_on(async move { + /// let client = Client::new(MEILISEARCH_URL, MEILISEARCH_API_KEY); + /// let movie_index = client.index("add_or_replace_unchecked_payload"); + /// + /// let task = movie_index.add_or_update_unchecked_payload( + /// r#"{ "id": 1, "body": "doggo" } + /// { "id": 2, "body": "catto" }"#.as_bytes(), + /// "application/x-ndjson", + /// Some("id"), + /// ).await.unwrap(); + /// // Meilisearch may take some time to execute the request so we are going to wait till it's completed + /// client.wait_for_task(task, None, None).await.unwrap(); + /// + /// let movies = movie_index.get_documents::().await.unwrap(); + /// assert!(movies.results.len() == 2); + /// # movie_index.delete().await.unwrap().wait_for_completion(&client, None, None).await.unwrap(); + /// # }); + /// ``` + #[cfg(not(target_arch = "wasm32"))] + pub async fn add_or_update_unchecked_payload< + T: futures_io::AsyncRead + Send + Sync + 'static, + >( + &self, + payload: T, + content_type: &str, + primary_key: Option<&str>, + ) -> Result { + let url = if let Some(primary_key) = primary_key { + format!( + "{}/indexes/{}/documents?primaryKey={}", + self.client.host, self.uid, primary_key + ) + } else { + format!("{}/indexes/{}/documents", self.client.host, self.uid) + }; + stream_request::<(), T, TaskInfo>( + &url, + &self.client.api_key, + Method::Put { + query: (), + body: payload, + }, + content_type, + 202, + ) + .await + } + /// Delete all documents in the index. /// /// # Example diff --git a/src/request.rs b/src/request.rs index 3a111598..87568b2b 100644 --- a/src/request.rs +++ b/src/request.rs @@ -115,6 +115,100 @@ pub(crate) async fn request< parse_response(status, expected_status_code, body) } +#[cfg(not(target_arch = "wasm32"))] +pub(crate) async fn stream_request< + 'a, + Query: Serialize, + Body: futures_io::AsyncRead + Send + Sync + 'static, + Output: DeserializeOwned + 'static, +>( + url: &str, + apikey: &str, + method: Method, + content_type: &str, + expected_status_code: u16, +) -> Result { + use isahc::http::header; + use isahc::*; + + let auth = format!("Bearer {}", apikey); + let user_agent = qualified_version(); + + let mut response = match method { + Method::Get { query } => { + let url = add_query_parameters(url, &query)?; + + Request::get(url) + .header(header::AUTHORIZATION, auth) + .header(header::USER_AGENT, user_agent) + .body(()) + .map_err(|_| crate::errors::Error::InvalidRequest)? + .send_async() + .await? + } + Method::Delete { query } => { + let url = add_query_parameters(url, &query)?; + + Request::delete(url) + .header(header::AUTHORIZATION, auth) + .header(header::USER_AGENT, user_agent) + .body(()) + .map_err(|_| crate::errors::Error::InvalidRequest)? + .send_async() + .await? + } + Method::Post { query, body } => { + let url = add_query_parameters(url, &query)?; + + Request::post(url) + .header(header::AUTHORIZATION, auth) + .header(header::USER_AGENT, user_agent) + .header(header::CONTENT_TYPE, content_type) + .body(AsyncBody::from_reader(body)) + .map_err(|_| crate::errors::Error::InvalidRequest)? + .send_async() + .await? + } + Method::Patch { query, body } => { + let url = add_query_parameters(url, &query)?; + + Request::patch(url) + .header(header::AUTHORIZATION, auth) + .header(header::USER_AGENT, user_agent) + .header(header::CONTENT_TYPE, content_type) + .body(AsyncBody::from_reader(body)) + .map_err(|_| crate::errors::Error::InvalidRequest)? + .send_async() + .await? + } + Method::Put { query, body } => { + let url = add_query_parameters(url, &query)?; + + Request::put(url) + .header(header::AUTHORIZATION, auth) + .header(header::USER_AGENT, user_agent) + .header(header::CONTENT_TYPE, content_type) + .body(AsyncBody::from_reader(body)) + .map_err(|_| crate::errors::Error::InvalidRequest)? + .send_async() + .await? + } + }; + + let status = response.status().as_u16(); + + let mut body = response + .text() + .await + .map_err(|e| crate::errors::Error::HttpError(e.into()))?; + + if body.is_empty() { + body = "null".to_string(); + } + + parse_response(status, expected_status_code, body) +} + #[cfg(target_arch = "wasm32")] pub fn add_query_parameters( mut url: String, From 148b2a249ac00b02daa871d81c5ca9b52f64d149 Mon Sep 17 00:00:00 2001 From: Charlotte Vermandel Date: Mon, 20 Feb 2023 15:38:02 +0100 Subject: [PATCH 2/2] Fix clippy error --- src/request.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/request.rs b/src/request.rs index 7e09acc5..81c2a255 100644 --- a/src/request.rs +++ b/src/request.rs @@ -131,7 +131,7 @@ pub(crate) async fn stream_request< use isahc::http::header; use isahc::*; - let auth = format!("Bearer {}", apikey); + let auth = format!("Bearer {apikey}"); let user_agent = qualified_version(); let mut response = match method {