Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
errors::*,
indexes::*,
key::{Key, KeyBuilder, KeyUpdater, KeysQuery, KeysResults},
network::{NetworkState, NetworkUpdate},
request::*,
search::*,
task_info::TaskInfo,
Expand Down Expand Up @@ -1148,6 +1149,46 @@ impl<Http: HttpClient> Client<Http> {
crate::tenant_tokens::generate_tenant_token(api_key_uid, search_rules, api_key, expires_at)
}

/// Get the current network state (/network)
pub async fn get_network_state(&self) -> Result<NetworkState, Error> {
self.http_client
.request::<(), (), NetworkState>(
&format!("{}/network", self.host),
Method::Get { query: () },
200,
)
.await
}

/// Partially update the network state (/network)
pub async fn update_network_state(&self, body: &NetworkUpdate) -> Result<NetworkState, Error> {
self.http_client
.request::<(), &NetworkUpdate, NetworkState>(
&format!("{}/network", self.host),
Method::Patch { query: (), body },
200,
)
.await
}

/// Convenience: set sharding=true/false
pub async fn set_sharding(&self, enabled: bool) -> Result<NetworkState, Error> {
let update = NetworkUpdate {
sharding: Some(enabled),
..NetworkUpdate::default()
};
self.update_network_state(&update).await
}

/// Convenience: set self to a remote name
pub async fn set_self_remote(&self, name: &str) -> Result<NetworkState, Error> {
let update = NetworkUpdate {
self_name: Some(name.to_string()),
..NetworkUpdate::default()
};
self.update_network_state(&update).await
}

fn sleep_backend(&self) -> SleepBackend {
SleepBackend::infer(self.http_client.is_tokio())
}
Expand Down Expand Up @@ -1207,6 +1248,49 @@ pub struct Version {

#[cfg(test)]
mod tests {
use super::*;
use mockito::Matcher;

#[tokio::test]
async fn test_network_update_and_deserialize_remotes() {
let mut s = mockito::Server::new_async().await;
let base = s.url();

let response_body = serde_json::json!({
"remotes": {
"ms-00": {
"url": "http://ms-00",
"searchApiKey": "SEARCH",
"writeApiKey": "WRITE"
}
},
"self": "ms-00",
"sharding": true
})
.to_string();

let _m = s
.mock("PATCH", "/network")
.match_body(Matcher::Regex(
r#"\{.*"sharding"\s*:\s*true.*\}"#.to_string(),
))
.with_status(200)
.with_header("content-type", "application/json")
.with_body(response_body)
.create_async()
.await;

let client = Client::new(base, None::<String>).unwrap();
let updated = client
.set_sharding(true)
.await
.expect("update_network_state failed");
assert_eq!(updated.sharding, Some(true));
let remotes = updated.remotes.expect("remotes should be present");
let ms00 = remotes.get("ms-00").expect("ms-00 should exist");
assert_eq!(ms00.write_api_key.as_deref(), Some("WRITE"));
}

use big_s::S;
use time::OffsetDateTime;

Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ pub mod features;
pub mod indexes;
/// Module containing the [`Key`](key::Key) struct.
pub mod key;
/// Module for Network configuration API (sharding/remotes).
pub mod network;
pub mod request;
/// Module related to search queries and results.
pub mod search;
Expand Down
37 changes: 37 additions & 0 deletions src/network.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

#[derive(Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RemoteConfig {
pub url: String,
#[serde(rename = "searchApiKey")]
pub search_api_key: String,
#[serde(rename = "writeApiKey", skip_serializing_if = "Option::is_none")]
// present in responses since 1.19
pub write_api_key: Option<String>,
}

pub type RemotesMap = HashMap<String, RemoteConfig>;

/// Full network state returned by GET /network
#[derive(Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct NetworkState {
pub remotes: Option<RemotesMap>,
#[serde(rename = "self")]
pub self_name: Option<String>,
pub sharding: Option<bool>,
}

/// Partial update body for PATCH /network
#[derive(Default, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct NetworkUpdate {
#[serde(skip_serializing_if = "Option::is_none")]
pub remotes: Option<RemotesMap>,
#[serde(rename = "self", skip_serializing_if = "Option::is_none")]
pub self_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub sharding: Option<bool>,
}
5 changes: 5 additions & 0 deletions src/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,12 @@ pub struct SearchQuery<'a, Http: HttpClient> {
#[derive(Debug, Serialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct QueryFederationOptions {
/// Weight multiplier for this query when merging federated results
#[serde(skip_serializing_if = "Option::is_none")]
pub weight: Option<f32>,
/// Remote instance name to target when sharding; corresponds to a key in network.remotes
#[serde(skip_serializing_if = "Option::is_none")]
pub remote: Option<String>,
}

#[allow(missing_docs)]
Expand Down Expand Up @@ -766,6 +770,7 @@ impl<'a, 'b, Http: HttpClient> MultiSearchQuery<'a, 'b, Http> {
search_query,
QueryFederationOptions {
weight: Some(weight),
remote: None,
},
)
}
Expand Down
61 changes: 59 additions & 2 deletions src/tasks.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use serde::{Deserialize, Deserializer, Serialize};
use serde_json::{Map, Value};
use std::time::Duration;
use time::OffsetDateTime;

Expand Down Expand Up @@ -157,6 +158,9 @@ pub struct SucceededTask {
pub canceled_by: Option<usize>,
pub index_uid: Option<String>,
pub error: Option<MeilisearchError>,
/// Remotes object returned by the server for this task (present since Meilisearch 1.19)
#[serde(skip_serializing_if = "Option::is_none")]
pub remotes: Option<Map<String, Value>>,
#[serde(flatten)]
pub update_type: TaskType,
pub uid: u32,
Expand All @@ -174,6 +178,9 @@ pub struct EnqueuedTask {
#[serde(with = "time::serde::rfc3339")]
pub enqueued_at: OffsetDateTime,
pub index_uid: Option<String>,
/// Remotes object returned by the server for this enqueued task
#[serde(skip_serializing_if = "Option::is_none")]
pub remotes: Option<Map<String, Value>>,
#[serde(flatten)]
pub update_type: TaskType,
pub uid: u32,
Expand All @@ -193,6 +200,9 @@ pub struct ProcessingTask {
#[serde(with = "time::serde::rfc3339")]
pub started_at: OffsetDateTime,
pub index_uid: Option<String>,
/// Remotes object returned by the server for this processing task
#[serde(skip_serializing_if = "Option::is_none")]
pub remotes: Option<Map<String, Value>>,
#[serde(flatten)]
pub update_type: TaskType,
pub uid: u32,
Expand Down Expand Up @@ -738,6 +748,54 @@ impl<'a, Http: HttpClient> TasksQuery<'a, TasksPaginationFilters, Http> {

#[cfg(test)]
mod test {

#[test]
fn test_deserialize_enqueued_task_with_remotes() {
let json = r#"{
"enqueuedAt": "2022-02-03T13:02:38.369634Z",
"indexUid": "movies",
"status": "enqueued",
"type": "indexUpdate",
"uid": 12,
"remotes": { "ms-00": { "status": "ok" } }
}"#;
let task: Task = serde_json::from_str(json).unwrap();
match task {
Task::Enqueued { content } => {
let remotes = content.remotes.expect("remotes should be present");
assert!(remotes.contains_key("ms-00"));
}
_ => panic!("expected enqueued task"),
}
}

#[test]
fn test_deserialize_processing_task_with_remotes() {
let json = r#"{
"details": {
"indexedDocuments": null,
"receivedDocuments": 10
},
"duration": null,
"enqueuedAt": "2022-02-03T15:17:02.801341Z",
"finishedAt": null,
"indexUid": "movies",
"startedAt": "2022-02-03T15:17:02.812338Z",
"status": "processing",
"type": "documentAdditionOrUpdate",
"uid": 14,
"remotes": { "ms-00": { "status": "ok" } }
}"#;
let task: Task = serde_json::from_str(json).unwrap();
match task {
Task::Processing { content } => {
let remotes = content.remotes.expect("remotes should be present");
assert!(remotes.contains_key("ms-00"));
}
_ => panic!("expected processing task"),
}
}

use super::*;
use crate::{
client::*,
Expand Down Expand Up @@ -782,8 +840,7 @@ mod test {
enqueued_at,
index_uid: Some(index_uid),
update_type: TaskType::DocumentAdditionOrUpdate { details: None },
uid: 12,
}
uid: 12, .. }
}
if enqueued_at == datetime && index_uid == "meili"));

Expand Down