Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: bump opendal and switch prometheus layer to the upstream impl #5179

Merged
merged 6 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 7 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/common/datasource/src/object_store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub fn build_fs_backend(root: &str) -> Result<ObjectStore> {
DefaultLoggingInterceptor,
))
.layer(object_store::layers::TracingLayer)
.layer(object_store::layers::PrometheusMetricsLayer::new(true))
.layer(object_store::layers::build_prometheus_metrics_layer(true))
.finish();
Ok(object_store)
}
2 changes: 1 addition & 1 deletion src/common/datasource/src/object_store/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub fn build_s3_backend(
DefaultLoggingInterceptor,
))
.layer(object_store::layers::TracingLayer)
.layer(object_store::layers::PrometheusMetricsLayer::new(true))
.layer(object_store::layers::build_prometheus_metrics_layer(true))
.finish())
}

Expand Down
8 changes: 6 additions & 2 deletions src/common/procedure/src/local/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ mod tests {
use common_test_util::temp_dir::create_temp_dir;
use futures_util::future::BoxFuture;
use futures_util::FutureExt;
use object_store::ObjectStore;
use object_store::{EntryMode, ObjectStore};
use tokio::sync::mpsc;

use super::*;
Expand Down Expand Up @@ -578,7 +578,11 @@ mod tests {
) {
let dir = proc_path!(procedure_store, "{procedure_id}/");
let lister = object_store.list(&dir).await.unwrap();
let mut files_in_dir: Vec<_> = lister.into_iter().map(|de| de.name().to_string()).collect();
let mut files_in_dir: Vec<_> = lister
.into_iter()
.filter(|x| x.metadata().mode() == EntryMode::FILE)
.map(|de| de.name().to_string())
.collect();
files_in_dir.sort_unstable();
assert_eq!(files, files_in_dir);
}
Expand Down
15 changes: 12 additions & 3 deletions src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,14 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to build http client"))]
BuildHttpClient {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: reqwest::Error,
},

#[snafu(display("Missing required field: {}", name))]
MissingRequiredField {
name: String,
Expand Down Expand Up @@ -406,9 +414,10 @@ impl ErrorExt for Error {
| MissingKvBackend { .. }
| TomlFormat { .. } => StatusCode::InvalidArguments,

PayloadNotExist { .. } | Unexpected { .. } | WatchAsyncTaskChange { .. } => {
StatusCode::Unexpected
}
PayloadNotExist { .. }
| Unexpected { .. }
| WatchAsyncTaskChange { .. }
| BuildHttpClient { .. } => StatusCode::Unexpected,

AsyncTaskExecute { source, .. } => source.status_code(),

Expand Down
5 changes: 3 additions & 2 deletions src/datanode/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder, O
use snafu::prelude::*;

use crate::config::{HttpClientConfig, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::error::{self, CreateDirSnafu, Result};
use crate::error::{self, BuildHttpClientSnafu, CreateDirSnafu, Result};

pub(crate) async fn new_raw_object_store(
store: &ObjectStoreConfig,
Expand Down Expand Up @@ -236,7 +236,8 @@ pub(crate) fn build_http_client(config: &HttpClientConfig) -> Result<HttpClient>
builder.timeout(config.timeout)
};

HttpClient::build(http_builder).context(error::InitBackendSnafu)
let client = http_builder.build().context(BuildHttpClientSnafu)?;
Ok(HttpClient::with(client))
}
struct PrintDetailedError;

Expand Down
2 changes: 1 addition & 1 deletion src/file-engine/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl FileRegionManifest {
pub async fn store(&self, region_dir: &str, object_store: &ObjectStore) -> Result<()> {
let path = &region_manifest_path(region_dir);
let exist = object_store
.is_exist(path)
.exists(path)
.await
.context(CheckObjectSnafu { path })?;
ensure!(!exist, ManifestExistsSnafu { path });
Expand Down
6 changes: 3 additions & 3 deletions src/file-engine/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ mod tests {
assert_eq!(region.metadata.primary_key, vec![1]);

assert!(object_store
.is_exist("create_region_dir/manifest/_file_manifest")
.exists("create_region_dir/manifest/_file_manifest")
.await
.unwrap());

Expand Down Expand Up @@ -198,13 +198,13 @@ mod tests {
.unwrap();

assert!(object_store
.is_exist("drop_region_dir/manifest/_file_manifest")
.exists("drop_region_dir/manifest/_file_manifest")
.await
.unwrap());

FileRegion::drop(&region, &object_store).await.unwrap();
assert!(!object_store
.is_exist("drop_region_dir/manifest/_file_manifest")
.exists("drop_region_dir/manifest/_file_manifest")
.await
.unwrap());

Expand Down
4 changes: 2 additions & 2 deletions src/metric-engine/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,12 +313,12 @@ mod test {
let region_dir = "test_metric_region";
// assert metadata region's dir
let metadata_region_dir = join_dir(region_dir, METADATA_REGION_SUBDIR);
let exist = object_store.is_exist(&metadata_region_dir).await.unwrap();
let exist = object_store.exists(&metadata_region_dir).await.unwrap();
assert!(exist);

// assert data region's dir
let data_region_dir = join_dir(region_dir, DATA_REGION_SUBDIR);
let exist = object_store.is_exist(&data_region_dir).await.unwrap();
let exist = object_store.exists(&data_region_dir).await.unwrap();
assert!(exist);

// check mito engine
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/cache/file_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ impl FileCache {
}

async fn get_reader(&self, file_path: &str) -> object_store::Result<Option<Reader>> {
if self.local_store.is_exist(file_path).await? {
if self.local_store.exists(file_path).await? {
Ok(Some(self.local_store.reader(file_path).await?))
} else {
Ok(None)
Expand Down Expand Up @@ -480,7 +480,7 @@ mod tests {
cache.memory_index.run_pending_tasks().await;

// The file also not exists.
assert!(!local_store.is_exist(&file_path).await.unwrap());
assert!(!local_store.exists(&file_path).await.unwrap());
assert_eq!(0, cache.memory_index.weighted_size());
}

Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/engine/create_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,12 @@ async fn test_engine_create_with_custom_store() {
assert!(object_store_manager
.find("Gcs")
.unwrap()
.is_exist(region_dir)
.exists(region_dir)
.await
.unwrap());
assert!(!object_store_manager
.default_object_store()
.is_exist(region_dir)
.exists(region_dir)
.await
.unwrap());
}
Expand Down
12 changes: 6 additions & 6 deletions src/mito2/src/engine/drop_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async fn test_engine_drop_region() {
assert!(!env
.get_object_store()
.unwrap()
.is_exist(&join_path(&region_dir, DROPPING_MARKER_FILE))
.exists(&join_path(&region_dir, DROPPING_MARKER_FILE))
.await
.unwrap());

Expand All @@ -93,7 +93,7 @@ async fn test_engine_drop_region() {
listener.wait().await;

let object_store = env.get_object_store().unwrap();
assert!(!object_store.is_exist(&region_dir).await.unwrap());
assert!(!object_store.exists(&region_dir).await.unwrap());
}

#[tokio::test]
Expand Down Expand Up @@ -167,13 +167,13 @@ async fn test_engine_drop_region_for_custom_store() {
assert!(object_store_manager
.find("Gcs")
.unwrap()
.is_exist(&custom_region_dir)
.exists(&custom_region_dir)
.await
.unwrap());
assert!(object_store_manager
.find("default")
.unwrap()
.is_exist(&global_region_dir)
.exists(&global_region_dir)
.await
.unwrap());

Expand All @@ -190,13 +190,13 @@ async fn test_engine_drop_region_for_custom_store() {
assert!(!object_store_manager
.find("Gcs")
.unwrap()
.is_exist(&custom_region_dir)
.exists(&custom_region_dir)
.await
.unwrap());
assert!(object_store_manager
.find("default")
.unwrap()
.is_exist(&global_region_dir)
.exists(&global_region_dir)
.await
.unwrap());
}
4 changes: 2 additions & 2 deletions src/mito2/src/engine/open_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,13 @@ async fn test_engine_region_open_with_custom_store() {
let object_store_manager = env.get_object_store_manager().unwrap();
assert!(!object_store_manager
.default_object_store()
.is_exist(region.access_layer.region_dir())
.exists(region.access_layer.region_dir())
.await
.unwrap());
assert!(object_store_manager
.find("Gcs")
.unwrap()
.is_exist(region.access_layer.region_dir())
.exists(region.access_layer.region_dir())
.await
.unwrap());
}
Expand Down
2 changes: 2 additions & 0 deletions src/mito2/src/manifest/tests/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ async fn manager_without_checkpoint() {

// check files
let mut expected = vec![
"/",
"00000000000000000010.json",
"00000000000000000009.json",
"00000000000000000008.json",
Expand Down Expand Up @@ -130,6 +131,7 @@ async fn manager_with_checkpoint_distance_1() {

// check files
let mut expected = vec![
"/",
"00000000000000000009.checkpoint",
"00000000000000000010.checkpoint",
"00000000000000000010.json",
Expand Down
6 changes: 3 additions & 3 deletions src/mito2/src/sst/file_purger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ mod tests {

scheduler.stop(true).await.unwrap();

assert!(!object_store.is_exist(&path).await.unwrap());
assert!(!object_store.exists(&path).await.unwrap());
}

#[tokio::test]
Expand Down Expand Up @@ -247,7 +247,7 @@ mod tests {

scheduler.stop(true).await.unwrap();

assert!(!object_store.is_exist(&path).await.unwrap());
assert!(!object_store.is_exist(&index_path).await.unwrap());
assert!(!object_store.exists(&path).await.unwrap());
assert!(!object_store.exists(&index_path).await.unwrap());
}
}
2 changes: 1 addition & 1 deletion src/mito2/src/worker/handle_open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// Check if this region is pending drop. And clean the entire dir if so.
if !self.dropping_regions.is_region_exists(region_id)
&& object_store
.is_exist(&join_path(&request.region_dir, DROPPING_MARKER_FILE))
.exists(&join_path(&request.region_dir, DROPPING_MARKER_FILE))
.await
.context(OpenDalSnafu)?
{
Expand Down
3 changes: 2 additions & 1 deletion src/object-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ futures.workspace = true
lazy_static.workspace = true
md5 = "0.7"
moka = { workspace = true, features = ["future"] }
opendal = { version = "0.49", features = [
opendal = { version = "0.50", features = [
"layers-tracing",
"layers-prometheus",
"services-azblob",
"services-fs",
"services-gcs",
Expand Down
33 changes: 31 additions & 2 deletions src/object-store/src/layers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,37 @@
// limitations under the License.

mod lru_cache;
mod prometheus;

pub use lru_cache::*;
pub use opendal::layers::*;
pub use prometheus::PrometheusMetricsLayer;
pub use prometheus::build_prometheus_metrics_layer;

mod prometheus {
use std::sync::{Mutex, OnceLock};

use opendal::layers::PrometheusLayer;

static PROMETHEUS_LAYER: OnceLock<Mutex<PrometheusLayer>> = OnceLock::new();

pub fn build_prometheus_metrics_layer(with_path_label: bool) -> PrometheusLayer {
PROMETHEUS_LAYER
.get_or_init(|| {
// This logical tries to extract parent path from the object storage operation
// the function also relies on assumption that the region path is built from
// pattern `<data|index>/catalog/schema/table_id/....`
//
// We'll get the data/catalog/schema from path.
let path_level = if with_path_label { 3 } else { 0 };

let layer = PrometheusLayer::builder()
.path_label(path_level)
.register_default()
.unwrap();

Mutex::new(layer)
})
.lock()
.unwrap()
.clone()
}
}
Loading
Loading