From 659349196d94d3512acf618b55c99237cc6107c0 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Wed, 15 Dec 2021 10:10:26 +0900 Subject: [PATCH 1/4] Allow extending Cassandra connector test This commit is preparation for the next commit. --- ...t.java => BaseCassandraConnectorTest.java} | 106 ++++++++++++------ 1 file changed, 71 insertions(+), 35 deletions(-) rename plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/{TestCassandraConnectorTest.java => BaseCassandraConnectorTest.java} (93%) diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnectorTest.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/BaseCassandraConnectorTest.java similarity index 93% rename from plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnectorTest.java rename to plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/BaseCassandraConnectorTest.java index 9d6e7a7086c1..d30afbf1b073 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnectorTest.java +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/BaseCassandraConnectorTest.java @@ -75,16 +75,16 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; -public class TestCassandraConnectorTest +public abstract class BaseCassandraConnectorTest extends BaseConnectorTest { - private static final String KEYSPACE = "smoke_test"; - private static final Session SESSION = createCassandraSession(KEYSPACE); + protected static final String KEYSPACE = "smoke_test"; + protected static final Session SESSION = createCassandraSession(KEYSPACE); - private static final ZonedDateTime TIMESTAMP_VALUE = ZonedDateTime.of(1970, 1, 1, 3, 4, 5, 0, ZoneId.of("UTC")); + protected static final ZonedDateTime TIMESTAMP_VALUE = ZonedDateTime.of(1970, 1, 1, 3, 4, 5, 0, ZoneId.of("UTC")); - private CassandraServer server; - private CassandraSession session; + protected CassandraServer server; + protected CassandraSession session; @SuppressWarnings("DuplicateBranchesInSwitch") @Override @@ -223,7 +223,7 @@ protected MaterializedResult getDescribeOrdersResult() public void testShowCreateTable() { assertThat(computeActual("SHOW CREATE TABLE orders").getOnlyValue()) - .isEqualTo("CREATE TABLE cassandra.tpch.orders (\n" + + .isEqualTo("CREATE TABLE " + getSession().getCatalog().orElseThrow() + ".tpch.orders (\n" + " orderkey bigint,\n" + " custkey bigint,\n" + " orderstatus varchar,\n" + @@ -612,16 +612,17 @@ public void testCreateTableAs() @Test public void testIdentifiers() { + String catalogName = getSession().getCatalog().orElseThrow(); session.execute("DROP KEYSPACE IF EXISTS \"_keyspace\""); session.execute("CREATE KEYSPACE \"_keyspace\" WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor': 1}"); - assertContainsEventually(() -> execute("SHOW SCHEMAS FROM cassandra"), resultBuilder(getSession(), createUnboundedVarcharType()) + assertContainsEventually(() -> execute("SHOW SCHEMAS FROM " + catalogName), resultBuilder(getSession(), createUnboundedVarcharType()) .row("_keyspace") .build(), new Duration(1, MINUTES)); execute("CREATE TABLE _keyspace._table AS SELECT 1 AS \"_col\", 2 AS \"2col\""); - assertQuery("SHOW TABLES FROM cassandra._keyspace", "VALUES ('_table')"); - assertQuery("SELECT * FROM cassandra._keyspace._table", "VALUES (1, 2)"); - assertUpdate("DROP TABLE cassandra._keyspace._table"); + assertQuery(format("SHOW TABLES FROM %s._keyspace", catalogName), "VALUES ('_table')"); + assertQuery(format("SELECT * FROM %s._keyspace._table", catalogName), "VALUES (1, 2)"); + assertUpdate(format("DROP TABLE %s._keyspace._table", catalogName)); session.execute("DROP KEYSPACE \"_keyspace\""); } @@ -855,23 +856,24 @@ public void testUpperCaseNameUnescapedInCassandra() * * http://docs.datastax.com/en/cql/3.1/cql/cql_reference/ucase-lcase_r.html */ + String catalogName = getSession().getCatalog().orElseThrow(); session.execute("CREATE KEYSPACE KEYSPACE_1 WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor': 1}"); - assertContainsEventually(() -> execute("SHOW SCHEMAS FROM cassandra"), resultBuilder(getSession(), createUnboundedVarcharType()) + assertContainsEventually(() -> execute("SHOW SCHEMAS FROM " + catalogName), resultBuilder(getSession(), createUnboundedVarcharType()) .row("keyspace_1") .build(), new Duration(1, MINUTES)); session.execute("CREATE TABLE KEYSPACE_1.TABLE_1 (COLUMN_1 bigint PRIMARY KEY)"); - assertContainsEventually(() -> execute("SHOW TABLES FROM cassandra.keyspace_1"), resultBuilder(getSession(), createUnboundedVarcharType()) + assertContainsEventually(() -> execute(format("SHOW TABLES FROM %s.keyspace_1", catalogName)), resultBuilder(getSession(), createUnboundedVarcharType()) .row("table_1") .build(), new Duration(1, MINUTES)); - assertContains(execute("SHOW COLUMNS FROM cassandra.keyspace_1.table_1"), resultBuilder(getSession(), createUnboundedVarcharType(), createUnboundedVarcharType(), createUnboundedVarcharType(), createUnboundedVarcharType()) + assertContains(execute(format("SHOW COLUMNS FROM %s.keyspace_1.table_1", catalogName)), resultBuilder(getSession(), createUnboundedVarcharType(), createUnboundedVarcharType(), createUnboundedVarcharType(), createUnboundedVarcharType()) .row("column_1", "bigint", "", "") .build()); execute("INSERT INTO keyspace_1.table_1 (column_1) VALUES (1)"); - assertEquals(execute("SELECT column_1 FROM cassandra.keyspace_1.table_1").getRowCount(), 1); - assertUpdate("DROP TABLE cassandra.keyspace_1.table_1"); + assertEquals(execute(format("SELECT column_1 FROM %s.keyspace_1.table_1", catalogName)).getRowCount(), 1); + assertUpdate(format("DROP TABLE %s.keyspace_1.table_1", catalogName)); // when an identifier is unquoted the lowercase and uppercase spelling may be used interchangeable session.execute("DROP KEYSPACE keyspace_1"); @@ -885,23 +887,24 @@ public void testUppercaseNameEscaped() * * http://docs.datastax.com/en/cql/3.1/cql/cql_reference/ucase-lcase_r.html */ + String catalogName = getSession().getCatalog().orElseThrow(); session.execute("CREATE KEYSPACE \"KEYSPACE_2\" WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor': 1}"); - assertContainsEventually(() -> execute("SHOW SCHEMAS FROM cassandra"), resultBuilder(getSession(), createUnboundedVarcharType()) + assertContainsEventually(() -> execute("SHOW SCHEMAS FROM " + catalogName), resultBuilder(getSession(), createUnboundedVarcharType()) .row("keyspace_2") .build(), new Duration(1, MINUTES)); session.execute("CREATE TABLE \"KEYSPACE_2\".\"TABLE_2\" (\"COLUMN_2\" bigint PRIMARY KEY)"); - assertContainsEventually(() -> execute("SHOW TABLES FROM cassandra.keyspace_2"), resultBuilder(getSession(), createUnboundedVarcharType()) + assertContainsEventually(() -> execute(format("SHOW TABLES FROM %s.keyspace_2", catalogName)), resultBuilder(getSession(), createUnboundedVarcharType()) .row("table_2") .build(), new Duration(1, MINUTES)); - assertContains(execute("SHOW COLUMNS FROM cassandra.keyspace_2.table_2"), resultBuilder(getSession(), createUnboundedVarcharType(), createUnboundedVarcharType(), createUnboundedVarcharType(), createUnboundedVarcharType()) + assertContains(execute(format("SHOW COLUMNS FROM %s.keyspace_2.table_2", catalogName)), resultBuilder(getSession(), createUnboundedVarcharType(), createUnboundedVarcharType(), createUnboundedVarcharType(), createUnboundedVarcharType()) .row("column_2", "bigint", "", "") .build()); execute("INSERT INTO \"KEYSPACE_2\".\"TABLE_2\" (\"COLUMN_2\") VALUES (1)"); - assertEquals(execute("SELECT column_2 FROM cassandra.keyspace_2.table_2").getRowCount(), 1); - assertUpdate("DROP TABLE cassandra.keyspace_2.table_2"); + assertEquals(execute(format("SELECT column_2 FROM %s.keyspace_2.table_2", catalogName)).getRowCount(), 1); + assertUpdate(format("DROP TABLE %s.keyspace_2.table_2", catalogName)); // when an identifier is unquoted the lowercase and uppercase spelling may be used interchangeable session.execute("DROP KEYSPACE \"KEYSPACE_2\""); @@ -912,11 +915,12 @@ public void testKeyspaceNameAmbiguity() { // Identifiers enclosed in double quotes are stored in Cassandra verbatim. It is possible to create 2 keyspaces with names // that have differences only in letters case. + String catalogName = getSession().getCatalog().orElseThrow(); session.execute("CREATE KEYSPACE \"KeYsPaCe_3\" WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor': 1}"); session.execute("CREATE KEYSPACE \"kEySpAcE_3\" WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor': 1}"); // Although in Trino all the schema and table names are always displayed as lowercase - assertContainsEventually(() -> execute("SHOW SCHEMAS FROM cassandra"), resultBuilder(getSession(), createUnboundedVarcharType()) + assertContainsEventually(() -> execute("SHOW SCHEMAS FROM " + catalogName), resultBuilder(getSession(), createUnboundedVarcharType()) .row("keyspace_3") .row("keyspace_3") .build(), new Duration(1, MINUTES)); @@ -934,8 +938,9 @@ public void testKeyspaceNameAmbiguity() @Test public void testTableNameAmbiguity() { + String catalogName = getSession().getCatalog().orElseThrow(); session.execute("CREATE KEYSPACE keyspace_4 WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor': 1}"); - assertContainsEventually(() -> execute("SHOW SCHEMAS FROM cassandra"), resultBuilder(getSession(), createUnboundedVarcharType()) + assertContainsEventually(() -> execute("SHOW SCHEMAS FROM " + catalogName), resultBuilder(getSession(), createUnboundedVarcharType()) .row("keyspace_4") .build(), new Duration(1, MINUTES)); @@ -945,18 +950,18 @@ public void testTableNameAmbiguity() session.execute("CREATE TABLE keyspace_4.\"tAbLe_4\" (column_4 bigint PRIMARY KEY)"); // Although in Trino all the schema and table names are always displayed as lowercase - assertContainsEventually(() -> execute("SHOW TABLES FROM cassandra.keyspace_4"), resultBuilder(getSession(), createUnboundedVarcharType()) + assertContainsEventually(() -> execute(format("SHOW TABLES FROM %s.keyspace_4", catalogName)), resultBuilder(getSession(), createUnboundedVarcharType()) .row("table_4") .row("table_4") .build(), new Duration(1, MINUTES)); // There is no way to figure out what the exactly table is being queried assertQueryFailsEventually( - "SHOW COLUMNS FROM cassandra.keyspace_4.table_4", + format("SHOW COLUMNS FROM %s.keyspace_4.table_4", catalogName), "More than one table has been found for the case insensitive table name: table_4 -> \\(TaBlE_4, tAbLe_4\\)", new Duration(1, MINUTES)); assertQueryFailsEventually( - "SELECT * FROM cassandra.keyspace_4.table_4", + format("SELECT * FROM %s.keyspace_4.table_4", catalogName), "More than one table has been found for the case insensitive table name: table_4 -> \\(TaBlE_4, tAbLe_4\\)", new Duration(1, MINUTES)); session.execute("DROP KEYSPACE keyspace_4"); @@ -965,22 +970,24 @@ public void testTableNameAmbiguity() @Test public void testColumnNameAmbiguity() { + String catalogName = getSession().getCatalog().orElseThrow(); + session.execute("CREATE KEYSPACE keyspace_5 WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor': 1}"); - assertContainsEventually(() -> execute("SHOW SCHEMAS FROM cassandra"), resultBuilder(getSession(), createUnboundedVarcharType()) + assertContainsEventually(() -> execute("SHOW SCHEMAS FROM " + catalogName), resultBuilder(getSession(), createUnboundedVarcharType()) .row("keyspace_5") .build(), new Duration(1, MINUTES)); session.execute("CREATE TABLE keyspace_5.table_5 (\"CoLuMn_5\" bigint PRIMARY KEY, \"cOlUmN_5\" bigint)"); - assertContainsEventually(() -> execute("SHOW TABLES FROM cassandra.keyspace_5"), resultBuilder(getSession(), createUnboundedVarcharType()) + assertContainsEventually(() -> execute(format("SHOW TABLES FROM %s.keyspace_5", catalogName)), resultBuilder(getSession(), createUnboundedVarcharType()) .row("table_5") .build(), new Duration(1, MINUTES)); assertQueryFailsEventually( - "SHOW COLUMNS FROM cassandra.keyspace_5.table_5", + format("SHOW COLUMNS FROM %s.keyspace_5.table_5", catalogName), "More than one column has been found for the case insensitive column name: column_5 -> \\(CoLuMn_5, cOlUmN_5\\)", new Duration(1, MINUTES)); assertQueryFailsEventually( - "SELECT * FROM cassandra.keyspace_5.table_5", + format("SELECT * FROM %s.keyspace_5.table_5", catalogName), "More than one column has been found for the case insensitive column name: column_5 -> \\(CoLuMn_5, cOlUmN_5\\)", new Duration(1, MINUTES)); @@ -1072,6 +1079,27 @@ public void testNullAndEmptyTimestamp() assertThat(query(format("SELECT id FROM %s WHERE timestamp_column_with_empty IS NULL", tableName))) .matches("VALUES 1"); } + String catalogName = getSession().getCatalog().orElseThrow(); + String tableName = "test_empty_timestamp"; + + session.execute(format("DROP TABLE IF EXISTS %s.%s", KEYSPACE, tableName)); + session.execute(format("CREATE TABLE %s.%s (id int PRIMARY KEY, timestamp_column_with_null timestamp, timestamp_column_with_empty timestamp)", KEYSPACE, tableName)); + session.execute(format("INSERT INTO %s.%s (id, timestamp_column_with_null, timestamp_column_with_empty) VALUES (1, NULL, '')", KEYSPACE, tableName)); + assertContainsEventually(() -> execute(format("SHOW TABLES FROM %s.%s LIKE '%s'", catalogName, KEYSPACE, tableName)), resultBuilder(getSession(), createUnboundedVarcharType()) + .row(tableName) + .build(), new Duration(1, MINUTES)); + + assertThat(query(format("SELECT timestamp_column_with_null FROM %s.%s", KEYSPACE, tableName))) + .matches("VALUES CAST(NULL AS timestamp(3) with time zone)"); + assertThat(query(format("SELECT timestamp_column_with_empty FROM %s.%s", KEYSPACE, tableName))) + .matches("VALUES CAST(NULL AS timestamp(3) with time zone)"); + + assertThat(query(format("SELECT id FROM %s.%s WHERE timestamp_column_with_null IS NULL", KEYSPACE, tableName))) + .matches("VALUES 1"); + assertThat(query(format("SELECT id FROM %s.%s WHERE timestamp_column_with_empty IS NULL", KEYSPACE, tableName))) + .matches("VALUES 1"); + + session.execute(format("DROP TABLE %s.%s", KEYSPACE, tableName)); } @Test @@ -1096,8 +1124,10 @@ public void testEmptyTimestampClusteringKey() @Test public void testNestedCollectionType() { + String catalogName = getSession().getCatalog().orElseThrow(); + session.execute("CREATE KEYSPACE keyspace_test_nested_collection WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor': 1}"); - assertContainsEventually(() -> execute("SHOW SCHEMAS FROM cassandra"), resultBuilder(getSession(), createUnboundedVarcharType()) + assertContainsEventually(() -> execute("SHOW SCHEMAS FROM " + catalogName), resultBuilder(getSession(), createUnboundedVarcharType()) .row("keyspace_test_nested_collection") .build(), new Duration(1, MINUTES)); @@ -1105,22 +1135,22 @@ public void testNestedCollectionType() session.execute("CREATE TABLE keyspace_test_nested_collection.table_list (column_5 bigint PRIMARY KEY, nested_collection frozen>>)"); session.execute("CREATE TABLE keyspace_test_nested_collection.table_map (column_5 bigint PRIMARY KEY, nested_collection frozen>>)"); - assertContainsEventually(() -> execute("SHOW TABLES FROM cassandra.keyspace_test_nested_collection"), resultBuilder(getSession(), createUnboundedVarcharType()) + assertContainsEventually(() -> execute(format("SHOW TABLES FROM %s.keyspace_test_nested_collection", catalogName)), resultBuilder(getSession(), createUnboundedVarcharType()) .row("table_set") .row("table_list") .row("table_map") .build(), new Duration(1, MINUTES)); session.execute("INSERT INTO keyspace_test_nested_collection.table_set (column_5, nested_collection) VALUES (1, {{1, 2, 3}})"); - assertEquals(execute("SELECT nested_collection FROM cassandra.keyspace_test_nested_collection.table_set").getMaterializedRows().get(0), + assertEquals(execute(format("SELECT nested_collection FROM %s.keyspace_test_nested_collection.table_set", catalogName)).getMaterializedRows().get(0), new MaterializedRow(DEFAULT_PRECISION, "[[1,2,3]]")); session.execute("INSERT INTO keyspace_test_nested_collection.table_list (column_5, nested_collection) VALUES (1, [[4, 5, 6]])"); - assertEquals(execute("SELECT nested_collection FROM cassandra.keyspace_test_nested_collection.table_list").getMaterializedRows().get(0), + assertEquals(execute(format("SELECT nested_collection FROM %s.keyspace_test_nested_collection.table_list", catalogName)).getMaterializedRows().get(0), new MaterializedRow(DEFAULT_PRECISION, "[[4,5,6]]")); session.execute("INSERT INTO keyspace_test_nested_collection.table_map (column_5, nested_collection) VALUES (1, {7:{8:9}})"); - assertEquals(execute("SELECT nested_collection FROM cassandra.keyspace_test_nested_collection.table_map").getMaterializedRows().get(0), + assertEquals(execute(format("SELECT nested_collection FROM %s.keyspace_test_nested_collection.table_map", catalogName)).getMaterializedRows().get(0), new MaterializedRow(DEFAULT_PRECISION, "{7:{8:9}}")); session.execute("DROP KEYSPACE keyspace_test_nested_collection"); @@ -1591,6 +1621,12 @@ private void assertSelect(String tableName) } } + protected void refreshSizeEstimates(String keyspace, String tableName) + throws Exception + { + server.refreshSizeEstimates(KEYSPACE, tableName); + } + private MaterializedResult execute(@Language("SQL") String sql) { return getQueryRunner().execute(SESSION, sql); From bb43a9d5984f7fd3a596654b2e53e3c580a7f9af Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Wed, 15 Dec 2021 10:12:27 +0900 Subject: [PATCH 2/4] Restore Cassandra connector test --- .../cassandra/TestCassandraConnectorTest.java | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnectorTest.java diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnectorTest.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnectorTest.java new file mode 100644 index 000000000000..41280f646c79 --- /dev/null +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnectorTest.java @@ -0,0 +1,36 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.cassandra; + +import com.google.common.collect.ImmutableMap; +import io.trino.testing.QueryRunner; + +import java.sql.Timestamp; + +import static io.trino.plugin.cassandra.CassandraQueryRunner.createCassandraQueryRunner; +import static io.trino.plugin.cassandra.CassandraTestingUtils.createTestTables; + +public class TestCassandraConnectorTest + extends BaseCassandraConnectorTest +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + server = closeAfterClass(new CassandraServer()); + session = server.getSession(); + createTestTables(session, KEYSPACE, Timestamp.from(TIMESTAMP_VALUE.toInstant())); + return createCassandraQueryRunner(server, ImmutableMap.of(), ImmutableMap.of(), REQUIRED_TPCH_TABLES); + } +} From 8e0d65ad227c1a001828e736e38affd4e21dfcc8 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Fri, 25 Feb 2022 11:20:57 +0900 Subject: [PATCH 3/4] Rename CassandraServer to TestingCassandraServer --- .../cassandra/BaseCassandraConnectorTest.java | 2 +- .../cassandra/CassandraQueryRunner.java | 2 +- .../plugin/cassandra/CassandraServer.java | 194 +--------------- .../cassandra/TestCassandraConnector.java | 4 +- .../cassandra/TestCassandraConnectorTest.java | 2 +- ...TestCassandraLatestConnectorSmokeTest.java | 2 +- ...raProtocolVersionV3ConnectorSmokeTest.java | 2 +- .../TestCassandraTokenSplitManager.java | 4 +- .../cassandra/TestCassandraTypeMapping.java | 2 +- .../TestDatastaxConnectorSmokeTest.java | 2 +- .../cassandra/TestingCassandraServer.java | 212 ++++++++++++++++++ 11 files changed, 230 insertions(+), 198 deletions(-) create mode 100644 plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestingCassandraServer.java diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/BaseCassandraConnectorTest.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/BaseCassandraConnectorTest.java index d30afbf1b073..978cf9fe2ae4 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/BaseCassandraConnectorTest.java +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/BaseCassandraConnectorTest.java @@ -139,7 +139,7 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) protected QueryRunner createQueryRunner() throws Exception { - server = closeAfterClass(new CassandraServer()); + server = closeAfterClass(new TestingCassandraServer()); session = server.getSession(); session.execute("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor': 1}"); return createCassandraQueryRunner(server, ImmutableMap.of(), ImmutableMap.of(), REQUIRED_TPCH_TABLES); diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/CassandraQueryRunner.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/CassandraQueryRunner.java index 79afa781aaf2..bdcc6c28b7ee 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/CassandraQueryRunner.java +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/CassandraQueryRunner.java @@ -85,7 +85,7 @@ public static void main(String[] args) throws Exception { DistributedQueryRunner queryRunner = createCassandraQueryRunner( - new CassandraServer(), + new TestingCassandraServer(), ImmutableMap.of("http-server.http.port", "8080"), ImmutableMap.of(), TpchTable.getTables()); diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/CassandraServer.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/CassandraServer.java index 3011bbca208d..72faa07d0b39 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/CassandraServer.java +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/CassandraServer.java @@ -13,197 +13,17 @@ */ package io.trino.plugin.cassandra; -import com.datastax.oss.driver.api.core.CqlSession; -import com.datastax.oss.driver.api.core.CqlSessionBuilder; -import com.datastax.oss.driver.api.core.ProtocolVersion; -import com.datastax.oss.driver.api.core.config.DriverConfigLoader; -import com.datastax.oss.driver.api.core.config.ProgrammaticDriverConfigLoaderBuilder; -import com.datastax.oss.driver.api.core.cql.ResultSet; -import com.datastax.oss.driver.api.core.cql.Row; -import com.google.common.collect.ImmutableMap; -import com.google.common.io.Resources; -import io.airlift.json.JsonCodec; -import io.airlift.log.Logger; -import io.airlift.units.Duration; -import io.trino.testing.ResourcePresence; -import org.testcontainers.containers.GenericContainer; - import java.io.Closeable; -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.file.Path; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeoutException; - -import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT; -import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.METADATA_SCHEMA_REFRESHED_KEYSPACES; -import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.PROTOCOL_VERSION; -import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.REQUEST_TIMEOUT; -import static com.google.common.io.Resources.getResource; -import static io.trino.plugin.cassandra.CassandraTestingUtils.CASSANDRA_TYPE_MANAGER; -import static java.lang.String.format; -import static java.nio.charset.StandardCharsets.UTF_8; -import static java.nio.file.Files.createDirectory; -import static java.nio.file.Files.createTempDirectory; -import static java.nio.file.Files.writeString; -import static java.util.Objects.requireNonNull; -import static java.util.concurrent.TimeUnit.MINUTES; -import static java.util.concurrent.TimeUnit.NANOSECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.testcontainers.utility.MountableFile.forHostPath; -import static org.testng.Assert.assertEquals; -public class CassandraServer - implements Closeable +public interface CassandraServer + extends Closeable { - private static final Logger log = Logger.get(CassandraServer.class); - - private static final int PORT = 9142; - - private static final Duration REFRESH_SIZE_ESTIMATES_TIMEOUT = new Duration(1, MINUTES); - - private final GenericContainer dockerContainer; - private final CassandraSession session; - - public CassandraServer() - throws Exception - { - this("cassandra:3.0", "cu-cassandra.yaml"); - } - - public CassandraServer(String imageName, String configFileName) - throws Exception - { - this(imageName, ImmutableMap.of(), "/etc/cassandra/cassandra.yaml", configFileName); - } - - public CassandraServer(String imageName, Map environmentVariables, String configPath, String configFileName) - throws Exception - { - log.info("Starting cassandra..."); - - this.dockerContainer = new GenericContainer<>(imageName) - .withExposedPorts(PORT) - .withCopyFileToContainer(forHostPath(prepareCassandraYaml(configFileName)), configPath) - .withEnv(environmentVariables) - .withStartupTimeout(java.time.Duration.ofMinutes(10)); - this.dockerContainer.start(); - - ProgrammaticDriverConfigLoaderBuilder driverConfigLoaderBuilder = DriverConfigLoader.programmaticBuilder(); - driverConfigLoaderBuilder.withDuration(REQUEST_TIMEOUT, java.time.Duration.ofSeconds(30)); - driverConfigLoaderBuilder.withString(PROTOCOL_VERSION, ProtocolVersion.V3.name()); - driverConfigLoaderBuilder.withDuration(CONTROL_CONNECTION_AGREEMENT_TIMEOUT, java.time.Duration.ofSeconds(30)); - // allow the retrieval of metadata for the system keyspaces - driverConfigLoaderBuilder.withStringList(METADATA_SCHEMA_REFRESHED_KEYSPACES, List.of()); - - CqlSessionBuilder cqlSessionBuilder = CqlSession.builder() - .withApplicationName("TestCluster") - .addContactPoint(new InetSocketAddress(this.dockerContainer.getHost(), this.dockerContainer.getMappedPort(PORT))) - .withLocalDatacenter("datacenter1") - .withConfigLoader(driverConfigLoaderBuilder.build()); - - CassandraSession session = new CassandraSession( - CASSANDRA_TYPE_MANAGER, - JsonCodec.listJsonCodec(ExtraColumnMetadata.class), - cqlSessionBuilder::build, - new Duration(1, MINUTES)); - - try { - checkConnectivity(session); - } - catch (RuntimeException e) { - session.close(); - this.dockerContainer.stop(); - throw e; - } - - this.session = session; - } - - private static String prepareCassandraYaml(String fileName) - throws IOException - { - String original = Resources.toString(getResource(fileName), UTF_8); - - Path tmpDirPath = createTempDirectory(null); - Path dataDir = tmpDirPath.resolve("data"); - createDirectory(dataDir); - - String modified = original.replaceAll("\\$\\{data_directory\\}", dataDir.toAbsolutePath().toString()); - - File yamlFile = tmpDirPath.resolve(fileName).toFile(); - yamlFile.deleteOnExit(); - writeString(yamlFile.toPath(), modified, UTF_8); - - return yamlFile.getAbsolutePath(); - } - - public CassandraSession getSession() - { - return requireNonNull(session, "session is null"); - } - - public String getHost() - { - return dockerContainer.getHost(); - } - - public int getPort() - { - return dockerContainer.getMappedPort(PORT); - } - - private static void checkConnectivity(CassandraSession session) - { - ResultSet result = session.execute("SELECT release_version FROM system.local"); - List rows = result.all(); - assertEquals(rows.size(), 1); - String version = rows.get(0).getString(0); - log.info("Cassandra version: %s", version); - } - - public void refreshSizeEstimates(String keyspace, String table) - throws Exception - { - long deadline = System.nanoTime() + REFRESH_SIZE_ESTIMATES_TIMEOUT.roundTo(NANOSECONDS); - while (System.nanoTime() - deadline < 0) { - flushTable(keyspace, table); - refreshSizeEstimates(); - List sizeEstimates = getSession().getSizeEstimates(keyspace, table); - if (!sizeEstimates.isEmpty()) { - log.info("Size estimates for the table %s.%s have been refreshed successfully: %s", keyspace, table, sizeEstimates); - return; - } - log.info("Size estimates haven't been refreshed as expected. Retrying ..."); - SECONDS.sleep(1); - } - throw new TimeoutException(format("Attempting to refresh size estimates for table %s.%s has timed out after %s", keyspace, table, REFRESH_SIZE_ESTIMATES_TIMEOUT)); - } - - private void flushTable(String keyspace, String table) - throws Exception - { - dockerContainer.execInContainer("nodetool", "flush", keyspace, table); - } + CassandraSession getSession(); - private void refreshSizeEstimates() - throws Exception - { - dockerContainer.execInContainer("nodetool", "refreshsizeestimates"); - } + String getHost(); - @Override - public void close() - { - session.close(); - dockerContainer.close(); - } + int getPort(); - @ResourcePresence - public boolean isRunning() - { - return dockerContainer.getContainerId() != null; - } + void refreshSizeEstimates(String keyspace, String table) + throws Exception; } diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnector.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnector.java index 821c1818acea..05ecfc4a551a 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnector.java +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnector.java @@ -99,7 +99,7 @@ public class TestCassandraConnector private static final ConnectorSession SESSION = TestingConnectorSession.builder() .setPropertyMetadata(new CassandraSessionProperties(new CassandraClientConfig()).getSessionProperties()) .build(); - private CassandraServer server; + private TestingCassandraServer server; protected String database; protected SchemaTableName table; protected SchemaTableName tableForDelete; @@ -113,7 +113,7 @@ public class TestCassandraConnector public void setup() throws Exception { - this.server = new CassandraServer(); + this.server = new TestingCassandraServer(); String keyspace = "test_connector"; createTestTables(server.getSession(), keyspace, DATE); diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnectorTest.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnectorTest.java index 41280f646c79..20d8f99b87f1 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnectorTest.java +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnectorTest.java @@ -28,7 +28,7 @@ public class TestCassandraConnectorTest protected QueryRunner createQueryRunner() throws Exception { - server = closeAfterClass(new CassandraServer()); + server = closeAfterClass(new TestingCassandraServer()); session = server.getSession(); createTestTables(session, KEYSPACE, Timestamp.from(TIMESTAMP_VALUE.toInstant())); return createCassandraQueryRunner(server, ImmutableMap.of(), ImmutableMap.of(), REQUIRED_TPCH_TABLES); diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraLatestConnectorSmokeTest.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraLatestConnectorSmokeTest.java index 7486739c8bb8..6c739399ab16 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraLatestConnectorSmokeTest.java +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraLatestConnectorSmokeTest.java @@ -28,7 +28,7 @@ public class TestCassandraLatestConnectorSmokeTest protected QueryRunner createQueryRunner() throws Exception { - CassandraServer server = closeAfterClass(new CassandraServer("cassandra:4.1", "cu-cassandra-latest.yaml")); + TestingCassandraServer server = closeAfterClass(new TestingCassandraServer("cassandra:4.1", "cu-cassandra-latest.yaml")); CassandraSession session = server.getSession(); createTestTables(session, KEYSPACE, Timestamp.from(TIMESTAMP_VALUE.toInstant())); return createCassandraQueryRunner(server, ImmutableMap.of(), ImmutableMap.of(), REQUIRED_TPCH_TABLES); diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraProtocolVersionV3ConnectorSmokeTest.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraProtocolVersionV3ConnectorSmokeTest.java index 7dd1953b3ab7..a6ce5b2e62fb 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraProtocolVersionV3ConnectorSmokeTest.java +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraProtocolVersionV3ConnectorSmokeTest.java @@ -31,7 +31,7 @@ public class TestCassandraProtocolVersionV3ConnectorSmokeTest protected QueryRunner createQueryRunner() throws Exception { - CassandraServer server = closeAfterClass(new CassandraServer()); + CassandraServer server = closeAfterClass(new TestingCassandraServer()); CassandraSession session = server.getSession(); createTestTables(session, KEYSPACE, Timestamp.from(TIMESTAMP_VALUE.toInstant())); return createCassandraQueryRunner(server, ImmutableMap.of(), ImmutableMap.of("cassandra.protocol-version", "V3"), REQUIRED_TPCH_TABLES); diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraTokenSplitManager.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraTokenSplitManager.java index 3ebc230fbc78..0d6d7f549411 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraTokenSplitManager.java +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraTokenSplitManager.java @@ -32,7 +32,7 @@ public class TestCassandraTokenSplitManager private static final String KEYSPACE = "test_cassandra_token_split_manager_keyspace"; private static final int PARTITION_COUNT = 1000; - private CassandraServer server; + private TestingCassandraServer server; private CassandraSession session; private CassandraTokenSplitManager splitManager; @@ -40,7 +40,7 @@ public class TestCassandraTokenSplitManager public void setUp() throws Exception { - server = new CassandraServer(); + server = new TestingCassandraServer(); session = server.getSession(); createKeyspace(session, KEYSPACE); splitManager = new CassandraTokenSplitManager(session, SPLIT_SIZE, Optional.empty()); diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraTypeMapping.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraTypeMapping.java index 7081bcaca1fb..e6b84b497e4c 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraTypeMapping.java +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraTypeMapping.java @@ -138,7 +138,7 @@ private static void checkIsDoubled(ZoneId zone, LocalDateTime dateTime) protected QueryRunner createQueryRunner() throws Exception { - server = closeAfterClass(new CassandraServer()); + server = closeAfterClass(new TestingCassandraServer()); session = server.getSession(); return createCassandraQueryRunner( server, diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestDatastaxConnectorSmokeTest.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestDatastaxConnectorSmokeTest.java index a784aeee5f2f..f83b784a24e8 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestDatastaxConnectorSmokeTest.java +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestDatastaxConnectorSmokeTest.java @@ -29,7 +29,7 @@ public class TestDatastaxConnectorSmokeTest protected QueryRunner createQueryRunner() throws Exception { - CassandraServer server = closeAfterClass(new CassandraServer( + TestingCassandraServer server = closeAfterClass(new TestingCassandraServer( "datastax/dse-server:6.8.25", Map.of( "DS_LICENSE", "accept", diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestingCassandraServer.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestingCassandraServer.java new file mode 100644 index 000000000000..fe72c6daaee5 --- /dev/null +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestingCassandraServer.java @@ -0,0 +1,212 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.cassandra; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.CqlSessionBuilder; +import com.datastax.oss.driver.api.core.ProtocolVersion; +import com.datastax.oss.driver.api.core.config.DriverConfigLoader; +import com.datastax.oss.driver.api.core.config.ProgrammaticDriverConfigLoaderBuilder; +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.Resources; +import io.airlift.json.JsonCodec; +import io.airlift.log.Logger; +import io.airlift.units.Duration; +import io.trino.testing.ResourcePresence; +import org.testcontainers.containers.GenericContainer; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeoutException; + +import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT; +import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.METADATA_SCHEMA_REFRESHED_KEYSPACES; +import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.PROTOCOL_VERSION; +import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.REQUEST_TIMEOUT; +import static com.google.common.io.Resources.getResource; +import static io.trino.plugin.cassandra.CassandraTestingUtils.CASSANDRA_TYPE_MANAGER; +import static java.lang.String.format; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.nio.file.Files.createDirectory; +import static java.nio.file.Files.createTempDirectory; +import static java.nio.file.Files.writeString; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.testcontainers.utility.MountableFile.forHostPath; +import static org.testng.Assert.assertEquals; + +public class TestingCassandraServer + implements CassandraServer +{ + private static Logger log = Logger.get(TestingCassandraServer.class); + + private static final int PORT = 9142; + + private static final Duration REFRESH_SIZE_ESTIMATES_TIMEOUT = new Duration(1, MINUTES); + + private final GenericContainer dockerContainer; + private final CassandraSession session; + + public TestingCassandraServer() + throws Exception + { + this("cassandra:3.0", "cu-cassandra.yaml"); + } + + public TestingCassandraServer(String imageName, String configFileName) + throws Exception + { + this(imageName, ImmutableMap.of(), "/etc/cassandra/cassandra.yaml", configFileName); + } + + public TestingCassandraServer(String imageName, Map environmentVariables, String configPath, String configFileName) + throws Exception + { + log.info("Starting cassandra..."); + + this.dockerContainer = new GenericContainer<>(imageName) + .withExposedPorts(PORT) + .withCopyFileToContainer(forHostPath(prepareCassandraYaml(configFileName)), configPath) + .withEnv(environmentVariables) + .withStartupTimeout(java.time.Duration.ofMinutes(10)); + this.dockerContainer.start(); + + ProgrammaticDriverConfigLoaderBuilder driverConfigLoaderBuilder = DriverConfigLoader.programmaticBuilder(); + driverConfigLoaderBuilder.withDuration(REQUEST_TIMEOUT, java.time.Duration.ofSeconds(30)); + driverConfigLoaderBuilder.withString(PROTOCOL_VERSION, ProtocolVersion.V3.name()); + driverConfigLoaderBuilder.withDuration(CONTROL_CONNECTION_AGREEMENT_TIMEOUT, java.time.Duration.ofSeconds(30)); + // allow the retrieval of metadata for the system keyspaces + driverConfigLoaderBuilder.withStringList(METADATA_SCHEMA_REFRESHED_KEYSPACES, List.of()); + + CqlSessionBuilder cqlSessionBuilder = CqlSession.builder() + .withApplicationName("TestCluster") + .addContactPoint(new InetSocketAddress(this.dockerContainer.getHost(), this.dockerContainer.getMappedPort(PORT))) + .withLocalDatacenter("datacenter1") + .withConfigLoader(driverConfigLoaderBuilder.build()); + + CassandraSession session = new CassandraSession( + CASSANDRA_TYPE_MANAGER, + JsonCodec.listJsonCodec(ExtraColumnMetadata.class), + cqlSessionBuilder::build, + new Duration(1, MINUTES)); + + try { + checkConnectivity(session); + } + catch (RuntimeException e) { + session.close(); + this.dockerContainer.stop(); + throw e; + } + + this.session = session; + } + + private static String prepareCassandraYaml(String fileName) + throws IOException + { + String original = Resources.toString(getResource(fileName), UTF_8); + + Path tmpDirPath = createTempDirectory(null); + Path dataDir = tmpDirPath.resolve("data"); + createDirectory(dataDir); + + String modified = original.replaceAll("\\$\\{data_directory\\}", dataDir.toAbsolutePath().toString()); + + File yamlFile = tmpDirPath.resolve(fileName).toFile(); + yamlFile.deleteOnExit(); + writeString(yamlFile.toPath(), modified, UTF_8); + + return yamlFile.getAbsolutePath(); + } + + @Override + public CassandraSession getSession() + { + return requireNonNull(session, "session is null"); + } + + @Override + public String getHost() + { + return dockerContainer.getHost(); + } + + @Override + public int getPort() + { + return dockerContainer.getMappedPort(PORT); + } + + private static void checkConnectivity(CassandraSession session) + { + ResultSet result = session.execute("SELECT release_version FROM system.local"); + List rows = result.all(); + assertEquals(rows.size(), 1); + String version = rows.get(0).getString(0); + log.info("Cassandra version: %s", version); + } + + @Override + public void refreshSizeEstimates(String keyspace, String table) + throws Exception + { + long deadline = System.nanoTime() + REFRESH_SIZE_ESTIMATES_TIMEOUT.roundTo(NANOSECONDS); + while (System.nanoTime() - deadline < 0) { + flushTable(keyspace, table); + refreshSizeEstimates(); + List sizeEstimates = getSession().getSizeEstimates(keyspace, table); + if (!sizeEstimates.isEmpty()) { + log.info("Size estimates for the table %s.%s have been refreshed successfully: %s", keyspace, table, sizeEstimates); + return; + } + log.info("Size estimates haven't been refreshed as expected. Retrying ..."); + SECONDS.sleep(1); + } + throw new TimeoutException(format("Attempting to refresh size estimates for table %s.%s has timed out after %s", keyspace, table, REFRESH_SIZE_ESTIMATES_TIMEOUT)); + } + + private void flushTable(String keyspace, String table) + throws Exception + { + dockerContainer.execInContainer("nodetool", "flush", keyspace, table); + } + + private void refreshSizeEstimates() + throws Exception + { + dockerContainer.execInContainer("nodetool", "refreshsizeestimates"); + } + + @Override + public void close() + { + session.close(); + dockerContainer.close(); + } + + @ResourcePresence + public boolean isRunning() + { + return dockerContainer.getContainerId() != null; + } +} From e08ec9c683218d503935cf5f122a7571ab7d0a2a Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Wed, 15 Dec 2021 10:12:44 +0900 Subject: [PATCH 4/4] Add ScyllaDB connector Set minimum required version as 3.0.0 because ScyllaDB version 2.x can't detect partition correctly and it leads to query failure. --- .github/workflows/ci.yml | 5 + core/trino-server/src/main/provisio/trino.xml | 6 + docs/src/main/sphinx/connector.md | 1 + docs/src/main/sphinx/connector/scylla.rst | 38 ++++ plugin/trino-scylla/pom.xml | 192 ++++++++++++++++++ .../plugin/scylla/ScyllaConnectorFactory.java | 26 +++ .../io/trino/plugin/scylla/ScyllaPlugin.java | 28 +++ .../plugin/scylla}/ScyllaQueryRunner.java | 11 +- .../scylla}/TestScyllaConnectorSmokeTest.java | 9 +- .../scylla/TestScyllaConnectorTest.java | 42 ++++ .../TestScyllaLatestConnectorSmokeTest.java | 39 ++++ .../plugin/scylla}/TestingScyllaServer.java | 19 +- pom.xml | 1 + .../EnvMultinodeAllConnectors.java | 1 + .../env/environment/EnvMultinodeScylla.java | 74 +++++++ .../launcher/suite/suites/SuiteScylla.java | 37 ++++ .../multinode-all/scylla.properties | 3 + .../multinode-scylla/scylla.properties | 5 + .../tempto-configuration.yaml | 3 + .../io/trino/tests/product/TestGroups.java | 1 + .../tests/product/scylla/TestScylla.java | 70 +++++++ .../main/resources/tempto-configuration.yaml | 5 + .../etc/catalog/scylla.properties | 5 + .../trino-server-dev/etc/config.properties | 1 + 24 files changed, 609 insertions(+), 13 deletions(-) create mode 100644 docs/src/main/sphinx/connector/scylla.rst create mode 100644 plugin/trino-scylla/pom.xml create mode 100644 plugin/trino-scylla/src/main/java/io/trino/plugin/scylla/ScyllaConnectorFactory.java create mode 100644 plugin/trino-scylla/src/main/java/io/trino/plugin/scylla/ScyllaPlugin.java rename plugin/{trino-cassandra/src/test/java/io/trino/plugin/cassandra => trino-scylla/src/test/java/io/trino/plugin/scylla}/ScyllaQueryRunner.java (92%) rename plugin/{trino-cassandra/src/test/java/io/trino/plugin/cassandra => trino-scylla/src/test/java/io/trino/plugin/scylla}/TestScyllaConnectorSmokeTest.java (80%) create mode 100644 plugin/trino-scylla/src/test/java/io/trino/plugin/scylla/TestScyllaConnectorTest.java create mode 100644 plugin/trino-scylla/src/test/java/io/trino/plugin/scylla/TestScyllaLatestConnectorSmokeTest.java rename plugin/{trino-cassandra/src/test/java/io/trino/plugin/cassandra => trino-scylla/src/test/java/io/trino/plugin/scylla}/TestingScyllaServer.java (92%) create mode 100644 testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeScylla.java create mode 100644 testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteScylla.java create mode 100644 testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-all/scylla.properties create mode 100644 testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-scylla/scylla.properties create mode 100644 testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-scylla/tempto-configuration.yaml create mode 100644 testing/trino-product-tests/src/main/java/io/trino/tests/product/scylla/TestScylla.java create mode 100644 testing/trino-server-dev/etc/catalog/scylla.properties diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b5fab70899c9..324bef4791bc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -449,6 +449,7 @@ jobs: !:trino-accumulo, !:trino-bigquery, !:trino-cassandra, + !:trino-scylla, !:trino-clickhouse, !:trino-delta-lake, !:trino-docs,!:trino-server,!:trino-server-rpm, @@ -554,6 +555,7 @@ jobs: - { modules: plugin/trino-bigquery } - { modules: plugin/trino-bigquery, profile: cloud-tests-arrow } - { modules: plugin/trino-cassandra } + - { modules: plugin/trino-scylla } - { modules: plugin/trino-clickhouse } - { modules: plugin/trino-delta-lake } - { modules: plugin/trino-delta-lake, profile: cloud-tests } @@ -983,6 +985,9 @@ jobs: - config: default suite: suite-cassandra # this suite is not meant to be run with different configs + - config: default + suite: suite-scylla + # this suite is not meant to be run with different configs - config: default suite: suite-clickhouse # this suite is not meant to be run with different configs diff --git a/core/trino-server/src/main/provisio/trino.xml b/core/trino-server/src/main/provisio/trino.xml index 8e014d0b9a57..c747cf584498 100644 --- a/core/trino-server/src/main/provisio/trino.xml +++ b/core/trino-server/src/main/provisio/trino.xml @@ -306,4 +306,10 @@ + + + + + + diff --git a/docs/src/main/sphinx/connector.md b/docs/src/main/sphinx/connector.md index 3b86e28f3d56..1ada1a9e7d60 100644 --- a/docs/src/main/sphinx/connector.md +++ b/docs/src/main/sphinx/connector.md @@ -36,6 +36,7 @@ PostgreSQL Prometheus Redis Redshift +Scylla SingleStore SQL Server System diff --git a/docs/src/main/sphinx/connector/scylla.rst b/docs/src/main/sphinx/connector/scylla.rst new file mode 100644 index 000000000000..d3b0498a8ead --- /dev/null +++ b/docs/src/main/sphinx/connector/scylla.rst @@ -0,0 +1,38 @@ +================ +Scylla connector +================ + +The Scylla connector allows querying data stored in +`Scylla `_. + +Requirements +------------ + +To connect to Scylla, you need: + +* Scylla version 3.0.0 or higher. +* Network access from the Trino coordinator and workers to Scylla. + Port 9042 is the default port. + +Configuration +------------- + +To configure the Scylla connector, create a catalog properties file +``etc/catalog/example.properties`` with the following contents, +replacing ``host1,host2`` with a comma-separated list of the Scylla +nodes, used to discovery the cluster topology: + +.. code-block:: text + + connector.name=scylla + cassandra.contact-points=host1,host2 + +You also need to set ``cassandra.native-protocol-port``, if your +Scylla nodes are not using the default port 9042. + +Compatibility with Cassandra connector +-------------------------------------- + +The Scylla connector is very similar to the Cassandra connector with the +only difference being the underlying driver. +See :doc:`Cassandra connector ` for more details. diff --git a/plugin/trino-scylla/pom.xml b/plugin/trino-scylla/pom.xml new file mode 100644 index 000000000000..cd217ad90109 --- /dev/null +++ b/plugin/trino-scylla/pom.xml @@ -0,0 +1,192 @@ + + + 4.0.0 + + + io.trino + trino-root + 423-SNAPSHOT + ../../pom.xml + + + trino-scylla + trino-plugin + Trino - Scylla Connector + + + ${project.parent.basedir} + + + + + com.google.guava + guava + + + + io.trino + trino-cassandra + ${project.version} + + + com.datastax.oss + java-driver-core + + + + + + com.fasterxml.jackson.core + jackson-annotations + provided + + + + io.airlift + slice + provided + + + + io.opentelemetry + opentelemetry-api + provided + + + + io.opentelemetry + opentelemetry-context + provided + + + + io.trino + trino-spi + provided + + + + org.openjdk.jol + jol-core + provided + + + + com.scylladb + java-driver-core + 4.13.0.0 + runtime + + + + io.airlift + json + runtime + + + + io.airlift + log + runtime + + + + io.airlift + log-manager + runtime + + + + io.airlift + units + runtime + + + + io.airlift + testing + test + + + + + io.trino + trino-cassandra + ${project.version} + test-jar + test + + + + io.trino + trino-main + test-jar + test + + + + io.trino + trino-main + test + + + + io.trino + trino-testing + test + + + + io.trino + trino-tpch + test + + + + io.trino.tpch + tpch + test + + + + org.apache.thrift + libthrift + test + + + + org.assertj + assertj-core + test + + + + org.testcontainers + testcontainers + test + + + + org.testng + testng + test + + + + + + + org.basepom.maven + duplicate-finder-maven-plugin + + + + com.datastax.oss + java-driver-core + + + + + + + diff --git a/plugin/trino-scylla/src/main/java/io/trino/plugin/scylla/ScyllaConnectorFactory.java b/plugin/trino-scylla/src/main/java/io/trino/plugin/scylla/ScyllaConnectorFactory.java new file mode 100644 index 000000000000..9260ca48e65b --- /dev/null +++ b/plugin/trino-scylla/src/main/java/io/trino/plugin/scylla/ScyllaConnectorFactory.java @@ -0,0 +1,26 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.scylla; + +import io.trino.plugin.cassandra.CassandraConnectorFactory; + +public class ScyllaConnectorFactory + extends CassandraConnectorFactory +{ + @Override + public String getName() + { + return "scylla"; + } +} diff --git a/plugin/trino-scylla/src/main/java/io/trino/plugin/scylla/ScyllaPlugin.java b/plugin/trino-scylla/src/main/java/io/trino/plugin/scylla/ScyllaPlugin.java new file mode 100644 index 000000000000..6c9b549123ca --- /dev/null +++ b/plugin/trino-scylla/src/main/java/io/trino/plugin/scylla/ScyllaPlugin.java @@ -0,0 +1,28 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.scylla; + +import com.google.common.collect.ImmutableList; +import io.trino.spi.Plugin; +import io.trino.spi.connector.ConnectorFactory; + +public class ScyllaPlugin + implements Plugin +{ + @Override + public Iterable getConnectorFactories() + { + return ImmutableList.of(new ScyllaConnectorFactory()); + } +} diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/ScyllaQueryRunner.java b/plugin/trino-scylla/src/test/java/io/trino/plugin/scylla/ScyllaQueryRunner.java similarity index 92% rename from plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/ScyllaQueryRunner.java rename to plugin/trino-scylla/src/test/java/io/trino/plugin/scylla/ScyllaQueryRunner.java index 52de69544c78..81e63d75c47e 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/ScyllaQueryRunner.java +++ b/plugin/trino-scylla/src/test/java/io/trino/plugin/scylla/ScyllaQueryRunner.java @@ -11,12 +11,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.cassandra; +package io.trino.plugin.scylla; import com.google.common.collect.ImmutableMap; import io.airlift.log.Logger; import io.airlift.log.Logging; import io.trino.Session; +import io.trino.plugin.cassandra.CassandraServer; import io.trino.plugin.tpch.TpchPlugin; import io.trino.testing.DistributedQueryRunner; import io.trino.tpch.TpchTable; @@ -35,7 +36,7 @@ public final class ScyllaQueryRunner private ScyllaQueryRunner() {} public static DistributedQueryRunner createScyllaQueryRunner( - TestingScyllaServer server, + CassandraServer server, Map extraProperties, Map connectorProperties, Iterable> tables) @@ -57,8 +58,8 @@ public static DistributedQueryRunner createScyllaQueryRunner( connectorProperties.putIfAbsent("cassandra.load-policy.use-dc-aware", "true"); connectorProperties.putIfAbsent("cassandra.load-policy.dc-aware.local-dc", "datacenter1"); - queryRunner.installPlugin(new CassandraPlugin()); - queryRunner.createCatalog("cassandra", "cassandra", connectorProperties); + queryRunner.installPlugin(new ScyllaPlugin()); + queryRunner.createCatalog("scylla", "scylla", connectorProperties); createKeyspace(server.getSession(), "tpch"); copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, createSession("tpch"), tables); @@ -76,7 +77,7 @@ public static DistributedQueryRunner createScyllaQueryRunner( public static Session createSession(String schema) { return testSessionBuilder() - .setCatalog("cassandra") + .setCatalog("scylla") .setSchema(schema) .build(); } diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestScyllaConnectorSmokeTest.java b/plugin/trino-scylla/src/test/java/io/trino/plugin/scylla/TestScyllaConnectorSmokeTest.java similarity index 80% rename from plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestScyllaConnectorSmokeTest.java rename to plugin/trino-scylla/src/test/java/io/trino/plugin/scylla/TestScyllaConnectorSmokeTest.java index 3c9c992d1100..8d29cff27edd 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestScyllaConnectorSmokeTest.java +++ b/plugin/trino-scylla/src/test/java/io/trino/plugin/scylla/TestScyllaConnectorSmokeTest.java @@ -11,15 +11,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.cassandra; +package io.trino.plugin.scylla; import com.google.common.collect.ImmutableMap; +import io.trino.plugin.cassandra.BaseCassandraConnectorSmokeTest; +import io.trino.plugin.cassandra.CassandraSession; import io.trino.testing.QueryRunner; import java.sql.Timestamp; import static io.trino.plugin.cassandra.CassandraTestingUtils.createTestTables; -import static io.trino.plugin.cassandra.ScyllaQueryRunner.createScyllaQueryRunner; +import static io.trino.plugin.scylla.ScyllaQueryRunner.createScyllaQueryRunner; +import static io.trino.plugin.scylla.TestingScyllaServer.V3_TAG; public class TestScyllaConnectorSmokeTest extends BaseCassandraConnectorSmokeTest @@ -28,7 +31,7 @@ public class TestScyllaConnectorSmokeTest protected QueryRunner createQueryRunner() throws Exception { - TestingScyllaServer server = closeAfterClass(new TestingScyllaServer("3.3.4")); + TestingScyllaServer server = closeAfterClass(new TestingScyllaServer(V3_TAG)); CassandraSession session = server.getSession(); createTestTables(session, KEYSPACE, Timestamp.from(TIMESTAMP_VALUE.toInstant())); return createScyllaQueryRunner(server, ImmutableMap.of(), ImmutableMap.of(), REQUIRED_TPCH_TABLES); diff --git a/plugin/trino-scylla/src/test/java/io/trino/plugin/scylla/TestScyllaConnectorTest.java b/plugin/trino-scylla/src/test/java/io/trino/plugin/scylla/TestScyllaConnectorTest.java new file mode 100644 index 000000000000..6b31f8674bed --- /dev/null +++ b/plugin/trino-scylla/src/test/java/io/trino/plugin/scylla/TestScyllaConnectorTest.java @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.scylla; + +import com.google.common.collect.ImmutableMap; +import io.trino.plugin.cassandra.BaseCassandraConnectorTest; +import io.trino.testing.QueryRunner; + +import java.sql.Timestamp; + +import static io.trino.plugin.cassandra.CassandraTestingUtils.createTestTables; +import static io.trino.plugin.scylla.ScyllaQueryRunner.createScyllaQueryRunner; +import static io.trino.plugin.scylla.TestingScyllaServer.V3_TAG; + +public class TestScyllaConnectorTest + extends BaseCassandraConnectorTest +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + server = closeAfterClass(new TestingScyllaServer(V3_TAG)); + session = server.getSession(); + createTestTables(session, KEYSPACE, Timestamp.from(TIMESTAMP_VALUE.toInstant())); + return createScyllaQueryRunner( + server, + ImmutableMap.of(), + ImmutableMap.of("cassandra.batch-size", "50"), // The default 100 causes 'Batch too large' error + REQUIRED_TPCH_TABLES); + } +} diff --git a/plugin/trino-scylla/src/test/java/io/trino/plugin/scylla/TestScyllaLatestConnectorSmokeTest.java b/plugin/trino-scylla/src/test/java/io/trino/plugin/scylla/TestScyllaLatestConnectorSmokeTest.java new file mode 100644 index 000000000000..a714b8d6fa98 --- /dev/null +++ b/plugin/trino-scylla/src/test/java/io/trino/plugin/scylla/TestScyllaLatestConnectorSmokeTest.java @@ -0,0 +1,39 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.scylla; + +import com.google.common.collect.ImmutableMap; +import io.trino.plugin.cassandra.BaseCassandraConnectorSmokeTest; +import io.trino.plugin.cassandra.CassandraSession; +import io.trino.testing.QueryRunner; + +import java.sql.Timestamp; + +import static io.trino.plugin.cassandra.CassandraTestingUtils.createTestTables; +import static io.trino.plugin.scylla.ScyllaQueryRunner.createScyllaQueryRunner; +import static io.trino.plugin.scylla.TestingScyllaServer.V4_TAG; + +public class TestScyllaLatestConnectorSmokeTest + extends BaseCassandraConnectorSmokeTest +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + TestingScyllaServer server = closeAfterClass(new TestingScyllaServer(V4_TAG)); + CassandraSession session = server.getSession(); + createTestTables(session, KEYSPACE, Timestamp.from(TIMESTAMP_VALUE.toInstant())); + return createScyllaQueryRunner(server, ImmutableMap.of(), ImmutableMap.of(), REQUIRED_TPCH_TABLES); + } +} diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestingScyllaServer.java b/plugin/trino-scylla/src/test/java/io/trino/plugin/scylla/TestingScyllaServer.java similarity index 92% rename from plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestingScyllaServer.java rename to plugin/trino-scylla/src/test/java/io/trino/plugin/scylla/TestingScyllaServer.java index 5888408b06d5..dcdd7977aede 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestingScyllaServer.java +++ b/plugin/trino-scylla/src/test/java/io/trino/plugin/scylla/TestingScyllaServer.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.cassandra; +package io.trino.plugin.scylla; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.CqlSessionBuilder; @@ -21,9 +21,12 @@ import io.airlift.json.JsonCodec; import io.airlift.log.Logger; import io.airlift.units.Duration; +import io.trino.plugin.cassandra.CassandraServer; +import io.trino.plugin.cassandra.CassandraSession; +import io.trino.plugin.cassandra.ExtraColumnMetadata; +import io.trino.plugin.cassandra.SizeEstimate; import org.testcontainers.containers.GenericContainer; -import java.io.Closeable; import java.net.InetSocketAddress; import java.util.List; import java.util.concurrent.TimeoutException; @@ -40,12 +43,14 @@ import static java.util.concurrent.TimeUnit.SECONDS; public class TestingScyllaServer - implements Closeable + implements CassandraServer { private static final Logger log = Logger.get(TestingScyllaServer.class); - private static final int PORT = 9042; + public static final String V4_TAG = "4.5.3"; + public static final String V3_TAG = "3.0.0"; + private static final int PORT = 9042; private static final Duration REFRESH_SIZE_ESTIMATES_TIMEOUT = new Duration(1, MINUTES); private final GenericContainer container; @@ -53,7 +58,7 @@ public class TestingScyllaServer public TestingScyllaServer() { - this("2.2.0"); + this(V3_TAG); } public TestingScyllaServer(String version) @@ -83,21 +88,25 @@ public TestingScyllaServer(String version) new Duration(1, MINUTES)); } + @Override public CassandraSession getSession() { return requireNonNull(session, "session is null"); } + @Override public String getHost() { return container.getHost(); } + @Override public int getPort() { return container.getMappedPort(PORT); } + @Override public void refreshSizeEstimates(String keyspace, String table) throws Exception { diff --git a/pom.xml b/pom.xml index a9662c95012f..1a419636545c 100644 --- a/pom.xml +++ b/pom.xml @@ -100,6 +100,7 @@ plugin/trino-redis plugin/trino-redshift plugin/trino-resource-group-managers + plugin/trino-scylla plugin/trino-session-property-managers plugin/trino-singlestore plugin/trino-sqlserver diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeAllConnectors.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeAllConnectors.java index 0ba9cf6edd05..91e71489f803 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeAllConnectors.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeAllConnectors.java @@ -77,6 +77,7 @@ public void extendEnvironment(Environment.Builder builder) "raptor_legacy", "redis", "redshift", + "scylla", "sqlserver", "trino_thrift", "tpcds") diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeScylla.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeScylla.java new file mode 100644 index 000000000000..61d85450126a --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeScylla.java @@ -0,0 +1,74 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.product.launcher.env.environment; + +import com.google.inject.Inject; +import io.trino.tests.product.launcher.docker.DockerFiles; +import io.trino.tests.product.launcher.docker.DockerFiles.ResourceProvider; +import io.trino.tests.product.launcher.env.DockerContainer; +import io.trino.tests.product.launcher.env.Environment.Builder; +import io.trino.tests.product.launcher.env.EnvironmentProvider; +import io.trino.tests.product.launcher.env.common.StandardMultinode; +import io.trino.tests.product.launcher.env.common.TestsEnvironment; +import io.trino.tests.product.launcher.testcontainers.PortBinder; +import org.testcontainers.containers.startupcheck.IsRunningStartupCheckStrategy; + +import java.time.Duration; + +import static io.trino.tests.product.launcher.docker.ContainerUtil.forSelectedPorts; +import static io.trino.tests.product.launcher.env.EnvironmentContainers.configureTempto; +import static java.util.Objects.requireNonNull; +import static org.testcontainers.utility.MountableFile.forHostPath; + +@TestsEnvironment +public class EnvMultinodeScylla + extends EnvironmentProvider +{ + public static final int SCYLLA_PORT = 9042; + + private final ResourceProvider configDir; + private final PortBinder portBinder; + + @Inject + public EnvMultinodeScylla(StandardMultinode standardMultinode, DockerFiles dockerFiles, PortBinder portBinder) + { + super(standardMultinode); + this.configDir = requireNonNull(dockerFiles, "dockerFiles is null").getDockerFilesHostDirectory("conf/environment/multinode-scylla/"); + this.portBinder = requireNonNull(portBinder, "portBinder is null"); + } + + @Override + public void extendEnvironment(Builder builder) + { + builder.addConnector("scylla", forHostPath(configDir.getPath("scylla.properties"))); + builder.addContainer(createScylla()); + configureTempto(builder, configDir); + } + + private DockerContainer createScylla() + { + DockerContainer container = new DockerContainer("scylladb/scylla:4.6.2", "scylla") + .withEnv("HEAP_NEWSIZE", "128M") + .withEnv("MAX_HEAP_SIZE", "512M") + // Limit SMP to run in a machine having many cores https://github.com/scylladb/scylla/issues/5638 + .withCommand("--smp", "1") + .withStartupCheckStrategy(new IsRunningStartupCheckStrategy()) + .waitingFor(forSelectedPorts(SCYLLA_PORT)) + .withStartupTimeout(Duration.ofMinutes(5)); + + portBinder.exposePort(container, SCYLLA_PORT); + + return container; + } +} diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteScylla.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteScylla.java new file mode 100644 index 000000000000..87eddd7d0a60 --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteScylla.java @@ -0,0 +1,37 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.product.launcher.suite.suites; + +import com.google.common.collect.ImmutableList; +import io.trino.tests.product.launcher.env.EnvironmentConfig; +import io.trino.tests.product.launcher.env.environment.EnvMultinodeScylla; +import io.trino.tests.product.launcher.suite.Suite; +import io.trino.tests.product.launcher.suite.SuiteTestRun; + +import java.util.List; + +import static io.trino.tests.product.launcher.suite.SuiteTestRun.testOnEnvironment; + +public class SuiteScylla + extends Suite +{ + @Override + public List getTestRuns(EnvironmentConfig config) + { + return ImmutableList.of( + testOnEnvironment(EnvMultinodeScylla.class) + .withGroups("configured_features", "scylla") + .build()); + } +} diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-all/scylla.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-all/scylla.properties new file mode 100644 index 000000000000..48d4ab86786e --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-all/scylla.properties @@ -0,0 +1,3 @@ +connector.name=scylla +cassandra.contact-points=host1.invalid,host2.invalid +cassandra.load-policy.dc-aware.local-dc=datacenter1 diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-scylla/scylla.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-scylla/scylla.properties new file mode 100644 index 000000000000..883f48a5452b --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-scylla/scylla.properties @@ -0,0 +1,5 @@ +connector.name=scylla +cassandra.contact-points=scylla +cassandra.allow-drop-table=true +cassandra.load-policy.use-dc-aware=true +cassandra.load-policy.dc-aware.local-dc=datacenter1 diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-scylla/tempto-configuration.yaml b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-scylla/tempto-configuration.yaml new file mode 100644 index 000000000000..cefae6c56579 --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-scylla/tempto-configuration.yaml @@ -0,0 +1,3 @@ +databases: + presto: + jdbc_url: "jdbc:trino://${databases.presto.host}:${databases.presto.port}/scylla/test" diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java index 9bb8110f04a1..802e8960d396 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java @@ -58,6 +58,7 @@ public final class TestGroups public static final String HIVE_COERCION = "hive_coercion"; public static final String AZURE = "azure"; public static final String CASSANDRA = "cassandra"; + public static final String SCYLLA = "scylla"; public static final String SQL_SERVER = "sqlserver"; public static final String LDAP = "ldap"; public static final String LDAP_AND_FILE = "ldap_and_file"; diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/scylla/TestScylla.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/scylla/TestScylla.java new file mode 100644 index 000000000000..99c9bd45f970 --- /dev/null +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/scylla/TestScylla.java @@ -0,0 +1,70 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.product.scylla; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.google.inject.Inject; +import io.trino.tempto.BeforeMethodWithContext; +import io.trino.tempto.ProductTest; +import io.trino.tempto.configuration.Configuration; +import io.trino.tempto.query.QueryResult; +import org.intellij.lang.annotations.Language; +import org.testng.annotations.Test; + +import java.net.InetSocketAddress; + +import static io.trino.tempto.assertions.QueryAssert.Row.row; +import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; +import static io.trino.tests.product.TestGroups.SCYLLA; +import static io.trino.tests.product.utils.QueryExecutors.onTrino; +import static org.assertj.core.api.Assertions.assertThat; + +public class TestScylla + extends ProductTest +{ + @Inject + private Configuration configuration; + + @BeforeMethodWithContext + public void setUp() + { + onScylla("DROP KEYSPACE IF EXISTS test"); + onScylla("CREATE KEYSPACE test WITH replication={'class':'SimpleStrategy', 'replication_factor':1}"); + } + + @Test(groups = {SCYLLA, PROFILE_SPECIFIC_TESTS}) + public void testCreateTableAsSelect() + { + onTrino().executeQuery("DROP TABLE IF EXISTS scylla.test.nation"); + QueryResult result = onTrino().executeQuery("CREATE TABLE scylla.test.nation AS SELECT * FROM tpch.tiny.nation"); + try { + assertThat(result).updatedRowsCountIsEqualTo(25); + assertThat(onTrino().executeQuery("SELECT COUNT(*) FROM scylla.test.nation")) + .containsOnly(row(25)); + } + finally { + onTrino().executeQuery("DROP TABLE scylla.test.nation"); + } + } + + private void onScylla(@Language("SQL") String query) + { + try (CqlSession session = CqlSession.builder() + .addContactPoint(new InetSocketAddress(configuration.getStringMandatory("databases.scylla.host"), configuration.getIntMandatory("databases.scylla.port"))) + .withLocalDatacenter(configuration.getStringMandatory("databases.scylla.local_datacenter")) + .build()) { + session.execute(query); + } + } +} diff --git a/testing/trino-product-tests/src/main/resources/tempto-configuration.yaml b/testing/trino-product-tests/src/main/resources/tempto-configuration.yaml index 839c23c7b112..fdea1ffa5a17 100644 --- a/testing/trino-product-tests/src/main/resources/tempto-configuration.yaml +++ b/testing/trino-product-tests/src/main/resources/tempto-configuration.yaml @@ -136,6 +136,11 @@ databases: request: timeout_seconds: 30 + scylla: + host: scylla + port: 9042 + local_datacenter: datacenter1 + sqlserver: jdbc_driver_class: com.microsoft.sqlserver.jdbc.SQLServerDriver jdbc_url: jdbc:sqlserver://sqlserver;encrypt=false diff --git a/testing/trino-server-dev/etc/catalog/scylla.properties b/testing/trino-server-dev/etc/catalog/scylla.properties new file mode 100644 index 000000000000..e264b15f0548 --- /dev/null +++ b/testing/trino-server-dev/etc/catalog/scylla.properties @@ -0,0 +1,5 @@ +connector.name=scylla +cassandra.contact-points=scylla +cassandra.load-policy.use-dc-aware=true +cassandra.load-policy.dc-aware.local-dc=datacenter1 +cassandra.allow-drop-table=true diff --git a/testing/trino-server-dev/etc/config.properties b/testing/trino-server-dev/etc/config.properties index b786657e8d2b..815d068bc6f9 100644 --- a/testing/trino-server-dev/etc/config.properties +++ b/testing/trino-server-dev/etc/config.properties @@ -34,6 +34,7 @@ plugin.bundles=\ ../../plugin/trino-delta-lake/pom.xml,\ ../../plugin/trino-blackhole/pom.xml,\ ../../plugin/trino-cassandra/pom.xml,\ + ../../plugin/trino-scylla/pom.xml,\ ../../plugin/trino-memory/pom.xml,\ ../../plugin/trino-jmx/pom.xml,\ ../../plugin/trino-raptor-legacy/pom.xml,\