Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,9 @@ public interface PolarisCatalog {
boolean dropGenericTable(TableIdentifier identifier);

GenericTable createGenericTable(
TableIdentifier identifier, String format, String doc, Map<String, String> props);
TableIdentifier identifier,
String format,
String baseLocation,
String doc,
Map<String, String> props);
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,18 @@ public boolean dropGenericTable(TableIdentifier identifier) {

@Override
public GenericTable createGenericTable(
TableIdentifier identifier, String format, String doc, Map<String, String> props) {
TableIdentifier identifier,
String format,
String baseLocation,
String doc,
Map<String, String> props) {
Endpoint.check(endpoints, PolarisEndpoints.V1_CREATE_GENERIC_TABLE);
CreateGenericTableRESTRequest request =
new CreateGenericTableRESTRequest(
CreateGenericTableRequest.builder()
.setName(identifier.name())
.setFormat(format)
.setBaseLocation(baseLocation)
.setDoc(doc)
.setProperties(props)
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,16 @@
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A spark TableCatalog Implementation interacts with Polaris specific APIs only. The APIs it
* interacts with is generic table APIs, and all table operations performed in this class are
* expected to be for non-iceberg tables.
*/
public class PolarisSparkCatalog implements TableCatalog {
private static final Logger LOGGER = LoggerFactory.getLogger(PolarisSparkCatalog.class);

private PolarisCatalog polarisCatalog = null;
private String catalogName = null;
Expand Down Expand Up @@ -83,9 +86,30 @@ public Table createTable(
throws TableAlreadyExistsException, NoSuchNamespaceException {
try {
String format = properties.get(PolarisCatalogUtils.TABLE_PROVIDER_KEY);

String baseLocation;
// Extract the base table location from the spark properties.
// Spark pass the table base location either with the
// TableCatalog.PROP_LOCATION key, or with "path" key if created
// with the path option.
if (properties.get(TableCatalog.PROP_LOCATION) != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: If location and path are both set, it might be worth a debug log explaining that location is taking precedence

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sg! Added debug log under the case if both are configured

baseLocation = properties.get(TableCatalog.PROP_LOCATION);
if (properties.get(PolarisCatalogUtils.TABLE_PATH_KEY) != null) {
LOGGER.debug(
"Both location and path are propagated in the table properties, location {}, path {}",
baseLocation,
properties.get(PolarisCatalogUtils.TABLE_PATH_KEY));
}
} else {
baseLocation = properties.get(PolarisCatalogUtils.TABLE_PATH_KEY);
}
GenericTable genericTable =
this.polarisCatalog.createGenericTable(
Spark3Util.identifierToTableIdentifier(identifier), format, null, properties);
Spark3Util.identifierToTableIdentifier(identifier),
format,
baseLocation,
null,
properties);
return PolarisCatalogUtils.loadSparkTable(genericTable);
} catch (AlreadyExistsException e) {
throw new TableAlreadyExistsException(identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,16 @@ public static Table loadSparkTable(GenericTable genericTable) {
boolean hasPathClause = properties.get(TABLE_PATH_KEY) != null;
Map<String, String> tableProperties = Maps.newHashMap();
tableProperties.putAll(properties);
if (!hasPathClause && hasLocationClause) {
if (!hasPathClause) {
// DataSourceV2 requires the path property on table loading. However, spark today
// doesn't create the corresponding path property if the path keyword is not
// provided by user when location is provided. Here, we duplicate the location
// property as path to make sure the table can be loaded.
tableProperties.put(TABLE_PATH_KEY, properties.get(TableCatalog.PROP_LOCATION));
if (genericTable.getBaseLocation() != null && !genericTable.getBaseLocation().isEmpty()) {
tableProperties.put(TABLE_PATH_KEY, genericTable.getBaseLocation());
} else if (hasLocationClause) {
tableProperties.put(TABLE_PATH_KEY, properties.get(TableCatalog.PROP_LOCATION));
}
}
return DataSourceV2Utils.getTableFromProvider(
provider, new CaseInsensitiveStringMap(tableProperties), scala.Option.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ public boolean dropGenericTable(TableIdentifier identifier) {

@Override
public GenericTable createGenericTable(
TableIdentifier identifier, String format, String doc, Map<String, String> props) {
TableIdentifier identifier,
String format,
String baseLocation,
String doc,
Map<String, String> props) {
if (!namespaceExists(identifier.namespace())) {
throw new NoSuchNamespaceException(
"Cannot create generic table %s. Namespace does not exist: %s",
Expand All @@ -78,6 +82,7 @@ public GenericTable createGenericTable(
GenericTable.builder()
.setName(identifier.name())
.setFormat(format)
.setBaseLocation(baseLocation)
.setProperties(props)
.build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,19 @@ public void setUp() {

@ParameterizedTest
@MethodSource("genericTableTestCases")
public void testLoadGenericTableRESTResponse(String doc, Map<String, String> properties)
public void testLoadGenericTableRESTResponse(
String baseLocation, String doc, Map<String, String> properties)
throws JsonProcessingException {
GenericTable table =
GenericTable.Builder tableBuilder =
GenericTable.builder()
.setFormat("delta")
.setName("test-table")
.setProperties(properties)
.setDoc(doc)
.build();
.setDoc(doc);
if (baseLocation != null) {
tableBuilder.setBaseLocation(baseLocation);
}
GenericTable table = tableBuilder.build();
LoadGenericTableRESTResponse response = new LoadGenericTableRESTResponse(table);
String json = mapper.writeValueAsString(response);
LoadGenericTableRESTResponse deserializedResponse =
Expand All @@ -82,18 +86,21 @@ public void testLoadGenericTableRESTResponse(String doc, Map<String, String> pro
assertThat(deserializedResponse.getTable().getName()).isEqualTo("test-table");
assertThat(deserializedResponse.getTable().getDoc()).isEqualTo(doc);
assertThat(deserializedResponse.getTable().getProperties().size()).isEqualTo(properties.size());
assertThat(deserializedResponse.getTable().getBaseLocation()).isEqualTo(baseLocation);
}

@ParameterizedTest
@MethodSource("genericTableTestCases")
public void testCreateGenericTableRESTRequest(String doc, Map<String, String> properties)
public void testCreateGenericTableRESTRequest(
String baseLocation, String doc, Map<String, String> properties)
throws JsonProcessingException {
CreateGenericTableRESTRequest request =
new CreateGenericTableRESTRequest(
CreateGenericTableRequest.builder()
.setName("test-table")
.setFormat("delta")
.setDoc(doc)
.setBaseLocation(baseLocation)
.setProperties(properties)
.build());
String json = mapper.writeValueAsString(request);
Expand All @@ -103,6 +110,7 @@ public void testCreateGenericTableRESTRequest(String doc, Map<String, String> pr
assertThat(deserializedRequest.getFormat()).isEqualTo("delta");
assertThat(deserializedRequest.getDoc()).isEqualTo(doc);
assertThat(deserializedRequest.getProperties().size()).isEqualTo(properties.size());
assertThat(deserializedRequest.getBaseLocation()).isEqualTo(baseLocation);
}

@Test
Expand Down Expand Up @@ -150,10 +158,12 @@ private static Stream<Arguments> genericTableTestCases() {
var doc = "table for testing";
var properties = Maps.newHashMap();
properties.put("location", "s3://path/to/table/");
var baseLocation = "s3://path/to/table/";
return Stream.of(
Arguments.of(doc, properties),
Arguments.of(null, Maps.newHashMap()),
Arguments.of(doc, Maps.newHashMap()),
Arguments.of(null, properties));
Arguments.of(null, doc, properties),
Arguments.of(baseLocation, doc, properties),
Arguments.of(null, null, Maps.newHashMap()),
Arguments.of(baseLocation, doc, Maps.newHashMap()),
Arguments.of(baseLocation, null, properties));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.polaris.core.entity.NamespaceEntity;
import org.apache.polaris.core.entity.PolarisBaseEntity;
import org.apache.polaris.core.entity.PolarisEntity;
import org.apache.polaris.core.entity.PolarisEntityConstants;
import org.apache.polaris.core.entity.PolarisEntitySubType;
import org.apache.polaris.core.entity.PolarisEntityType;

Expand Down Expand Up @@ -58,6 +59,12 @@ public String getDoc() {
return getInternalPropertiesAsMap().get(GenericTableEntity.DOC_KEY);
}

@Override
@JsonIgnore
public String getBaseLocation() {
return getInternalPropertiesAsMap().get(PolarisEntityConstants.ENTITY_BASE_LOCATION);
}

public static class Builder
extends PolarisEntity.BaseBuilder<GenericTableEntity, GenericTableEntity.Builder> {
public Builder(TableIdentifier tableIdentifier, String format) {
Expand All @@ -79,6 +86,11 @@ public GenericTableEntity.Builder setDoc(String doc) {
return this;
}

public GenericTableEntity.Builder setBaseLocation(String location) {
internalProperties.put(PolarisEntityConstants.ENTITY_BASE_LOCATION, location);
return this;
}

public GenericTableEntity.Builder setTableIdentifier(TableIdentifier identifier) {
Namespace namespace = identifier.namespace();
setParentNamespace(namespace);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.RESTUtil;
import org.apache.polaris.core.entity.LocationBasedEntity;
import org.apache.polaris.core.entity.NamespaceEntity;
import org.apache.polaris.core.entity.PolarisBaseEntity;
import org.apache.polaris.core.entity.PolarisEntity;
Expand All @@ -34,7 +33,7 @@
* An entity type for {@link TableLikeEntity} instances that conform to iceberg semantics around
* locations. This includes both Iceberg tables and Iceberg views.
*/
public class IcebergTableLikeEntity extends TableLikeEntity implements LocationBasedEntity {
public class IcebergTableLikeEntity extends TableLikeEntity {
// For applicable types, this key on the "internalProperties" map will return the location
// of the internalProperties JSON file.
public static final String METADATA_LOCATION_KEY = "metadata-location";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.RESTUtil;
import org.apache.polaris.core.entity.LocationBasedEntity;
import org.apache.polaris.core.entity.NamespaceEntity;
import org.apache.polaris.core.entity.PolarisBaseEntity;
import org.apache.polaris.core.entity.PolarisEntity;
import org.apache.polaris.core.entity.PolarisEntityType;

/**
* An entity type for all table-like entities including Iceberg tables, Iceberg views, and generic
* tables. This entity maps to {@link PolarisEntityType#TABLE_LIKE}
*/
public abstract class TableLikeEntity extends PolarisEntity {
public abstract class TableLikeEntity extends PolarisEntity implements LocationBasedEntity {

public TableLikeEntity(@Nonnull PolarisBaseEntity sourceEntity) {
super(sourceEntity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,8 @@ public void before(TestInfo testInfo) {
baseCatalog.buildTable(TABLE_NS1B_1, SCHEMA).create();
baseCatalog.buildTable(TABLE_NS2_1, SCHEMA).create();

genericTableCatalog.createGenericTable(TABLE_NS1_1_GENERIC, "format", "doc", Map.of());
genericTableCatalog.createGenericTable(
TABLE_NS1_1_GENERIC, "format", "file:///tmp/test_table", "doc", Map.of());

policyCatalog.createPolicy(
POLICY_NS1_1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.NullSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
Expand Down Expand Up @@ -277,7 +280,7 @@ public void testCreateGenericTableDoesNotThrow() {
Assertions.assertThatCode(
() ->
genericTableCatalog.createGenericTable(
TableIdentifier.of("ns", "t1"), "test-format", "doc", Map.of()))
TableIdentifier.of("ns", "t1"), "test-format", null, "doc", Map.of()))
.doesNotThrowAnyException();
}

Expand All @@ -286,12 +289,12 @@ public void testGenericTableAlreadyExists() {
Namespace namespace = Namespace.of("ns");
icebergCatalog.createNamespace(namespace);
genericTableCatalog.createGenericTable(
TableIdentifier.of("ns", "t1"), "format1", "doc", Map.of());
TableIdentifier.of("ns", "t1"), "format1", null, "doc", Map.of());

Assertions.assertThatCode(
() ->
genericTableCatalog.createGenericTable(
TableIdentifier.of("ns", "t1"), "format2", "doc", Map.of()))
TableIdentifier.of("ns", "t1"), "format2", null, "doc", Map.of()))
.hasMessageContaining("already exists");

Assertions.assertThatCode(
Expand All @@ -308,16 +311,18 @@ public void testIcebergTableAlreadyExists() {
Assertions.assertThatCode(
() ->
genericTableCatalog.createGenericTable(
TableIdentifier.of("ns", "t1"), "format2", "doc", Map.of()))
TableIdentifier.of("ns", "t1"), "format2", null, "doc", Map.of()))
.hasMessageContaining("already exists");

Assertions.assertThatCode(
() -> icebergCatalog.createTable(TableIdentifier.of("ns", "t1"), SCHEMA))
.hasMessageContaining("already exists");
}

@Test
public void testGenericTableRoundTrip() {
@ParameterizedTest
@NullSource
@ValueSource(strings = {"", "file://path/to/my/table"})
public void testGenericTableRoundTrip(String baseLocation) {
Namespace namespace = Namespace.of("ns");
icebergCatalog.createNamespace(namespace);

Expand All @@ -327,14 +332,15 @@ public void testGenericTableRoundTrip() {
String doc = "round-trip-doc";

genericTableCatalog.createGenericTable(
TableIdentifier.of("ns", tableName), format, doc, properties);
TableIdentifier.of("ns", tableName), format, baseLocation, doc, properties);

GenericTableEntity resultEntity =
genericTableCatalog.loadGenericTable(TableIdentifier.of("ns", tableName));

Assertions.assertThat(resultEntity.getFormat()).isEqualTo(format);
Assertions.assertThat(resultEntity.getPropertiesAsMap()).isEqualTo(properties);
Assertions.assertThat(resultEntity.getName()).isEqualTo(tableName);
Assertions.assertThat(resultEntity.getBaseLocation()).isEqualTo(baseLocation);
}

@Test
Expand Down Expand Up @@ -381,7 +387,7 @@ public void testReadGenericAsIcebergTable() {
String tableName = "t1";

genericTableCatalog.createGenericTable(
TableIdentifier.of("ns", tableName), "format", "doc", Map.of());
TableIdentifier.of("ns", tableName), "format", null, "doc", Map.of());
Assertions.assertThatCode(() -> icebergCatalog.loadTable(TableIdentifier.of("ns", tableName)))
.hasMessageContaining("does not exist: ns.t1");
}
Expand All @@ -394,7 +400,7 @@ public void testReadGenericAsIcebergView() {
String tableName = "t1";

genericTableCatalog.createGenericTable(
TableIdentifier.of("ns", tableName), "format", "doc", Map.of());
TableIdentifier.of("ns", tableName), "format", null, "doc", Map.of());
Assertions.assertThatCode(() -> icebergCatalog.loadView(TableIdentifier.of("ns", tableName)))
.hasMessageContaining("does not exist: ns.t1");
}
Expand All @@ -406,7 +412,7 @@ public void testListTables() {

for (int i = 0; i < 10; i++) {
genericTableCatalog.createGenericTable(
TableIdentifier.of("ns", "t" + i), "format", "doc", Map.of());
TableIdentifier.of("ns", "t" + i), "format", null, "doc", Map.of());
}

List<TableIdentifier> listResult = genericTableCatalog.listGenericTables(namespace);
Expand Down Expand Up @@ -468,7 +474,7 @@ public void testListMixedTables() {

for (int i = 0; i < 10; i++) {
genericTableCatalog.createGenericTable(
TableIdentifier.of("ns", "g" + i), "format", "doc", Map.of());
TableIdentifier.of("ns", "g" + i), "format", null, "doc", Map.of());
}

Assertions.assertThat(genericTableCatalog.listGenericTables(namespace).size()).isEqualTo(10);
Expand Down Expand Up @@ -514,7 +520,7 @@ public void testDropViaIceberg() {
Namespace namespace = Namespace.of("ns");
icebergCatalog.createNamespace(namespace);
genericTableCatalog.createGenericTable(
TableIdentifier.of("ns", "t1"), "format", "doc", Map.of());
TableIdentifier.of("ns", "t1"), "format", null, "doc", Map.of());

Assertions.assertThat(icebergCatalog.dropTable(TableIdentifier.of("ns", "t1"))).isFalse();
Assertions.assertThat(genericTableCatalog.loadGenericTable(TableIdentifier.of("ns", "t1")))
Expand All @@ -526,7 +532,7 @@ public void testDropViaIcebergView() {
Namespace namespace = Namespace.of("ns");
icebergCatalog.createNamespace(namespace);
genericTableCatalog.createGenericTable(
TableIdentifier.of("ns", "t1"), "format", "doc", Map.of());
TableIdentifier.of("ns", "t1"), "format", null, "doc", Map.of());

Assertions.assertThat(icebergCatalog.dropView(TableIdentifier.of("ns", "t1"))).isFalse();
Assertions.assertThat(genericTableCatalog.loadGenericTable(TableIdentifier.of("ns", "t1")))
Expand Down
Loading