From 8a2ea68ea485ef6f90944fc9de7c5ea371cd22c2 Mon Sep 17 00:00:00 2001 From: Huanbing Date: Sat, 17 Aug 2024 19:52:55 +0800 Subject: [PATCH] feat(io): Implement base IO Module (#51) --- crates/paimon/Cargo.toml | 11 + crates/paimon/src/error.rs | 20 + crates/paimon/src/io/file_io.rs | 534 +++++++++++++++++++++---- crates/paimon/src/io/mod.rs | 13 + crates/paimon/src/io/storage.rs | 81 ++++ crates/paimon/src/io/storage_fs.rs | 29 ++ crates/paimon/src/io/storage_memory.rs | 25 ++ crates/paimon/src/spec/types.rs | 2 +- 8 files changed, 646 insertions(+), 69 deletions(-) create mode 100644 crates/paimon/src/io/storage.rs create mode 100644 crates/paimon/src/io/storage_fs.rs create mode 100644 crates/paimon/src/io/storage_memory.rs diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml index 9e741fa..9e22e5b 100644 --- a/crates/paimon/Cargo.toml +++ b/crates/paimon/Cargo.toml @@ -26,8 +26,19 @@ edition.workspace = true license.workspace = true version.workspace = true +[features] +default = ["storage-memory", "storage-fs"] +storage-all = ["storage-memory", "storage-fs"] + +storage-memory = ["opendal/services-memory"] +storage-fs = ["opendal/services-fs"] + [dependencies] +url = "2.5.2" +async-trait = "0.1.81" +bytes = "1.7.1" bitflags = "2.6.0" +tokio = { version = "1.39.2", features = ["macros"] } chrono = { version = "0.4.38", features = ["serde"] } serde = { version = "1", features = ["derive"] } serde_bytes = "0.11.15" diff --git a/crates/paimon/src/error.rs b/crates/paimon/src/error.rs index 93404da..f42b465 100644 --- a/crates/paimon/src/error.rs +++ b/crates/paimon/src/error.rs @@ -42,4 +42,24 @@ pub enum Error { message: String, source: opendal::Error, }, + #[snafu( + visibility(pub(crate)), + display("Paimon hitting unsupported io error {}", message) + )] + IoUnsupported { message: String }, + #[snafu( + visibility(pub(crate)), + display("Paimon hitting invalid config: {}", message) + )] + ConfigInvalid { message: String }, +} + +impl From for Error { + fn from(source: opendal::Error) -> Self { + // TODO: Simple use IoUnexpected for now + Error::IoUnexpected { + message: "IO operation failed on underlying storage".to_string(), + source, + } + } } diff --git a/crates/paimon/src/io/file_io.rs b/crates/paimon/src/io/file_io.rs index 0d31af7..d9ebc87 100644 --- a/crates/paimon/src/io/file_io.rs +++ b/crates/paimon/src/io/file_io.rs @@ -17,59 +17,69 @@ use crate::error::*; use std::collections::HashMap; +use std::ops::Range; +use std::sync::Arc; -use chrono::offset::Utc; -use chrono::DateTime; -use opendal::services::Fs; -use opendal::{Metakey, Operator}; +use bytes::Bytes; +use chrono::{DateTime, Utc}; +use opendal::Operator; use snafu::ResultExt; +use url::Url; + +use super::Storage; #[derive(Clone, Debug)] pub struct FileIO { - op: Operator, + storage: Arc, } impl FileIO { - /// Create a new FileIO. + /// Try to infer file io scheme from path. /// /// The input HashMap is paimon-java's [`Options`](https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/options/Options.java#L60) - /// - /// TODO: Support building Operator from HashMap via options. - pub fn new(_: HashMap) -> Result { - let op = Operator::new(Fs::default().root("/")) - .context(IoUnexpectedSnafu { - message: "Failed to create operator".to_string(), - })? - .finish(); - Ok(Self { op }) + pub fn from_url(path: &str) -> crate::Result { + let url = Url::parse(path).map_err(|_| Error::ConfigInvalid { + message: format!("Invalid URL: {}", path), + })?; + + Ok(FileIOBuilder::new(url.scheme())) } /// Create a new input file to read data. /// /// Reference: - pub fn new_input(&self, path: &str) -> InputFile { - InputFile { - _op: self.op.clone(), - path: path.to_string(), - } + pub fn new_input(&self, path: &str) -> crate::Result { + let (op, relative_path) = self.storage.create(path)?; + let path = path.to_string(); + let relative_path_pos = path.len() - relative_path.len(); + Ok(InputFile { + op, + path, + relative_path_pos, + }) } /// Create a new output file to write data. /// /// Reference: - pub fn new_output(&self, path: &str) -> OutputFile { - OutputFile { - _op: self.op.clone(), - path: path.to_string(), - } + pub fn new_output(&self, path: &str) -> Result { + let (op, relative_path) = self.storage.create(path)?; + let path = path.to_string(); + let relative_path_pos = path.len() - relative_path.len(); + Ok(OutputFile { + op, + path, + relative_path_pos, + }) } /// Return a file status object that represents the path. /// /// Reference: pub async fn get_status(&self, path: &str) -> Result { - let meta = self.op.stat(path).await.context(IoUnexpectedSnafu { - message: "Failed to get file status".to_string(), + let (op, relative_path) = self.storage.create(path)?; + let meta = op.stat(relative_path).await.context(IoUnexpectedSnafu { + message: format!("Failed to get file status for '{}'", path), })?; Ok(FileStatus { @@ -86,32 +96,35 @@ impl FileIO { /// /// FIXME: how to handle large dir? Better to return a stream instead? pub async fn list_status(&self, path: &str) -> Result> { - let entries = self - .op - .list_with(path) - .metakey(Metakey::ContentLength | Metakey::LastModified) - .await - .context(IoUnexpectedSnafu { - message: "Failed to list file status".to_string(), - })?; + let (op, relative_path) = self.storage.create(path)?; + + let entries = op.list(relative_path).await.context(IoUnexpectedSnafu { + message: format!("Failed to list files in '{}'", path), + })?; + + let mut statuses = Vec::new(); + + for entry in entries { + let meta = entry.metadata(); + statuses.push(FileStatus { + size: meta.content_length(), + is_dir: meta.is_dir(), + path: path.to_string(), + last_modified: meta.last_modified(), + }); + } - Ok(entries - .into_iter() - .map(|meta| FileStatus { - size: meta.metadata().content_length(), - is_dir: meta.metadata().is_dir(), - last_modified: meta.metadata().last_modified(), - path: format!("{}{}", path, meta.name()), - }) - .collect()) + Ok(statuses) } /// Check if exists. /// /// References: pub async fn exists(&self, path: &str) -> Result { - self.op.is_exist(path).await.context(IoUnexpectedSnafu { - message: "Failed to check file existence".to_string(), + let (op, relative_path) = self.storage.create(path)?; + + op.is_exist(relative_path).await.context(IoUnexpectedSnafu { + message: format!("Failed to check existence of '{}'", path), }) } @@ -119,8 +132,10 @@ impl FileIO { /// /// Reference: pub async fn delete_file(&self, path: &str) -> Result<()> { - self.op.delete(path).await.context(IoUnexpectedSnafu { - message: "Failed to delete file".to_string(), + let (op, relative_path) = self.storage.create(path)?; + + op.delete(relative_path).await.context(IoUnexpectedSnafu { + message: format!("Failed to delete file '{}'", path), })?; Ok(()) @@ -130,9 +145,14 @@ impl FileIO { /// /// Reference: pub async fn delete_dir(&self, path: &str) -> Result<()> { - self.op.remove_all(path).await.context(IoUnexpectedSnafu { - message: "Failed to delete dir".to_string(), - })?; + let (op, relative_path) = self.storage.create(path)?; + + op.remove_all(relative_path) + .await + .context(IoUnexpectedSnafu { + message: format!("Failed to delete directory '{}'", path), + })?; + Ok(()) } @@ -142,9 +162,14 @@ impl FileIO { /// /// Reference: pub async fn mkdirs(&self, path: &str) -> Result<()> { - self.op.create_dir(path).await.context(IoUnexpectedSnafu { - message: "Failed to create dir".to_string(), - })?; + let (op, relative_path) = self.storage.create(path)?; + + op.create_dir(relative_path) + .await + .context(IoUnexpectedSnafu { + message: format!("Failed to create directory '{}'", path), + })?; + Ok(()) } @@ -152,14 +177,101 @@ impl FileIO { /// /// Reference: pub async fn rename(&self, src: &str, dst: &str) -> Result<()> { - self.op.rename(src, dst).await.context(IoUnexpectedSnafu { - message: "Failed to rename file".to_string(), - })?; + let (op_src, relative_path_src) = self.storage.create(src)?; + let (_, relative_path_dst) = self.storage.create(dst)?; + + op_src + .rename(relative_path_src, relative_path_dst) + .await + .context(IoUnexpectedSnafu { + message: format!("Failed to rename '{}' to '{}'", src, dst), + })?; + Ok(()) } } -/// FileStatus represents the status of a file. +#[derive(Debug)] +pub struct FileIOBuilder { + scheme_str: Option, + props: HashMap, +} + +impl FileIOBuilder { + pub fn new(scheme_str: impl ToString) -> Self { + Self { + scheme_str: Some(scheme_str.to_string()), + props: HashMap::default(), + } + } + + pub fn new_fs_io_builder() -> Self { + Self { + scheme_str: None, + props: HashMap::default(), + } + } + + pub(crate) fn into_parts(self) -> (String, HashMap) { + (self.scheme_str.unwrap_or_default(), self.props) + } + + pub fn with_prop(mut self, key: impl ToString, value: impl ToString) -> Self { + self.props.insert(key.to_string(), value.to_string()); + self + } + + pub fn with_props( + mut self, + args: impl IntoIterator, + ) -> Self { + self.props + .extend(args.into_iter().map(|e| (e.0.to_string(), e.1.to_string()))); + self + } + + pub fn build(self) -> crate::Result { + let storage = Storage::build(self)?; + Ok(FileIO { + storage: Arc::new(storage), + }) + } +} + +#[async_trait::async_trait] +pub trait FileRead: Send + Unpin + 'static { + async fn read(&self, range: Range) -> crate::Result; +} + +#[async_trait::async_trait] +impl FileRead for opendal::Reader { + async fn read(&self, range: Range) -> crate::Result { + // TODO: build a error type + Ok(opendal::Reader::read(self, range) + .await + .expect("read error") + .to_bytes()) + } +} + +#[async_trait::async_trait] +pub trait FileWrite: Send + Unpin + 'static { + async fn write(&mut self, bs: Bytes) -> crate::Result<()>; + + async fn close(&mut self) -> crate::Result<()>; +} + +#[async_trait::async_trait] +impl FileWrite for opendal::Writer { + async fn write(&mut self, bs: Bytes) -> crate::Result<()> { + Ok(opendal::Writer::write(self, bs).await?) + } + + async fn close(&mut self) -> crate::Result<()> { + Ok(opendal::Writer::close(self).await?) + } +} + #[derive(Clone, Debug)] pub struct FileStatus { pub size: u64, @@ -168,30 +280,316 @@ pub struct FileStatus { pub last_modified: Option>, } -/// Input file represents a file that can be read from. -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct InputFile { - _op: Operator, + op: Operator, path: String, + relative_path_pos: usize, } impl InputFile { - /// Get the path of given input file. - pub fn path(&self) -> &str { + pub fn location(&self) -> &str { &self.path } + + pub async fn exists(&self) -> crate::Result { + Ok(self + .op + .is_exist(&self.path[self.relative_path_pos..]) + .await?) + } + + pub async fn metadata(&self) -> crate::Result { + let meta = self.op.stat(&self.path[self.relative_path_pos..]).await?; + + Ok(FileStatus { + size: meta.content_length(), + is_dir: meta.is_dir(), + path: self.path.clone(), + last_modified: meta.last_modified(), + }) + } + + pub async fn read(&self) -> crate::Result { + Ok(self + .op + .read(&self.path[self.relative_path_pos..]) + .await? + .to_bytes()) + } + + pub async fn reader(&self) -> crate::Result { + Ok(self.op.reader(&self.path[self.relative_path_pos..]).await?) + } } -/// Output file represents a file that can be written to. -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct OutputFile { - _op: Operator, + op: Operator, path: String, + relative_path_pos: usize, } impl OutputFile { - /// Get the path of given output file. - pub fn path(&self) -> &str { + pub fn location(&self) -> &str { &self.path } + + pub async fn exists(&self) -> crate::Result { + Ok(self + .op + .is_exist(&self.path[self.relative_path_pos..]) + .await?) + } + + pub fn to_input_file(self) -> InputFile { + InputFile { + op: self.op, + path: self.path, + relative_path_pos: self.relative_path_pos, + } + } + + pub async fn write(&self, bs: Bytes) -> crate::Result<()> { + let mut writer = self.writer().await?; + writer.write(bs).await?; + writer.close().await + } + + pub async fn writer(&self) -> crate::Result> { + Ok(Box::new( + self.op.writer(&self.path[self.relative_path_pos..]).await?, + )) + } +} + +#[cfg(test)] +mod file_action_test { + use std::fs; + + use super::*; + use bytes::Bytes; + + fn setup_memory_file_io() -> FileIO { + let storage = Storage::Memory; + FileIO { + storage: Arc::new(storage), + } + } + + fn setup_fs_file_io() -> FileIO { + let storage = Storage::LocalFs; + FileIO { + storage: Arc::new(storage), + } + } + + async fn common_test_get_status(file_io: &FileIO, path: &str) { + let output = file_io.new_output(path).unwrap(); + output.write(Bytes::from("hello world")).await.unwrap(); + + let status = file_io.get_status(path).await.unwrap(); + assert_eq!(status.size, 11); + + file_io.delete_file(path).await.unwrap(); + } + + async fn common_test_exists(file_io: &FileIO, path: &str) { + let output = file_io.new_output(path).unwrap(); + output.write(Bytes::from("hello world")).await.unwrap(); + + let exists = file_io.exists(path).await.unwrap(); + assert!(exists); + + file_io.delete_file(path).await.unwrap(); + } + + async fn common_test_delete_file(file_io: &FileIO, path: &str) { + let output = file_io.new_output(path).unwrap(); + output.write(Bytes::from("hello world")).await.unwrap(); + + file_io.delete_file(path).await.unwrap(); + + let exists = file_io.exists(path).await.unwrap(); + assert!(!exists); + } + + async fn common_test_mkdirs(file_io: &FileIO, dir_path: &str) { + file_io.mkdirs(dir_path).await.unwrap(); + + let exists = file_io.exists(dir_path).await.unwrap(); + assert!(exists); + + let _ = fs::remove_dir_all(dir_path.strip_prefix("file:/").unwrap()); + } + + async fn common_test_rename(file_io: &FileIO, src: &str, dst: &str) { + let output = file_io.new_output(src).unwrap(); + output.write(Bytes::from("hello world")).await.unwrap(); + + file_io.rename(src, dst).await.unwrap(); + + let exists_old = file_io.exists(src).await.unwrap(); + let exists_new = file_io.exists(dst).await.unwrap(); + assert!(!exists_old); + assert!(exists_new); + + file_io.delete_file(dst).await.unwrap(); + } + + #[tokio::test] + async fn test_delete_file_memory() { + let file_io = setup_memory_file_io(); + common_test_delete_file(&file_io, "memory:/test_file_delete_mem").await; + } + + #[tokio::test] + async fn test_get_status_fs() { + let file_io = setup_fs_file_io(); + common_test_get_status(&file_io, "file:/tmp/test_file_get_status_fs").await; + } + + #[tokio::test] + async fn test_exists_fs() { + let file_io = setup_fs_file_io(); + common_test_exists(&file_io, "file:/tmp/test_file_exists_fs").await; + } + + #[tokio::test] + async fn test_delete_file_fs() { + let file_io = setup_fs_file_io(); + common_test_delete_file(&file_io, "file:/tmp/test_file_delete_fs").await; + } + + #[tokio::test] + async fn test_mkdirs_fs() { + let file_io = setup_fs_file_io(); + common_test_mkdirs(&file_io, "file:/tmp/test_fs_dir/").await; + } + + #[tokio::test] + async fn test_rename_fs() { + let file_io = setup_fs_file_io(); + common_test_rename( + &file_io, + "file:/tmp/test_file_fs_z", + "file:/tmp/new_test_file_fs_o", + ) + .await; + } +} + +#[cfg(test)] +mod input_output_test { + use super::*; + use bytes::Bytes; + + fn setup_memory_file_io() -> FileIO { + let storage = Storage::Memory; + FileIO { + storage: Arc::new(storage), + } + } + + fn setup_fs_file_io() -> FileIO { + let storage = Storage::LocalFs; + FileIO { + storage: Arc::new(storage), + } + } + + async fn common_test_output_file_write_and_read(file_io: &FileIO, path: &str) { + let output = file_io.new_output(path).unwrap(); + output.write(Bytes::from("hello world")).await.unwrap(); + + let input = output.to_input_file(); + let content = input.read().await.unwrap(); + + assert_eq!(&content[..], b"hello world"); + + file_io.delete_file(path).await.unwrap(); + } + + async fn common_test_output_file_exists(file_io: &FileIO, path: &str) { + let output = file_io.new_output(path).unwrap(); + output.write(Bytes::from("hello world")).await.unwrap(); + + let exists = output.exists().await.unwrap(); + assert!(exists); + + file_io.delete_file(path).await.unwrap(); + } + + async fn common_test_input_file_metadata(file_io: &FileIO, path: &str) { + let output = file_io.new_output(path).unwrap(); + output.write(Bytes::from("hello world")).await.unwrap(); + + let input = output.to_input_file(); + let metadata = input.metadata().await.unwrap(); + + assert_eq!(metadata.size, 11); + + file_io.delete_file(path).await.unwrap(); + } + + async fn common_test_input_file_partial_read(file_io: &FileIO, path: &str) { + let output = file_io.new_output(path).unwrap(); + output.write(Bytes::from("hello world")).await.unwrap(); + + let input = output.to_input_file(); + let reader = input.reader().await.unwrap(); + let partial_content = reader.read(0..5).await.unwrap(); // 读取 "hello" + + assert_eq!(&partial_content[..], b"hello"); + + file_io.delete_file(path).await.unwrap(); + } + + #[tokio::test] + async fn test_output_file_write_and_read_memory() { + let file_io = setup_memory_file_io(); + common_test_output_file_write_and_read(&file_io, "memory:/test_file_rw_mem").await; + } + + #[tokio::test] + async fn test_output_file_exists_memory() { + let file_io = setup_memory_file_io(); + common_test_output_file_exists(&file_io, "memory:/test_file_exist_mem").await; + } + + #[tokio::test] + async fn test_input_file_metadata_memory() { + let file_io = setup_memory_file_io(); + common_test_input_file_metadata(&file_io, "memory:/test_file_meta_mem").await; + } + + #[tokio::test] + async fn test_input_file_partial_read_memory() { + let file_io = setup_memory_file_io(); + common_test_input_file_partial_read(&file_io, "memory:/test_file_part_read_mem").await; + } + + #[tokio::test] + async fn test_output_file_write_and_read_fs() { + let file_io = setup_fs_file_io(); + common_test_output_file_write_and_read(&file_io, "file:/tmp/test_file_fs_rw").await; + } + + #[tokio::test] + async fn test_output_file_exists_fs() { + let file_io = setup_fs_file_io(); + common_test_output_file_exists(&file_io, "file:/tmp/test_file_exists").await; + } + + #[tokio::test] + async fn test_input_file_metadata_fs() { + let file_io = setup_fs_file_io(); + common_test_input_file_metadata(&file_io, "file:/tmp/test_file_meta").await; + } + + #[tokio::test] + async fn test_input_file_partial_read_fs() { + let file_io = setup_fs_file_io(); + common_test_input_file_partial_read(&file_io, "file:/tmp/test_file_read_fs").await; + } } diff --git a/crates/paimon/src/io/mod.rs b/crates/paimon/src/io/mod.rs index a9d049b..a216946 100644 --- a/crates/paimon/src/io/mod.rs +++ b/crates/paimon/src/io/mod.rs @@ -17,3 +17,16 @@ mod file_io; pub use file_io::*; + +mod storage; +pub use storage::*; + +#[cfg(feature = "storage-fs")] +mod storage_fs; +#[cfg(feature = "storage-fs")] +use storage_fs::*; + +#[cfg(feature = "storage-memory")] +mod storage_memory; +#[cfg(feature = "storage-memory")] +use storage_memory::*; diff --git a/crates/paimon/src/io/storage.rs b/crates/paimon/src/io/storage.rs new file mode 100644 index 0000000..14c528c --- /dev/null +++ b/crates/paimon/src/io/storage.rs @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use opendal::{Operator, Scheme}; + +use crate::error; + +use super::FileIOBuilder; + +/// The storage carries all supported storage services in paimon +#[derive(Debug)] +pub enum Storage { + #[cfg(feature = "storage-memory")] + Memory, + #[cfg(feature = "storage-fs")] + LocalFs, +} + +impl Storage { + pub(crate) fn build(file_io_builder: FileIOBuilder) -> crate::Result { + let (scheme_str, _) = file_io_builder.into_parts(); + let scheme = Self::parse_scheme(&scheme_str)?; + + match scheme { + #[cfg(feature = "storage-memory")] + Scheme::Memory => Ok(Self::Memory), + #[cfg(feature = "storage-fs")] + Scheme::Fs => Ok(Self::LocalFs), + _ => Err(error::Error::IoUnsupported { + message: "Unsupported storage feature".to_string(), + }), + } + } + + pub(crate) fn create<'a>(&self, path: &'a str) -> crate::Result<(Operator, &'a str)> { + match self { + #[cfg(feature = "storage-memory")] + Storage::Memory => { + let op = super::memory_config_build()?; + + if let Some(stripped) = path.strip_prefix("memory:/") { + Ok((op, stripped)) + } else { + Ok((op, &path[1..])) + } + } + #[cfg(feature = "storage-fs")] + Storage::LocalFs => { + let op = super::fs_config_build()?; + + if let Some(stripped) = path.strip_prefix("file:/") { + Ok((op, stripped)) + } else { + Ok((op, &path[1..])) + } + } + } + } + + fn parse_scheme(scheme: &str) -> crate::Result { + match scheme { + "memory" => Ok(Scheme::Memory), + "file" | "" => Ok(Scheme::Fs), + s => Ok(s.parse::()?), + } + } +} diff --git a/crates/paimon/src/io/storage_fs.rs b/crates/paimon/src/io/storage_fs.rs new file mode 100644 index 0000000..ff38d76 --- /dev/null +++ b/crates/paimon/src/io/storage_fs.rs @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use opendal::services::FsConfig; +use opendal::Operator; + +use crate::Result; + +/// Build new opendal operator from give path. +pub(crate) fn fs_config_build() -> Result { + let mut cfg = FsConfig::default(); + cfg.root = Some("/".to_string()); + + Ok(Operator::from_config(cfg)?.finish()) +} diff --git a/crates/paimon/src/io/storage_memory.rs b/crates/paimon/src/io/storage_memory.rs new file mode 100644 index 0000000..ffc082d --- /dev/null +++ b/crates/paimon/src/io/storage_memory.rs @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use opendal::services::MemoryConfig; +use opendal::Operator; + +use crate::Result; + +pub(crate) fn memory_config_build() -> Result { + Ok(Operator::from_config(MemoryConfig::default())?.finish()) +} diff --git a/crates/paimon/src/spec/types.rs b/crates/paimon/src/spec/types.rs index 65ec59e..b253ea4 100644 --- a/crates/paimon/src/spec/types.rs +++ b/crates/paimon/src/spec/types.rs @@ -1175,7 +1175,7 @@ mod serde_utils { { let s = String::deserialize(deserializer)?; - let (name, nullable) = s.split_once(" ").unwrap_or((s.as_str(), "")); + let (name, nullable) = s.split_once(' ').unwrap_or((s.as_str(), "")); if name == T::NAME && nullable.is_empty() { Ok(NullableType::from(true))