Skip to content

Commit

Permalink
retry getMore
Browse files Browse the repository at this point in the history
  • Loading branch information
isabelatkinson committed Apr 24, 2024
1 parent 5ee0bd6 commit 9ad9e88
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 19 deletions.
2 changes: 1 addition & 1 deletion src/client/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1043,7 +1043,7 @@ impl Error {
}
}

if let Some(ref mut session) = session {
if let Some(session) = session {
if self.contains_label(TRANSIENT_TRANSACTION_ERROR)
|| self.contains_label(UNKNOWN_TRANSACTION_COMMIT_RESULT)
{
Expand Down
2 changes: 1 addition & 1 deletion src/client/session/test/causal_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ async fn first_op_update_op_time() {
let event = {
let mut events = client.events.clone();
events
.get_command_events(&[name])
.get_command_events_mut(&[name])
.into_iter()
.find(|e| matches!(e, CommandEvent::Succeeded(_) | CommandEvent::Failed(_)))
.unwrap_or_else(|| panic!("no event found for {}", name))
Expand Down
11 changes: 9 additions & 2 deletions src/operation/bulk_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
checked::Checked,
cmap::{Command, RawCommandResponse, StreamDescription},
cursor::CursorSpecification,
error::{ClientBulkWriteError, Error, ErrorKind, Result},
error::{ClientBulkWriteError, Error, ErrorKind, Result, RETRYABLE_WRITE_ERROR},
operation::OperationWithDefaults,
options::{BulkWriteOptions, OperationType, WriteModel},
results::{BulkWriteResult, DeleteResult, InsertOneResult, UpdateResult},
Expand Down Expand Up @@ -276,7 +276,7 @@ impl<'a> OperationWithDefaults for BulkWrite<'a> {
context: ExecutionContext<'b>,
) -> BoxFuture<'b, Result<Self::O>> {
async move {
let response: WriteResponseBody<Response> = response.body()?;
let mut response: WriteResponseBody<Response> = response.body()?;

let mut bulk_write_error = ClientBulkWriteError::default();

Expand Down Expand Up @@ -345,6 +345,13 @@ impl<'a> OperationWithDefaults for BulkWrite<'a> {
}
}
Err(error) => {
// Retry the entire bulkWrite command if cursor iteration fails.
let labels = response.labels.get_or_insert_with(Default::default);
let retryable_write_error = RETRYABLE_WRITE_ERROR.to_string();
if !labels.contains(&retryable_write_error) {
labels.push(retryable_write_error);
}

let error = Error::new(
ErrorKind::ClientBulkWrite(bulk_write_error),
response.labels,
Expand Down
103 changes: 99 additions & 4 deletions src/test/bulk_write.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
bson::doc,
bson::{doc, Document},
error::{ClientBulkWriteError, ErrorKind},
options::WriteModel,
test::{
Expand Down Expand Up @@ -91,6 +91,11 @@ async fn max_message_size_bytes_batching() {
let first_event = command_started_events
.next()
.expect("no first event observed");

let mut command = first_event.command.clone();
command.remove("ops");
dbg!("{}", command);

let first_len = first_event.command.get_array("ops").unwrap().len();
assert_eq!(first_len, num_models - 1);

Expand Down Expand Up @@ -217,7 +222,7 @@ async fn successful_cursor_iteration() {

let max_bson_object_size = client.server_info.max_bson_object_size as usize;

let collection = client.database("db").collection::<bson::Document>("coll");
let collection = client.database("db").collection::<Document>("coll");
collection.drop().await.unwrap();

let models = vec![
Expand Down Expand Up @@ -256,6 +261,7 @@ async fn successful_cursor_iteration() {
#[tokio::test(flavor = "multi_thread")]
async fn failed_cursor_iteration() {
let mut options = get_client_options().await.clone();
options.retry_writes = Some(false);
if TestClient::new().await.is_sharded() {
options.hosts.drain(1..);
}
Expand All @@ -275,7 +281,7 @@ async fn failed_cursor_iteration() {
let fail_point = FailPoint::fail_command(&["getMore"], FailPointMode::Times(1)).error_code(8);
let _guard = client.enable_fail_point(fail_point).await.unwrap();

let collection = client.database("db").collection::<bson::Document>("coll");
let collection = client.database("db").collection::<Document>("coll");
collection.drop().await.unwrap();

let models = vec![
Expand Down Expand Up @@ -344,7 +350,7 @@ async fn cursor_iteration_in_a_transaction() {

let max_bson_object_size = client.server_info.max_bson_object_size as usize;

let collection = client.database("db").collection::<bson::Document>("coll");
let collection = client.database("db").collection::<Document>("coll");
collection.drop().await.unwrap();

let mut session = client.start_session().await.unwrap();
Expand Down Expand Up @@ -436,3 +442,92 @@ async fn namespace_batching() {
.unwrap();
assert_eq!(second_ns, "db.coll1");
}

#[tokio::test(flavor = "multi_thread")]
async fn get_more_is_retried() {
let mut options = get_client_options().await.clone();
if TestClient::new().await.is_sharded() {
options.hosts.drain(1..);
}
let client = Client::test_builder()
.options(options)
.monitor_events()
.build()
.await;

if client.server_version_lt(8, 0) || client.is_standalone() {
log_uncaptured(
"skipping get_more_is_retried: bulkWrite requires 8.0+, retryable writes require \
non-standalone",
);
return;
}

let max_bson_object_size = client.server_info.max_bson_object_size as usize;

let fail_point = FailPoint::fail_command(&["getMore"], FailPointMode::Times(1)).error_code(6);
let _guard = client.enable_fail_point(fail_point).await.unwrap();

let collection = client.database("db").collection::<Document>("coll");
collection.drop().await.unwrap();

let models = vec![
WriteModel::ReplaceOne {
namespace: collection.namespace(),
filter: doc! { "_id": "a".repeat(max_bson_object_size / 2) },
replacement: doc! { "x": 1 },
array_filters: None,
collation: None,
hint: None,
upsert: Some(true),
},
WriteModel::ReplaceOne {
namespace: collection.namespace(),
filter: doc! { "_id": "b".repeat(max_bson_object_size / 2) },
replacement: doc! { "x": 1 },
array_filters: None,
collation: None,
hint: None,
upsert: Some(true),
},
];

let _ = client.bulk_write(models).verbose_results(true).await;

let mut command_events = client
.events
.get_command_events(&["bulkWrite", "getMore"])
.into_iter();

let bulk_write_event = command_events.next().unwrap();
let started_event = bulk_write_event.as_command_started().unwrap();
assert_eq!(started_event.command_name, "bulkWrite");

let bulk_write_event = command_events.next().unwrap();
let succeeded_event = bulk_write_event.as_command_succeeded().unwrap();
assert_eq!(succeeded_event.command_name, "bulkWrite");

let get_more_event = command_events.next().unwrap();
let started_event = get_more_event.as_command_started().unwrap();
assert_eq!(started_event.command_name, "getMore");

let get_more_event = command_events.next().unwrap();
let failed_event = get_more_event.as_command_failed().unwrap();
assert_eq!(failed_event.command_name, "getMore");

let bulk_write_event = command_events.next().unwrap();
let started_event = bulk_write_event.as_command_started().unwrap();
assert_eq!(started_event.command_name, "bulkWrite");

let bulk_write_event = command_events.next().unwrap();
let succeeded_event = bulk_write_event.as_command_succeeded().unwrap();
assert_eq!(succeeded_event.command_name, "bulkWrite");

let get_more_event = command_events.next().unwrap();
let started_event = get_more_event.as_command_started().unwrap();
assert_eq!(started_event.command_name, "getMore");

let get_more_event = command_events.next().unwrap();
let succeeded_event = get_more_event.as_command_succeeded().unwrap();
assert_eq!(succeeded_event.command_name, "getMore");
}
8 changes: 4 additions & 4 deletions src/test/change_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ async fn tracks_resume_token() -> Result<()> {
let events: Vec<_> = {
let mut events = client.events.clone();
events
.get_command_events(&["aggregate", "getMore"])
.get_command_events_mut(&["aggregate", "getMore"])
.into_iter()
.filter_map(|ev| match ev {
CommandEvent::Succeeded(s) => Some(s),
Expand Down Expand Up @@ -232,7 +232,7 @@ async fn empty_batch_not_closed() -> Result<()> {
#[allow(deprecated)]
let events = {
let mut events = client.events.clone();
events.get_command_events(&["aggregate", "getMore"])
events.get_command_events_mut(&["aggregate", "getMore"])
};
let cursor_id = match &events[1] {
CommandEvent::Succeeded(CommandSucceededEvent { reply, .. }) => {
Expand Down Expand Up @@ -318,7 +318,7 @@ async fn resume_start_at_operation_time() -> Result<()> {
#[allow(deprecated)]
let events = {
let mut events = client.events.clone();
events.get_command_events(&["aggregate"])
events.get_command_events_mut(&["aggregate"])
};
assert_eq!(events.len(), 4);

Expand Down Expand Up @@ -365,7 +365,7 @@ async fn batch_end_resume_token() -> Result<()> {
#[allow(deprecated)]
let commands = {
let mut events = client.events.clone();
events.get_command_events(&["aggregate", "getMore"])
events.get_command_events_mut(&["aggregate", "getMore"])
};
assert!(matches!(commands.last(), Some(
CommandEvent::Succeeded(CommandSucceededEvent {
Expand Down
2 changes: 1 addition & 1 deletion src/test/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ async fn batch_exhaustion() {
let replies: Vec<_> = {
let mut events = client.events.clone();
events
.get_command_events(&["getMore"])
.get_command_events_mut(&["getMore"])
.into_iter()
.filter_map(|e| e.as_command_succeeded().map(|e| e.reply.clone()))
.collect()
Expand Down
4 changes: 2 additions & 2 deletions src/test/spec/retryable_reads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ async fn retry_read_different_mongos() {
#[allow(deprecated)]
let events = {
let mut events = client.events.clone();
events.get_command_events(&["find"])
events.get_command_events_mut(&["find"])
};
assert!(
matches!(
Expand Down Expand Up @@ -263,7 +263,7 @@ async fn retry_read_same_mongos() {
#[allow(deprecated)]
let events = {
let mut events = client.events.clone();
events.get_command_events(&["find"])
events.get_command_events_mut(&["find"])
};
assert!(
matches!(
Expand Down
4 changes: 2 additions & 2 deletions src/test/spec/retryable_writes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ async fn retry_write_different_mongos() {
#[allow(deprecated)]
let events = {
let mut events = client.events.clone();
events.get_command_events(&["insert"])
events.get_command_events_mut(&["insert"])
};
assert!(
matches!(
Expand Down Expand Up @@ -665,7 +665,7 @@ async fn retry_write_same_mongos() {
#[allow(deprecated)]
let events = {
let mut events = client.events.clone();
events.get_command_events(&["insert"])
events.get_command_events_mut(&["insert"])
};
assert!(
matches!(
Expand Down
9 changes: 8 additions & 1 deletion src/test/util/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
bson::doc,
event::{
cmap::CmapEvent,
command::{CommandEvent, CommandStartedEvent, CommandSucceededEvent},
command::{CommandEvent, CommandFailedEvent, CommandStartedEvent, CommandSucceededEvent},
sdam::SdamEvent,
},
test::get_client_options,
Expand Down Expand Up @@ -110,6 +110,13 @@ impl CommandEvent {
_ => None,
}
}

pub(crate) fn as_command_failed(&self) -> Option<&CommandFailedEvent> {
match self {
CommandEvent::Failed(e) => Some(e),
_ => None,
}
}
}

#[derive(Clone, Debug)]
Expand Down
20 changes: 19 additions & 1 deletion src/test/util/event_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,28 @@ impl EventBuffer<Event> {
.collect()
}

pub(crate) fn get_command_events(&self, command_names: &[&str]) -> Vec<CommandEvent> {
self.inner
.events
.lock()
.unwrap()
.data
.iter()
.filter_map(|(event, _)| match event {
Event::Command(command_event)
if command_names.contains(&command_event.command_name()) =>
{
Some(command_event.clone())
}
_ => None,
})
.collect()
}

/// Remove all command events from the buffer, returning those matching any of the command
/// names.
#[deprecated = "use immutable methods"]
pub(crate) fn get_command_events(&mut self, command_names: &[&str]) -> Vec<CommandEvent> {
pub(crate) fn get_command_events_mut(&mut self, command_names: &[&str]) -> Vec<CommandEvent> {
let mut out = vec![];
self.retain(|ev| match ev {
Event::Command(cev) => {
Expand Down

0 comments on commit 9ad9e88

Please sign in to comment.