diff --git a/crates/rattler_index/src/cache.rs b/crates/rattler_index/src/cache.rs index 46875d56da..e62c92157b 100644 --- a/crates/rattler_index/src/cache.rs +++ b/crates/rattler_index/src/cache.rs @@ -228,6 +228,8 @@ pub async fn read_package_with_retry( metadata = RepodataFileMetadata { etag: fresh_metadata.etag().map(str::to_owned), last_modified: fresh_metadata.last_modified(), + file_existed: true, + precondition_checks: crate::PreconditionChecks::Enabled, }; // Loop continues to next iteration with fresh metadata } diff --git a/crates/rattler_index/src/lib.rs b/crates/rattler_index/src/lib.rs index 7cb655799f..1aef74a648 100644 --- a/crates/rattler_index/src/lib.rs +++ b/crates/rattler_index/src/lib.rs @@ -44,6 +44,29 @@ use tokio::sync::Semaphore; use tracing::Instrument; use url::Url; +/// Configuration for precondition checks during file operations. +/// +/// Precondition checks use `ETags` and timestamps to detect concurrent modifications +/// and prevent race conditions when multiple processes are indexing simultaneously. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum PreconditionChecks { + /// Enable precondition checks (default behavior). + /// This provides protection against concurrent modifications. + #[default] + Enabled, + /// Disable precondition checks. + /// Use this when working with S3 implementations that don't fully support + /// conditional requests, or when you're certain no concurrent indexing occurs. + Disabled, +} + +impl PreconditionChecks { + /// Returns true if precondition checks are enabled + pub fn is_enabled(self) -> bool { + matches!(self, PreconditionChecks::Enabled) + } +} + /// Statistics for a single subdir indexing operation #[derive(Debug, Clone, Default)] pub struct SubdirIndexStats { @@ -300,6 +323,8 @@ async fn read_and_parse_package( RepodataFileMetadata { etag, last_modified, + file_existed: true, // File exists since we got its metadata from stat + precondition_checks: PreconditionChecks::Enabled, // Always enabled for cache reads }, ) .await @@ -340,20 +365,42 @@ pub struct RepodataFileMetadata { pub etag: Option, /// The last modified timestamp of the file, if available pub last_modified: Option>, + /// Whether the file existed when metadata was collected + pub file_existed: bool, + /// The precondition checks configuration when this metadata was collected + pub precondition_checks: PreconditionChecks, } impl RepodataFileMetadata { /// Collect metadata for a file without reading its contents. - /// Returns metadata with None values if the file doesn't exist. - pub async fn new(op: &Operator, path: &str) -> opendal::Result { + /// Returns metadata with None values if the file doesn't exist or if precondition checks are disabled. + pub async fn new( + op: &Operator, + path: &str, + precondition_checks: PreconditionChecks, + ) -> opendal::Result { + // If precondition checks are disabled, return empty metadata + if !precondition_checks.is_enabled() { + return Ok(Self { + etag: None, + last_modified: None, + file_existed: false, + precondition_checks, + }); + } + match op.stat(path).await { Ok(metadata) => Ok(Self { etag: metadata.etag().map(str::to_owned), last_modified: metadata.last_modified(), + file_existed: true, + precondition_checks, }), Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(Self { etag: None, last_modified: None, + file_existed: false, + precondition_checks, }), Err(e) => Err(e), } @@ -382,28 +429,49 @@ impl RepodataMetadataCollection { has_patch: bool, write_zst: bool, write_shards: bool, + precondition_checks: PreconditionChecks, ) -> opendal::Result { // Always track repodata.json - let repodata = RepodataFileMetadata::new(op, &format!("{subdir}/{REPODATA}")).await?; + let repodata = + RepodataFileMetadata::new(op, &format!("{subdir}/{REPODATA}"), precondition_checks) + .await?; // Track repodata_from_packages.json if patches are used let repodata_from_packages = if has_patch { Some( - RepodataFileMetadata::new(op, &format!("{subdir}/{REPODATA_FROM_PACKAGES}")) - .await?, + RepodataFileMetadata::new( + op, + &format!("{subdir}/{REPODATA_FROM_PACKAGES}"), + precondition_checks, + ) + .await?, ) } else { None }; let repodata_zst = if write_zst { - Some(RepodataFileMetadata::new(op, &format!("{subdir}/{REPODATA}.zst")).await?) + Some( + RepodataFileMetadata::new( + op, + &format!("{subdir}/{REPODATA}.zst"), + precondition_checks, + ) + .await?, + ) } else { None }; let repodata_shards = if write_shards { - Some(RepodataFileMetadata::new(op, &format!("{subdir}/{REPODATA_SHARDS}")).await?) + Some( + RepodataFileMetadata::new( + op, + &format!("{subdir}/{REPODATA_SHARDS}"), + precondition_checks, + ) + .await?, + ) } else { None }; @@ -428,6 +496,7 @@ async fn index_subdir( progress: Option, semaphore: Arc, cache: cache::PackageRecordCache, + precondition_checks: PreconditionChecks, ) -> Result { // Use write_retry_policy for handling lock contention during repodata writes // This will retry for 10 minutes with longer backoff durations (10s, 30s, 60s, etc.) @@ -447,6 +516,7 @@ async fn index_subdir( progress.clone(), semaphore.clone(), cache.clone(), + precondition_checks, ) .await { @@ -455,18 +525,37 @@ async fn index_subdir( return Ok(stats); } Err(e) => { - // Check if this is a race condition error + // Check if this is a race condition error that we should retry if let Some(opendal_err) = e.downcast_ref::() { - if opendal_err.kind() == opendal::ErrorKind::ConditionNotMatch { + let is_retryable_condition_error = matches!( + opendal_err.kind(), + opendal::ErrorKind::ConditionNotMatch | opendal::ErrorKind::Unexpected + ) && { + // For Unexpected errors, check if it's the HTTP 409 ConditionalRequestConflict + let error_str = format!("{opendal_err:?}"); + error_str.contains("ConditionalRequestConflict") + || error_str.contains("status: 409") + || opendal_err.kind() == opendal::ErrorKind::ConditionNotMatch + }; + + if is_retryable_condition_error { // Race condition detected - should we retry? match retry_policy.should_retry(request_start_time, current_try as u32) { RetryDecision::Retry { execute_after } => { let duration = execute_after .duration_since(SystemTime::now()) .unwrap_or_default(); + + // Log with more context to help diagnose the issue tracing::warn!( - "Detected concurrent modification of repodata for {}, retrying in {:?}", + "Detected concurrent modification of repodata for {} (attempt {}/max). \ + Error: {:?}. Retrying in {:?}. \ + This may indicate multiple indexing processes running simultaneously, \ + or an S3 backend with incomplete precondition support. \ + Consider using PreconditionChecks::Disabled if this persists.", subdir, + current_try + 1, + opendal_err, duration ); tokio::time::sleep(duration).await; @@ -475,8 +564,13 @@ async fn index_subdir( } RetryDecision::DoNotRetry => { tracing::error!( - "Max retries exceeded for {} due to concurrent modifications", - subdir + "Max retries exceeded for {} due to concurrent modifications. \ + Final error: {:?}. \ + If you're not running concurrent indexing, your S3 backend may not \ + fully support conditional requests. Consider disabling precondition \ + checks by setting precondition_checks to PreconditionChecks::Disabled.", + subdir, + opendal_err ); return Err(e); } @@ -501,6 +595,7 @@ async fn index_subdir_inner( progress: Option, semaphore: Arc, cache: cache::PackageRecordCache, + precondition_checks: PreconditionChecks, ) -> Result { // Step 1: Collect ETags/metadata for all critical files upfront let metadata = RepodataMetadataCollection::new( @@ -509,6 +604,7 @@ async fn index_subdir_inner( repodata_patch.is_some(), write_zst, write_shards, + precondition_checks, ) .await?; @@ -938,6 +1034,7 @@ pub async fn index_fs( force, max_parallel, multi_progress, + PreconditionChecks::Disabled, ) .await .map(|_| ()) @@ -963,6 +1060,8 @@ pub struct IndexS3Config { pub max_parallel: usize, /// The multi-progress bar to use for the index. pub multi_progress: Option, + /// Configuration for precondition checks during file operations. + pub precondition_checks: PreconditionChecks, } /// Create a new `repodata.json` for all packages in the channel at the given S3 @@ -978,6 +1077,7 @@ pub async fn index_s3( force, max_parallel, multi_progress, + precondition_checks, }: IndexS3Config, ) -> anyhow::Result<()> { // Create the S3 configuration for opendal. @@ -1008,6 +1108,7 @@ pub async fn index_s3( force, max_parallel, multi_progress, + precondition_checks, ) .await .map(|_| ()) @@ -1039,6 +1140,7 @@ pub async fn index( force: bool, max_parallel: usize, multi_progress: Option, + precondition_checks: PreconditionChecks, ) -> anyhow::Result { let entries = op.list_with("").await?; @@ -1116,6 +1218,7 @@ pub async fn index( multi_progress.clone(), semaphore.clone(), cache, + precondition_checks, ) .instrument(tracing::info_span!("index_subdir", subdir = %subdir)); tasks.push((*subdir, task)); diff --git a/crates/rattler_index/src/main.rs b/crates/rattler_index/src/main.rs index 8c17160b80..b25e53fe60 100644 --- a/crates/rattler_index/src/main.rs +++ b/crates/rattler_index/src/main.rs @@ -5,7 +5,7 @@ use clap::{arg, Parser, Subcommand}; use clap_verbosity_flag::Verbosity; use rattler_conda_types::Platform; use rattler_config::config::concurrency::default_max_concurrent_solves; -use rattler_index::{index_fs, index_s3, IndexFsConfig, IndexS3Config}; +use rattler_index::{index_fs, index_s3, IndexFsConfig, IndexS3Config, PreconditionChecks}; use rattler_networking::AuthenticationStorage; use rattler_s3::S3Credentials; use url::Url; @@ -60,6 +60,13 @@ struct Cli { #[arg(long, global = true)] repodata_patch: Option, + /// Disable precondition checks (`ETags`, timestamps) during file operations. + /// Use this flag if your S3 backend doesn't fully support conditional requests, + /// or if you're certain no concurrent indexing processes are running. + /// Warning: Disabling this removes protection against concurrent modifications. + #[arg(long, default_value = "false", global = true)] + disable_precondition_checks: bool, + /// The path to the config file to use to configure rattler-index. /// Uses the same configuration format as pixi, see `https://pixi.sh/latest/reference/pixi_configuration`. #[arg(long)] @@ -115,6 +122,12 @@ async fn main() -> anyhow::Result<()> { .or(config.as_ref().map(|c| c.concurrency.downloads)) .unwrap_or_else(default_max_concurrent_solves); + let precondition_checks = if cli.disable_precondition_checks { + PreconditionChecks::Disabled + } else { + PreconditionChecks::Enabled + }; + match cli.command { Commands::FileSystem { channel } => { index_fs(IndexFsConfig { @@ -163,6 +176,7 @@ async fn main() -> anyhow::Result<()> { force: cli.force, max_parallel, multi_progress: Some(multi_progress), + precondition_checks, }) .await } diff --git a/crates/rattler_index/src/utils.rs b/crates/rattler_index/src/utils.rs index 09736e467d..3e5ce4b3ba 100644 --- a/crates/rattler_index/src/utils.rs +++ b/crates/rattler_index/src/utils.rs @@ -6,7 +6,11 @@ use crate::RepodataFileMetadata; /// /// This function reads a file and validates that it hasn't been modified since /// the metadata was collected. If the file has been modified (`ETag` or -/// last-modified doesn't match), it returns a `ConditionNotMatch` error. +/// `last_modified` doesn't match), it returns a `ConditionNotMatch` error. +/// +/// If metadata has no `ETag` or `last_modified` (either because the file didn't exist, +/// precondition checks are disabled, or the backend doesn't support it), the file +/// is read without conditional checks. /// /// # Parameters /// - `op`: A reference to the `Operator`, which facilitates file system @@ -23,15 +27,21 @@ pub async fn read_with_metadata_check( path: &str, metadata: &RepodataFileMetadata, ) -> opendal::Result { - let reader = op.read_with(path); - let reader = if let Some(etag) = &metadata.etag { - reader.if_match(etag) - } else if let Some(last_modified) = metadata.last_modified { - reader.if_unmodified_since(last_modified) - } else { - // If no metadata available, just read without conditions - reader - }; + let mut reader = op.read_with(path); + + // Only apply precondition checks if they're enabled + if metadata.precondition_checks.is_enabled() { + // Prefer ETag for precise change detection + if let Some(etag) = &metadata.etag { + reader = reader.if_match(etag); + } + // Fall back to last_modified timestamp + else if let Some(last_modified) = metadata.last_modified { + reader = reader.if_unmodified_since(last_modified); + } + // else: no metadata available, proceed without conditions + } + reader.await } @@ -45,6 +55,10 @@ pub async fn read_with_metadata_check( /// function uses `if_not_exists` to ensure the file still doesn't exist, /// preventing race conditions where another process creates it first. /// +/// If metadata has no etag (either because the file didn't exist, precondition +/// checks are disabled, or the backend doesn't support it), the file is written +/// without conditional checks. +/// /// # Parameters /// - `op`: A reference to the `Operator`, which facilitates file system /// operations. @@ -61,15 +75,20 @@ pub async fn write_with_metadata_check( path: &str, data: Vec, metadata: &RepodataFileMetadata, -) -> opendal::Result<()> { - let writer = op.write_with(path, data); - let writer = if let Some(etag) = &metadata.etag { - // File existed - verify it hasn't changed - writer.if_match(etag) - } else { - // File didn't exist - ensure it still doesn't exist - writer.if_not_exists(true) - }; - writer.await?; - Ok(()) +) -> opendal::Result { + let mut writer = op.write_with(path, data); + + // Only apply precondition checks if they're enabled + if metadata.precondition_checks.is_enabled() { + if let Some(etag) = &metadata.etag { + // File existed - verify it hasn't changed + writer = writer.if_match(etag); + } else if !metadata.file_existed { + // File didn't exist - ensure it still doesn't (prevents race conditions) + writer = writer.if_not_exists(true); + } + // else: file existed but no etag support, proceed without conditions + } + + writer.await } diff --git a/crates/rattler_index/tests/integration/cache_tests.rs b/crates/rattler_index/tests/integration/cache_tests.rs index 6658015944..24a6decf82 100644 --- a/crates/rattler_index/tests/integration/cache_tests.rs +++ b/crates/rattler_index/tests/integration/cache_tests.rs @@ -45,6 +45,8 @@ async fn test_read_package_with_retry_success_after_etag_change() { let old_metadata = RepodataFileMetadata { etag: etag_v1, last_modified: metadata_v1.last_modified(), + file_existed: true, + precondition_checks: rattler_index::PreconditionChecks::Enabled, }; let result = read_package_with_retry(&op, path, old_metadata).await; @@ -77,6 +79,8 @@ async fn test_read_package_with_retry_propagates_other_errors() { let metadata = RepodataFileMetadata { etag: Some("fake-etag".to_string()), last_modified: None, + file_existed: true, + precondition_checks: rattler_index::PreconditionChecks::Enabled, }; let result = read_package_with_retry(&op, path, metadata).await; diff --git a/crates/rattler_index/tests/integration/concurrent_indexing.rs b/crates/rattler_index/tests/integration/concurrent_indexing.rs index 59779e0e70..be90ca882b 100644 --- a/crates/rattler_index/tests/integration/concurrent_indexing.rs +++ b/crates/rattler_index/tests/integration/concurrent_indexing.rs @@ -12,7 +12,7 @@ use std::path::Path; use opendal::Operator; use rattler_conda_types::Platform; -use rattler_index::index; +use rattler_index::{index, PreconditionChecks}; use tracing::Instrument; use super::etag_memory_backend::ETagMemoryBuilder; @@ -110,6 +110,7 @@ async fn test_concurrent_index_with_race_condition_and_retry() { false, 1, None, + PreconditionChecks::Enabled, ) .await; result @@ -129,6 +130,7 @@ async fn test_concurrent_index_with_race_condition_and_retry() { false, 1, None, + PreconditionChecks::Enabled, ) .await; result diff --git a/py-rattler/Cargo.lock b/py-rattler/Cargo.lock index 2706f99600..90db83c7d5 100644 --- a/py-rattler/Cargo.lock +++ b/py-rattler/Cargo.lock @@ -1870,15 +1870,6 @@ dependencies = [ "slab", ] -[[package]] -name = "fxhash" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" -dependencies = [ - "byteorder", -] - [[package]] name = "generic-array" version = "0.14.7" @@ -3173,8 +3164,8 @@ checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" name = "path_resolver" version = "0.2.1" dependencies = [ + "ahash", "fs-err", - "fxhash", "indexmap 2.12.0", "itertools 0.14.0", "proptest", @@ -3729,7 +3720,7 @@ dependencies = [ [[package]] name = "rattler" -version = "0.38.2" +version = "0.38.3" dependencies = [ "anyhow", "console", @@ -3771,8 +3762,9 @@ dependencies = [ [[package]] name = "rattler_cache" -version = "0.3.41" +version = "0.4.0" dependencies = [ + "ahash", "anyhow", "dashmap", "digest", @@ -3780,7 +3772,6 @@ dependencies = [ "fs-err", "fs4", "futures", - "fxhash", "itertools 0.14.0", "parking_lot", "rattler_conda_types", @@ -3804,12 +3795,12 @@ dependencies = [ name = "rattler_conda_types" version = "0.40.3" dependencies = [ + "ahash", "chrono", "core-foundation 0.10.1", "dirs", "file_url", "fs-err", - "fxhash", "glob", "hex", "indexmap 2.12.0", @@ -3874,8 +3865,9 @@ dependencies = [ [[package]] name = "rattler_index" -version = "0.26.4" +version = "0.26.5" dependencies = [ + "ahash", "anyhow", "bytes", "chrono", @@ -3884,7 +3876,7 @@ dependencies = [ "console", "fs-err", "futures", - "fxhash", + "indexmap 2.12.0", "indicatif", "opendal", "rattler_conda_types", @@ -3909,11 +3901,11 @@ dependencies = [ [[package]] name = "rattler_lock" -version = "0.25.3" +version = "0.26.0" dependencies = [ + "ahash", "chrono", "file_url", - "fxhash", "indexmap 2.12.0", "itertools 0.14.0", "pep440_rs", @@ -3941,7 +3933,7 @@ dependencies = [ [[package]] name = "rattler_menuinst" -version = "0.2.32" +version = "0.2.33" dependencies = [ "chrono", "configparser", @@ -3999,7 +3991,7 @@ dependencies = [ [[package]] name = "rattler_package_streaming" -version = "0.23.11" +version = "0.23.12" dependencies = [ "bzip2", "chrono", @@ -4046,7 +4038,7 @@ dependencies = [ [[package]] name = "rattler_repodata_gateway" -version = "0.24.12" +version = "0.24.13" dependencies = [ "anyhow", "async-compression", @@ -4121,7 +4113,7 @@ dependencies = [ [[package]] name = "rattler_shell" -version = "0.25.5" +version = "0.25.6" dependencies = [ "anyhow", "enum_dispatch", diff --git a/py-rattler/rattler/index/index.py b/py-rattler/rattler/index/index.py index d93d3bb464..68b18d8d8e 100644 --- a/py-rattler/rattler/index/index.py +++ b/py-rattler/rattler/index/index.py @@ -77,6 +77,7 @@ async def index_s3( write_shards: bool = True, force: bool = False, max_parallel: int | None = None, + precondition_checks: bool = True, ) -> None: """ Indexes dependencies in the `channel_url` for one or more subdirectories in the S3 directory. @@ -96,6 +97,7 @@ async def index_s3( write_shards: Whether to write sharded repodata. force: Whether to forcefully re-index all subdirs. max_parallel: The maximum number of packages to process in-memory simultaneously. + precondition_checks: Whether to perform precondition checks before indexing on S3 buckets which helps to prevent data corruption when indexing with multiple processes at the same time. Defaults to True. """ await py_index_s3( channel_url, @@ -106,4 +108,5 @@ async def index_s3( write_shards, force, max_parallel, + precondition_checks, ) diff --git a/py-rattler/src/index.rs b/py-rattler/src/index.rs index 7b74a37707..691fe18209 100644 --- a/py-rattler/src/index.rs +++ b/py-rattler/src/index.rs @@ -44,7 +44,7 @@ pub fn py_index_fs( #[pyfunction] #[allow(clippy::too_many_arguments, clippy::fn_params_excessive_bools)] -#[pyo3(signature = (channel_url, credentials=None, target_platform=None, repodata_patch=None, write_zst=true, write_shards=true, force=false, max_parallel=None))] +#[pyo3(signature = (channel_url, credentials=None, target_platform=None, repodata_patch=None, write_zst=true, write_shards=true, force=false, max_parallel=None, precondition_checks=true))] pub fn py_index_s3<'py>( py: Python<'py>, channel_url: String, @@ -55,6 +55,7 @@ pub fn py_index_s3<'py>( write_shards: bool, force: bool, max_parallel: Option, + precondition_checks: bool, ) -> PyResult> { let channel_url = Url::parse(&channel_url).map_err(PyRattlerError::from)?; let credentials = match credentials { @@ -89,6 +90,11 @@ pub fn py_index_s3<'py>( force, max_parallel: max_parallel.unwrap_or_else(default_max_concurrent_solves), multi_progress: None, + precondition_checks: if precondition_checks { + rattler_index::PreconditionChecks::Enabled + } else { + rattler_index::PreconditionChecks::Disabled + }, }) .await .map_err(|e| PyRattlerError::from(e).into())