diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index c029e316dd05..f367138e106e 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -313,6 +313,7 @@ jobs:
!:trino-raptor-legacy,
!:trino-accumulo,
!:trino-cassandra,
+ !:trino-scylla,
!:trino-clickhouse,
!:trino-delta-lake,
!:trino-hive,
@@ -386,6 +387,7 @@ jobs:
- { modules: plugin/trino-raptor-legacy }
- { modules: plugin/trino-accumulo }
- { modules: plugin/trino-cassandra }
+ - { modules: plugin/trino-scylla }
- { modules: plugin/trino-clickhouse }
- { modules: plugin/trino-delta-lake }
- { modules: plugin/trino-delta-lake, profile: test-failure-recovery }
diff --git a/core/trino-server/src/main/provisio/trino.xml b/core/trino-server/src/main/provisio/trino.xml
index f7f8c165bc02..935217b42105 100644
--- a/core/trino-server/src/main/provisio/trino.xml
+++ b/core/trino-server/src/main/provisio/trino.xml
@@ -288,4 +288,10 @@
+
+
+
+
+
+
diff --git a/docs/src/main/sphinx/connector.rst b/docs/src/main/sphinx/connector.rst
index 54cca76b3874..79d3b3804e34 100644
--- a/docs/src/main/sphinx/connector.rst
+++ b/docs/src/main/sphinx/connector.rst
@@ -36,6 +36,7 @@ from different data sources.
Prometheus
Redis
Redshift
+ Scylla
SingleStore (MemSQL)
SQL Server
System
diff --git a/docs/src/main/sphinx/connector/scylla.rst b/docs/src/main/sphinx/connector/scylla.rst
new file mode 100644
index 000000000000..6c36597bd5b9
--- /dev/null
+++ b/docs/src/main/sphinx/connector/scylla.rst
@@ -0,0 +1,38 @@
+================
+Scylla connector
+================
+
+The Scylla connector allows querying data stored in
+`Scylla `_.
+
+Requirements
+------------
+
+To connect to Scylla, you need:
+
+* Scylla version 3.0.0 or higher.
+* Network access from the Trino coordinator and workers to Scylla.
+ Port 9042 is the default port.
+
+Configuration
+-------------
+
+To configure the Scylla connector, create a catalog properties file
+``etc/catalog/scylla.properties`` with the following contents,
+replacing ``host1,host2`` with a comma-separated list of the Scylla
+nodes, used to discovery the cluster topology:
+
+.. code-block:: text
+
+ connector.name=scylla
+ cassandra.contact-points=host1,host2
+
+You also need to set ``cassandra.native-protocol-port``, if your
+Scylla nodes are not using the default port 9042.
+
+Compatibility with Cassandra connector
+--------------------------------------
+
+The Scylla connector is very similar to the Cassandra connector with the
+only difference being the underlying driver.
+See :doc:`Cassandra connector ` for more details.
diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/BaseCassandraConnectorTest.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/BaseCassandraConnectorTest.java
new file mode 100644
index 000000000000..2fcb7a467a4a
--- /dev/null
+++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/BaseCassandraConnectorTest.java
@@ -0,0 +1,1440 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.cassandra;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.primitives.Ints;
+import io.airlift.units.Duration;
+import io.trino.Session;
+import io.trino.spi.type.Type;
+import io.trino.testing.BaseConnectorTest;
+import io.trino.testing.Bytes;
+import io.trino.testing.MaterializedResult;
+import io.trino.testing.MaterializedRow;
+import io.trino.testing.QueryRunner;
+import io.trino.testing.TestingConnectorBehavior;
+import io.trino.testing.assertions.Assert;
+import io.trino.testing.sql.TestTable;
+import org.testng.SkipException;
+import org.testng.annotations.Test;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.Optional;
+
+import static com.datastax.oss.driver.api.core.data.ByteUtils.toHexString;
+import static com.google.common.io.BaseEncoding.base16;
+import static io.trino.plugin.cassandra.CassandraQueryRunner.createCassandraQueryRunner;
+import static io.trino.plugin.cassandra.CassandraQueryRunner.createCassandraSession;
+import static io.trino.plugin.cassandra.TestCassandraTable.clusterColumn;
+import static io.trino.plugin.cassandra.TestCassandraTable.columnsValue;
+import static io.trino.plugin.cassandra.TestCassandraTable.generalColumn;
+import static io.trino.plugin.cassandra.TestCassandraTable.partitionColumn;
+import static io.trino.spi.type.BigintType.BIGINT;
+import static io.trino.spi.type.BooleanType.BOOLEAN;
+import static io.trino.spi.type.DoubleType.DOUBLE;
+import static io.trino.spi.type.IntegerType.INTEGER;
+import static io.trino.spi.type.RealType.REAL;
+import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
+import static io.trino.spi.type.UuidType.UUID;
+import static io.trino.spi.type.VarbinaryType.VARBINARY;
+import static io.trino.spi.type.VarcharType.VARCHAR;
+import static io.trino.spi.type.VarcharType.createUnboundedVarcharType;
+import static io.trino.spi.type.VarcharType.createVarcharType;
+import static io.trino.testing.MaterializedResult.DEFAULT_PRECISION;
+import static io.trino.testing.MaterializedResult.resultBuilder;
+import static io.trino.testing.QueryAssertions.assertContains;
+import static io.trino.testing.QueryAssertions.assertContainsEventually;
+import static java.lang.String.format;
+import static java.util.Comparator.comparing;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.stream.Collectors.toList;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.testng.Assert.assertEquals;
+
+public abstract class BaseCassandraConnectorTest
+ extends BaseConnectorTest
+{
+ protected static final String KEYSPACE = "smoke_test";
+ protected static final Session SESSION = createCassandraSession(KEYSPACE);
+
+ protected static final ZonedDateTime TIMESTAMP_VALUE = ZonedDateTime.of(1970, 1, 1, 3, 4, 5, 0, ZoneId.of("UTC"));
+
+ protected CassandraServer server;
+ protected CassandraSession session;
+
+ @Override
+ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
+ {
+ switch (connectorBehavior) {
+ case SUPPORTS_TRUNCATE:
+ return true;
+
+ case SUPPORTS_CREATE_SCHEMA:
+ return false;
+
+ case SUPPORTS_CREATE_VIEW:
+ return false;
+
+ case SUPPORTS_CREATE_TABLE_WITH_TABLE_COMMENT:
+ case SUPPORTS_CREATE_TABLE_WITH_COLUMN_COMMENT:
+ return false;
+
+ case SUPPORTS_RENAME_TABLE:
+ return false;
+
+ case SUPPORTS_ARRAY:
+ case SUPPORTS_ROW_TYPE:
+ return false;
+
+ case SUPPORTS_ADD_COLUMN:
+ case SUPPORTS_DROP_COLUMN:
+ case SUPPORTS_RENAME_COLUMN:
+ return false;
+
+ case SUPPORTS_COMMENT_ON_TABLE:
+ case SUPPORTS_COMMENT_ON_COLUMN:
+ return false;
+
+ case SUPPORTS_TOPN_PUSHDOWN:
+ return false;
+
+ case SUPPORTS_NOT_NULL_CONSTRAINT:
+ return false;
+
+ case SUPPORTS_DELETE:
+ return true;
+
+ default:
+ return super.hasBehavior(connectorBehavior);
+ }
+ }
+
+ @Override
+ protected QueryRunner createQueryRunner()
+ throws Exception
+ {
+ server = closeAfterClass(new TestingCassandraServer());
+ session = server.getSession();
+ session.execute("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor': 1}");
+ return createCassandraQueryRunner(server, ImmutableMap.of(), ImmutableMap.of(), REQUIRED_TPCH_TABLES);
+ }
+
+ @Override
+ protected TestTable createTableWithDefaultColumns()
+ {
+ throw new SkipException("Cassandra connector does not support column default values");
+ }
+
+ @Override
+ protected Optional filterDataMappingSmokeTestData(DataMappingTestSetup dataMappingTestSetup)
+ {
+ String typeName = dataMappingTestSetup.getTrinoTypeName();
+ if (typeName.equals("time")
+ || typeName.equals("timestamp")
+ || typeName.equals("decimal(5,3)")
+ || typeName.equals("decimal(15,3)")
+ || typeName.equals("char(3)")) {
+ // TODO this should either work or fail cleanly
+ return Optional.empty();
+ }
+ return Optional.of(dataMappingTestSetup);
+ }
+
+ @Override
+ protected Optional filterCaseSensitiveDataMappingTestData(DataMappingTestSetup dataMappingTestSetup)
+ {
+ String typeName = dataMappingTestSetup.getTrinoTypeName();
+ if (typeName.equals("char(1)")) {
+ // TODO this should either work or fail cleanly
+ return Optional.empty();
+ }
+ return Optional.of(dataMappingTestSetup);
+ }
+
+ @Override
+ protected String dataMappingTableName(String trinoTypeName)
+ {
+ return "tmp_trino_" + System.nanoTime();
+ }
+
+ @Test
+ @Override
+ public void testShowColumns()
+ {
+ MaterializedResult actual = computeActual("SHOW COLUMNS FROM orders");
+
+ MaterializedResult expectedParametrizedVarchar = resultBuilder(getSession(), VARCHAR, VARCHAR, VARCHAR, VARCHAR)
+ .row("orderkey", "bigint", "", "")
+ .row("custkey", "bigint", "", "")
+ .row("orderstatus", "varchar", "", "")
+ .row("totalprice", "double", "", "")
+ .row("orderdate", "date", "", "")
+ .row("orderpriority", "varchar", "", "")
+ .row("clerk", "varchar", "", "")
+ .row("shippriority", "integer", "", "")
+ .row("comment", "varchar", "", "")
+ .build();
+
+ Assert.assertEquals(actual, expectedParametrizedVarchar);
+ }
+
+ @Test
+ @Override
+ public void testDescribeTable()
+ {
+ MaterializedResult expectedColumns = resultBuilder(getSession(), VARCHAR, VARCHAR, VARCHAR, VARCHAR)
+ .row("orderkey", "bigint", "", "")
+ .row("custkey", "bigint", "", "")
+ .row("orderstatus", "varchar", "", "")
+ .row("totalprice", "double", "", "")
+ .row("orderdate", "date", "", "")
+ .row("orderpriority", "varchar", "", "")
+ .row("clerk", "varchar", "", "")
+ .row("shippriority", "integer", "", "")
+ .row("comment", "varchar", "", "")
+ .build();
+ MaterializedResult actualColumns = computeActual("DESCRIBE orders");
+ Assert.assertEquals(actualColumns, expectedColumns);
+ }
+
+ @Test
+ @Override
+ public void testShowCreateTable()
+ {
+ assertThat(computeActual("SHOW CREATE TABLE orders").getOnlyValue())
+ .isEqualTo("CREATE TABLE " + getSession().getCatalog().orElseThrow() + ".tpch.orders (\n" +
+ " orderkey bigint,\n" +
+ " custkey bigint,\n" +
+ " orderstatus varchar,\n" +
+ " totalprice double,\n" +
+ " orderdate date,\n" +
+ " orderpriority varchar,\n" +
+ " clerk varchar,\n" +
+ " shippriority integer,\n" +
+ " comment varchar\n" +
+ ")");
+ }
+
+ @Override
+ public void testCharVarcharComparison()
+ {
+ assertThatThrownBy(super::testCharVarcharComparison)
+ .hasMessage("Unsupported type: char(3)");
+ }
+
+ @Test
+ public void testPushdownUuidPartitionKeyPredicate()
+ {
+ try (TestCassandraTable testCassandraTable = testTable(
+ "table_pushdown_uuid_partition_key",
+ ImmutableList.of(partitionColumn("col_uuid", "uuid"), generalColumn("col_text", "text")),
+ ImmutableList.of("00000000-0000-0000-0000-000000000001, 'Trino'"))) {
+ assertThat(query(format("SELECT col_text FROM %s WHERE col_uuid = UUID '00000000-0000-0000-0000-000000000001'", testCassandraTable.getTableName())))
+ .matches("VALUES CAST('Trino' AS varchar)");
+ }
+ }
+
+ @Test
+ public void testPushdownAllTypesPartitionKeyPredicate()
+ {
+ // TODO partition key predicate pushdown for decimal types does not work https://github.com/trinodb/trino/issues/10927
+ try (TestCassandraTable testCassandraTable = testTable(
+ "table_pushdown_all_types_partition_key",
+ ImmutableList.of(
+ partitionColumn("key", "text"),
+ partitionColumn("typeuuid", "uuid"),
+ partitionColumn("typetinyint", "tinyint"),
+ partitionColumn("typesmallint", "smallint"),
+ partitionColumn("typeinteger", "int"),
+ partitionColumn("typelong", "bigint"),
+ generalColumn("typebytes", "blob"),
+ partitionColumn("typedate", "date"),
+ partitionColumn("typetimestamp", "timestamp"),
+ partitionColumn("typeansi", "ascii"),
+ partitionColumn("typeboolean", "boolean"),
+ generalColumn("typedecimal", "decimal"),
+ partitionColumn("typedouble", "double"),
+ partitionColumn("typefloat", "float"),
+ partitionColumn("typeinet", "inet"),
+ partitionColumn("typevarchar", "varchar"),
+ generalColumn("typevarint", "varint"),
+ partitionColumn("typetimeuuid", "timeuuid"),
+ generalColumn("typelist", "frozen >"),
+ generalColumn("typemap", "frozen