diff --git a/plugin/trino-cassandra/pom.xml b/plugin/trino-cassandra/pom.xml index e125d36ecf2b..303e37525b9a 100644 --- a/plugin/trino-cassandra/pom.xml +++ b/plugin/trino-cassandra/pom.xml @@ -240,6 +240,12 @@ test + + org.testcontainers + cassandra + test + + org.testcontainers testcontainers diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/CassandraServer.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/CassandraServer.java index 7c2ee1b906be..611efa200a9d 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/CassandraServer.java +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/CassandraServer.java @@ -18,20 +18,19 @@ import com.datastax.oss.driver.api.core.ProtocolVersion; import com.datastax.oss.driver.api.core.config.DriverConfigLoader; import com.datastax.oss.driver.api.core.config.ProgrammaticDriverConfigLoaderBuilder; -import com.datastax.oss.driver.api.core.cql.ResultSet; -import com.datastax.oss.driver.api.core.cql.Row; import com.google.common.collect.ImmutableMap; import com.google.common.io.Resources; import io.airlift.json.JsonCodec; import io.airlift.log.Logger; import io.airlift.units.Duration; import io.trino.testing.ResourcePresence; -import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.CassandraContainer; +import org.testcontainers.containers.wait.CassandraQueryWaitStrategy; +import org.testcontainers.utility.DockerImageName; import java.io.Closeable; import java.io.File; import java.io.IOException; -import java.net.InetSocketAddress; import java.nio.file.Path; import java.util.List; import java.util.Map; @@ -52,7 +51,7 @@ import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.assertj.core.api.Assertions.assertThat; +import static org.testcontainers.containers.CassandraContainer.CQL_PORT; import static org.testcontainers.utility.MountableFile.forHostPath; public class CassandraServer @@ -60,11 +59,9 @@ public class CassandraServer { private static final Logger log = Logger.get(CassandraServer.class); - private static final int PORT = 9142; - private static final Duration REFRESH_SIZE_ESTIMATES_TIMEOUT = new Duration(1, MINUTES); - private final GenericContainer dockerContainer; + private final CassandraContainer dockerContainer; private final CassandraSession session; public CassandraServer() @@ -76,18 +73,18 @@ public CassandraServer() public CassandraServer(String imageName, String configFileName) throws Exception { - this(imageName, ImmutableMap.of(), "/etc/cassandra/cassandra.yaml", configFileName); + this(DockerImageName.parse(imageName), ImmutableMap.of(), "/etc/cassandra/cassandra.yaml", configFileName); } - public CassandraServer(String imageName, Map environmentVariables, String configPath, String configFileName) + public CassandraServer(DockerImageName imageName, Map environmentVariables, String configPath, String configFileName) throws Exception { log.debug("Starting cassandra..."); - this.dockerContainer = new GenericContainer<>(imageName) - .withExposedPorts(PORT) + this.dockerContainer = new CassandraContainer<>(imageName) .withCopyFileToContainer(forHostPath(prepareCassandraYaml(configFileName)), configPath) .withEnv(environmentVariables) + .waitingFor(new CassandraQueryWaitStrategy()) .withStartupTimeout(java.time.Duration.ofMinutes(10)); this.dockerContainer.start(); @@ -99,27 +96,15 @@ public CassandraServer(String imageName, Map environmentVariable driverConfigLoaderBuilder.withStringList(METADATA_SCHEMA_REFRESHED_KEYSPACES, List.of()); CqlSessionBuilder cqlSessionBuilder = CqlSession.builder() - .withApplicationName("TestCluster") - .addContactPoint(new InetSocketAddress(this.dockerContainer.getHost(), this.dockerContainer.getMappedPort(PORT))) - .withLocalDatacenter("datacenter1") + .addContactPoint(dockerContainer.getContactPoint()) + .withLocalDatacenter(dockerContainer.getLocalDatacenter()) .withConfigLoader(driverConfigLoaderBuilder.build()); - CassandraSession session = new CassandraSession( + this.session = new CassandraSession( CASSANDRA_TYPE_MANAGER, JsonCodec.listJsonCodec(ExtraColumnMetadata.class), cqlSessionBuilder::build, new Duration(1, MINUTES)); - - try { - checkConnectivity(session); - } - catch (RuntimeException e) { - session.close(); - this.dockerContainer.stop(); - throw e; - } - - this.session = session; } private static String prepareCassandraYaml(String fileName) @@ -152,16 +137,7 @@ public String getHost() public int getPort() { - return dockerContainer.getMappedPort(PORT); - } - - private static void checkConnectivity(CassandraSession session) - { - ResultSet result = session.execute("SELECT release_version FROM system.local"); - List rows = result.all(); - assertThat(rows.size()).isEqualTo(1); - String version = rows.get(0).getString(0); - log.debug("Cassandra version: %s", version); + return dockerContainer.getMappedPort(CQL_PORT); } public void refreshSizeEstimates(String keyspace, String table) diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestDatastaxConnectorSmokeTest.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestDatastaxConnectorSmokeTest.java index a784aeee5f2f..032639e31995 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestDatastaxConnectorSmokeTest.java +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestDatastaxConnectorSmokeTest.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableMap; import io.trino.testing.QueryRunner; +import org.testcontainers.utility.DockerImageName; import java.sql.Timestamp; import java.util.Map; @@ -30,7 +31,7 @@ protected QueryRunner createQueryRunner() throws Exception { CassandraServer server = closeAfterClass(new CassandraServer( - "datastax/dse-server:6.8.25", + DockerImageName.parse("datastax/dse-server:6.8.25").asCompatibleSubstituteFor("cassandra"), Map.of( "DS_LICENSE", "accept", "DC", "datacenter1"), diff --git a/plugin/trino-cassandra/src/test/resources/cassandra-dse.yaml b/plugin/trino-cassandra/src/test/resources/cassandra-dse.yaml index d92c3c73d413..d9e3e6976cd0 100644 --- a/plugin/trino-cassandra/src/test/resources/cassandra-dse.yaml +++ b/plugin/trino-cassandra/src/test/resources/cassandra-dse.yaml @@ -10,8 +10,6 @@ seed_provider: parameters: - seeds: "127.0.0.1" -native_transport_port: 9142 - read_request_timeout_in_ms: 30000 range_request_timeout_in_ms: 30000 write_request_timeout_in_ms: 30000 diff --git a/plugin/trino-cassandra/src/test/resources/cu-cassandra-latest.yaml b/plugin/trino-cassandra/src/test/resources/cu-cassandra-latest.yaml index 5123f75c0f5e..9013e04645c9 100644 --- a/plugin/trino-cassandra/src/test/resources/cu-cassandra-latest.yaml +++ b/plugin/trino-cassandra/src/test/resources/cu-cassandra-latest.yaml @@ -22,7 +22,6 @@ concurrent_counter_writes: 2 concurrent_materialized_view_writes: 2 listen_address: 127.0.0.1 -native_transport_port: 9142 rpc_address: localhost broadcast_rpc_address: localhost diff --git a/plugin/trino-cassandra/src/test/resources/cu-cassandra.yaml b/plugin/trino-cassandra/src/test/resources/cu-cassandra.yaml index 1bb73b383190..122daa13db27 100644 --- a/plugin/trino-cassandra/src/test/resources/cu-cassandra.yaml +++ b/plugin/trino-cassandra/src/test/resources/cu-cassandra.yaml @@ -260,8 +260,6 @@ ssl_storage_port: 7011 listen_address: 127.0.0.1 start_native_transport: true -# port for the CQL native transport to listen for clients on -native_transport_port: 9142 # Whether to start the thrift rpc server. start_rpc: false