Skip to content

Commit

Permalink
cr feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
isabelatkinson committed Mar 24, 2021
1 parent d154470 commit 7385687
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 94 deletions.
6 changes: 3 additions & 3 deletions src/client/session/cluster_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ use crate::bson::{Document, Timestamp};
#[derive(Debug, Deserialize, Clone, Serialize, Derivative)]
#[derivative(PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub(crate) struct ClusterTime {
cluster_time: Timestamp,
pub struct ClusterTime {
pub cluster_time: Timestamp,

#[derivative(PartialEq = "ignore")]
signature: Document,
pub signature: Document,
}

impl std::cmp::Ord for ClusterTime {
Expand Down
30 changes: 18 additions & 12 deletions src/client/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ lazy_static! {
};
}

/// Session to be used with client operations. This acts as a handle to a server session.
/// This keeps the details of how server sessions are pooled opaque to users.
/// A MongoDB client session. This struct represents a logical session used for ordering sequential
/// operations. To create a `ClientSession`, call `start_session` on a `Client`.
///
/// `ClientSession` instances are not thread safe or fork safe. They can only be used by one thread
/// or process at a time.
#[derive(Debug)]
pub struct ClientSession {
cluster_time: Option<ClusterTime>,
Expand All @@ -57,8 +60,13 @@ impl ClientSession {
}
}

/// The client used to create this session.
pub fn client(&self) -> Client {
self.client.clone()
}

/// The id of this session.
pub(crate) fn id(&self) -> &Document {
pub fn id(&self) -> &Document {
&self.server_session.id
}

Expand All @@ -69,13 +77,18 @@ impl ClientSession {

/// The highest seen cluster time this session has seen so far.
/// This will be `None` if this session has not been used in an operation yet.
pub(crate) fn cluster_time(&self) -> Option<&ClusterTime> {
pub fn cluster_time(&self) -> Option<&ClusterTime> {
self.cluster_time.as_ref()
}

/// The options used to create this session.
pub fn options(&self) -> Option<&SessionOptions> {
self.options.as_ref()
}

/// Set the cluster time to the provided one if it is greater than this session's highest seen
/// cluster time or if this session's cluster time is `None`.
pub(crate) fn advance_cluster_time(&mut self, to: &ClusterTime) {
pub fn advance_cluster_time(&mut self, to: &ClusterTime) {
if self.cluster_time().map(|ct| ct < to).unwrap_or(true) {
self.cluster_time = Some(to.clone());
}
Expand Down Expand Up @@ -103,13 +116,6 @@ impl ClientSession {
pub(crate) fn is_dirty(&self) -> bool {
self.server_session.dirty
}

/// Returns a mutable reference to this session wrapped in an option. This method is necessary
/// to bypass the construction of an option taking ownership of a mutable reference.
#[allow(clippy::unnecessary_wraps)]
pub(crate) fn as_mut_ref_option(&mut self) -> Option<&mut Self> {
Some(self)
}
}

impl Drop for ClientSession {
Expand Down
6 changes: 2 additions & 4 deletions src/coll/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,11 +460,10 @@ where
filter: impl Into<Option<Document>>,
options: impl Into<Option<FindOneOptions>>,
) -> Result<Option<T>> {
let mut options: FindOptions = options
let options: FindOptions = options
.into()
.map(Into::into)
.unwrap_or_else(Default::default);
options.limit = Some(-1);
let mut cursor = self.find(filter, Some(options)).await?;
cursor.next().await.transpose()
}
Expand All @@ -477,11 +476,10 @@ where
options: impl Into<Option<FindOneOptions>>,
session: &mut ClientSession,
) -> Result<Option<T>> {
let mut options: FindOptions = options
let options: FindOptions = options
.into()
.map(Into::into)
.unwrap_or_else(Default::default);
options.limit = Some(-1);
let mut cursor = self
.find_with_session(filter, Some(options), session)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion src/cursor/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ impl<'session> GetMoreProvider for ExplicitSessionGetMoreProvider<'session> {
let future = Box::pin(async move {
let get_more = GetMore::new(info);
let get_more_result = client
.execute_operation(get_more, session.reference.as_mut_ref_option())
.execute_operation(get_more, Some(&mut *session.reference))
.await;
ExecutionResult {
get_more_result,
Expand Down
2 changes: 1 addition & 1 deletion src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ impl Database {
ListCollections::new(self.name().to_string(), filter.into(), true, None);
let mut cursor: SessionCursor<Document> = self
.client()
.execute_operation(list_collections, session.as_mut_ref_option())
.execute_operation(list_collections, Some(&mut *session))
.await
.map(|spec| SessionCursor::new(self.client().clone(), spec))?;

Expand Down
24 changes: 7 additions & 17 deletions src/test/spec/v2_runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,20 @@ pub mod test_file;
use std::{ops::Deref, time::Duration};

use semver::VersionReq;
use tokio::sync::RwLockWriteGuard;

use crate::{
bson::doc,
coll::options::DropCollectionOptions,
concern::{Acknowledgment, WriteConcern},
options::{CreateCollectionOptions, InsertManyOptions},
test::{assert_matches, util::get_default_name, EventClient, TestClient, LOCK},
test::{assert_matches, util::get_default_name, EventClient, TestClient},
RUNTIME,
};

use operation::{OperationObject, OperationResult};
use test_event::CommandStartedEvent;
use test_file::{TestData, TestFile};

use super::run_spec_test;

const SKIPPED_OPERATIONS: &[&str] = &[
"bulkWrite",
"count",
Expand All @@ -35,13 +32,6 @@ const SKIPPED_OPERATIONS: &[&str] = &[
"watch",
];

#[cfg_attr(feature = "tokio-runtime", tokio::test(flavor = "multi_thread"))]
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
async fn run_crud_v2_tests() {
let _guard: RwLockWriteGuard<()> = LOCK.run_exclusively().await;
run_spec_test(&["crud", "v2"], run_v2_test).await;
}

pub async fn run_v2_test(test_file: TestFile) {
let client = TestClient::new().await;

Expand Down Expand Up @@ -300,15 +290,15 @@ fn majority_write_concern() -> WriteConcern {
}

fn assert_different_lsid_on_last_two_commands(client: &EventClient) {
let (e1, e2) = client.get_last_two_command_started_events();
let lsid1 = e1.command.get("lsid").unwrap();
let lsid2 = e2.command.get("lsid").unwrap();
let events = client.get_all_command_started_events();
let lsid1 = events[events.len() - 1].command.get("lsid").unwrap();
let lsid2 = events[events.len() - 2].command.get("lsid").unwrap();
assert_ne!(lsid1, lsid2);
}

fn assert_same_lsid_on_last_two_commands(client: &EventClient) {
let (e1, e2) = client.get_last_two_command_started_events();
let lsid1 = e1.command.get("lsid").unwrap();
let lsid2 = e2.command.get("lsid").unwrap();
let events = client.get_all_command_started_events();
let lsid1 = events[events.len() - 1].command.get("lsid").unwrap();
let lsid2 = events[events.len() - 2].command.get("lsid").unwrap();
assert_eq!(lsid1, lsid2);
}
36 changes: 9 additions & 27 deletions src/test/spec/v2_runner/test_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,37 +23,19 @@ impl CommandStartedEvent {
if expected.database_name.is_some() && self.database_name != expected.database_name {
return false;
}
for (k, v) in &expected.command {
if k == "lsid" {
match v.as_str().unwrap() {
"session0" => {
if self.command.get(k).unwrap().as_document().unwrap() != session0_lsid {
return false;
}
}
"session1" => {
if self.command.get(k).unwrap().as_document().unwrap() != session1_lsid {
return false;
}
}
other => panic!("unknown session name: {}", other),
let mut expected = expected.command.clone();
if let Some(Bson::String(session)) = expected.remove("lsid") {
match session.as_str() {
"session0" => {
expected.insert("lsid", session0_lsid.clone());
}
} else {
match self.command.get(k) {
Some(actual_v) => {
if !actual_v.matches(v) {
return false;
}
}
None => {
if v != &Bson::Null {
return false;
}
}
"session1" => {
expected.insert("lsid", session1_lsid.clone());
}
other => panic!("unknown session name: {}", other),
}
}
true
self.command.content_matches(&expected)
}
}

Expand Down
28 changes: 4 additions & 24 deletions src/test/util/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,12 +330,10 @@ impl EventClient {
events
.iter()
.filter_map(|event| match event {
CommandEvent::CommandStartedEvent(event) => {
if event.command_name != "configureFailPoint" {
Some(event.clone())
} else {
None
}
CommandEvent::CommandStartedEvent(event)
if event.command_name != "configureFailPoint" =>
{
Some(event.clone())
}
_ => None,
})
Expand Down Expand Up @@ -414,24 +412,6 @@ impl EventClient {
self.handler.command_events.write().unwrap().clear();
self.handler.pool_cleared_events.write().unwrap().clear();
}

/// Gets the last two command started events.
pub fn get_last_two_command_started_events(
&self,
) -> (CommandStartedEvent, CommandStartedEvent) {
let events = self.handler.command_events.read().unwrap();
let events: Vec<CommandStartedEvent> = events
.iter()
.filter_map(|event| match event {
CommandEvent::CommandStartedEvent(event) => Some(event.clone()),
_ => None,
})
.collect();
(
events[events.len() - 2].clone(),
events[events.len() - 1].clone(),
)
}
}

#[cfg_attr(feature = "tokio-runtime", tokio::test)]
Expand Down
15 changes: 10 additions & 5 deletions src/test/util/matchable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,17 @@ impl Matchable for Document {
if k == "upsertedCount" {
continue;
}
if let Some(actual_v) = self.get(k) {
if !actual_v.matches(v) {
return false;
match self.get(k) {
Some(actual_v) => {
if !actual_v.matches(v) {
return false;
}
}
None => {
if v != &Bson::Null {
return false;
}
}
} else {
return false;
}
}
true
Expand Down

0 comments on commit 7385687

Please sign in to comment.