Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions examples/schema_agreement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::{bail, Result};
use futures::TryStreamExt as _;
use scylla::client::session::Session;
use scylla::client::session_builder::SessionBuilder;
use scylla::errors::ExecutionError;
use scylla::errors::SchemaAgreementError;
use std::env;
use std::time::Duration;

Expand All @@ -27,7 +27,9 @@ async fn main() -> Result<()> {

match session.await_schema_agreement().await {
Ok(_schema_version) => println!("Schema is in agreement in time"),
Err(ExecutionError::RequestTimeout(_)) => println!("Schema is NOT in agreement in time"),
Err(SchemaAgreementError::Timeout(_)) => {
println!("Schema is NOT in agreement in time")
}
Err(err) => bail!(err),
};
session
Expand Down
42 changes: 42 additions & 0 deletions scylla-cql/src/frame/response/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ impl Error {
/// An error sent from the database in response to a query
/// as described in the [specification](https://github.com/apache/cassandra/blob/5ed5e84613ef0e9664a774493db7d2604e3596e0/doc/native_protocol_v4.spec#L1029)\
#[derive(Error, Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum DbError {
/// The submitted query has a syntax error
#[error("The submitted query has a syntax error")]
Expand Down Expand Up @@ -386,6 +387,47 @@ impl DbError {
} => protocol_features.rate_limit_error.unwrap(),
}
}

/// Decides whether the error can be ignored. If true, the driver can perform
/// a speculative retry to the next target.
pub fn can_speculative_retry(&self) -> bool {
// Do not remove this lint!
// It's there for a reason - we don't want new variants
// automatically fall under `_` pattern when they are introduced.
#[deny(clippy::wildcard_enum_match_arm)]
match self {
// Errors that will almost certainly appear on other nodes as well
DbError::SyntaxError
| DbError::Invalid
| DbError::AlreadyExists { .. }
| DbError::Unauthorized
| DbError::ProtocolError => false,

// Errors that should not appear there - thus, should not be ignored.
DbError::AuthenticationError | DbError::Other(_) => false,

// For now, let's assume that UDF failure is not transient - don't ignore it
// TODO: investigate
DbError::FunctionFailure { .. } => false,

// Not sure when these can appear - don't ignore them
// TODO: Investigate these errors
DbError::ConfigError | DbError::TruncateError => false,

// Errors that we can ignore and perform a retry on some other node
DbError::Unavailable { .. }
| DbError::Overloaded
| DbError::IsBootstrapping
| DbError::ReadTimeout { .. }
| DbError::WriteTimeout { .. }
| DbError::ReadFailure { .. }
| DbError::WriteFailure { .. }
// Preparation may succeed on some other node.
| DbError::Unprepared { .. }
| DbError::ServerError
| DbError::RateLimitReached { .. } => true,
}
}
}

/// Type of the operation rejected by rate limiting
Expand Down
26 changes: 13 additions & 13 deletions scylla/src/client/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::cluster::node::{InternalKnownNode, KnownNode, NodeRef};
use crate::cluster::{Cluster, ClusterNeatDebug, ClusterState};
use crate::errors::{
BadQuery, ExecutionError, MetadataError, NewSessionError, PagerExecutionError, PrepareError,
ProtocolError, RequestAttemptError, RequestError, TracingError, UseKeyspaceError,
RequestAttemptError, RequestError, SchemaAgreementError, TracingError, UseKeyspaceError,
};
use crate::frame::response::result;
#[cfg(feature = "ssl")]
Expand Down Expand Up @@ -867,7 +867,9 @@ impl Session {
.await?;
if !paging_state_response.finished() {
error!("Unpaged unprepared query returned a non-empty paging state! This is a driver-side or server-side bug.");
return Err(ProtocolError::NonfinishedPagingState.into());
return Err(ExecutionError::LastAttemptError(
RequestAttemptError::NonfinishedPagingState,
));
}
Ok(result)
}
Expand Down Expand Up @@ -988,9 +990,7 @@ impl Session {
self.handle_set_keyspace_response(&response).await?;
self.handle_auto_await_schema_agreement(&response).await?;

let (result, paging_state_response) = response
.into_query_result_and_paging_state()
.map_err(RequestAttemptError::into_execution_error)?;
let (result, paging_state_response) = response.into_query_result_and_paging_state()?;
span.record_result_fields(&result);

Ok((result, paging_state_response))
Expand Down Expand Up @@ -1175,7 +1175,9 @@ impl Session {
.await?;
if !paging_state.finished() {
error!("Unpaged prepared query returned a non-empty paging state! This is a driver-side or server-side bug.");
return Err(ProtocolError::NonfinishedPagingState.into());
return Err(ExecutionError::LastAttemptError(
RequestAttemptError::NonfinishedPagingState,
));
}
Ok(result)
}
Expand Down Expand Up @@ -1295,9 +1297,7 @@ impl Session {
self.handle_set_keyspace_response(&response).await?;
self.handle_auto_await_schema_agreement(&response).await?;

let (result, paging_state_response) = response
.into_query_result_and_paging_state()
.map_err(RequestAttemptError::into_execution_error)?;
let (result, paging_state_response) = response.into_query_result_and_paging_state()?;
span.record_result_fields(&result);

Ok((result, paging_state_response))
Expand Down Expand Up @@ -1938,7 +1938,7 @@ impl Session {
last_error.map(Result::Err)
}

async fn await_schema_agreement_indefinitely(&self) -> Result<Uuid, ExecutionError> {
async fn await_schema_agreement_indefinitely(&self) -> Result<Uuid, SchemaAgreementError> {
loop {
tokio::time::sleep(self.schema_agreement_interval).await;
if let Some(agreed_version) = self.check_schema_agreement().await? {
Expand All @@ -1947,18 +1947,18 @@ impl Session {
}
}

pub async fn await_schema_agreement(&self) -> Result<Uuid, ExecutionError> {
pub async fn await_schema_agreement(&self) -> Result<Uuid, SchemaAgreementError> {
timeout(
self.schema_agreement_timeout,
self.await_schema_agreement_indefinitely(),
)
.await
.unwrap_or(Err(ExecutionError::SchemaAgreementTimeout(
.unwrap_or(Err(SchemaAgreementError::Timeout(
self.schema_agreement_timeout,
)))
}

pub async fn check_schema_agreement(&self) -> Result<Option<Uuid>, ExecutionError> {
pub async fn check_schema_agreement(&self) -> Result<Option<Uuid>, SchemaAgreementError> {
let cluster_state = self.get_cluster_state();
let connections_iter = cluster_state.iter_working_connections()?;

Expand Down
17 changes: 12 additions & 5 deletions scylla/src/client/session_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use crate::cluster::metadata::{
CollectionType, ColumnKind, ColumnType, NativeType, UserDefinedType,
};
use crate::deserialize::DeserializeOwnedValue;
use crate::errors::{BadKeyspaceName, DbError, ExecutionError, UseKeyspaceError};
use crate::errors::{
BadKeyspaceName, DbError, ExecutionError, RequestAttemptError, UseKeyspaceError,
};
use crate::observability::tracing::TracingInfo;
use crate::policies::retry::{RequestInfo, RetryDecision, RetryPolicy, RetrySession};
use crate::prepared_statement::PreparedStatement;
Expand Down Expand Up @@ -948,15 +950,17 @@ async fn test_db_errors() {
// SyntaxError on bad query
assert!(matches!(
session.query_unpaged("gibberish", &[]).await,
Err(ExecutionError::DbError(DbError::SyntaxError, _))
Err(ExecutionError::LastAttemptError(
RequestAttemptError::DbError(DbError::SyntaxError, _)
))
));

// AlreadyExists when creating a keyspace for the second time
session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap();

let create_keyspace_res = session.ddl(format!("CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await;
let keyspace_exists_error: DbError = match create_keyspace_res {
Err(ExecutionError::DbError(e, _)) => e,
Err(ExecutionError::LastAttemptError(RequestAttemptError::DbError(e, _))) => e,
_ => panic!("Second CREATE KEYSPACE didn't return an error!"),
};

Expand All @@ -981,7 +985,7 @@ async fn test_db_errors() {
.ddl(format!("CREATE TABLE {}.tab (a text primary key)", ks))
.await;
let create_tab_error: DbError = match create_table_res {
Err(ExecutionError::DbError(e, _)) => e,
Err(ExecutionError::LastAttemptError(RequestAttemptError::DbError(e, _))) => e,
_ => panic!("Second CREATE TABLE didn't return an error!"),
};

Expand Down Expand Up @@ -2621,7 +2625,10 @@ async fn test_rate_limit_exceeded_exception() {
use crate::errors::OperationType;

match maybe_err.expect("Rate limit error didn't occur") {
ExecutionError::DbError(DbError::RateLimitReached { op_type, .. }, _) => {
ExecutionError::LastAttemptError(RequestAttemptError::DbError(
DbError::RateLimitReached { op_type, .. },
_,
)) => {
assert_eq!(op_type, OperationType::Write);
}
err => panic!("Unexpected error type received: {:?}", err),
Expand Down
12 changes: 4 additions & 8 deletions scylla/src/cluster/state.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::errors::{BadQuery, ConnectionPoolError};
use crate::errors::ConnectionPoolError;
use crate::network::{Connection, PoolConfig, VerifiedKeyspaceName};
use crate::policies::host_filter::HostFilter;
use crate::prepared_statement::TokenCalculationError;
Expand Down Expand Up @@ -204,7 +204,7 @@ impl ClusterState {
keyspace: &str,
table: &str,
partition_key: &SerializedValues,
) -> Result<Token, BadQuery> {
) -> Result<Token, TokenCalculationError> {
let partitioner = self
.keyspaces
.get(keyspace)
Expand All @@ -213,11 +213,7 @@ impl ClusterState {
.and_then(PartitionerName::from_str)
.unwrap_or_default();

calculate_token_for_partition_key(partition_key, &partitioner).map_err(|err| match err {
TokenCalculationError::ValueTooLong(values_len) => {
BadQuery::ValuesTooLongForKey(values_len, u16::MAX.into())
}
})
calculate_token_for_partition_key(partition_key, &partitioner)
}

/// Access to replicas owning a given token
Expand Down Expand Up @@ -255,7 +251,7 @@ impl ClusterState {
keyspace: &str,
table: &str,
partition_key: &SerializedValues,
) -> Result<Vec<(Arc<Node>, Shard)>, BadQuery> {
) -> Result<Vec<(Arc<Node>, Shard)>, TokenCalculationError> {
let token = self.compute_token(keyspace, table, partition_key)?;
Ok(self.get_token_endpoints(keyspace, table, token))
}
Expand Down
Loading