Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions plugin/trino-cassandra/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>cassandra</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,19 @@
import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.config.ProgrammaticDriverConfigLoaderBuilder;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Resources;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import io.trino.testing.ResourcePresence;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.CassandraContainer;
import org.testcontainers.containers.wait.CassandraQueryWaitStrategy;
import org.testcontainers.utility.DockerImageName;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
Expand All @@ -52,19 +51,17 @@
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testcontainers.containers.CassandraContainer.CQL_PORT;
import static org.testcontainers.utility.MountableFile.forHostPath;

public class CassandraServer
implements Closeable
{
private static final Logger log = Logger.get(CassandraServer.class);

private static final int PORT = 9142;
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CassandraContainer expects 9042 as the default port.


private static final Duration REFRESH_SIZE_ESTIMATES_TIMEOUT = new Duration(1, MINUTES);

private final GenericContainer<?> dockerContainer;
private final CassandraContainer<?> dockerContainer;
private final CassandraSession session;

public CassandraServer()
Expand All @@ -76,18 +73,18 @@ public CassandraServer()
public CassandraServer(String imageName, String configFileName)
throws Exception
{
this(imageName, ImmutableMap.of(), "/etc/cassandra/cassandra.yaml", configFileName);
this(DockerImageName.parse(imageName), ImmutableMap.of(), "/etc/cassandra/cassandra.yaml", configFileName);
}

public CassandraServer(String imageName, Map<String, String> environmentVariables, String configPath, String configFileName)
public CassandraServer(DockerImageName imageName, Map<String, String> environmentVariables, String configPath, String configFileName)
throws Exception
{
log.debug("Starting cassandra...");

this.dockerContainer = new GenericContainer<>(imageName)
.withExposedPorts(PORT)
this.dockerContainer = new CassandraContainer<>(imageName)
.withCopyFileToContainer(forHostPath(prepareCassandraYaml(configFileName)), configPath)
.withEnv(environmentVariables)
.waitingFor(new CassandraQueryWaitStrategy())
.withStartupTimeout(java.time.Duration.ofMinutes(10));
this.dockerContainer.start();

Expand All @@ -99,27 +96,15 @@ public CassandraServer(String imageName, Map<String, String> environmentVariable
driverConfigLoaderBuilder.withStringList(METADATA_SCHEMA_REFRESHED_KEYSPACES, List.of());

CqlSessionBuilder cqlSessionBuilder = CqlSession.builder()
.withApplicationName("TestCluster")
.addContactPoint(new InetSocketAddress(this.dockerContainer.getHost(), this.dockerContainer.getMappedPort(PORT)))
.withLocalDatacenter("datacenter1")
.addContactPoint(dockerContainer.getContactPoint())
.withLocalDatacenter(dockerContainer.getLocalDatacenter())
.withConfigLoader(driverConfigLoaderBuilder.build());

CassandraSession session = new CassandraSession(
this.session = new CassandraSession(
CASSANDRA_TYPE_MANAGER,
JsonCodec.listJsonCodec(ExtraColumnMetadata.class),
cqlSessionBuilder::build,
new Duration(1, MINUTES));

try {
checkConnectivity(session);
}
catch (RuntimeException e) {
session.close();
this.dockerContainer.stop();
throw e;
}

this.session = session;
}

private static String prepareCassandraYaml(String fileName)
Expand Down Expand Up @@ -152,16 +137,7 @@ public String getHost()

public int getPort()
{
return dockerContainer.getMappedPort(PORT);
}

private static void checkConnectivity(CassandraSession session)
Copy link
Copy Markdown
Member Author

@ebyhr ebyhr Nov 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is replaced with CassandraQueryWaitStrategy

{
ResultSet result = session.execute("SELECT release_version FROM system.local");
List<Row> rows = result.all();
assertThat(rows.size()).isEqualTo(1);
String version = rows.get(0).getString(0);
log.debug("Cassandra version: %s", version);
return dockerContainer.getMappedPort(CQL_PORT);
}

public void refreshSizeEstimates(String keyspace, String table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.collect.ImmutableMap;
import io.trino.testing.QueryRunner;
import org.testcontainers.utility.DockerImageName;

import java.sql.Timestamp;
import java.util.Map;
Expand All @@ -30,7 +31,7 @@ protected QueryRunner createQueryRunner()
throws Exception
{
CassandraServer server = closeAfterClass(new CassandraServer(
"datastax/dse-server:6.8.25",
DockerImageName.parse("datastax/dse-server:6.8.25").asCompatibleSubstituteFor("cassandra"),
Map.of(
"DS_LICENSE", "accept",
"DC", "datacenter1"),
Expand Down
2 changes: 0 additions & 2 deletions plugin/trino-cassandra/src/test/resources/cassandra-dse.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ seed_provider:
parameters:
- seeds: "127.0.0.1"

native_transport_port: 9142

read_request_timeout_in_ms: 30000
range_request_timeout_in_ms: 30000
write_request_timeout_in_ms: 30000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ concurrent_counter_writes: 2
concurrent_materialized_view_writes: 2

listen_address: 127.0.0.1
native_transport_port: 9142
rpc_address: localhost
broadcast_rpc_address: localhost

Expand Down
2 changes: 0 additions & 2 deletions plugin/trino-cassandra/src/test/resources/cu-cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,6 @@ ssl_storage_port: 7011
listen_address: 127.0.0.1

start_native_transport: true
# port for the CQL native transport to listen for clients on
native_transport_port: 9142

# Whether to start the thrift rpc server.
start_rpc: false
Expand Down