Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 4 additions & 29 deletions src/commands/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,8 +508,10 @@ impl Buffer {
_ => write_attr |= INFO2_WRITE,
}

let each_op =
matches!(operation.data, OperationData::CdtMapOp(_) | OperationData::CdtBitOp(_));
let each_op = matches!(
operation.data,
OperationData::CdtMapOp(_) | OperationData::CdtBitOp(_)
);

if policy.respond_per_each_op || each_op {
write_attr |= INFO2_RESPOND_ALL_OPS;
Expand Down Expand Up @@ -612,10 +614,6 @@ impl Buffer {
field_count += 1;
}

// Estimate scan options size.
self.data_offset += 2 + FIELD_HEADER_SIZE as usize;
field_count += 1;

// Estimate scan timeout size.
self.data_offset += 4 + FIELD_HEADER_SIZE as usize;
field_count += 1;
Expand Down Expand Up @@ -661,18 +659,6 @@ impl Buffer {
self.write_filter_expression(filter, filter_size)?;
}

self.write_field_header(2, FieldType::ScanOptions)?;

let mut priority: u8 = policy.base_policy.priority.clone() as u8;
priority <<= 4;

if policy.fail_on_cluster_change {
priority |= 0x08;
}

self.write_u8(priority)?;
self.write_u8(policy.scan_percent)?;

// Write scan timeout
self.write_field_header(4, FieldType::ScanTimeout)?;
self.write_u32(policy.socket_timeout)?;
Expand Down Expand Up @@ -751,11 +737,6 @@ impl Buffer {
self.data_offset += bin_name_size;
field_count += 1;
}
} else {
// Calling query with no filters is more efficiently handled by a primary index scan.
// Estimate scan options size.
self.data_offset += 2 + FIELD_HEADER_SIZE as usize;
field_count += 1;
}
let filter_exp_size = self.estimate_filter_size(policy.filter_expression())?;
if filter_exp_size > 0 {
Expand Down Expand Up @@ -847,12 +828,6 @@ impl Buffer {
}
}
}
} else {
// Calling query with no filters is more efficiently handled by a primary index scan.
self.write_field_header(2, FieldType::ScanOptions)?;
let priority: u8 = (policy.base_policy.priority.clone() as u8) << 4;
self.write_u8(priority)?;
self.write_u8(100)?;
}

if let Some(filter_exp) = policy.filter_expression() {
Expand Down
2 changes: 1 addition & 1 deletion src/commands/field_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub enum FieldType {
// GUID = 5,
// DigestRipeArray = 6,
TranId = 7, // user supplied transaction id, which is simply passed back,
ScanOptions = 8,
// ScanOptions = 8,
ScanTimeout = 9,
IndexName = 21,
IndexRange = 22,
Expand Down
3 changes: 1 addition & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,7 @@ pub use net::Host;
pub use operations::{MapPolicy, MapReturnType, MapWriteMode};
pub use policy::{
BatchPolicy, ClientPolicy, CommitLevel, Concurrency, ConsistencyLevel, Expiration,
GenerationPolicy, Policy, Priority, QueryPolicy, ReadPolicy, RecordExistsAction, ScanPolicy,
WritePolicy,
GenerationPolicy, Policy, QueryPolicy, ReadPolicy, RecordExistsAction, ScanPolicy, WritePolicy,
};
pub use query::{CollectionIndexType, IndexType, Recordset, Statement, UDFLang};
pub use record::Record;
Expand Down
17 changes: 0 additions & 17 deletions src/policy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ mod concurrency;
mod consistency_level;
mod expiration;
mod generation_policy;
mod priority;
mod query_policy;
mod read_policy;
mod record_exists_action;
Expand All @@ -39,7 +38,6 @@ pub use self::concurrency::Concurrency;
pub use self::consistency_level::ConsistencyLevel;
pub use self::expiration::Expiration;
pub use self::generation_policy::GenerationPolicy;
pub use self::priority::Priority;
pub use self::query_policy::QueryPolicy;
pub use self::read_policy::ReadPolicy;
pub use self::record_exists_action::RecordExistsAction;
Expand All @@ -53,9 +51,6 @@ use std::time::{Duration, Instant};
/// Trait implemented by most policy types; policies that implement this trait typically encompass
/// an instance of `BasePolicy`.
pub trait Policy {
/// Transaction priority.
fn priority(&self) -> &Priority;

#[doc(hidden)]
/// Deadline for current transaction based on specified timeout. For internal use only.
fn deadline(&self) -> Option<Instant>;
Expand Down Expand Up @@ -92,10 +87,6 @@ impl<T> Policy for T
where
T: PolicyLike,
{
fn priority(&self) -> &Priority {
self.base().priority()
}

fn consistency_level(&self) -> &ConsistencyLevel {
self.base().consistency_level()
}
Expand All @@ -120,10 +111,6 @@ where
/// Common parameters shared by all policy types.
#[derive(Debug, Clone)]
pub struct BasePolicy {
/// Priority of request relative to other transactions.
/// Currently, only used for scans.
pub priority: Priority,

/// How replicas should be consulted in a read operation to provide the desired
/// consistency guarantee. Default to allowing one replica to be used in the
/// read operation.
Expand All @@ -150,10 +137,6 @@ pub struct BasePolicy {
}

impl Policy for BasePolicy {
fn priority(&self) -> &Priority {
&self.priority
}

fn deadline(&self) -> Option<Instant> {
match self.timeout {
Some(timeout) => Some(Instant::now() + timeout),
Expand Down
35 changes: 0 additions & 35 deletions src/policy/priority.rs

This file was deleted.

3 changes: 1 addition & 2 deletions src/policy/read_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

use crate::expressions::FilterExpression;
use crate::policy::BasePolicy;
use crate::{ConsistencyLevel, Priority};
use crate::ConsistencyLevel;
use std::time::Duration;

/// `ReadPolicy` excapsulates parameters for transaction policy attributes
Expand All @@ -25,7 +25,6 @@ pub type ReadPolicy = BasePolicy;
impl Default for ReadPolicy {
fn default() -> ReadPolicy {
ReadPolicy {
priority: Priority::Default,
timeout: Some(Duration::new(30, 0)),
max_retries: Some(2),
sleep_between_retries: Some(Duration::new(0, 500_000_000)),
Expand Down
8 changes: 0 additions & 8 deletions src/policy/scan_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ pub struct ScanPolicy {
/// Base policy instance
pub base_policy: BasePolicy,

/// Percent of data to scan. Valid integer range is 1 to 100. Default is 100.
pub scan_percent: u8,

/// Maximum number of concurrent requests to server nodes at any point in time. If there are 16
/// nodes in the cluster and `max_concurrent_nodes` is 8, then scan requests will be made to 8
/// nodes in parallel. When a scan completes, a new scan request will be issued until all 16
Expand All @@ -36,9 +33,6 @@ pub struct ScanPolicy {
/// the queue is full, the producer threads will block until records are consumed.
pub record_queue_size: usize,

/// Terminate scan if cluster is in fluctuating state.
pub fail_on_cluster_change: bool,

/// Maximum time in milliseconds to wait when polling socket for availability prior to
/// performing an operation on the socket on the server side. Zero means there is no socket
/// timeout. Default: 10,000 ms.
Expand All @@ -64,10 +58,8 @@ impl Default for ScanPolicy {
fn default() -> Self {
ScanPolicy {
base_policy: BasePolicy::default(),
scan_percent: 100,
max_concurrent_nodes: 0,
record_queue_size: 1024,
fail_on_cluster_change: true,
socket_timeout: 10000,
filter_expression: None,
}
Expand Down