-
Notifications
You must be signed in to change notification settings - Fork 3k
Support for file paths in SparkCatalogs via HadoopTables #1843
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c9589f8
36b31ef
78bfda9
71c69de
2a1d19a
d725841
a5530ce
a5c6f95
94e6b63
8b39276
068a4f9
8710c15
dfb9e2c
fbba2c1
46d5fb7
e5be0da
f7eb6b9
78071f3
211b070
793a117
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,6 +37,9 @@ | |
| import org.apache.iceberg.TableMetadata; | ||
| import org.apache.iceberg.TableOperations; | ||
| import org.apache.iceberg.Tables; | ||
| import org.apache.iceberg.Transaction; | ||
| import org.apache.iceberg.Transactions; | ||
| import org.apache.iceberg.catalog.Catalog; | ||
| import org.apache.iceberg.exceptions.AlreadyExistsException; | ||
| import org.apache.iceberg.exceptions.NoSuchTableException; | ||
| import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; | ||
|
|
@@ -131,20 +134,10 @@ private Table loadMetadataTable(String location, String metadataTableName, Metad | |
| @Override | ||
| public Table create(Schema schema, PartitionSpec spec, SortOrder order, | ||
| Map<String, String> properties, String location) { | ||
| Preconditions.checkNotNull(schema, "A table schema is required"); | ||
|
|
||
| TableOperations ops = newTableOps(location); | ||
| if (ops.current() != null) { | ||
| throw new AlreadyExistsException("Table already exists at location: %s", location); | ||
| } | ||
|
|
||
| Map<String, String> tableProps = properties == null ? ImmutableMap.of() : properties; | ||
| PartitionSpec partitionSpec = spec == null ? PartitionSpec.unpartitioned() : spec; | ||
| SortOrder sortOrder = order == null ? SortOrder.unsorted() : order; | ||
| TableMetadata metadata = TableMetadata.newTableMetadata(schema, partitionSpec, sortOrder, location, tableProps); | ||
| ops.commit(null, metadata); | ||
|
|
||
| return new BaseTable(ops, location); | ||
| return buildTable(location, schema).withPartitionSpec(spec) | ||
| .withSortOrder(order) | ||
| .withProperties(properties) | ||
| .create(); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -200,6 +193,165 @@ TableOperations newTableOps(String location) { | |
| } | ||
| } | ||
|
|
||
| private TableMetadata tableMetadata(Schema schema, PartitionSpec spec, SortOrder order, | ||
| Map<String, String> properties, String location) { | ||
| Preconditions.checkNotNull(schema, "A table schema is required"); | ||
|
|
||
| Map<String, String> tableProps = properties == null ? ImmutableMap.of() : properties; | ||
| PartitionSpec partitionSpec = spec == null ? PartitionSpec.unpartitioned() : spec; | ||
| SortOrder sortOrder = order == null ? SortOrder.unsorted() : order; | ||
| return TableMetadata.newTableMetadata(schema, partitionSpec, sortOrder, location, tableProps); | ||
| } | ||
|
|
||
| /** | ||
| * Start a transaction to create a table. | ||
| * | ||
| * @param location a location for the table | ||
| * @param schema a schema | ||
| * @param spec a partition spec | ||
| * @param properties a string map of table properties | ||
| * @return a {@link Transaction} to create the table | ||
| * @throws AlreadyExistsException if the table already exists | ||
| */ | ||
| public Transaction newCreateTableTransaction( | ||
| String location, | ||
| Schema schema, | ||
| PartitionSpec spec, | ||
| Map<String, String> properties) { | ||
| return buildTable(location, schema).withPartitionSpec(spec).withProperties(properties).createTransaction(); | ||
| } | ||
|
|
||
| /** | ||
| * Start a transaction to replace a table. | ||
| * | ||
| * @param location a location for the table | ||
| * @param schema a schema | ||
| * @param spec a partition spec | ||
| * @param properties a string map of table properties | ||
| * @param orCreate whether to create the table if not exists | ||
| * @return a {@link Transaction} to replace the table | ||
| * @throws NoSuchTableException if the table doesn't exist and orCreate is false | ||
| */ | ||
| public Transaction newReplaceTableTransaction( | ||
| String location, | ||
| Schema schema, | ||
| PartitionSpec spec, | ||
| Map<String, String> properties, | ||
| boolean orCreate) { | ||
|
|
||
|
|
||
| Catalog.TableBuilder builder = buildTable(location, schema).withPartitionSpec(spec).withProperties(properties); | ||
| return orCreate ? builder.createOrReplaceTransaction() : builder.replaceTransaction(); | ||
| } | ||
|
|
||
| public Catalog.TableBuilder buildTable(String location, Schema schema) { | ||
| return new HadoopTableBuilder(location, schema); | ||
| } | ||
|
|
||
| private class HadoopTableBuilder implements Catalog.TableBuilder { | ||
| private final String location; | ||
| private final Schema schema; | ||
| private final ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.builder(); | ||
| private PartitionSpec spec = PartitionSpec.unpartitioned(); | ||
| private SortOrder sortOrder = SortOrder.unsorted(); | ||
|
|
||
|
|
||
| HadoopTableBuilder(String location, Schema schema) { | ||
| this.location = location; | ||
| this.schema = schema; | ||
| } | ||
|
|
||
| @Override | ||
| public Catalog.TableBuilder withPartitionSpec(PartitionSpec newSpec) { | ||
| this.spec = newSpec != null ? newSpec : PartitionSpec.unpartitioned(); | ||
| return this; | ||
| } | ||
|
|
||
| @Override | ||
| public Catalog.TableBuilder withSortOrder(SortOrder newSortOrder) { | ||
| this.sortOrder = newSortOrder != null ? newSortOrder : SortOrder.unsorted(); | ||
| return this; | ||
| } | ||
|
|
||
| @Override | ||
| public Catalog.TableBuilder withLocation(String newLocation) { | ||
| Preconditions.checkArgument(newLocation == null || location.equals(newLocation), | ||
| String.format("Table location %s differs from the table location (%s) from the PathIdentifier", | ||
| newLocation, location)); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: preconditions already support argument formatting, so |
||
| return this; | ||
rdblue marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| @Override | ||
| public Catalog.TableBuilder withProperties(Map<String, String> properties) { | ||
| if (properties != null) { | ||
| propertiesBuilder.putAll(properties); | ||
| } | ||
| return this; | ||
| } | ||
|
|
||
| @Override | ||
| public Catalog.TableBuilder withProperty(String key, String value) { | ||
| propertiesBuilder.put(key, value); | ||
| return this; | ||
| } | ||
|
|
||
| @Override | ||
| public Table create() { | ||
| TableOperations ops = newTableOps(location); | ||
| if (ops.current() != null) { | ||
| throw new AlreadyExistsException("Table already exists at location: %s", location); | ||
| } | ||
|
|
||
| Map<String, String> properties = propertiesBuilder.build(); | ||
| TableMetadata metadata = tableMetadata(schema, spec, sortOrder, properties, location); | ||
| ops.commit(null, metadata); | ||
| return new BaseTable(ops, location); | ||
| } | ||
|
|
||
| @Override | ||
| public Transaction createTransaction() { | ||
| TableOperations ops = newTableOps(location); | ||
| if (ops.current() != null) { | ||
| throw new AlreadyExistsException("Table already exists: %s", location); | ||
| } | ||
|
|
||
| Map<String, String> properties = propertiesBuilder.build(); | ||
| TableMetadata metadata = tableMetadata(schema, spec, null, properties, location); | ||
| return Transactions.createTableTransaction(location, ops, metadata); | ||
| } | ||
|
|
||
| @Override | ||
| public Transaction replaceTransaction() { | ||
| return newReplaceTableTransaction(false); | ||
| } | ||
|
|
||
| @Override | ||
| public Transaction createOrReplaceTransaction() { | ||
| return newReplaceTableTransaction(true); | ||
| } | ||
|
|
||
| private Transaction newReplaceTableTransaction(boolean orCreate) { | ||
| TableOperations ops = newTableOps(location); | ||
| if (!orCreate && ops.current() == null) { | ||
| throw new NoSuchTableException("No such table: %s", location); | ||
| } | ||
|
|
||
| Map<String, String> properties = propertiesBuilder.build(); | ||
| TableMetadata metadata; | ||
| if (ops.current() != null) { | ||
| metadata = ops.current().buildReplacement(schema, spec, SortOrder.unsorted(), location, properties); | ||
| } else { | ||
| metadata = tableMetadata(schema, spec, null, properties, location); | ||
|
Comment on lines
+342
to
+344
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| } | ||
|
|
||
| if (orCreate) { | ||
| return Transactions.createOrReplaceTableTransaction(location, ops, metadata); | ||
| } else { | ||
| return Transactions.replaceTableTransaction(location, ops, metadata); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void setConf(Configuration conf) { | ||
| this.conf = conf; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.iceberg.spark; | ||
|
|
||
| import java.util.List; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Joiner; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Splitter; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Iterables; | ||
| import org.apache.spark.sql.connector.catalog.Identifier; | ||
|
|
||
| public class PathIdentifier implements Identifier { | ||
| private static final Splitter SPLIT = Splitter.on("/"); | ||
| private static final Joiner JOIN = Joiner.on("/"); | ||
| private final String[] namespace; | ||
| private final String location; | ||
| private final String name; | ||
|
|
||
| public PathIdentifier(String location) { | ||
| this.location = location; | ||
| List<String> pathParts = SPLIT.splitToList(location); | ||
| name = Iterables.getLast(pathParts); | ||
| namespace = pathParts.size() > 1 ? | ||
| new String[]{JOIN.join(pathParts.subList(0, pathParts.size() - 1))} : | ||
| new String[0]; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that the namespace should be the location, not an array of path parts. This would create really weird namespaces, like
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. agreed that it shouldn't be
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't namespace be just
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I prefer @aokolnychyi's suggestion over splitting filename from directories. Then we dont have the extra
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I don't think so. Those identifiers would not be passed to Iceberg as
I'm fine either way. I do think that it makes sense to omit it from the namespace (["s3://bucket/path/to"] and "table"). The main requirement is that the location should be available unmodified when this is used. Sounds like we agree that the table name should be the last directory name to make the subquery alias work.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cool, have left it as is. Thanks for the pointer to subquery alias! |
||
| } | ||
|
|
||
| @Override | ||
| public String[] namespace() { | ||
| return namespace; | ||
| } | ||
|
|
||
| @Override | ||
| public String name() { | ||
| return name; | ||
| } | ||
|
|
||
| public String location() { | ||
| return location; | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this use
TableBuilderinstead? That would make theSparkCatalogimplementation a lot cleaner because the code to configure the builder could be shared.I opened #1879 because tests would fail unless the builder is also supported in CachingCatalog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed. Will rebase once #1879 is merged. I also prefer the
TableBuilderpattern, much cleaner.