diff --git a/src/client.rs b/src/client.rs index f9dfe98c..5e02abaa 100644 --- a/src/client.rs +++ b/src/client.rs @@ -8,6 +8,7 @@ use crate::{ errors::*, indexes::*, key::{Key, KeyBuilder, KeyUpdater, KeysQuery, KeysResults}, + network::{NetworkState, NetworkUpdate}, request::*, search::*, task_info::TaskInfo, @@ -1148,6 +1149,46 @@ impl Client { crate::tenant_tokens::generate_tenant_token(api_key_uid, search_rules, api_key, expires_at) } + /// Get the current network state (/network) + pub async fn get_network_state(&self) -> Result { + self.http_client + .request::<(), (), NetworkState>( + &format!("{}/network", self.host), + Method::Get { query: () }, + 200, + ) + .await + } + + /// Partially update the network state (/network) + pub async fn update_network_state(&self, body: &NetworkUpdate) -> Result { + self.http_client + .request::<(), &NetworkUpdate, NetworkState>( + &format!("{}/network", self.host), + Method::Patch { query: (), body }, + 200, + ) + .await + } + + /// Convenience: set sharding=true/false + pub async fn set_sharding(&self, enabled: bool) -> Result { + let update = NetworkUpdate { + sharding: Some(enabled), + ..NetworkUpdate::default() + }; + self.update_network_state(&update).await + } + + /// Convenience: set self to a remote name + pub async fn set_self_remote(&self, name: &str) -> Result { + let update = NetworkUpdate { + self_name: Some(name.to_string()), + ..NetworkUpdate::default() + }; + self.update_network_state(&update).await + } + fn sleep_backend(&self) -> SleepBackend { SleepBackend::infer(self.http_client.is_tokio()) } @@ -1207,6 +1248,49 @@ pub struct Version { #[cfg(test)] mod tests { + use super::*; + use mockito::Matcher; + + #[tokio::test] + async fn test_network_update_and_deserialize_remotes() { + let mut s = mockito::Server::new_async().await; + let base = s.url(); + + let response_body = serde_json::json!({ + "remotes": { + "ms-00": { + "url": "http://ms-00", + "searchApiKey": "SEARCH", + "writeApiKey": "WRITE" + } + }, + "self": "ms-00", + "sharding": true + }) + .to_string(); + + let _m = s + .mock("PATCH", "/network") + .match_body(Matcher::Regex( + r#"\{.*"sharding"\s*:\s*true.*\}"#.to_string(), + )) + .with_status(200) + .with_header("content-type", "application/json") + .with_body(response_body) + .create_async() + .await; + + let client = Client::new(base, None::).unwrap(); + let updated = client + .set_sharding(true) + .await + .expect("update_network_state failed"); + assert_eq!(updated.sharding, Some(true)); + let remotes = updated.remotes.expect("remotes should be present"); + let ms00 = remotes.get("ms-00").expect("ms-00 should exist"); + assert_eq!(ms00.write_api_key.as_deref(), Some("WRITE")); + } + use big_s::S; use time::OffsetDateTime; diff --git a/src/lib.rs b/src/lib.rs index e4c8d5ac..2ba31fa2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -244,6 +244,8 @@ pub mod features; pub mod indexes; /// Module containing the [`Key`](key::Key) struct. pub mod key; +/// Module for Network configuration API (sharding/remotes). +pub mod network; pub mod request; /// Module related to search queries and results. pub mod search; diff --git a/src/network.rs b/src/network.rs new file mode 100644 index 00000000..d221f781 --- /dev/null +++ b/src/network.rs @@ -0,0 +1,37 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +#[derive(Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct RemoteConfig { + pub url: String, + #[serde(rename = "searchApiKey")] + pub search_api_key: String, + #[serde(rename = "writeApiKey", skip_serializing_if = "Option::is_none")] + // present in responses since 1.19 + pub write_api_key: Option, +} + +pub type RemotesMap = HashMap; + +/// Full network state returned by GET /network +#[derive(Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct NetworkState { + pub remotes: Option, + #[serde(rename = "self")] + pub self_name: Option, + pub sharding: Option, +} + +/// Partial update body for PATCH /network +#[derive(Default, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct NetworkUpdate { + #[serde(skip_serializing_if = "Option::is_none")] + pub remotes: Option, + #[serde(rename = "self", skip_serializing_if = "Option::is_none")] + pub self_name: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub sharding: Option, +} diff --git a/src/search.rs b/src/search.rs index 084a0d3f..53e2b872 100644 --- a/src/search.rs +++ b/src/search.rs @@ -415,8 +415,12 @@ pub struct SearchQuery<'a, Http: HttpClient> { #[derive(Debug, Serialize, Clone)] #[serde(rename_all = "camelCase")] pub struct QueryFederationOptions { + /// Weight multiplier for this query when merging federated results #[serde(skip_serializing_if = "Option::is_none")] pub weight: Option, + /// Remote instance name to target when sharding; corresponds to a key in network.remotes + #[serde(skip_serializing_if = "Option::is_none")] + pub remote: Option, } #[allow(missing_docs)] @@ -766,6 +770,7 @@ impl<'a, 'b, Http: HttpClient> MultiSearchQuery<'a, 'b, Http> { search_query, QueryFederationOptions { weight: Some(weight), + remote: None, }, ) } diff --git a/src/tasks.rs b/src/tasks.rs index 9977eae4..c06b21d7 100644 --- a/src/tasks.rs +++ b/src/tasks.rs @@ -1,4 +1,5 @@ use serde::{Deserialize, Deserializer, Serialize}; +use serde_json::{Map, Value}; use std::time::Duration; use time::OffsetDateTime; @@ -157,6 +158,9 @@ pub struct SucceededTask { pub canceled_by: Option, pub index_uid: Option, pub error: Option, + /// Remotes object returned by the server for this task (present since Meilisearch 1.19) + #[serde(skip_serializing_if = "Option::is_none")] + pub remotes: Option>, #[serde(flatten)] pub update_type: TaskType, pub uid: u32, @@ -174,6 +178,9 @@ pub struct EnqueuedTask { #[serde(with = "time::serde::rfc3339")] pub enqueued_at: OffsetDateTime, pub index_uid: Option, + /// Remotes object returned by the server for this enqueued task + #[serde(skip_serializing_if = "Option::is_none")] + pub remotes: Option>, #[serde(flatten)] pub update_type: TaskType, pub uid: u32, @@ -193,6 +200,9 @@ pub struct ProcessingTask { #[serde(with = "time::serde::rfc3339")] pub started_at: OffsetDateTime, pub index_uid: Option, + /// Remotes object returned by the server for this processing task + #[serde(skip_serializing_if = "Option::is_none")] + pub remotes: Option>, #[serde(flatten)] pub update_type: TaskType, pub uid: u32, @@ -738,6 +748,54 @@ impl<'a, Http: HttpClient> TasksQuery<'a, TasksPaginationFilters, Http> { #[cfg(test)] mod test { + + #[test] + fn test_deserialize_enqueued_task_with_remotes() { + let json = r#"{ + "enqueuedAt": "2022-02-03T13:02:38.369634Z", + "indexUid": "movies", + "status": "enqueued", + "type": "indexUpdate", + "uid": 12, + "remotes": { "ms-00": { "status": "ok" } } + }"#; + let task: Task = serde_json::from_str(json).unwrap(); + match task { + Task::Enqueued { content } => { + let remotes = content.remotes.expect("remotes should be present"); + assert!(remotes.contains_key("ms-00")); + } + _ => panic!("expected enqueued task"), + } + } + + #[test] + fn test_deserialize_processing_task_with_remotes() { + let json = r#"{ + "details": { + "indexedDocuments": null, + "receivedDocuments": 10 + }, + "duration": null, + "enqueuedAt": "2022-02-03T15:17:02.801341Z", + "finishedAt": null, + "indexUid": "movies", + "startedAt": "2022-02-03T15:17:02.812338Z", + "status": "processing", + "type": "documentAdditionOrUpdate", + "uid": 14, + "remotes": { "ms-00": { "status": "ok" } } + }"#; + let task: Task = serde_json::from_str(json).unwrap(); + match task { + Task::Processing { content } => { + let remotes = content.remotes.expect("remotes should be present"); + assert!(remotes.contains_key("ms-00")); + } + _ => panic!("expected processing task"), + } + } + use super::*; use crate::{ client::*, @@ -782,8 +840,7 @@ mod test { enqueued_at, index_uid: Some(index_uid), update_type: TaskType::DocumentAdditionOrUpdate { details: None }, - uid: 12, - } + uid: 12, .. } } if enqueued_at == datetime && index_uid == "meili"));