Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/src/main/sphinx/connector/accumulo.rst
Original file line number Diff line number Diff line change
Expand Up @@ -708,3 +708,5 @@ statements, the connector supports the following features:
* :doc:`/sql/create-table`
* :doc:`/sql/create-table-as`
* :doc:`/sql/drop-table`
* :doc:`/sql/create-schema`
* :doc:`/sql/drop-schema`
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.SchemaNotFoundException;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.predicate.Domain;
Expand Down Expand Up @@ -71,6 +72,7 @@
import static io.trino.plugin.accumulo.AccumuloErrorCode.ACCUMULO_TABLE_DNE;
import static io.trino.plugin.accumulo.AccumuloErrorCode.ACCUMULO_TABLE_EXISTS;
import static io.trino.plugin.accumulo.AccumuloErrorCode.UNEXPECTED_ACCUMULO_ERROR;
import static io.trino.plugin.accumulo.metadata.ZooKeeperMetadataManager.DEFAULT_SCHEMA;
import static io.trino.spi.StandardErrorCode.ALREADY_EXISTS;
import static io.trino.spi.StandardErrorCode.FUNCTION_IMPLEMENTATION_ERROR;
import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY;
Expand Down Expand Up @@ -111,6 +113,23 @@ public AccumuloClient(
this.indexLookup = requireNonNull(indexLookup, "indexLookup is null");

this.auths = connector.securityOperations().getUserAuthorizations(username);

// The default namespace is created in ZooKeeperMetadataManager's constructor
if (!tableManager.namespaceExists(DEFAULT_SCHEMA)) {
tableManager.createNamespace(DEFAULT_SCHEMA);
}
}

public void createSchema(String schemaName)
{
metaManager.createSchema(schemaName);
tableManager.createNamespace(schemaName);
}

public void dropSchema(String schemaName)
{
metaManager.dropSchema(schemaName);
tableManager.dropNamespace(schemaName);
}

public AccumuloTable createTable(ConnectorTableMetadata meta)
Expand All @@ -134,12 +153,14 @@ public AccumuloTable createTable(ConnectorTableMetadata meta)
AccumuloTableProperties.getSerializerClass(tableProperties),
AccumuloTableProperties.getScanAuthorizations(tableProperties));

// Make sure the namespace exists
if (!tableManager.namespaceExists(table.getSchema())) {
throw new SchemaNotFoundException(table.getSchema());
}

// First, create the metadata
metaManager.createTableMetadata(table);

// Make sure the namespace exists
tableManager.ensureNamespace(table.getSchema());

// Create the Accumulo table if it does not exist (for 'external' table)
if (!tableManager.exists(table.getFullTableName())) {
tableManager.createAccumuloTable(table.getFullTableName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.trino.spi.connector.SchemaTablePrefix;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.security.TrinoPrincipal;
import io.trino.spi.statistics.ComputedStatistics;

import javax.inject.Inject;
Expand All @@ -54,6 +55,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static io.trino.plugin.accumulo.AccumuloErrorCode.ACCUMULO_TABLE_EXISTS;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
Expand All @@ -79,6 +81,19 @@ public AccumuloMetadata(AccumuloClient client)
this.client = requireNonNull(client, "client is null");
}

@Override
public void createSchema(ConnectorSession session, String schemaName, Map<String, Object> properties, TrinoPrincipal owner)
{
checkArgument(properties.isEmpty(), "Can't have properties for schema creation");
client.createSchema(schemaName);
}

@Override
public void dropSchema(ConnectorSession session, String schemaName)
{
client.dropSchema(schemaName);
}

@Override
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.NamespaceExistsException;
import org.apache.accumulo.core.client.NamespaceNotEmptyException;
import org.apache.accumulo.core.client.NamespaceNotFoundException;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
Expand All @@ -43,7 +45,6 @@
public class AccumuloTableManager
{
private static final Logger LOG = Logger.get(AccumuloTableManager.class);
private static final String DEFAULT = "default";
private final Connector connector;

@Inject
Expand All @@ -52,25 +53,33 @@ public AccumuloTableManager(Connector connector)
this.connector = requireNonNull(connector, "connector is null");
}

/**
* Ensures the given Accumulo namespace exist, creating it if necessary
*
* @param schema Trino schema (Accumulo namespace)
*/
public void ensureNamespace(String schema)
public void createNamespace(String schema)
{
try {
// If the table schema is not "default" and the namespace does not exist, create it
if (!schema.equals(DEFAULT) && !connector.namespaceOperations().exists(schema)) {
connector.namespaceOperations().create(schema);
}
connector.namespaceOperations().create(schema);
}
catch (AccumuloException | AccumuloSecurityException e) {
throw new TrinoException(UNEXPECTED_ACCUMULO_ERROR, "Failed to check for existence or create Accumulo namespace", e);
catch (AccumuloException | AccumuloSecurityException | NamespaceExistsException e) {
throw new TrinoException(UNEXPECTED_ACCUMULO_ERROR, "Failed to create Accumulo namespace: " + schema, e);
}
}

public void dropNamespace(String schema)
{
try {
connector.namespaceOperations().delete(schema);
}
catch (NamespaceExistsException e) {
// Suppress race condition between test for existence and creation
LOG.warn("NamespaceExistsException suppressed when creating %s", schema);
catch (AccumuloException | AccumuloSecurityException | NamespaceNotFoundException | NamespaceNotEmptyException e) {
throw new TrinoException(UNEXPECTED_ACCUMULO_ERROR, "Failed to delete Accumulo namespace: " + schema, e);
}
}

public boolean namespaceExists(String schema)
{
try {
return connector.namespaceOperations().exists(schema);
}
catch (AccumuloException | AccumuloSecurityException e) {
throw new TrinoException(UNEXPECTED_ACCUMULO_ERROR, "Failed to check for existence Accumulo namespace: " + schema, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@

public class ZooKeeperMetadataManager
{
private static final String DEFAULT_SCHEMA = "default";
public static final String DEFAULT_SCHEMA = "default";

private final CuratorFramework curator;
private final ObjectMapper mapper;
Expand Down Expand Up @@ -92,6 +92,26 @@ public ZooKeeperMetadataManager(AccumuloConfig config, TypeManager typeManager)
}
}

public void createSchema(String schemaName)
{
try {
curator.create().forPath("/" + schemaName);
}
catch (Exception e) {
throw new TrinoException(ZOOKEEPER_ERROR, "ZK error creating schema: " + schemaName, e);
}
}

public void dropSchema(String schemaName)
{
try {
curator.delete().forPath("/" + schemaName);
}
catch (Exception e) {
throw new TrinoException(ZOOKEEPER_ERROR, "ZK error deleting schema: " + schemaName, e);
}
}

public Set<String> getSchemaNames()
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public static synchronized DistributedQueryRunner createAccumuloQueryRunner(Map<
queryRunner.createCatalog("accumulo", "accumulo", accumuloProperties);

if (!tpchLoaded) {
queryRunner.execute("CREATE SCHEMA accumulo.tpch");
copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, createSession(), TpchTable.getTables());
server.getConnector().tableOperations().addSplits("tpch.orders", ImmutableSortedSet.of(new Text(new LexicoderRowSerializer().encode(BIGINT, 7500L))));
tpchLoaded = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ protected QueryRunner createQueryRunner()
protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
{
switch (connectorBehavior) {
case SUPPORTS_CREATE_SCHEMA:
case SUPPORTS_RENAME_SCHEMA:
return false;

case SUPPORTS_RENAME_TABLE_ACROSS_SCHEMAS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.trino.spi.connector.FixedSplitSource;
import io.trino.spi.connector.JoinStatistics;
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.SchemaNotFoundException;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.predicate.TupleDomain;
Expand Down Expand Up @@ -73,7 +74,6 @@
import static io.trino.plugin.jdbc.StandardColumnMappings.varcharReadFunction;
import static io.trino.plugin.jdbc.TypeHandlingJdbcSessionProperties.getUnsupportedTypeHandling;
import static io.trino.plugin.jdbc.UnsupportedTypeHandling.IGNORE;
import static io.trino.spi.StandardErrorCode.NOT_FOUND;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.type.VarcharType.createUnboundedVarcharType;
import static java.lang.String.CASE_INSENSITIVE_ORDER;
Expand Down Expand Up @@ -501,7 +501,7 @@ protected JdbcOutputTableHandle createTable(ConnectorSession session, ConnectorT

ConnectorIdentity identity = session.getIdentity();
if (!getSchemaNames(session).contains(schemaTableName.getSchemaName())) {
throw new TrinoException(NOT_FOUND, "Schema not found: " + schemaTableName.getSchemaName());
throw new SchemaNotFoundException(schemaTableName.getSchemaName());
}

try (Connection connection = connectionFactory.openConnection(session)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,11 @@ private void createTable(ConnectorSession session, ConnectorTableMetadata tableM
SchemaTableName schemaTableName = tableMetadata.getTable();
String schemaName = schemaTableName.getSchemaName();
String tableName = schemaTableName.getTableName();

if (!schemaExists(session, schemaName)) {
throw new SchemaNotFoundException(schemaName);
}

List<Field> fields = tableMetadata.getColumns().stream()
.map(column -> toField(column.getName(), column.getType()))
.collect(toImmutableList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ public MongoTable getTable(SchemaTableName tableName)

public void createTable(SchemaTableName name, List<MongoColumnHandle> columns, Optional<String> comment)
{
if (!getAllSchemas().contains(name.getSchemaName())) {
throw new SchemaNotFoundException(name.getSchemaName());
}
createTableMetadata(name, columns, comment);
// collection is created implicitly
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public static DistributedQueryRunner createMongoQueryRunner(MongoServer server,

queryRunner.installPlugin(new MongoPlugin());
queryRunner.createCatalog("mongodb", "mongodb", properties);
queryRunner.execute("CREATE SCHEMA mongodb." + TPCH_SCHEMA);

copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, createSession(), tables);
return queryRunner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,24 @@ public void testShowCreateTable()
assertEquals(getOnlyElement(actualResult.getOnlyColumnAsSet()), createTableSql);
}

@Test
@Override
public void testCreateTableSchemaNotFound()
{
// TODO (https://github.com/trinodb/trino/issues/11110) Raptor connector can create new tables in a schema where it doesn't exist
assertThatThrownBy(super::testCreateTableSchemaNotFound)
.hasMessageContaining("Expected query to fail: CREATE TABLE test_schema_");
}

@Test
@Override
public void testCreateTableAsSelectSchemaNotFound()
{
// TODO (https://github.com/trinodb/trino/issues/11110) Raptor connector can create new tables in a schema where it doesn't exist
assertThatThrownBy(super::testCreateTableAsSelectSchemaNotFound)
.hasMessageContaining("Expected query to fail: CREATE TABLE test_schema_");
}

@Test
public void testTablesSystemTable()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1868,6 +1868,23 @@ public void testCreateTable()
assertFalse(getQueryRunner().tableExists(getSession(), tableNameLike));
}

@Test
public void testCreateTableSchemaNotFound()
{
skipTestUnless(hasBehavior(SUPPORTS_CREATE_TABLE));

String schemaName = "test_schema_" + randomTableSuffix();
String tableName = "test_create_no_schema_" + randomTableSuffix();
try {
assertQueryFails(
format("CREATE TABLE %s.%s (a bigint)", schemaName, tableName),
format("Schema %s not found", schemaName));
}
finally {
assertUpdate(format("DROP TABLE IF EXISTS %s.%s", schemaName, tableName));
}
}

@Test
public void testCreateTableAsSelect()
{
Expand Down Expand Up @@ -1940,6 +1957,23 @@ public void testCreateTableAsSelect()
assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testCreateTableAsSelectSchemaNotFound()
{
skipTestUnless(hasBehavior(SUPPORTS_CREATE_TABLE_WITH_DATA));

String schemaName = "test_schema_" + randomTableSuffix();
String tableName = "test_ctas_no_schema_" + randomTableSuffix();
try {
assertQueryFails(
format("CREATE TABLE %s.%s AS SELECT name FROM nation", schemaName, tableName),
format("Schema %s not found", schemaName));
}
finally {
assertUpdate(format("DROP TABLE IF EXISTS %s.%s", schemaName, tableName));
}
}

@Test
public void testCreateTableAsSelectWithUnicode()
{
Expand Down