diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/MyRaptorConnector.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/MyRaptorConnector.java new file mode 100644 index 000000000000..d2dde520aaa9 --- /dev/null +++ b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/MyRaptorConnector.java @@ -0,0 +1,242 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.raptor.legacy; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.SetMultimap; +import io.airlift.bootstrap.LifeCycleManager; +import io.airlift.log.Logger; +import io.trino.plugin.raptor.legacy.metadata.ForMetadata; +import io.trino.plugin.raptor.legacy.metadata.MetadataDao; +import io.trino.spi.NodeManager; +import io.trino.spi.connector.Connector; +import io.trino.spi.connector.ConnectorAccessControl; +import io.trino.spi.connector.ConnectorMetadata; +import io.trino.spi.connector.ConnectorNodePartitioningProvider; +import io.trino.spi.connector.ConnectorPageSinkProvider; +import io.trino.spi.connector.ConnectorPageSourceProvider; +import io.trino.spi.connector.ConnectorSplitManager; +import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.connector.SystemTable; +import io.trino.spi.session.PropertyMetadata; +import io.trino.spi.transaction.IsolationLevel; +import org.skife.jdbi.v2.IDBI; + +import javax.annotation.PostConstruct; +import javax.annotation.concurrent.GuardedBy; +import javax.inject.Inject; + +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Verify.verify; +import static io.airlift.concurrent.Threads.daemonThreadsNamed; +import static io.trino.plugin.raptor.legacy.util.DatabaseUtil.onDemandDao; +import static io.trino.spi.transaction.IsolationLevel.READ_COMMITTED; +import static io.trino.spi.transaction.IsolationLevel.checkConnectorSupports; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; +import static java.util.concurrent.TimeUnit.SECONDS; + +public class MyRaptorConnector + implements Connector +{ + private static final Logger log = Logger.get(RaptorConnector.class); + + private final LifeCycleManager lifeCycleManager; + private final RaptorMetadataFactory metadataFactory; + private final RaptorSplitManager splitManager; + private final RaptorPageSourceProvider pageSourceProvider; + private final RaptorPageSinkProvider pageSinkProvider; + private final RaptorNodePartitioningProvider nodePartitioningProvider; + private final List> sessionProperties; + private final List> tableProperties; + private final Set systemTables; + private final MetadataDao dao; + private final ConnectorAccessControl accessControl; + private final boolean coordinator; + + private final ConcurrentMap transactions = new ConcurrentHashMap<>(); + + private final ScheduledExecutorService unblockMaintenanceExecutor = newSingleThreadScheduledExecutor(daemonThreadsNamed("raptor-unblock-maintenance")); + + @GuardedBy("this") + private final SetMultimap deletions = HashMultimap.create(); + + @Inject + public MyRaptorConnector( + LifeCycleManager lifeCycleManager, + NodeManager nodeManager, + RaptorMetadataFactory metadataFactory, + RaptorSplitManager splitManager, + RaptorPageSourceProvider pageSourceProvider, + RaptorPageSinkProvider pageSinkProvider, + RaptorNodePartitioningProvider nodePartitioningProvider, + RaptorSessionProperties sessionProperties, + RaptorTableProperties tableProperties, + Set systemTables, + ConnectorAccessControl accessControl, + @ForMetadata IDBI dbi) + { + this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null"); + this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory is null"); + this.splitManager = requireNonNull(splitManager, "splitManager is null"); + this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null"); + this.pageSinkProvider = requireNonNull(pageSinkProvider, "pageSinkProvider is null"); + this.nodePartitioningProvider = requireNonNull(nodePartitioningProvider, "nodePartitioningProvider is null"); + this.sessionProperties = requireNonNull(sessionProperties, "sessionProperties is null").getSessionProperties(); + this.tableProperties = requireNonNull(tableProperties, "tableProperties is null").getTableProperties(); + this.systemTables = requireNonNull(systemTables, "systemTables is null"); + this.accessControl = requireNonNull(accessControl, "accessControl is null"); + this.dao = onDemandDao(dbi, MetadataDao.class); + this.coordinator = nodeManager.getCurrentNode().isCoordinator(); + } + + @PostConstruct + public void start() + { + if (coordinator) { + dao.unblockAllMaintenance(); + } + } + + @Override + public boolean isSingleStatementWritesOnly() + { + return true; + } + + @Override + public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly) + { + checkConnectorSupports(READ_COMMITTED, isolationLevel); + RaptorTransactionHandle transaction = new RaptorTransactionHandle(); + transactions.put(transaction, metadataFactory.create(tableId -> beginDelete(tableId, transaction.getUuid()))); + return transaction; + } + + @Override + public void commit(ConnectorTransactionHandle transaction) + { + checkArgument(transactions.remove(transaction) != null, "no such transaction: %s", transaction); + finishDelete(((RaptorTransactionHandle) transaction).getUuid()); + } + + @Override + public void rollback(ConnectorTransactionHandle transaction) + { + RaptorMetadata metadata = transactions.remove(transaction); + checkArgument(metadata != null, "no such transaction: %s", transaction); + finishDelete(((RaptorTransactionHandle) transaction).getUuid()); + metadata.rollback(); + } + + @Override + public ConnectorPageSourceProvider getPageSourceProvider() + { + return pageSourceProvider; + } + + @Override + public ConnectorPageSinkProvider getPageSinkProvider() + { + return pageSinkProvider; + } + + @Override + public ConnectorMetadata getMetadata(ConnectorTransactionHandle transaction) + { + RaptorMetadata metadata = transactions.get(transaction); + checkArgument(metadata != null, "no such transaction: %s", transaction); + return metadata; + } + + @Override + public ConnectorSplitManager getSplitManager() + { + return splitManager; + } + + @Override + public ConnectorNodePartitioningProvider getNodePartitioningProvider() + { + return nodePartitioningProvider; + } + + @Override + public List> getSessionProperties() + { + return sessionProperties; + } + + @Override + public List> getTableProperties() + { + return tableProperties; + } + + @Override + public Set getSystemTables() + { + return systemTables; + } + + @Override + public ConnectorAccessControl getAccessControl() + { + return accessControl; + } + + @Override + public final void shutdown() + { + lifeCycleManager.stop(); + } + + private synchronized void beginDelete(long tableId, UUID transactionId) + { + dao.blockMaintenance(tableId); + verify(deletions.put(tableId, transactionId)); + } + + private synchronized void finishDelete(UUID transactionId) + { + deletions.entries().stream() + .filter(entry -> entry.getValue().equals(transactionId)) + .findFirst() + .ifPresent(entry -> { + long tableId = entry.getKey(); + deletions.remove(tableId, transactionId); + if (!deletions.containsKey(tableId)) { + unblockMaintenance(tableId); + } + }); + } + + private void unblockMaintenance(long tableId) + { + try { + dao.unblockMaintenance(tableId); + } + catch (Throwable t) { + log.warn(t, "Failed to unblock maintenance for table ID %s, will retry", tableId); + unblockMaintenanceExecutor.schedule(() -> unblockMaintenance(tableId), 2, SECONDS); + } + } +} diff --git a/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/TestRaptorBucketedConnectorTest.java b/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/TestRaptorBucketedConnectorTest.java new file mode 100644 index 000000000000..86e93e0fb3d8 --- /dev/null +++ b/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/TestRaptorBucketedConnectorTest.java @@ -0,0 +1,67 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.raptor.legacy; + +import com.google.common.collect.ImmutableMap; +import io.trino.testing.QueryRunner; +import io.trino.tpch.TpchTable; +import org.testng.annotations.Test; + +import static io.trino.plugin.raptor.legacy.RaptorQueryRunner.createRaptorQueryRunner; +import static org.assertj.core.api.Assertions.assertThat; + +public class TestRaptorBucketedConnectorTest + extends TestRaptorConnectorTest +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return createRaptorQueryRunner(ImmutableMap.of(), TpchTable.getTables(), true, ImmutableMap.of()); + } + + @Test + @Override + public void testShowCreateTable() + { + assertThat(computeActual("SHOW CREATE TABLE orders").getOnlyValue()) + .isEqualTo("CREATE TABLE raptor.tpch.orders (\n" + + " orderkey bigint,\n" + + " custkey bigint,\n" + + " orderstatus varchar(1),\n" + + " totalprice double,\n" + + " orderdate date,\n" + + " orderpriority varchar(15),\n" + + " clerk varchar(15),\n" + + " shippriority integer,\n" + + " comment varchar(79)\n" + + ")\n" + + "WITH (\n" + + " bucket_count = 25,\n" + + " bucketed_on = ARRAY['orderkey'],\n" + + " distribution_name = 'order'\n" + + ")"); + } + + @Test + public void testShardsSystemTableBucketNumber() + { + assertQuery("" + + "SELECT count(DISTINCT bucket_number)\n" + + "FROM system.shards\n" + + "WHERE table_schema = 'tpch'\n" + + " AND table_name = 'orders'", + "SELECT 25"); + } +} diff --git a/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/TestRaptorIntegrationSmokeTest.java b/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/TestRaptorConnectorTest.java similarity index 94% rename from plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/TestRaptorIntegrationSmokeTest.java rename to plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/TestRaptorConnectorTest.java index 6d8aacc91388..b6f192a84384 100644 --- a/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/TestRaptorIntegrationSmokeTest.java +++ b/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/TestRaptorConnectorTest.java @@ -18,19 +18,22 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.SetMultimap; import io.trino.spi.type.ArrayType; -import io.trino.testing.AbstractTestIntegrationSmokeTest; +import io.trino.testing.BaseConnectorTest; import io.trino.testing.MaterializedResult; import io.trino.testing.MaterializedRow; import io.trino.testing.QueryRunner; +import io.trino.testing.TestingConnectorBehavior; +import io.trino.testing.sql.TestTable; import io.trino.testng.services.Flaky; -import io.trino.tpch.TpchTable; import org.intellij.lang.annotations.Language; +import org.testng.SkipException; import org.testng.annotations.Test; import java.time.LocalDate; import java.time.LocalDateTime; import java.util.Collection; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.StringJoiner; import java.util.UUID; @@ -57,15 +60,26 @@ import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; -public class TestRaptorIntegrationSmokeTest - // TODO extend BaseConnectorTest - extends AbstractTestIntegrationSmokeTest +public class TestRaptorConnectorTest + extends BaseConnectorTest { + @Override + protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) + { + switch (connectorBehavior) { + case SUPPORTS_COMMENT_ON_TABLE: + case SUPPORTS_COMMENT_ON_COLUMN: + return false; + default: + return super.hasBehavior(connectorBehavior); + } + } + @Override protected QueryRunner createQueryRunner() throws Exception { - return createRaptorQueryRunner(ImmutableMap.of(), TpchTable.getTables(), false, ImmutableMap.of()); + return createRaptorQueryRunner(ImmutableMap.of(), REQUIRED_TPCH_TABLES, false, ImmutableMap.of()); } @Test @@ -778,4 +792,44 @@ public void testAlterTable() assertUpdate("DROP TABLE test_alter_table"); } + + @Override + protected TestTable createTableWithDefaultColumns() + { + throw new SkipException("Raptor connector does not support column default values"); + } + + @Override + public void testCreateSchema() + { + throw new SkipException("Raptor connector does not support creating schemas"); + } + + @Override + protected Optional filterDataMappingSmokeTestData(DataMappingTestSetup dataMappingTestSetup) + { + String typeName = dataMappingTestSetup.getTrinoTypeName(); + if (typeName.equals("tinyint") + || typeName.equals("real") + || typeName.startsWith("decimal(") + || typeName.equals("time") + || typeName.equals("timestamp(3) with time zone") + || typeName.startsWith("char(")) { + // TODO this should either work or fail cleanly + return Optional.empty(); + } + + return Optional.of(dataMappingTestSetup); + } + + @Override + protected Optional filterCaseSensitiveDataMappingTestData(DataMappingTestSetup dataMappingTestSetup) + { + String typeName = dataMappingTestSetup.getTrinoTypeName(); + if (typeName.equals("char(1)")) { + // TODO this should either work or fail cleanly + return Optional.empty(); + } + return Optional.of(dataMappingTestSetup); + } } diff --git a/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/TestRaptorIntegrationSmokeTestBucketed.java b/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/TestRaptorIntegrationSmokeTestBucketed.java index f5275fbc88fd..f7a9e8300389 100644 --- a/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/TestRaptorIntegrationSmokeTestBucketed.java +++ b/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/TestRaptorIntegrationSmokeTestBucketed.java @@ -22,7 +22,7 @@ import static org.assertj.core.api.Assertions.assertThat; public class TestRaptorIntegrationSmokeTestBucketed - extends TestRaptorIntegrationSmokeTest + extends TestRaptorConnectorTest { @Override protected QueryRunner createQueryRunner() diff --git a/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/TestRaptorIntegrationSmokeTestMySql.java b/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/TestRaptorIntegrationSmokeTestMySql.java index de132da1bf33..1d5c2e4d9d86 100644 --- a/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/TestRaptorIntegrationSmokeTestMySql.java +++ b/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/TestRaptorIntegrationSmokeTestMySql.java @@ -31,7 +31,7 @@ @Test public class TestRaptorIntegrationSmokeTestMySql - extends TestRaptorIntegrationSmokeTest + extends TestRaptorConnectorTest { private MySQLContainer mysqlContainer; diff --git a/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/TestRaptorMysqlConnectorTest.java b/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/TestRaptorMysqlConnectorTest.java new file mode 100644 index 000000000000..39a775f32c14 --- /dev/null +++ b/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/TestRaptorMysqlConnectorTest.java @@ -0,0 +1,87 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.raptor.legacy; + +import com.google.common.collect.ImmutableMap; +import io.trino.plugin.tpch.TpchPlugin; +import io.trino.testing.BaseConnectorTest; +import io.trino.testing.DistributedQueryRunner; +import io.trino.testing.QueryRunner; +import io.trino.tpch.TpchTable; +import org.testcontainers.containers.MySQLContainer; +import org.testng.annotations.AfterClass; +import org.testng.annotations.Test; + +import java.io.File; +import java.util.Map; + +import static io.trino.plugin.raptor.legacy.RaptorQueryRunner.copyTables; +import static io.trino.plugin.raptor.legacy.RaptorQueryRunner.createSession; +import static java.lang.String.format; + +@Test +public class TestRaptorMysqlConnectorTest + extends BaseConnectorTest +{ + private MySQLContainer mysqlContainer; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + mysqlContainer = new MySQLContainer<>("mysql:8.0.12"); + mysqlContainer.start(); + return createRaptorMySqlQueryRunner(getJdbcUrl(mysqlContainer)); + } + + @AfterClass(alwaysRun = true) + public final void destroy() + { + mysqlContainer.close(); + } + + private static String getJdbcUrl(MySQLContainer container) + { + return format("%s?user=%s&password=%s&useSSL=false&allowPublicKeyRetrieval=true", + container.getJdbcUrl(), + container.getUsername(), + container.getPassword()); + } + + private static QueryRunner createRaptorMySqlQueryRunner(String mysqlUrl) + throws Exception + { + DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(createSession("tpch")).build(); + + queryRunner.installPlugin(new TpchPlugin()); + queryRunner.createCatalog("tpch", "tpch"); + + queryRunner.installPlugin(new RaptorPlugin()); + File baseDir = queryRunner.getCoordinator().getBaseDataDir().toFile(); + Map raptorProperties = ImmutableMap.builder() + .put("metadata.db.type", "mysql") + .put("metadata.db.url", mysqlUrl) + .put("storage.data-directory", new File(baseDir, "data").getAbsolutePath()) + .put("storage.max-shard-rows", "2000") + .put("backup.provider", "file") + .put("backup.directory", new File(baseDir, "backup").getAbsolutePath()) + .build(); + + queryRunner.createCatalog("raptor", "raptor-legacy", raptorProperties); + + copyTables(queryRunner, "tpch", createSession(), false, TpchTable.getTables()); + + return queryRunner; + } +}