-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Iceberg: use LocationProvider instead of hardcoded path #8573
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -145,7 +145,6 @@ | |
| import static io.trino.plugin.iceberg.IcebergTableProperties.getTableLocation; | ||
| import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue; | ||
| import static io.trino.plugin.iceberg.IcebergUtil.getColumns; | ||
| import static io.trino.plugin.iceberg.IcebergUtil.getDataPath; | ||
| import static io.trino.plugin.iceberg.IcebergUtil.getFileFormat; | ||
| import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata; | ||
| import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys; | ||
|
|
@@ -174,6 +173,9 @@ | |
| import static org.apache.iceberg.TableMetadata.newTableMetadata; | ||
| import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; | ||
| import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; | ||
| import static org.apache.iceberg.TableProperties.OBJECT_STORE_PATH; | ||
| import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION; | ||
| import static org.apache.iceberg.TableProperties.WRITE_NEW_DATA_LOCATION; | ||
| import static org.apache.iceberg.Transactions.createTableTransaction; | ||
|
|
||
| public class IcebergMetadata | ||
|
|
@@ -607,7 +609,8 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con | |
| propertiesBuilder.put(TABLE_COMMENT, tableMetadata.getComment().get()); | ||
| } | ||
|
|
||
| TableMetadata metadata = newTableMetadata(schema, partitionSpec, targetPath, propertiesBuilder.build()); | ||
| Map<String, String> properties = propertiesBuilder.build(); | ||
| TableMetadata metadata = newTableMetadata(schema, partitionSpec, targetPath, properties); | ||
|
|
||
| transaction = createTableTransaction(tableName, operations, metadata); | ||
|
|
||
|
|
@@ -618,7 +621,8 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con | |
| PartitionSpecParser.toJson(metadata.spec()), | ||
| getColumns(metadata.schema(), typeManager), | ||
| targetPath, | ||
| fileFormat); | ||
| fileFormat, | ||
| properties); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -641,8 +645,9 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto | |
| SchemaParser.toJson(icebergTable.schema()), | ||
| PartitionSpecParser.toJson(icebergTable.spec()), | ||
| getColumns(icebergTable.schema(), typeManager), | ||
| getDataPath(icebergTable.location()), | ||
| getFileFormat(icebergTable)); | ||
| icebergTable.location(), | ||
| getFileFormat(icebergTable), | ||
| icebergTable.properties()); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -702,6 +707,13 @@ public Optional<Object> getInfo(ConnectorTableHandle tableHandle) | |
| public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle) | ||
| { | ||
| IcebergTableHandle handle = (IcebergTableHandle) tableHandle; | ||
| // TODO: support path override in Iceberg table creation: https://github.com/trinodb/trino/issues/8861 | ||
| org.apache.iceberg.Table table = getIcebergTable(session, handle.getSchemaTableName()); | ||
|
||
| if (table.properties().containsKey(OBJECT_STORE_PATH) || | ||
| table.properties().containsKey(WRITE_NEW_DATA_LOCATION) || | ||
| table.properties().containsKey(WRITE_METADATA_LOCATION)) { | ||
| throw new TrinoException(NOT_SUPPORTED, "Table " + handle.getSchemaTableName() + " contains Iceberg path override properties and cannot be dropped from Trino"); | ||
| } | ||
jackye1995 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| metastore.dropTable(new HiveIdentity(session), handle.getSchemaName(), handle.getTableName(), true); | ||
| } | ||
|
|
||
|
|
@@ -1147,8 +1159,9 @@ public ConnectorInsertTableHandle beginRefreshMaterializedView(ConnectorSession | |
| SchemaParser.toJson(icebergTable.schema()), | ||
| PartitionSpecParser.toJson(icebergTable.spec()), | ||
| getColumns(icebergTable.schema(), typeManager), | ||
| getDataPath(icebergTable.location()), | ||
| getFileFormat(icebergTable)); | ||
| icebergTable.location(), | ||
| getFileFormat(icebergTable), | ||
| icebergTable.properties()); | ||
| } | ||
|
|
||
| @Override | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,13 +23,16 @@ | |
| import io.trino.spi.connector.ConnectorPageSinkProvider; | ||
| import io.trino.spi.connector.ConnectorSession; | ||
| import io.trino.spi.connector.ConnectorTransactionHandle; | ||
| import io.trino.spi.connector.SchemaTableName; | ||
| import org.apache.iceberg.PartitionSpec; | ||
| import org.apache.iceberg.PartitionSpecParser; | ||
| import org.apache.iceberg.Schema; | ||
| import org.apache.iceberg.SchemaParser; | ||
| import org.apache.iceberg.io.LocationProvider; | ||
|
|
||
| import javax.inject.Inject; | ||
|
|
||
| import static io.trino.plugin.iceberg.IcebergUtil.getLocationProvider; | ||
| import static java.util.Objects.requireNonNull; | ||
|
|
||
| public class IcebergPageSinkProvider | ||
|
|
@@ -74,10 +77,12 @@ private ConnectorPageSink createPageSink(ConnectorSession session, IcebergWritab | |
| HdfsContext hdfsContext = new HdfsContext(session); | ||
| Schema schema = SchemaParser.fromJson(tableHandle.getSchemaAsJson()); | ||
| PartitionSpec partitionSpec = PartitionSpecParser.fromJson(schema, tableHandle.getPartitionSpecAsJson()); | ||
| LocationProvider locationProvider = getLocationProvider(new SchemaTableName(tableHandle.getSchemaName(), tableHandle.getTableName()), | ||
| tableHandle.getOutputPath(), tableHandle.getStorageProperties()); | ||
| return new IcebergPageSink( | ||
| schema, | ||
| partitionSpec, | ||
| tableHandle.getOutputPath(), | ||
| locationProvider, | ||
|
||
| fileWriterFactory, | ||
| pageIndexerFactory, | ||
| hdfsEnvironment, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,6 +39,7 @@ | |
| import org.apache.iceberg.Table; | ||
| import org.apache.iceberg.TableMetadata; | ||
| import org.apache.iceberg.TableOperations; | ||
| import org.apache.iceberg.io.LocationProvider; | ||
| import org.apache.iceberg.types.Type.PrimitiveType; | ||
| import org.apache.iceberg.types.Types; | ||
|
|
||
|
|
@@ -64,6 +65,7 @@ | |
| import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID; | ||
| import static io.trino.plugin.iceberg.util.Timestamps.timestampTzFromMicros; | ||
| import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; | ||
| import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; | ||
| import static io.trino.spi.type.BigintType.BIGINT; | ||
| import static io.trino.spi.type.BooleanType.BOOLEAN; | ||
| import static io.trino.spi.type.DateType.DATE; | ||
|
|
@@ -84,8 +86,10 @@ | |
| import static java.nio.charset.StandardCharsets.UTF_8; | ||
| import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE; | ||
| import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; | ||
| import static org.apache.iceberg.LocationProviders.locationsFor; | ||
| import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; | ||
| import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; | ||
| import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL; | ||
| import static org.apache.iceberg.types.Type.TypeID.BINARY; | ||
| import static org.apache.iceberg.types.Type.TypeID.FIXED; | ||
|
|
||
|
|
@@ -190,14 +194,6 @@ private static Stream<Entry<Integer, PrimitiveType>> primitiveFieldTypes(Types.N | |
| throw new IllegalStateException("Unsupported field type: " + nestedField); | ||
| } | ||
|
|
||
| public static String getDataPath(String location) | ||
|
||
| { | ||
| if (!location.endsWith("/")) { | ||
| location += "/"; | ||
| } | ||
| return location + "data"; | ||
| } | ||
|
|
||
| public static FileFormat getFileFormat(Table table) | ||
| { | ||
| return FileFormat.valueOf(table.properties() | ||
|
|
@@ -327,4 +323,13 @@ public static Map<Integer, String> getPartitionKeys(FileScanTask scanTask) | |
|
|
||
| return Collections.unmodifiableMap(partitionKeys); | ||
| } | ||
|
|
||
| public static LocationProvider getLocationProvider(SchemaTableName schemaTableName, String tableLocation, Map<String, String> storageProperties) | ||
| { | ||
| if (storageProperties.containsKey(WRITE_LOCATION_PROVIDER_IMPL)) { | ||
| throw new TrinoException(NOT_SUPPORTED, "Table " + schemaTableName + " specifies " + storageProperties.get(WRITE_LOCATION_PROVIDER_IMPL) + | ||
| " as a location provider. Writing to Iceberg tables with custom location provider is not supported."); | ||
| } | ||
| return locationsFor(tableLocation, storageProperties); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.