Skip to content

Commit 54186dc

Browse files
waynexiaevenyag
authored andcommitted
feat: bump opendal and switch prometheus layer to the upstream impl (GreptimeTeam#5179)
* feat: bump opendal and switch prometheus layer to the upstream impl Signed-off-by: Ruihang Xia <[email protected]> * remove unused files Signed-off-by: Ruihang Xia <[email protected]> * fix tests Signed-off-by: Ruihang Xia <[email protected]> * remove unused things Signed-off-by: Ruihang Xia <[email protected]> * remove root dir on recovering cache Signed-off-by: Ruihang Xia <[email protected]> * filter out non-files entry in test Signed-off-by: Ruihang Xia <[email protected]> --------- Signed-off-by: Ruihang Xia <[email protected]>
1 parent 17d75c7 commit 54186dc

File tree

22 files changed

+134
-708
lines changed

22 files changed

+134
-708
lines changed

Cargo.lock

+7-18
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/datasource/src/object_store/fs.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ pub fn build_fs_backend(root: &str) -> Result<ObjectStore> {
2727
DefaultLoggingInterceptor,
2828
))
2929
.layer(object_store::layers::TracingLayer)
30-
.layer(object_store::layers::PrometheusMetricsLayer::new(true))
30+
.layer(object_store::layers::build_prometheus_metrics_layer(true))
3131
.finish();
3232
Ok(object_store)
3333
}

src/common/datasource/src/object_store/s3.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ pub fn build_s3_backend(
8989
DefaultLoggingInterceptor,
9090
))
9191
.layer(object_store::layers::TracingLayer)
92-
.layer(object_store::layers::PrometheusMetricsLayer::new(true))
92+
.layer(object_store::layers::build_prometheus_metrics_layer(true))
9393
.finish())
9494
}
9595

src/common/procedure/src/local/runner.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -544,7 +544,7 @@ mod tests {
544544
use common_test_util::temp_dir::create_temp_dir;
545545
use futures_util::future::BoxFuture;
546546
use futures_util::FutureExt;
547-
use object_store::ObjectStore;
547+
use object_store::{EntryMode, ObjectStore};
548548
use tokio::sync::mpsc;
549549

550550
use super::*;
@@ -578,7 +578,11 @@ mod tests {
578578
) {
579579
let dir = proc_path!(procedure_store, "{procedure_id}/");
580580
let lister = object_store.list(&dir).await.unwrap();
581-
let mut files_in_dir: Vec<_> = lister.into_iter().map(|de| de.name().to_string()).collect();
581+
let mut files_in_dir: Vec<_> = lister
582+
.into_iter()
583+
.filter(|x| x.metadata().mode() == EntryMode::FILE)
584+
.map(|de| de.name().to_string())
585+
.collect();
582586
files_in_dir.sort_unstable();
583587
assert_eq!(files, files_in_dir);
584588
}

src/datanode/src/error.rs

+12-3
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,14 @@ pub enum Error {
193193
location: Location,
194194
},
195195

196+
#[snafu(display("Failed to build http client"))]
197+
BuildHttpClient {
198+
#[snafu(implicit)]
199+
location: Location,
200+
#[snafu(source)]
201+
error: reqwest::Error,
202+
},
203+
196204
#[snafu(display("Missing required field: {}", name))]
197205
MissingRequiredField {
198206
name: String,
@@ -406,9 +414,10 @@ impl ErrorExt for Error {
406414
| MissingKvBackend { .. }
407415
| TomlFormat { .. } => StatusCode::InvalidArguments,
408416

409-
PayloadNotExist { .. } | Unexpected { .. } | WatchAsyncTaskChange { .. } => {
410-
StatusCode::Unexpected
411-
}
417+
PayloadNotExist { .. }
418+
| Unexpected { .. }
419+
| WatchAsyncTaskChange { .. }
420+
| BuildHttpClient { .. } => StatusCode::Unexpected,
412421

413422
AsyncTaskExecute { source, .. } => source.status_code(),
414423

src/datanode/src/store.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder, O
3232
use snafu::prelude::*;
3333

3434
use crate::config::{HttpClientConfig, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
35-
use crate::error::{self, CreateDirSnafu, Result};
35+
use crate::error::{self, BuildHttpClientSnafu, CreateDirSnafu, Result};
3636

3737
pub(crate) async fn new_raw_object_store(
3838
store: &ObjectStoreConfig,
@@ -236,7 +236,8 @@ pub(crate) fn build_http_client(config: &HttpClientConfig) -> Result<HttpClient>
236236
builder.timeout(config.timeout)
237237
};
238238

239-
HttpClient::build(http_builder).context(error::InitBackendSnafu)
239+
let client = http_builder.build().context(BuildHttpClientSnafu)?;
240+
Ok(HttpClient::with(client))
240241
}
241242
struct PrintDetailedError;
242243

src/file-engine/src/manifest.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ impl FileRegionManifest {
4646
pub async fn store(&self, region_dir: &str, object_store: &ObjectStore) -> Result<()> {
4747
let path = &region_manifest_path(region_dir);
4848
let exist = object_store
49-
.is_exist(path)
49+
.exists(path)
5050
.await
5151
.context(CheckObjectSnafu { path })?;
5252
ensure!(!exist, ManifestExistsSnafu { path });

src/file-engine/src/region.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ mod tests {
130130
assert_eq!(region.metadata.primary_key, vec![1]);
131131

132132
assert!(object_store
133-
.is_exist("create_region_dir/manifest/_file_manifest")
133+
.exists("create_region_dir/manifest/_file_manifest")
134134
.await
135135
.unwrap());
136136

@@ -198,13 +198,13 @@ mod tests {
198198
.unwrap();
199199

200200
assert!(object_store
201-
.is_exist("drop_region_dir/manifest/_file_manifest")
201+
.exists("drop_region_dir/manifest/_file_manifest")
202202
.await
203203
.unwrap());
204204

205205
FileRegion::drop(&region, &object_store).await.unwrap();
206206
assert!(!object_store
207-
.is_exist("drop_region_dir/manifest/_file_manifest")
207+
.exists("drop_region_dir/manifest/_file_manifest")
208208
.await
209209
.unwrap());
210210

src/metric-engine/src/test_util.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -313,12 +313,12 @@ mod test {
313313
let region_dir = "test_metric_region";
314314
// assert metadata region's dir
315315
let metadata_region_dir = join_dir(region_dir, METADATA_REGION_SUBDIR);
316-
let exist = object_store.is_exist(&metadata_region_dir).await.unwrap();
316+
let exist = object_store.exists(&metadata_region_dir).await.unwrap();
317317
assert!(exist);
318318

319319
// assert data region's dir
320320
let data_region_dir = join_dir(region_dir, DATA_REGION_SUBDIR);
321-
let exist = object_store.is_exist(&data_region_dir).await.unwrap();
321+
let exist = object_store.exists(&data_region_dir).await.unwrap();
322322
assert!(exist);
323323

324324
// check mito engine

src/mito2/src/cache/file_cache.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ impl FileCache {
286286
}
287287

288288
async fn get_reader(&self, file_path: &str) -> object_store::Result<Option<Reader>> {
289-
if self.local_store.is_exist(file_path).await? {
289+
if self.local_store.exists(file_path).await? {
290290
Ok(Some(self.local_store.reader(file_path).await?))
291291
} else {
292292
Ok(None)
@@ -480,7 +480,7 @@ mod tests {
480480
cache.memory_index.run_pending_tasks().await;
481481

482482
// The file also not exists.
483-
assert!(!local_store.is_exist(&file_path).await.unwrap());
483+
assert!(!local_store.exists(&file_path).await.unwrap());
484484
assert_eq!(0, cache.memory_index.weighted_size());
485485
}
486486

src/mito2/src/engine/create_test.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -192,12 +192,12 @@ async fn test_engine_create_with_custom_store() {
192192
assert!(object_store_manager
193193
.find("Gcs")
194194
.unwrap()
195-
.is_exist(region_dir)
195+
.exists(region_dir)
196196
.await
197197
.unwrap());
198198
assert!(!object_store_manager
199199
.default_object_store()
200-
.is_exist(region_dir)
200+
.exists(region_dir)
201201
.await
202202
.unwrap());
203203
}

src/mito2/src/engine/drop_test.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ async fn test_engine_drop_region() {
7171
assert!(!env
7272
.get_object_store()
7373
.unwrap()
74-
.is_exist(&join_path(&region_dir, DROPPING_MARKER_FILE))
74+
.exists(&join_path(&region_dir, DROPPING_MARKER_FILE))
7575
.await
7676
.unwrap());
7777

@@ -93,7 +93,7 @@ async fn test_engine_drop_region() {
9393
listener.wait().await;
9494

9595
let object_store = env.get_object_store().unwrap();
96-
assert!(!object_store.is_exist(&region_dir).await.unwrap());
96+
assert!(!object_store.exists(&region_dir).await.unwrap());
9797
}
9898

9999
#[tokio::test]
@@ -167,13 +167,13 @@ async fn test_engine_drop_region_for_custom_store() {
167167
assert!(object_store_manager
168168
.find("Gcs")
169169
.unwrap()
170-
.is_exist(&custom_region_dir)
170+
.exists(&custom_region_dir)
171171
.await
172172
.unwrap());
173173
assert!(object_store_manager
174174
.find("default")
175175
.unwrap()
176-
.is_exist(&global_region_dir)
176+
.exists(&global_region_dir)
177177
.await
178178
.unwrap());
179179

@@ -190,13 +190,13 @@ async fn test_engine_drop_region_for_custom_store() {
190190
assert!(!object_store_manager
191191
.find("Gcs")
192192
.unwrap()
193-
.is_exist(&custom_region_dir)
193+
.exists(&custom_region_dir)
194194
.await
195195
.unwrap());
196196
assert!(object_store_manager
197197
.find("default")
198198
.unwrap()
199-
.is_exist(&global_region_dir)
199+
.exists(&global_region_dir)
200200
.await
201201
.unwrap());
202202
}

src/mito2/src/engine/open_test.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -228,13 +228,13 @@ async fn test_engine_region_open_with_custom_store() {
228228
let object_store_manager = env.get_object_store_manager().unwrap();
229229
assert!(!object_store_manager
230230
.default_object_store()
231-
.is_exist(region.access_layer.region_dir())
231+
.exists(region.access_layer.region_dir())
232232
.await
233233
.unwrap());
234234
assert!(object_store_manager
235235
.find("Gcs")
236236
.unwrap()
237-
.is_exist(region.access_layer.region_dir())
237+
.exists(region.access_layer.region_dir())
238238
.await
239239
.unwrap());
240240
}

src/mito2/src/manifest/tests/checkpoint.rs

+2
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ async fn manager_without_checkpoint() {
8484

8585
// check files
8686
let mut expected = vec![
87+
"/",
8788
"00000000000000000010.json",
8889
"00000000000000000009.json",
8990
"00000000000000000008.json",
@@ -130,6 +131,7 @@ async fn manager_with_checkpoint_distance_1() {
130131

131132
// check files
132133
let mut expected = vec![
134+
"/",
133135
"00000000000000000009.checkpoint",
134136
"00000000000000000010.checkpoint",
135137
"00000000000000000010.json",

src/mito2/src/sst/file_purger.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ mod tests {
185185

186186
scheduler.stop(true).await.unwrap();
187187

188-
assert!(!object_store.is_exist(&path).await.unwrap());
188+
assert!(!object_store.exists(&path).await.unwrap());
189189
}
190190

191191
#[tokio::test]
@@ -247,7 +247,7 @@ mod tests {
247247

248248
scheduler.stop(true).await.unwrap();
249249

250-
assert!(!object_store.is_exist(&path).await.unwrap());
251-
assert!(!object_store.is_exist(&index_path).await.unwrap());
250+
assert!(!object_store.exists(&path).await.unwrap());
251+
assert!(!object_store.exists(&index_path).await.unwrap());
252252
}
253253
}

src/mito2/src/worker/handle_open.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
5151
// Check if this region is pending drop. And clean the entire dir if so.
5252
if !self.dropping_regions.is_region_exists(region_id)
5353
&& object_store
54-
.is_exist(&join_path(&request.region_dir, DROPPING_MARKER_FILE))
54+
.exists(&join_path(&request.region_dir, DROPPING_MARKER_FILE))
5555
.await
5656
.context(OpenDalSnafu)?
5757
{

src/object-store/Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@ futures.workspace = true
1717
lazy_static.workspace = true
1818
md5 = "0.7"
1919
moka = { workspace = true, features = ["future"] }
20-
opendal = { version = "0.49", features = [
20+
opendal = { version = "0.50", features = [
2121
"layers-tracing",
22+
"layers-prometheus",
2223
"services-azblob",
2324
"services-fs",
2425
"services-gcs",

src/object-store/src/layers.rs

+31-2
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,37 @@
1313
// limitations under the License.
1414

1515
mod lru_cache;
16-
mod prometheus;
1716

1817
pub use lru_cache::*;
1918
pub use opendal::layers::*;
20-
pub use prometheus::PrometheusMetricsLayer;
19+
pub use prometheus::build_prometheus_metrics_layer;
20+
21+
mod prometheus {
22+
use std::sync::{Mutex, OnceLock};
23+
24+
use opendal::layers::PrometheusLayer;
25+
26+
static PROMETHEUS_LAYER: OnceLock<Mutex<PrometheusLayer>> = OnceLock::new();
27+
28+
pub fn build_prometheus_metrics_layer(with_path_label: bool) -> PrometheusLayer {
29+
PROMETHEUS_LAYER
30+
.get_or_init(|| {
31+
// This logical tries to extract parent path from the object storage operation
32+
// the function also relies on assumption that the region path is built from
33+
// pattern `<data|index>/catalog/schema/table_id/....`
34+
//
35+
// We'll get the data/catalog/schema from path.
36+
let path_level = if with_path_label { 3 } else { 0 };
37+
38+
let layer = PrometheusLayer::builder()
39+
.path_label(path_level)
40+
.register_default()
41+
.unwrap();
42+
43+
Mutex::new(layer)
44+
})
45+
.lock()
46+
.unwrap()
47+
.clone()
48+
}
49+
}

0 commit comments

Comments
 (0)