-
Notifications
You must be signed in to change notification settings - Fork 180
feat(ibis): introduce Local file connector #1029
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
goldmedal
merged 13 commits into
Canner:main
from
goldmedal:feature/file-connecotr-duckdb
Jan 7, 2025
Merged
Changes from all commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
8e846b4
add default alias for source plan
goldmedal 1968898
add local file data source
goldmedal 99cbbd7
add metadata api for file connector
goldmedal df6a2e6
add test for v3 api
goldmedal 53921d1
fix wren-core-py test
goldmedal b1d398c
address comments
goldmedal db091dc
add negative test and handle unsupported error
goldmedal 65559b2
fix format
goldmedal f0ffc03
add path info in table properties
goldmedal 23e4360
enhance the test for type and format
goldmedal 98b9ad4
Merge branch 'main' into feature/file-connecotr-duckdb
goldmedal 318e11f
poetry lock
goldmedal 12d1f1c
fix test
goldmedal File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,156 @@ | ||
| import os | ||
|
|
||
| import duckdb | ||
| import opendal | ||
| from loguru import logger | ||
|
|
||
| from app.model import LocalFileConnectionInfo | ||
| from app.model.metadata.dto import ( | ||
| Column, | ||
| RustWrenEngineColumnType, | ||
| Table, | ||
| TableProperties, | ||
| ) | ||
| from app.model.metadata.metadata import Metadata | ||
|
|
||
|
|
||
| class ObjectStorageMetadata(Metadata): | ||
| def __init__(self, connection_info): | ||
| super().__init__(connection_info) | ||
|
|
||
| def get_table_list(self) -> list[Table]: | ||
| op = opendal.Operator("fs", root=self.connection_info.url.get_secret_value()) | ||
| conn = self._get_connection() | ||
| unique_tables = {} | ||
| for file in op.list("/"): | ||
| if file.path != "/": | ||
| stat = op.stat(file.path) | ||
| if stat.mode.is_dir(): | ||
| # if the file is a directory, use the directory name as the table name | ||
| table_name = os.path.basename(os.path.normpath(file.path)) | ||
| full_path = f"{self.connection_info.url.get_secret_value()}/{table_name}/*.{self.connection_info.format}" | ||
| else: | ||
| # if the file is a file, use the file name as the table name | ||
| table_name = os.path.splitext(os.path.basename(file.path))[0] | ||
| full_path = ( | ||
| f"{self.connection_info.url.get_secret_value()}/{file.path}" | ||
| ) | ||
|
|
||
| # read the file with the target format if unreadable, skip the file | ||
| df = self._read_df(conn, full_path) | ||
| if df is None: | ||
| continue | ||
| columns = [] | ||
| try: | ||
| for col in df.columns: | ||
| duckdb_type = df[col].dtypes[0] | ||
| columns.append( | ||
| Column( | ||
| name=col, | ||
| type=self._to_column_type(duckdb_type.__str__()), | ||
| notNull=False, | ||
| ) | ||
| ) | ||
| except Exception as e: | ||
| logger.debug(f"Failed to read column types: {e}") | ||
| continue | ||
|
|
||
| unique_tables[table_name] = Table( | ||
| name=table_name, | ||
| description=None, | ||
| columns=[], | ||
| properties=TableProperties( | ||
| table=table_name, | ||
| schema=None, | ||
| catalog=None, | ||
| path=full_path, | ||
| ), | ||
| primaryKey=None, | ||
| ) | ||
| unique_tables[table_name].columns = columns | ||
|
|
||
| return list(unique_tables.values()) | ||
|
|
||
| def get_constraints(self): | ||
| return [] | ||
|
|
||
| def get_version(self): | ||
| raise NotImplementedError("Subclasses must implement `get_version` method") | ||
|
|
||
| def _read_df(self, conn, path): | ||
| if self.connection_info.format == "parquet": | ||
| try: | ||
| return conn.read_parquet(path) | ||
| except Exception as e: | ||
| logger.debug(f"Failed to read parquet file: {e}") | ||
| return None | ||
| elif self.connection_info.format == "csv": | ||
| try: | ||
| logger.debug(f"Reading csv file: {path}") | ||
| return conn.read_csv(path) | ||
| except Exception as e: | ||
| logger.debug(f"Failed to read csv file: {e}") | ||
| return None | ||
| elif self.connection_info.format == "json": | ||
| try: | ||
| return conn.read_json(path) | ||
| except Exception as e: | ||
| logger.debug(f"Failed to read json file: {e}") | ||
| return None | ||
| else: | ||
| raise NotImplementedError( | ||
| f"Unsupported format: {self.connection_info.format}" | ||
| ) | ||
|
|
||
| def _to_column_type(self, col_type: str) -> RustWrenEngineColumnType: | ||
| if col_type.startswith("DECIMAL"): | ||
| return RustWrenEngineColumnType.DECIMAL | ||
|
|
||
| # TODO: support struct | ||
| if col_type.startswith("STRUCT"): | ||
| return RustWrenEngineColumnType.UNKNOWN | ||
|
|
||
| # TODO: support array | ||
| if col_type.endswith("[]"): | ||
| return RustWrenEngineColumnType.UNKNOWN | ||
|
|
||
| # refer to https://duckdb.org/docs/sql/data_types/overview#general-purpose-data-types | ||
| switcher = { | ||
| "BIGINT": RustWrenEngineColumnType.INT64, | ||
| "BIT": RustWrenEngineColumnType.INT2, | ||
| "BLOB": RustWrenEngineColumnType.BYTES, | ||
| "BOOLEAN": RustWrenEngineColumnType.BOOL, | ||
| "DATE": RustWrenEngineColumnType.DATE, | ||
| "DOUBLE": RustWrenEngineColumnType.DOUBLE, | ||
| "FLOAT": RustWrenEngineColumnType.FLOAT, | ||
| "INTEGER": RustWrenEngineColumnType.INT, | ||
| # TODO: Wren engine does not support HUGEINT. Map to INT64 for now. | ||
| "HUGEINT": RustWrenEngineColumnType.INT64, | ||
| "INTERVAL": RustWrenEngineColumnType.INTERVAL, | ||
| "JSON": RustWrenEngineColumnType.JSON, | ||
| "SMALLINT": RustWrenEngineColumnType.INT2, | ||
| "TIME": RustWrenEngineColumnType.TIME, | ||
| "TIMESTAMP": RustWrenEngineColumnType.TIMESTAMP, | ||
| "TIMESTAMP WITH TIME ZONE": RustWrenEngineColumnType.TIMESTAMPTZ, | ||
| "TINYINT": RustWrenEngineColumnType.INT2, | ||
| "UBIGINT": RustWrenEngineColumnType.INT64, | ||
| # TODO: Wren engine does not support UHUGEINT. Map to INT64 for now. | ||
| "UHUGEINT": RustWrenEngineColumnType.INT64, | ||
| "UINTEGER": RustWrenEngineColumnType.INT, | ||
| "USMALLINT": RustWrenEngineColumnType.INT2, | ||
| "UTINYINT": RustWrenEngineColumnType.INT2, | ||
| "UUID": RustWrenEngineColumnType.UUID, | ||
| "VARCHAR": RustWrenEngineColumnType.STRING, | ||
| } | ||
| return switcher.get(col_type, RustWrenEngineColumnType.UNKNOWN) | ||
|
|
||
| def _get_connection(self): | ||
| return duckdb.connect() | ||
|
|
||
|
|
||
| class LocalFileMetadata(ObjectStorageMetadata): | ||
| def __init__(self, connection_info: LocalFileConnectionInfo): | ||
| super().__init__(connection_info) | ||
|
|
||
| def get_version(self): | ||
| return "Local File System" |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.