Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 167 additions & 11 deletions core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand All @@ -37,7 +38,6 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.TableMetadata;
Expand All @@ -49,6 +49,7 @@
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NoSuchViewException;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.hadoop.Configurable;
import org.apache.iceberg.io.CloseableGroup;
Expand All @@ -60,16 +61,21 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.LocationUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.view.BaseMetastoreViewCatalog;
import org.apache.iceberg.view.ViewOperations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcCatalog extends BaseMetastoreCatalog
public class JdbcCatalog extends BaseMetastoreViewCatalog
implements Configurable<Object>, SupportsNamespaces {

public static final String PROPERTY_PREFIX = "jdbc.";
private static final String NAMESPACE_EXISTS_PROPERTY = "exists";
private static final Logger LOG = LoggerFactory.getLogger(JdbcCatalog.class);
private static final Joiner SLASH = Joiner.on("/");
static final String VIEW_WARNING_LOG_MESSAGE =
"JDBC catalog is initialized without view support. To auto-migrate the database's schema and enable view support, set jdbc.add-view-support=true";

private FileIO io;
private String catalogName = "jdbc";
Expand All @@ -81,6 +87,7 @@ public class JdbcCatalog extends BaseMetastoreCatalog
private final Function<Map<String, String>, JdbcClientPool> clientPoolBuilder;
private final boolean initializeCatalogTables;
private CloseableGroup closeableGroup;
private JdbcUtil.SchemaVersion schemaVersion = JdbcUtil.SchemaVersion.V1;

public JdbcCatalog() {
this(null, null, true);
Expand Down Expand Up @@ -158,14 +165,17 @@ private void initializeCatalogTables() throws InterruptedException, SQLException
dbMeta.getTables(
null /* catalog name */,
null /* schemaPattern */,
JdbcUtil.CATALOG_TABLE_NAME /* tableNamePattern */,
JdbcUtil.CATALOG_TABLE_VIEW_NAME /* tableNamePattern */,
null /* types */);
if (tableExists.next()) {
updateCatalogTables(conn);
return true;
}

LOG.debug("Creating table {} to store iceberg catalog", JdbcUtil.CATALOG_TABLE_NAME);
return conn.prepareStatement(JdbcUtil.CREATE_CATALOG_TABLE).execute();
LOG.debug(
"Creating table {} to store iceberg catalog tables",
JdbcUtil.CATALOG_TABLE_VIEW_NAME);
return conn.prepareStatement(JdbcUtil.CREATE_CATALOG_SQL).execute();
});

connections.run(
Expand All @@ -185,14 +195,39 @@ private void initializeCatalogTables() throws InterruptedException, SQLException
LOG.debug(
"Creating table {} to store iceberg catalog namespace properties",
JdbcUtil.NAMESPACE_PROPERTIES_TABLE_NAME);
return conn.prepareStatement(JdbcUtil.CREATE_NAMESPACE_PROPERTIES_TABLE).execute();
return conn.prepareStatement(JdbcUtil.CREATE_NAMESPACE_PROPERTIES_TABLE_SQL).execute();
});
}

private void updateCatalogTables(Connection connection) throws SQLException {
DatabaseMetaData dbMeta = connection.getMetaData();
ResultSet typeColumn =
dbMeta.getColumns(null, null, JdbcUtil.CATALOG_TABLE_VIEW_NAME, JdbcUtil.RECORD_TYPE);
if (typeColumn.next()) {
LOG.debug("{} already supports views", JdbcUtil.CATALOG_TABLE_VIEW_NAME);
} else {
if (PropertyUtil.propertyAsBoolean(
catalogProperties, JdbcUtil.ADD_VIEW_SUPPORT_PROPERTY, false)) {
connection.prepareStatement(JdbcUtil.UPDATE_CATALOG_SQL).execute();
} else {
LOG.warn(VIEW_WARNING_LOG_MESSAGE);
schemaVersion = JdbcUtil.SchemaVersion.V0;
}
}
}

@Override
protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
return new JdbcTableOperations(
connections, io, catalogName, tableIdentifier, catalogProperties);
connections, io, catalogName, tableIdentifier, catalogProperties, schemaVersion);
}

@Override
protected ViewOperations newViewOps(TableIdentifier viewIdentifier) {
if (schemaVersion != JdbcUtil.SchemaVersion.V1) {
throw new UnsupportedOperationException(VIEW_WARNING_LOG_MESSAGE);
}
return new JdbcViewOperations(connections, io, catalogName, viewIdentifier, catalogProperties);
}

@Override
Expand All @@ -217,7 +252,9 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) {

int deletedRecords =
execute(
JdbcUtil.DROP_TABLE_SQL,
(schemaVersion == JdbcUtil.SchemaVersion.V1)
? JdbcUtil.V1_DROP_TABLE_SQL
: JdbcUtil.V0_DROP_TABLE_SQL,
catalogName,
JdbcUtil.namespaceToString(identifier.namespace()),
identifier.name());
Expand Down Expand Up @@ -245,13 +282,35 @@ public List<TableIdentifier> listTables(Namespace namespace) {
row ->
JdbcUtil.stringToTableIdentifier(
row.getString(JdbcUtil.TABLE_NAMESPACE), row.getString(JdbcUtil.TABLE_NAME)),
JdbcUtil.LIST_TABLES_SQL,
(schemaVersion == JdbcUtil.SchemaVersion.V1)
? JdbcUtil.V1_LIST_TABLE_SQL
: JdbcUtil.V0_LIST_TABLE_SQL,
catalogName,
JdbcUtil.namespaceToString(namespace));
}

@Override
public void renameTable(TableIdentifier from, TableIdentifier to) {
if (from.equals(to)) {
return;
}

if (!tableExists(from)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably slightly change the ordering here to align with other catalog impls:

  • check if namespace exists
  • check if from exists

I think we should also align renameView(..) to that as well

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to test the table first to make TestJdbcCatalog#testRenameTableMissingSourceTable() happy, else I get:

    Expecting actual throwable to be an instance of:
      org.apache.iceberg.exceptions.NoSuchTableException
    but was:
      org.apache.iceberg.exceptions.NoSuchNamespaceException: Namespace does not exist: newdb

The reason is because, the test expects the NoSuchTableException because the NoSuchNamespaceException one.

throw new NoSuchTableException("Table does not exist: %s", from);
}

if (!namespaceExists(to.namespace())) {
throw new NoSuchNamespaceException("Namespace does not exist: %s", to.namespace());
}

if (viewExists(to)) {
throw new AlreadyExistsException("Cannot rename %s to %s. View already exists", from, to);
}

if (tableExists(to)) {
throw new AlreadyExistsException("Table already exists: %s", to);
}

int updatedRecords =
execute(
err -> {
Expand All @@ -261,7 +320,9 @@ public void renameTable(TableIdentifier from, TableIdentifier to) {
throw new AlreadyExistsException("Table already exists: %s", to);
}
},
JdbcUtil.RENAME_TABLE_SQL,
(schemaVersion == JdbcUtil.SchemaVersion.V1)
? JdbcUtil.V1_RENAME_TABLE_SQL
: JdbcUtil.V0_RENAME_TABLE_SQL,
JdbcUtil.namespaceToString(to.namespace()),
to.name(),
catalogName,
Expand Down Expand Up @@ -315,7 +376,7 @@ public List<Namespace> listNamespaces() {
namespaces.addAll(
fetch(
row -> JdbcUtil.stringToNamespace(row.getString(JdbcUtil.TABLE_NAMESPACE)),
JdbcUtil.LIST_ALL_TABLE_NAMESPACES_SQL,
JdbcUtil.LIST_ALL_NAMESPACES_SQL,
catalogName));
namespaces.addAll(
fetch(
Expand Down Expand Up @@ -503,6 +564,101 @@ public boolean namespaceExists(Namespace namespace) {
return JdbcUtil.namespaceExists(catalogName, connections, namespace);
}

@Override
public boolean dropView(TableIdentifier identifier) {
if (schemaVersion != JdbcUtil.SchemaVersion.V1) {
throw new UnsupportedOperationException(VIEW_WARNING_LOG_MESSAGE);
}

int deletedRecords =
execute(
JdbcUtil.DROP_VIEW_SQL,
catalogName,
JdbcUtil.namespaceToString(identifier.namespace()),
identifier.name());

if (deletedRecords == 0) {
LOG.info("Skipping drop, view does not exist: {}", identifier);
return false;
}

LOG.info("Dropped view: {}", identifier);
return true;
}

@Override
public List<TableIdentifier> listViews(Namespace namespace) {
if (schemaVersion != JdbcUtil.SchemaVersion.V1) {
throw new UnsupportedOperationException(VIEW_WARNING_LOG_MESSAGE);
}

if (!namespaceExists(namespace)) {
throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
}

return fetch(
row ->
JdbcUtil.stringToTableIdentifier(
row.getString(JdbcUtil.TABLE_NAMESPACE), row.getString(JdbcUtil.TABLE_NAME)),
JdbcUtil.LIST_VIEW_SQL,
catalogName,
JdbcUtil.namespaceToString(namespace));
}

@Override
public void renameView(TableIdentifier from, TableIdentifier to) {
if (schemaVersion != JdbcUtil.SchemaVersion.V1) {
throw new UnsupportedOperationException(VIEW_WARNING_LOG_MESSAGE);
}

if (from.equals(to)) {
return;
}

if (!namespaceExists(to.namespace())) {
throw new NoSuchNamespaceException("Namespace does not exist: %s", to.namespace());
}

if (!viewExists(from)) {
throw new NoSuchViewException("View does not exist");
}

if (tableExists(to)) {
throw new AlreadyExistsException("Cannot rename %s to %s. Table already exists", from, to);
}

if (viewExists(to)) {
throw new AlreadyExistsException("Cannot rename %s to %s. View already exists", from, to);
}

int updatedRecords =
execute(
err -> {
// SQLite doesn't set SQLState or throw SQLIntegrityConstraintViolationException
if (err instanceof SQLIntegrityConstraintViolationException
|| (err.getMessage() != null && err.getMessage().contains("constraint failed"))) {
throw new AlreadyExistsException(
"Cannot rename %s to %s. View already exists", from, to);
}
},
JdbcUtil.RENAME_VIEW_SQL,
JdbcUtil.namespaceToString(to.namespace()),
to.name(),
catalogName,
JdbcUtil.namespaceToString(from.namespace()),
from.name());

if (updatedRecords == 1) {
LOG.info("Renamed view from {}, to {}", from, to);
} else if (updatedRecords == 0) {
throw new NoSuchViewException("View does not exist: %s", from);
} else {
LOG.warn(
"Rename operation affected {} rows: the catalog view's primary key assumption has been violated",
updatedRecords);
}
}

private int execute(String sql, String... args) {
return execute(err -> {}, sql, args);
}
Expand Down
Loading