This repository was archived by the owner on May 7, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 199
feat(ibis): introduce Local file connector #1029
Merged
goldmedal
merged 13 commits into
Canner:main
from
goldmedal:feature/file-connecotr-duckdb
Jan 7, 2025
Merged
Changes from 8 commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
8e846b4
add default alias for source plan
goldmedal 1968898
add local file data source
goldmedal 99cbbd7
add metadata api for file connector
goldmedal df6a2e6
add test for v3 api
goldmedal 53921d1
fix wren-core-py test
goldmedal b1d398c
address comments
goldmedal db091dc
add negative test and handle unsupported error
goldmedal 65559b2
fix format
goldmedal f0ffc03
add path info in table properties
goldmedal 23e4360
enhance the test for type and format
goldmedal 98b9ad4
Merge branch 'main' into feature/file-connecotr-duckdb
goldmedal 318e11f
poetry lock
goldmedal 12d1f1c
fix test
goldmedal File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,132 @@ | ||
| import os | ||
|
|
||
| import duckdb | ||
| import opendal | ||
|
|
||
| from app.model import LocalFileConnectionInfo | ||
| from app.model.metadata.dto import ( | ||
| Column, | ||
| RustWrenEngineColumnType, | ||
| Table, | ||
| TableProperties, | ||
| ) | ||
| from app.model.metadata.metadata import Metadata | ||
|
|
||
|
|
||
| class ObjectStorageMetadata(Metadata): | ||
| def __init__(self, connection_info): | ||
| super().__init__(connection_info) | ||
|
|
||
| def get_table_list(self) -> list[Table]: | ||
| op = opendal.Operator("fs", root=self.connection_info.url.get_secret_value()) | ||
| conn = self._get_connection() | ||
| unique_tables = {} | ||
| for file in op.list("/"): | ||
| if file.path != "/": | ||
| stat = op.stat(file.path) | ||
| if stat.mode.is_dir(): | ||
| # if the file is a directory, use the directory name as the table name | ||
| table_name = os.path.basename(os.path.normpath(file.path)) | ||
| else: | ||
| # if the file is a file, use the file name as the table name | ||
| table_name = os.path.splitext(os.path.basename(file.path))[0] | ||
| unique_tables[table_name] = Table( | ||
|
goldmedal marked this conversation as resolved.
|
||
| name=table_name, | ||
| description=None, | ||
| columns=[], | ||
| properties=TableProperties( | ||
| table=table_name, | ||
| schema=None, | ||
| catalog=None, | ||
| ), | ||
| primaryKey=None, | ||
| ) | ||
| df = self._read_df(conn, file.path) | ||
| for col in df.columns: | ||
| duckdb_type = df[col].dtypes[0] | ||
|
goldmedal marked this conversation as resolved.
Outdated
|
||
| unique_tables[table_name].columns.append( | ||
| Column( | ||
| name=col, | ||
| type=self._to_column_type(duckdb_type.__str__()), | ||
| notNull=False, | ||
| ) | ||
| ) | ||
|
|
||
| return list(unique_tables.values()) | ||
|
|
||
| def get_constraints(self): | ||
| return [] | ||
|
|
||
| def get_version(self): | ||
| raise NotImplementedError("Subclasses must implement `get_version` method") | ||
|
|
||
| def _read_df(self, conn, path): | ||
| if self.connection_info.format == "parquet": | ||
| return conn.read_parquet( | ||
| f"{self.connection_info.url.get_secret_value()}/{path}" | ||
| ) | ||
| elif self.connection_info.format == "csv": | ||
| return conn.read_csv( | ||
| f"{self.connection_info.url.get_secret_value()}/{path}" | ||
| ) | ||
| elif self.connection_info.format == "json": | ||
| return conn.read_json( | ||
| f"{self.connection_info.url.get_secret_value()}/{path}" | ||
| ) | ||
| else: | ||
| raise NotImplementedError( | ||
| f"Unsupported format: {self.connection_info.format}" | ||
| ) | ||
|
goldmedal marked this conversation as resolved.
|
||
|
|
||
| def _to_column_type(self, col_type: str) -> RustWrenEngineColumnType: | ||
| if col_type.startswith("DECIMAL"): | ||
| return RustWrenEngineColumnType.DECIMAL | ||
|
|
||
| # TODO: support struct | ||
| if col_type.startswith("STRUCT"): | ||
| return RustWrenEngineColumnType.UNKNOWN | ||
|
|
||
| # TODO: support array | ||
| if col_type.endswith("[]"): | ||
| return RustWrenEngineColumnType.UNKNOWN | ||
|
|
||
| # refer to https://duckdb.org/docs/sql/data_types/overview#general-purpose-data-types | ||
| switcher = { | ||
| "BIGINT": RustWrenEngineColumnType.INT64, | ||
| "BIT": RustWrenEngineColumnType.INT2, | ||
| "BLOB": RustWrenEngineColumnType.BYTES, | ||
| "BOOLEAN": RustWrenEngineColumnType.BOOL, | ||
| "DATE": RustWrenEngineColumnType.DATE, | ||
| "DOUBLE": RustWrenEngineColumnType.FLOAT64, | ||
| "FLOAT": RustWrenEngineColumnType.FLOAT, | ||
| "INTEGER": RustWrenEngineColumnType.INT, | ||
| # TODO: Wren engine does not support HUGEINT. Map to INT64 for now. | ||
| "HUGEINT": RustWrenEngineColumnType.INT64, | ||
| "INTERVAL": RustWrenEngineColumnType.INTERVAL, | ||
| "JSON": RustWrenEngineColumnType.JSON, | ||
| "SMALLINT": RustWrenEngineColumnType.INT2, | ||
| "TIME": RustWrenEngineColumnType.TIME, | ||
| "TIMESTAMP": RustWrenEngineColumnType.TIMESTAMP, | ||
| "TIMESTAMP WITH TIME ZONE": RustWrenEngineColumnType.TIMESTAMPTZ, | ||
| "TINYINT": RustWrenEngineColumnType.INT2, | ||
| "UBIGINT": RustWrenEngineColumnType.INT64, | ||
| # TODO: Wren engine does not support UHUGEINT. Map to INT64 for now. | ||
| "UHUGEINT": RustWrenEngineColumnType.INT64, | ||
| "UINTEGER": RustWrenEngineColumnType.INT, | ||
| "USMALLINT": RustWrenEngineColumnType.INT2, | ||
| "UTINYINT": RustWrenEngineColumnType.INT2, | ||
| "UUID": RustWrenEngineColumnType.UUID, | ||
| "VARCHAR": RustWrenEngineColumnType.STRING, | ||
| } | ||
| return switcher.get(col_type, RustWrenEngineColumnType.UNKNOWN) | ||
|
|
||
| def _get_connection(self): | ||
| return duckdb.connect() | ||
|
|
||
|
|
||
| class LocalFileMetadata(ObjectStorageMetadata): | ||
| def __init__(self, connection_info: LocalFileConnectionInfo): | ||
| super().__init__(connection_info) | ||
|
|
||
| def get_version(self): | ||
| return "Local File System" | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.