From d71e23857fe811d18008443d5f13f49566811931 Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Mon, 22 Sep 2025 01:13:13 +0530 Subject: [PATCH 1/4] Add sharding support for Network methods --- src/client.rs | 84 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 2 ++ src/network.rs | 37 ++++++++++++++++++++++ src/search.rs | 5 +++ src/tasks.rs | 62 +++++++++++++++++++++++++++++++++++-- 5 files changed, 188 insertions(+), 2 deletions(-) create mode 100644 src/network.rs diff --git a/src/client.rs b/src/client.rs index f9dfe98c..5e02abaa 100644 --- a/src/client.rs +++ b/src/client.rs @@ -8,6 +8,7 @@ use crate::{ errors::*, indexes::*, key::{Key, KeyBuilder, KeyUpdater, KeysQuery, KeysResults}, + network::{NetworkState, NetworkUpdate}, request::*, search::*, task_info::TaskInfo, @@ -1148,6 +1149,46 @@ impl Client { crate::tenant_tokens::generate_tenant_token(api_key_uid, search_rules, api_key, expires_at) } + /// Get the current network state (/network) + pub async fn get_network_state(&self) -> Result { + self.http_client + .request::<(), (), NetworkState>( + &format!("{}/network", self.host), + Method::Get { query: () }, + 200, + ) + .await + } + + /// Partially update the network state (/network) + pub async fn update_network_state(&self, body: &NetworkUpdate) -> Result { + self.http_client + .request::<(), &NetworkUpdate, NetworkState>( + &format!("{}/network", self.host), + Method::Patch { query: (), body }, + 200, + ) + .await + } + + /// Convenience: set sharding=true/false + pub async fn set_sharding(&self, enabled: bool) -> Result { + let update = NetworkUpdate { + sharding: Some(enabled), + ..NetworkUpdate::default() + }; + self.update_network_state(&update).await + } + + /// Convenience: set self to a remote name + pub async fn set_self_remote(&self, name: &str) -> Result { + let update = NetworkUpdate { + self_name: Some(name.to_string()), + ..NetworkUpdate::default() + }; + self.update_network_state(&update).await + } + fn sleep_backend(&self) -> SleepBackend { SleepBackend::infer(self.http_client.is_tokio()) } @@ -1207,6 +1248,49 @@ pub struct Version { #[cfg(test)] mod tests { + use super::*; + use mockito::Matcher; + + #[tokio::test] + async fn test_network_update_and_deserialize_remotes() { + let mut s = mockito::Server::new_async().await; + let base = s.url(); + + let response_body = serde_json::json!({ + "remotes": { + "ms-00": { + "url": "http://ms-00", + "searchApiKey": "SEARCH", + "writeApiKey": "WRITE" + } + }, + "self": "ms-00", + "sharding": true + }) + .to_string(); + + let _m = s + .mock("PATCH", "/network") + .match_body(Matcher::Regex( + r#"\{.*"sharding"\s*:\s*true.*\}"#.to_string(), + )) + .with_status(200) + .with_header("content-type", "application/json") + .with_body(response_body) + .create_async() + .await; + + let client = Client::new(base, None::).unwrap(); + let updated = client + .set_sharding(true) + .await + .expect("update_network_state failed"); + assert_eq!(updated.sharding, Some(true)); + let remotes = updated.remotes.expect("remotes should be present"); + let ms00 = remotes.get("ms-00").expect("ms-00 should exist"); + assert_eq!(ms00.write_api_key.as_deref(), Some("WRITE")); + } + use big_s::S; use time::OffsetDateTime; diff --git a/src/lib.rs b/src/lib.rs index e4c8d5ac..2ba31fa2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -244,6 +244,8 @@ pub mod features; pub mod indexes; /// Module containing the [`Key`](key::Key) struct. pub mod key; +/// Module for Network configuration API (sharding/remotes). +pub mod network; pub mod request; /// Module related to search queries and results. pub mod search; diff --git a/src/network.rs b/src/network.rs new file mode 100644 index 00000000..9f7857e0 --- /dev/null +++ b/src/network.rs @@ -0,0 +1,37 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct RemoteConfig { + pub url: String, + #[serde(rename = "searchApiKey")] + pub search_api_key: String, + #[serde(rename = "writeApiKey", skip_serializing_if = "Option::is_none")] + // present in responses since 1.19 + pub write_api_key: Option, +} + +pub type RemotesMap = HashMap; + +/// Full network state returned by GET /network +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct NetworkState { + pub remotes: Option, + #[serde(rename = "self")] + pub self_name: Option, + pub sharding: Option, +} + +/// Partial update body for PATCH /network +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct NetworkUpdate { + #[serde(skip_serializing_if = "Option::is_none")] + pub remotes: Option, + #[serde(rename = "self", skip_serializing_if = "Option::is_none")] + pub self_name: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub sharding: Option, +} diff --git a/src/search.rs b/src/search.rs index 084a0d3f..53e2b872 100644 --- a/src/search.rs +++ b/src/search.rs @@ -415,8 +415,12 @@ pub struct SearchQuery<'a, Http: HttpClient> { #[derive(Debug, Serialize, Clone)] #[serde(rename_all = "camelCase")] pub struct QueryFederationOptions { + /// Weight multiplier for this query when merging federated results #[serde(skip_serializing_if = "Option::is_none")] pub weight: Option, + /// Remote instance name to target when sharding; corresponds to a key in network.remotes + #[serde(skip_serializing_if = "Option::is_none")] + pub remote: Option, } #[allow(missing_docs)] @@ -766,6 +770,7 @@ impl<'a, 'b, Http: HttpClient> MultiSearchQuery<'a, 'b, Http> { search_query, QueryFederationOptions { weight: Some(weight), + remote: None, }, ) } diff --git a/src/tasks.rs b/src/tasks.rs index 9977eae4..080ef8a8 100644 --- a/src/tasks.rs +++ b/src/tasks.rs @@ -1,4 +1,5 @@ use serde::{Deserialize, Deserializer, Serialize}; +use serde_json::{Map, Value}; use std::time::Duration; use time::OffsetDateTime; @@ -157,6 +158,9 @@ pub struct SucceededTask { pub canceled_by: Option, pub index_uid: Option, pub error: Option, + /// Remotes object returned by the server for this task (present since Meilisearch 1.19) + #[serde(skip_serializing_if = "Option::is_none")] + pub remotes: Option>, #[serde(flatten)] pub update_type: TaskType, pub uid: u32, @@ -174,6 +178,9 @@ pub struct EnqueuedTask { #[serde(with = "time::serde::rfc3339")] pub enqueued_at: OffsetDateTime, pub index_uid: Option, + /// Remotes object returned by the server for this enqueued task + #[serde(skip_serializing_if = "Option::is_none")] + pub remotes: Option>, #[serde(flatten)] pub update_type: TaskType, pub uid: u32, @@ -193,6 +200,9 @@ pub struct ProcessingTask { #[serde(with = "time::serde::rfc3339")] pub started_at: OffsetDateTime, pub index_uid: Option, + /// Remotes object returned by the server for this processing task + #[serde(skip_serializing_if = "Option::is_none")] + pub remotes: Option>, #[serde(flatten)] pub update_type: TaskType, pub uid: u32, @@ -738,6 +748,55 @@ impl<'a, Http: HttpClient> TasksQuery<'a, TasksPaginationFilters, Http> { #[cfg(test)] mod test { + use super::*; + + #[test] + fn test_deserialize_enqueued_task_with_remotes() { + let json = r#"{ + "enqueuedAt": "2022-02-03T13:02:38.369634Z", + "indexUid": "movies", + "status": "enqueued", + "type": "indexUpdate", + "uid": 12, + "remotes": { "ms-00": { "status": "ok" } } +}"#; + let task: Task = serde_json::from_str(json).unwrap(); + match task { + Task::Enqueued { content } => { + let remotes = content.remotes.expect("remotes should be present"); + assert!(remotes.contains_key("ms-00")); + } + _ => panic!("expected enqueued task"), + } + } + + #[test] + fn test_deserialize_processing_task_with_remotes() { + let json = r#"{ + "details": { + "indexedDocuments": null, + "receivedDocuments": 10 + }, + "duration": null, + "enqueuedAt": "2022-02-03T15:17:02.801341Z", + "finishedAt": null, + "indexUid": "movies", + "startedAt": "2022-02-03T15:17:02.812338Z", + "status": "processing", + "type": "documentAdditionOrUpdate", + "uid": 14, + "remotes": { "ms-00": { "status": "ok" } } +}"#; + let task: Task = serde_json::from_str(json).unwrap(); + match task { + Task::Processing { content } => { + let remotes = content.remotes.expect("remotes should be present"); + assert!(remotes.contains_key("ms-00")); + } + _ => panic!("expected processing task"), + } + } + use super::*; use crate::{ client::*, @@ -782,8 +841,7 @@ mod test { enqueued_at, index_uid: Some(index_uid), update_type: TaskType::DocumentAdditionOrUpdate { details: None }, - uid: 12, - } + uid: 12, .. } } if enqueued_at == datetime && index_uid == "meili")); From ca562a0599e50165a08eb8192a40c1eace255794 Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Mon, 22 Sep 2025 09:56:12 +0530 Subject: [PATCH 2/4] fixed formatting --- src/tasks.rs | 42 +++++++++++++++++++++--------------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/src/tasks.rs b/src/tasks.rs index 080ef8a8..08a05d96 100644 --- a/src/tasks.rs +++ b/src/tasks.rs @@ -753,13 +753,13 @@ mod test { #[test] fn test_deserialize_enqueued_task_with_remotes() { let json = r#"{ - "enqueuedAt": "2022-02-03T13:02:38.369634Z", - "indexUid": "movies", - "status": "enqueued", - "type": "indexUpdate", - "uid": 12, - "remotes": { "ms-00": { "status": "ok" } } -}"#; + "enqueuedAt": "2022-02-03T13:02:38.369634Z", + "indexUid": "movies", + "status": "enqueued", + "type": "indexUpdate", + "uid": 12, + "remotes": { "ms-00": { "status": "ok" } } + }"#; let task: Task = serde_json::from_str(json).unwrap(); match task { Task::Enqueued { content } => { @@ -773,20 +773,20 @@ mod test { #[test] fn test_deserialize_processing_task_with_remotes() { let json = r#"{ - "details": { - "indexedDocuments": null, - "receivedDocuments": 10 - }, - "duration": null, - "enqueuedAt": "2022-02-03T15:17:02.801341Z", - "finishedAt": null, - "indexUid": "movies", - "startedAt": "2022-02-03T15:17:02.812338Z", - "status": "processing", - "type": "documentAdditionOrUpdate", - "uid": 14, - "remotes": { "ms-00": { "status": "ok" } } -}"#; + "details": { + "indexedDocuments": null, + "receivedDocuments": 10 + }, + "duration": null, + "enqueuedAt": "2022-02-03T15:17:02.801341Z", + "finishedAt": null, + "indexUid": "movies", + "startedAt": "2022-02-03T15:17:02.812338Z", + "status": "processing", + "type": "documentAdditionOrUpdate", + "uid": 14, + "remotes": { "ms-00": { "status": "ok" } } + }"#; let task: Task = serde_json::from_str(json).unwrap(); match task { Task::Processing { content } => { From 59c2da18b5732dbb332145f77a03869f23b841fb Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Wed, 24 Sep 2025 00:43:46 +0530 Subject: [PATCH 3/4] removed Debug --- src/network.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/network.rs b/src/network.rs index 9f7857e0..d221f781 100644 --- a/src/network.rs +++ b/src/network.rs @@ -1,7 +1,7 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct RemoteConfig { pub url: String, @@ -15,7 +15,7 @@ pub struct RemoteConfig { pub type RemotesMap = HashMap; /// Full network state returned by GET /network -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct NetworkState { pub remotes: Option, @@ -25,7 +25,7 @@ pub struct NetworkState { } /// Partial update body for PATCH /network -#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[derive(Default, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct NetworkUpdate { #[serde(skip_serializing_if = "Option::is_none")] From d27dbed003edc8f1471b4c1d474bde72ed55aa62 Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Wed, 24 Sep 2025 00:51:33 +0530 Subject: [PATCH 4/4] nit --- src/tasks.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/tasks.rs b/src/tasks.rs index 08a05d96..c06b21d7 100644 --- a/src/tasks.rs +++ b/src/tasks.rs @@ -748,7 +748,6 @@ impl<'a, Http: HttpClient> TasksQuery<'a, TasksPaginationFilters, Http> { #[cfg(test)] mod test { - use super::*; #[test] fn test_deserialize_enqueued_task_with_remotes() {