diff --git a/docs/src/main/sphinx/connector/accumulo.rst b/docs/src/main/sphinx/connector/accumulo.rst index 20e50b5023cd..82ffcd5e0efe 100644 --- a/docs/src/main/sphinx/connector/accumulo.rst +++ b/docs/src/main/sphinx/connector/accumulo.rst @@ -708,3 +708,5 @@ statements, the connector supports the following features: * :doc:`/sql/create-table` * :doc:`/sql/create-table-as` * :doc:`/sql/drop-table` +* :doc:`/sql/create-schema` +* :doc:`/sql/drop-schema` diff --git a/plugin/trino-accumulo/src/main/java/io/trino/plugin/accumulo/AccumuloClient.java b/plugin/trino-accumulo/src/main/java/io/trino/plugin/accumulo/AccumuloClient.java index 5e4bc730d72e..efbf3b4647c2 100644 --- a/plugin/trino-accumulo/src/main/java/io/trino/plugin/accumulo/AccumuloClient.java +++ b/plugin/trino-accumulo/src/main/java/io/trino/plugin/accumulo/AccumuloClient.java @@ -36,6 +36,7 @@ import io.trino.spi.connector.ColumnMetadata; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorTableMetadata; +import io.trino.spi.connector.SchemaNotFoundException; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.TableNotFoundException; import io.trino.spi.predicate.Domain; @@ -71,6 +72,7 @@ import static io.trino.plugin.accumulo.AccumuloErrorCode.ACCUMULO_TABLE_DNE; import static io.trino.plugin.accumulo.AccumuloErrorCode.ACCUMULO_TABLE_EXISTS; import static io.trino.plugin.accumulo.AccumuloErrorCode.UNEXPECTED_ACCUMULO_ERROR; +import static io.trino.plugin.accumulo.metadata.ZooKeeperMetadataManager.DEFAULT_SCHEMA; import static io.trino.spi.StandardErrorCode.ALREADY_EXISTS; import static io.trino.spi.StandardErrorCode.FUNCTION_IMPLEMENTATION_ERROR; import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY; @@ -111,6 +113,23 @@ public AccumuloClient( this.indexLookup = requireNonNull(indexLookup, "indexLookup is null"); this.auths = connector.securityOperations().getUserAuthorizations(username); + + // The default namespace is created in ZooKeeperMetadataManager's constructor + if (!tableManager.namespaceExists(DEFAULT_SCHEMA)) { + tableManager.createNamespace(DEFAULT_SCHEMA); + } + } + + public void createSchema(String schemaName) + { + metaManager.createSchema(schemaName); + tableManager.createNamespace(schemaName); + } + + public void dropSchema(String schemaName) + { + metaManager.dropSchema(schemaName); + tableManager.dropNamespace(schemaName); } public AccumuloTable createTable(ConnectorTableMetadata meta) @@ -134,12 +153,14 @@ public AccumuloTable createTable(ConnectorTableMetadata meta) AccumuloTableProperties.getSerializerClass(tableProperties), AccumuloTableProperties.getScanAuthorizations(tableProperties)); + // Make sure the namespace exists + if (!tableManager.namespaceExists(table.getSchema())) { + throw new SchemaNotFoundException(table.getSchema()); + } + // First, create the metadata metaManager.createTableMetadata(table); - // Make sure the namespace exists - tableManager.ensureNamespace(table.getSchema()); - // Create the Accumulo table if it does not exist (for 'external' table) if (!tableManager.exists(table.getFullTableName())) { tableManager.createAccumuloTable(table.getFullTableName()); diff --git a/plugin/trino-accumulo/src/main/java/io/trino/plugin/accumulo/AccumuloMetadata.java b/plugin/trino-accumulo/src/main/java/io/trino/plugin/accumulo/AccumuloMetadata.java index 4756126540cc..a48a946091b2 100644 --- a/plugin/trino-accumulo/src/main/java/io/trino/plugin/accumulo/AccumuloMetadata.java +++ b/plugin/trino-accumulo/src/main/java/io/trino/plugin/accumulo/AccumuloMetadata.java @@ -42,6 +42,7 @@ import io.trino.spi.connector.SchemaTablePrefix; import io.trino.spi.connector.TableNotFoundException; import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.security.TrinoPrincipal; import io.trino.spi.statistics.ComputedStatistics; import javax.inject.Inject; @@ -54,6 +55,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicReference; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static io.trino.plugin.accumulo.AccumuloErrorCode.ACCUMULO_TABLE_EXISTS; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; @@ -79,6 +81,19 @@ public AccumuloMetadata(AccumuloClient client) this.client = requireNonNull(client, "client is null"); } + @Override + public void createSchema(ConnectorSession session, String schemaName, Map properties, TrinoPrincipal owner) + { + checkArgument(properties.isEmpty(), "Can't have properties for schema creation"); + client.createSchema(schemaName); + } + + @Override + public void dropSchema(ConnectorSession session, String schemaName) + { + client.dropSchema(schemaName); + } + @Override public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional layout) { diff --git a/plugin/trino-accumulo/src/main/java/io/trino/plugin/accumulo/AccumuloTableManager.java b/plugin/trino-accumulo/src/main/java/io/trino/plugin/accumulo/AccumuloTableManager.java index 25dbf3fa5ad5..7b5462cd3169 100644 --- a/plugin/trino-accumulo/src/main/java/io/trino/plugin/accumulo/AccumuloTableManager.java +++ b/plugin/trino-accumulo/src/main/java/io/trino/plugin/accumulo/AccumuloTableManager.java @@ -20,6 +20,8 @@ import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.NamespaceExistsException; +import org.apache.accumulo.core.client.NamespaceNotEmptyException; +import org.apache.accumulo.core.client.NamespaceNotFoundException; import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; @@ -43,7 +45,6 @@ public class AccumuloTableManager { private static final Logger LOG = Logger.get(AccumuloTableManager.class); - private static final String DEFAULT = "default"; private final Connector connector; @Inject @@ -52,25 +53,33 @@ public AccumuloTableManager(Connector connector) this.connector = requireNonNull(connector, "connector is null"); } - /** - * Ensures the given Accumulo namespace exist, creating it if necessary - * - * @param schema Trino schema (Accumulo namespace) - */ - public void ensureNamespace(String schema) + public void createNamespace(String schema) { try { - // If the table schema is not "default" and the namespace does not exist, create it - if (!schema.equals(DEFAULT) && !connector.namespaceOperations().exists(schema)) { - connector.namespaceOperations().create(schema); - } + connector.namespaceOperations().create(schema); } - catch (AccumuloException | AccumuloSecurityException e) { - throw new TrinoException(UNEXPECTED_ACCUMULO_ERROR, "Failed to check for existence or create Accumulo namespace", e); + catch (AccumuloException | AccumuloSecurityException | NamespaceExistsException e) { + throw new TrinoException(UNEXPECTED_ACCUMULO_ERROR, "Failed to create Accumulo namespace: " + schema, e); + } + } + + public void dropNamespace(String schema) + { + try { + connector.namespaceOperations().delete(schema); } - catch (NamespaceExistsException e) { - // Suppress race condition between test for existence and creation - LOG.warn("NamespaceExistsException suppressed when creating %s", schema); + catch (AccumuloException | AccumuloSecurityException | NamespaceNotFoundException | NamespaceNotEmptyException e) { + throw new TrinoException(UNEXPECTED_ACCUMULO_ERROR, "Failed to delete Accumulo namespace: " + schema, e); + } + } + + public boolean namespaceExists(String schema) + { + try { + return connector.namespaceOperations().exists(schema); + } + catch (AccumuloException | AccumuloSecurityException e) { + throw new TrinoException(UNEXPECTED_ACCUMULO_ERROR, "Failed to check for existence Accumulo namespace: " + schema, e); } } diff --git a/plugin/trino-accumulo/src/main/java/io/trino/plugin/accumulo/metadata/ZooKeeperMetadataManager.java b/plugin/trino-accumulo/src/main/java/io/trino/plugin/accumulo/metadata/ZooKeeperMetadataManager.java index 6d9fef88c2c2..d546cc21cbd7 100644 --- a/plugin/trino-accumulo/src/main/java/io/trino/plugin/accumulo/metadata/ZooKeeperMetadataManager.java +++ b/plugin/trino-accumulo/src/main/java/io/trino/plugin/accumulo/metadata/ZooKeeperMetadataManager.java @@ -44,7 +44,7 @@ public class ZooKeeperMetadataManager { - private static final String DEFAULT_SCHEMA = "default"; + public static final String DEFAULT_SCHEMA = "default"; private final CuratorFramework curator; private final ObjectMapper mapper; @@ -92,6 +92,26 @@ public ZooKeeperMetadataManager(AccumuloConfig config, TypeManager typeManager) } } + public void createSchema(String schemaName) + { + try { + curator.create().forPath("/" + schemaName); + } + catch (Exception e) { + throw new TrinoException(ZOOKEEPER_ERROR, "ZK error creating schema: " + schemaName, e); + } + } + + public void dropSchema(String schemaName) + { + try { + curator.delete().forPath("/" + schemaName); + } + catch (Exception e) { + throw new TrinoException(ZOOKEEPER_ERROR, "ZK error deleting schema: " + schemaName, e); + } + } + public Set getSchemaNames() { try { diff --git a/plugin/trino-accumulo/src/test/java/io/trino/plugin/accumulo/AccumuloQueryRunner.java b/plugin/trino-accumulo/src/test/java/io/trino/plugin/accumulo/AccumuloQueryRunner.java index ea4a3576c1d4..884cf4432c76 100644 --- a/plugin/trino-accumulo/src/test/java/io/trino/plugin/accumulo/AccumuloQueryRunner.java +++ b/plugin/trino-accumulo/src/test/java/io/trino/plugin/accumulo/AccumuloQueryRunner.java @@ -68,6 +68,7 @@ public static synchronized DistributedQueryRunner createAccumuloQueryRunner(Map< queryRunner.createCatalog("accumulo", "accumulo", accumuloProperties); if (!tpchLoaded) { + queryRunner.execute("CREATE SCHEMA accumulo.tpch"); copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, createSession(), TpchTable.getTables()); server.getConnector().tableOperations().addSplits("tpch.orders", ImmutableSortedSet.of(new Text(new LexicoderRowSerializer().encode(BIGINT, 7500L)))); tpchLoaded = true; diff --git a/plugin/trino-accumulo/src/test/java/io/trino/plugin/accumulo/TestAccumuloConnectorTest.java b/plugin/trino-accumulo/src/test/java/io/trino/plugin/accumulo/TestAccumuloConnectorTest.java index d906cc965e39..6404b1049d5c 100644 --- a/plugin/trino-accumulo/src/test/java/io/trino/plugin/accumulo/TestAccumuloConnectorTest.java +++ b/plugin/trino-accumulo/src/test/java/io/trino/plugin/accumulo/TestAccumuloConnectorTest.java @@ -58,7 +58,7 @@ protected QueryRunner createQueryRunner() protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) { switch (connectorBehavior) { - case SUPPORTS_CREATE_SCHEMA: + case SUPPORTS_RENAME_SCHEMA: return false; case SUPPORTS_RENAME_TABLE_ACROSS_SCHEMAS: diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/BaseJdbcClient.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/BaseJdbcClient.java index ba806f48455b..1e17f762c864 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/BaseJdbcClient.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/BaseJdbcClient.java @@ -29,6 +29,7 @@ import io.trino.spi.connector.FixedSplitSource; import io.trino.spi.connector.JoinStatistics; import io.trino.spi.connector.JoinType; +import io.trino.spi.connector.SchemaNotFoundException; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.TableNotFoundException; import io.trino.spi.predicate.TupleDomain; @@ -73,7 +74,6 @@ import static io.trino.plugin.jdbc.StandardColumnMappings.varcharReadFunction; import static io.trino.plugin.jdbc.TypeHandlingJdbcSessionProperties.getUnsupportedTypeHandling; import static io.trino.plugin.jdbc.UnsupportedTypeHandling.IGNORE; -import static io.trino.spi.StandardErrorCode.NOT_FOUND; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.type.VarcharType.createUnboundedVarcharType; import static java.lang.String.CASE_INSENSITIVE_ORDER; @@ -501,7 +501,7 @@ protected JdbcOutputTableHandle createTable(ConnectorSession session, ConnectorT ConnectorIdentity identity = session.getIdentity(); if (!getSchemaNames(session).contains(schemaTableName.getSchemaName())) { - throw new TrinoException(NOT_FOUND, "Schema not found: " + schemaTableName.getSchemaName()); + throw new SchemaNotFoundException(schemaTableName.getSchemaName()); } try (Connection connection = connectionFactory.openConnection(session)) { diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java index a6beea84dfad..55199a5d332d 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java @@ -369,6 +369,11 @@ private void createTable(ConnectorSession session, ConnectorTableMetadata tableM SchemaTableName schemaTableName = tableMetadata.getTable(); String schemaName = schemaTableName.getSchemaName(); String tableName = schemaTableName.getTableName(); + + if (!schemaExists(session, schemaName)) { + throw new SchemaNotFoundException(schemaName); + } + List fields = tableMetadata.getColumns().stream() .map(column -> toField(column.getName(), column.getType())) .collect(toImmutableList()); diff --git a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoSession.java b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoSession.java index 7a8e64148fe0..8346f74928a0 100644 --- a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoSession.java +++ b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoSession.java @@ -201,6 +201,9 @@ public MongoTable getTable(SchemaTableName tableName) public void createTable(SchemaTableName name, List columns, Optional comment) { + if (!getAllSchemas().contains(name.getSchemaName())) { + throw new SchemaNotFoundException(name.getSchemaName()); + } createTableMetadata(name, columns, comment); // collection is created implicitly } diff --git a/plugin/trino-mongodb/src/test/java/io/trino/plugin/mongodb/MongoQueryRunner.java b/plugin/trino-mongodb/src/test/java/io/trino/plugin/mongodb/MongoQueryRunner.java index 49451e1a5880..ccb433041df4 100644 --- a/plugin/trino-mongodb/src/test/java/io/trino/plugin/mongodb/MongoQueryRunner.java +++ b/plugin/trino-mongodb/src/test/java/io/trino/plugin/mongodb/MongoQueryRunner.java @@ -60,6 +60,7 @@ public static DistributedQueryRunner createMongoQueryRunner(MongoServer server, queryRunner.installPlugin(new MongoPlugin()); queryRunner.createCatalog("mongodb", "mongodb", properties); + queryRunner.execute("CREATE SCHEMA mongodb." + TPCH_SCHEMA); copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, createSession(), tables); return queryRunner; diff --git a/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/BaseRaptorConnectorTest.java b/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/BaseRaptorConnectorTest.java index 4621e39b4d8f..22a3d89c6448 100644 --- a/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/BaseRaptorConnectorTest.java +++ b/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/BaseRaptorConnectorTest.java @@ -659,6 +659,24 @@ public void testShowCreateTable() assertEquals(getOnlyElement(actualResult.getOnlyColumnAsSet()), createTableSql); } + @Test + @Override + public void testCreateTableSchemaNotFound() + { + // TODO (https://github.com/trinodb/trino/issues/11110) Raptor connector can create new tables in a schema where it doesn't exist + assertThatThrownBy(super::testCreateTableSchemaNotFound) + .hasMessageContaining("Expected query to fail: CREATE TABLE test_schema_"); + } + + @Test + @Override + public void testCreateTableAsSelectSchemaNotFound() + { + // TODO (https://github.com/trinodb/trino/issues/11110) Raptor connector can create new tables in a schema where it doesn't exist + assertThatThrownBy(super::testCreateTableAsSelectSchemaNotFound) + .hasMessageContaining("Expected query to fail: CREATE TABLE test_schema_"); + } + @Test public void testTablesSystemTable() { diff --git a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java index 4a6e07440dd9..393f26c46fe8 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java @@ -1868,6 +1868,23 @@ public void testCreateTable() assertFalse(getQueryRunner().tableExists(getSession(), tableNameLike)); } + @Test + public void testCreateTableSchemaNotFound() + { + skipTestUnless(hasBehavior(SUPPORTS_CREATE_TABLE)); + + String schemaName = "test_schema_" + randomTableSuffix(); + String tableName = "test_create_no_schema_" + randomTableSuffix(); + try { + assertQueryFails( + format("CREATE TABLE %s.%s (a bigint)", schemaName, tableName), + format("Schema %s not found", schemaName)); + } + finally { + assertUpdate(format("DROP TABLE IF EXISTS %s.%s", schemaName, tableName)); + } + } + @Test public void testCreateTableAsSelect() { @@ -1940,6 +1957,23 @@ public void testCreateTableAsSelect() assertUpdate("DROP TABLE " + tableName); } + @Test + public void testCreateTableAsSelectSchemaNotFound() + { + skipTestUnless(hasBehavior(SUPPORTS_CREATE_TABLE_WITH_DATA)); + + String schemaName = "test_schema_" + randomTableSuffix(); + String tableName = "test_ctas_no_schema_" + randomTableSuffix(); + try { + assertQueryFails( + format("CREATE TABLE %s.%s AS SELECT name FROM nation", schemaName, tableName), + format("Schema %s not found", schemaName)); + } + finally { + assertUpdate(format("DROP TABLE IF EXISTS %s.%s", schemaName, tableName)); + } + } + @Test public void testCreateTableAsSelectWithUnicode() {