Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/rattler_index/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ pub async fn read_package_with_retry(
metadata = RepodataFileMetadata {
etag: fresh_metadata.etag().map(str::to_owned),
last_modified: fresh_metadata.last_modified(),
file_existed: true,
precondition_checks: crate::PreconditionChecks::Enabled,
};
// Loop continues to next iteration with fresh metadata
}
Expand Down
127 changes: 115 additions & 12 deletions crates/rattler_index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,29 @@ use tokio::sync::Semaphore;
use tracing::Instrument;
use url::Url;

/// Configuration for precondition checks during file operations.
///
/// Precondition checks use `ETags` and timestamps to detect concurrent modifications
/// and prevent race conditions when multiple processes are indexing simultaneously.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum PreconditionChecks {
/// Enable precondition checks (default behavior).
/// This provides protection against concurrent modifications.
#[default]
Enabled,
/// Disable precondition checks.
/// Use this when working with S3 implementations that don't fully support
/// conditional requests, or when you're certain no concurrent indexing occurs.
Disabled,
}

impl PreconditionChecks {
/// Returns true if precondition checks are enabled
pub fn is_enabled(self) -> bool {
matches!(self, PreconditionChecks::Enabled)
}
}

/// Statistics for a single subdir indexing operation
#[derive(Debug, Clone, Default)]
pub struct SubdirIndexStats {
Expand Down Expand Up @@ -300,6 +323,8 @@ async fn read_and_parse_package(
RepodataFileMetadata {
etag,
last_modified,
file_existed: true, // File exists since we got its metadata from stat
precondition_checks: PreconditionChecks::Enabled, // Always enabled for cache reads
},
)
.await
Expand Down Expand Up @@ -340,20 +365,42 @@ pub struct RepodataFileMetadata {
pub etag: Option<String>,
/// The last modified timestamp of the file, if available
pub last_modified: Option<DateTime<Utc>>,
/// Whether the file existed when metadata was collected
pub file_existed: bool,
/// The precondition checks configuration when this metadata was collected
pub precondition_checks: PreconditionChecks,
}

impl RepodataFileMetadata {
/// Collect metadata for a file without reading its contents.
/// Returns metadata with None values if the file doesn't exist.
pub async fn new(op: &Operator, path: &str) -> opendal::Result<Self> {
/// Returns metadata with None values if the file doesn't exist or if precondition checks are disabled.
pub async fn new(
op: &Operator,
path: &str,
precondition_checks: PreconditionChecks,
) -> opendal::Result<Self> {
// If precondition checks are disabled, return empty metadata
if !precondition_checks.is_enabled() {
return Ok(Self {
etag: None,
last_modified: None,
file_existed: false,
precondition_checks,
});
}

match op.stat(path).await {
Ok(metadata) => Ok(Self {
etag: metadata.etag().map(str::to_owned),
last_modified: metadata.last_modified(),
file_existed: true,
precondition_checks,
}),
Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(Self {
etag: None,
last_modified: None,
file_existed: false,
precondition_checks,
}),
Err(e) => Err(e),
}
Expand Down Expand Up @@ -382,28 +429,49 @@ impl RepodataMetadataCollection {
has_patch: bool,
write_zst: bool,
write_shards: bool,
precondition_checks: PreconditionChecks,
) -> opendal::Result<Self> {
// Always track repodata.json
let repodata = RepodataFileMetadata::new(op, &format!("{subdir}/{REPODATA}")).await?;
let repodata =
RepodataFileMetadata::new(op, &format!("{subdir}/{REPODATA}"), precondition_checks)
.await?;

// Track repodata_from_packages.json if patches are used
let repodata_from_packages = if has_patch {
Some(
RepodataFileMetadata::new(op, &format!("{subdir}/{REPODATA_FROM_PACKAGES}"))
.await?,
RepodataFileMetadata::new(
op,
&format!("{subdir}/{REPODATA_FROM_PACKAGES}"),
precondition_checks,
)
.await?,
)
} else {
None
};

let repodata_zst = if write_zst {
Some(RepodataFileMetadata::new(op, &format!("{subdir}/{REPODATA}.zst")).await?)
Some(
RepodataFileMetadata::new(
op,
&format!("{subdir}/{REPODATA}.zst"),
precondition_checks,
)
.await?,
)
} else {
None
};

let repodata_shards = if write_shards {
Some(RepodataFileMetadata::new(op, &format!("{subdir}/{REPODATA_SHARDS}")).await?)
Some(
RepodataFileMetadata::new(
op,
&format!("{subdir}/{REPODATA_SHARDS}"),
precondition_checks,
)
.await?,
)
} else {
None
};
Expand All @@ -428,6 +496,7 @@ async fn index_subdir(
progress: Option<MultiProgress>,
semaphore: Arc<Semaphore>,
cache: cache::PackageRecordCache,
precondition_checks: PreconditionChecks,
) -> Result<SubdirIndexStats> {
// Use write_retry_policy for handling lock contention during repodata writes
// This will retry for 10 minutes with longer backoff durations (10s, 30s, 60s, etc.)
Expand All @@ -447,6 +516,7 @@ async fn index_subdir(
progress.clone(),
semaphore.clone(),
cache.clone(),
precondition_checks,
)
.await
{
Expand All @@ -455,18 +525,37 @@ async fn index_subdir(
return Ok(stats);
}
Err(e) => {
// Check if this is a race condition error
// Check if this is a race condition error that we should retry
if let Some(opendal_err) = e.downcast_ref::<opendal::Error>() {
if opendal_err.kind() == opendal::ErrorKind::ConditionNotMatch {
let is_retryable_condition_error = matches!(
opendal_err.kind(),
opendal::ErrorKind::ConditionNotMatch | opendal::ErrorKind::Unexpected
) && {
// For Unexpected errors, check if it's the HTTP 409 ConditionalRequestConflict
let error_str = format!("{opendal_err:?}");
error_str.contains("ConditionalRequestConflict")
|| error_str.contains("status: 409")
|| opendal_err.kind() == opendal::ErrorKind::ConditionNotMatch
};

if is_retryable_condition_error {
// Race condition detected - should we retry?
match retry_policy.should_retry(request_start_time, current_try as u32) {
RetryDecision::Retry { execute_after } => {
let duration = execute_after
.duration_since(SystemTime::now())
.unwrap_or_default();

// Log with more context to help diagnose the issue
tracing::warn!(
"Detected concurrent modification of repodata for {}, retrying in {:?}",
"Detected concurrent modification of repodata for {} (attempt {}/max). \
Error: {:?}. Retrying in {:?}. \
This may indicate multiple indexing processes running simultaneously, \
or an S3 backend with incomplete precondition support. \
Consider using PreconditionChecks::Disabled if this persists.",
subdir,
current_try + 1,
opendal_err,
duration
);
tokio::time::sleep(duration).await;
Expand All @@ -475,8 +564,13 @@ async fn index_subdir(
}
RetryDecision::DoNotRetry => {
tracing::error!(
"Max retries exceeded for {} due to concurrent modifications",
subdir
"Max retries exceeded for {} due to concurrent modifications. \
Final error: {:?}. \
If you're not running concurrent indexing, your S3 backend may not \
fully support conditional requests. Consider disabling precondition \
checks by setting precondition_checks to PreconditionChecks::Disabled.",
subdir,
opendal_err
);
return Err(e);
}
Expand All @@ -501,6 +595,7 @@ async fn index_subdir_inner(
progress: Option<MultiProgress>,
semaphore: Arc<Semaphore>,
cache: cache::PackageRecordCache,
precondition_checks: PreconditionChecks,
) -> Result<SubdirIndexStats> {
// Step 1: Collect ETags/metadata for all critical files upfront
let metadata = RepodataMetadataCollection::new(
Expand All @@ -509,6 +604,7 @@ async fn index_subdir_inner(
repodata_patch.is_some(),
write_zst,
write_shards,
precondition_checks,
)
.await?;

Expand Down Expand Up @@ -938,6 +1034,7 @@ pub async fn index_fs(
force,
max_parallel,
multi_progress,
PreconditionChecks::Disabled,
)
.await
.map(|_| ())
Expand All @@ -963,6 +1060,8 @@ pub struct IndexS3Config {
pub max_parallel: usize,
/// The multi-progress bar to use for the index.
pub multi_progress: Option<MultiProgress>,
/// Configuration for precondition checks during file operations.
pub precondition_checks: PreconditionChecks,
}

/// Create a new `repodata.json` for all packages in the channel at the given S3
Expand All @@ -978,6 +1077,7 @@ pub async fn index_s3(
force,
max_parallel,
multi_progress,
precondition_checks,
}: IndexS3Config,
) -> anyhow::Result<()> {
// Create the S3 configuration for opendal.
Expand Down Expand Up @@ -1008,6 +1108,7 @@ pub async fn index_s3(
force,
max_parallel,
multi_progress,
precondition_checks,
)
.await
.map(|_| ())
Expand Down Expand Up @@ -1039,6 +1140,7 @@ pub async fn index(
force: bool,
max_parallel: usize,
multi_progress: Option<MultiProgress>,
precondition_checks: PreconditionChecks,
) -> anyhow::Result<IndexStats> {
let entries = op.list_with("").await?;

Expand Down Expand Up @@ -1116,6 +1218,7 @@ pub async fn index(
multi_progress.clone(),
semaphore.clone(),
cache,
precondition_checks,
)
.instrument(tracing::info_span!("index_subdir", subdir = %subdir));
tasks.push((*subdir, task));
Expand Down
16 changes: 15 additions & 1 deletion crates/rattler_index/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use clap::{arg, Parser, Subcommand};
use clap_verbosity_flag::Verbosity;
use rattler_conda_types::Platform;
use rattler_config::config::concurrency::default_max_concurrent_solves;
use rattler_index::{index_fs, index_s3, IndexFsConfig, IndexS3Config};
use rattler_index::{index_fs, index_s3, IndexFsConfig, IndexS3Config, PreconditionChecks};
use rattler_networking::AuthenticationStorage;
use rattler_s3::S3Credentials;
use url::Url;
Expand Down Expand Up @@ -60,6 +60,13 @@ struct Cli {
#[arg(long, global = true)]
repodata_patch: Option<String>,

/// Disable precondition checks (`ETags`, timestamps) during file operations.
/// Use this flag if your S3 backend doesn't fully support conditional requests,
/// or if you're certain no concurrent indexing processes are running.
/// Warning: Disabling this removes protection against concurrent modifications.
#[arg(long, default_value = "false", global = true)]
disable_precondition_checks: bool,

/// The path to the config file to use to configure rattler-index.
/// Uses the same configuration format as pixi, see `https://pixi.sh/latest/reference/pixi_configuration`.
#[arg(long)]
Expand Down Expand Up @@ -115,6 +122,12 @@ async fn main() -> anyhow::Result<()> {
.or(config.as_ref().map(|c| c.concurrency.downloads))
.unwrap_or_else(default_max_concurrent_solves);

let precondition_checks = if cli.disable_precondition_checks {
PreconditionChecks::Disabled
} else {
PreconditionChecks::Enabled
};

match cli.command {
Commands::FileSystem { channel } => {
index_fs(IndexFsConfig {
Expand Down Expand Up @@ -163,6 +176,7 @@ async fn main() -> anyhow::Result<()> {
force: cli.force,
max_parallel,
multi_progress: Some(multi_progress),
precondition_checks,
})
.await
}
Expand Down
Loading