diff --git a/pom.xml b/pom.xml index 5494a3d4..80470136 100644 --- a/pom.xml +++ b/pom.xml @@ -188,9 +188,9 @@ - com.datastax.cassandra - cassandra-driver-core - 3.8.0 + com.datastax.oss + java-driver-core + 4.14.0 diff --git a/tempto-core/pom.xml b/tempto-core/pom.xml index f2686d97..94f53e27 100644 --- a/tempto-core/pom.xml +++ b/tempto-core/pom.xml @@ -142,8 +142,8 @@ - com.datastax.cassandra - cassandra-driver-core + com.datastax.oss + java-driver-core diff --git a/tempto-core/src/main/java/io/trino/tempto/internal/fulfillment/table/cassandra/CassandraBatchLoader.java b/tempto-core/src/main/java/io/trino/tempto/internal/fulfillment/table/cassandra/CassandraBatchLoader.java index a7a04c2a..48e99988 100644 --- a/tempto-core/src/main/java/io/trino/tempto/internal/fulfillment/table/cassandra/CassandraBatchLoader.java +++ b/tempto-core/src/main/java/io/trino/tempto/internal/fulfillment/table/cassandra/CassandraBatchLoader.java @@ -13,9 +13,10 @@ */ package io.trino.tempto.internal.fulfillment.table.cassandra; -import com.datastax.driver.core.BatchStatement; -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.Session; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder; +import com.datastax.oss.driver.api.core.cql.BatchType; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; import java.util.Iterator; import java.util.List; @@ -28,12 +29,12 @@ public class CassandraBatchLoader { - private final Session session; + private final CqlSession session; private final String insertQuery; private final int columnsCount; private final int batchRowsCount; - public CassandraBatchLoader(Session session, String tableName, List columnNames, int batchRowsCount) + public CassandraBatchLoader(CqlSession session, String tableName, List columnNames, int batchRowsCount) { this.session = requireNonNull(session, "session is null"); requireNonNull(tableName, "tableName is null"); @@ -67,24 +68,25 @@ public void load(Iterator> rows) { PreparedStatement statement = session.prepare(insertQuery); - BatchStatement batch = createBatchStatement(); + BatchStatementBuilder batch = createBatchStatement(); while (rows.hasNext()) { - if (batch.size() >= batchRowsCount) { - session.execute(batch); - batch = createBatchStatement(); + if (batch.getStatementsCount() >= batchRowsCount) { + session.execute(batch.build()); + batch.clearStatements(); } List row = rows.next(); checkState(row.size() == columnsCount, "values count in a row is expected to be %d, but found: %d", columnsCount, row.size()); - batch.add(statement.bind(row.toArray())); + batch.addStatement(statement.bind(row.toArray())); } - if (batch.size() > 0) { - session.execute(batch); + if (batch.getStatementsCount() > 0) { + session.execute(batch.build()); + batch.clearStatements(); } } - private static BatchStatement createBatchStatement() + private static BatchStatementBuilder createBatchStatement() { - return new BatchStatement(BatchStatement.Type.UNLOGGED); + return new BatchStatementBuilder(BatchType.UNLOGGED); } } diff --git a/tempto-core/src/main/java/io/trino/tempto/internal/query/CassandraQueryExecutor.java b/tempto-core/src/main/java/io/trino/tempto/internal/query/CassandraQueryExecutor.java index 3e91935d..82e25d55 100644 --- a/tempto-core/src/main/java/io/trino/tempto/internal/query/CassandraQueryExecutor.java +++ b/tempto-core/src/main/java/io/trino/tempto/internal/query/CassandraQueryExecutor.java @@ -13,27 +13,30 @@ */ package io.trino.tempto.internal.query; -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.ColumnDefinitions; -import com.datastax.driver.core.ColumnMetadata; -import com.datastax.driver.core.DataType; -import com.datastax.driver.core.KeyspaceMetadata; -import com.datastax.driver.core.Metadata; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.Row; -import com.datastax.driver.core.Session; -import com.datastax.driver.core.TableMetadata; +import com.datastax.oss.driver.api.core.CqlIdentifier; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.CqlSessionBuilder; +import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.metadata.Metadata; +import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; +import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata; +import com.datastax.oss.driver.api.core.type.DataType; +import com.datastax.oss.driver.api.core.type.DataTypes; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.trino.tempto.configuration.Configuration; import io.trino.tempto.query.QueryExecutionException; import io.trino.tempto.query.QueryResult; +import java.net.InetSocketAddress; import java.sql.JDBCType; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.stream.StreamSupport; -import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.Lists.newArrayList; import static java.lang.String.format; import static java.util.stream.Collectors.toList; @@ -41,30 +44,25 @@ public class CassandraQueryExecutor implements AutoCloseable { - private static final Map typeMapping; - private final Cluster cluster; - private Session session; - - static { - typeMapping = ImmutableMap.builder() - .put(DataType.ascii(), JDBCType.VARCHAR) - .put(DataType.bigint(), JDBCType.BIGINT) - .put(DataType.blob(), JDBCType.BLOB) - .put(DataType.cboolean(), JDBCType.BOOLEAN) - .put(DataType.counter(), JDBCType.BIGINT) - .put(DataType.date(), JDBCType.DATE) - .put(DataType.decimal(), JDBCType.DECIMAL) - .put(DataType.cdouble(), JDBCType.DOUBLE) - .put(DataType.cfloat(), JDBCType.REAL) - .put(DataType.cint(), JDBCType.INTEGER) - .put(DataType.smallint(), JDBCType.SMALLINT) - //.put(DataType.text(), JDBCType.NVARCHAR) - .put(DataType.time(), JDBCType.TIME) - .put(DataType.timestamp(), JDBCType.TIMESTAMP) - .put(DataType.tinyint(), JDBCType.TINYINT) - .put(DataType.varchar(), JDBCType.VARCHAR) - .build(); - } + private static final Map typeMapping = ImmutableMap.builder() + .put(DataTypes.ASCII, JDBCType.VARCHAR) + .put(DataTypes.BIGINT, JDBCType.BIGINT) + .put(DataTypes.BLOB, JDBCType.BLOB) + .put(DataTypes.BOOLEAN, JDBCType.BOOLEAN) + .put(DataTypes.COUNTER, JDBCType.BIGINT) + .put(DataTypes.DATE, JDBCType.DATE) + .put(DataTypes.DECIMAL, JDBCType.DECIMAL) + .put(DataTypes.DOUBLE, JDBCType.DOUBLE) + .put(DataTypes.FLOAT, JDBCType.REAL) + .put(DataTypes.INT, JDBCType.INTEGER) + .put(DataTypes.SMALLINT, JDBCType.SMALLINT) + .put(DataTypes.TIME, JDBCType.TIME) + .put(DataTypes.TIMESTAMP, JDBCType.TIMESTAMP) + .put(DataTypes.TINYINT, JDBCType.TINYINT) + .put(DataTypes.TEXT, JDBCType.VARCHAR) + .build(); + private final CqlSession session; + public static class TypeNotSupportedException extends IllegalStateException @@ -77,25 +75,23 @@ public static class TypeNotSupportedException public CassandraQueryExecutor(Configuration configuration) { - cluster = Cluster.builder() - .addContactPoint(configuration.getStringMandatory("databases.cassandra.host")) - .withPort(configuration.getIntMandatory("databases.cassandra.port")) - .build(); + CqlSessionBuilder sessionBuilder = CqlSession.builder() + .addContactPoint(new InetSocketAddress(configuration.getStringMandatory("databases.cassandra.host"), configuration.getIntMandatory("databases.cassandra.port"))); + configuration.getString("databases.cassandra.local_datacenter").ifPresent(sessionBuilder::withLocalDatacenter); + session = sessionBuilder.build(); } public QueryResult executeQuery(String sql) throws QueryExecutionException { - ensureConnected(); - ResultSet rs = session.execute(sql); - List definitions = rs.getColumnDefinitions().asList(); - List types = definitions.stream() + ColumnDefinitions definitions = rs.getColumnDefinitions(); + List types = StreamSupport.stream(definitions.spliterator(), false) .map(definition -> getJDBCType(definition.getType())) .collect(toList()); - List columnNames = definitions.stream() - .map(ColumnDefinitions.Definition::getName) + List columnNames = StreamSupport.stream(definitions.spliterator(), false) + .map(columnDefinition -> columnDefinition.getName().asInternal()) .collect(toList()); QueryResult.QueryResultBuilder resultBuilder = new QueryResult.QueryResultBuilder(types, columnNames); @@ -103,7 +99,7 @@ public QueryResult executeQuery(String sql) for (Row row : rs) { List builderRow = newArrayList(); for (int i = 0; i < types.size(); ++i) { - builderRow.add(row.getToken(i).getValue()); + builderRow.add(row.getObject(i)); } resultBuilder.addRow(builderRow); } @@ -111,53 +107,39 @@ public QueryResult executeQuery(String sql) return resultBuilder.build(); } - public Session getSession() + public CqlSession getSession() { return session; } public List getColumnNames(String keySpace, String tableName) { - checkState(tableExists(keySpace, tableName), "table %s.%s does not exist", keySpace, tableName); - KeyspaceMetadata keyspaceMetadata = session.getCluster().getMetadata().getKeyspace(keySpace); - TableMetadata tableMetadata = keyspaceMetadata.getTable(tableName); - return tableMetadata.getColumns().stream().map(ColumnMetadata::getName).collect(toList()); + KeyspaceMetadata keyspaceMetadata = session.getMetadata().getKeyspace(keySpace) + .orElseThrow(() -> new IllegalArgumentException(format("keyspace %s does not exist", keySpace))); + TableMetadata tableMetadata = keyspaceMetadata.getTable(tableName) + .orElseThrow(() -> new IllegalArgumentException(format("table %s.%s does not exist", keySpace, tableName))); + return tableMetadata.getColumns().keySet().stream().map(CqlIdentifier::asInternal).collect(toList()); } public boolean tableExists(String keySpace, String tableName) { - KeyspaceMetadata keyspaceMetadata = cluster.getMetadata().getKeyspace(keySpace); - if (keyspaceMetadata == null) { - return false; - } - return keyspaceMetadata.getTable(tableName) != null; + Optional keyspaceMetadata = session.getMetadata().getKeyspace(keySpace); + return keyspaceMetadata.map(metadata -> metadata.getTable(tableName).isPresent()).orElse(false); } public List getTableNames(String keySpace) { - Metadata clusterMetadata = cluster.getMetadata(); - KeyspaceMetadata keyspaceMetadata = clusterMetadata.getKeyspace(keySpace); - if (keyspaceMetadata == null) { - return ImmutableList.of(); - } - return keyspaceMetadata.getTables().stream() - .map(TableMetadata::getName) - .collect(toList()); + Metadata clusterMetadata = session.getMetadata(); + Optional keyspaceMetadata = clusterMetadata.getKeyspace(keySpace); + return keyspaceMetadata.map(metadata -> metadata.getTables().keySet().stream() + .map(CqlIdentifier::asInternal) + .collect(toList())).orElseGet(ImmutableList::of); } @Override public void close() { - cluster.close(); - } - - private void ensureConnected() - { - checkState(!cluster.isClosed(), "Trying to connect using closed Cluster"); - - if (session == null || session.isClosed()) { - session = cluster.connect(); - } + session.close(); } private static JDBCType getJDBCType(DataType type) diff --git a/tempto-examples/docker/trino-server/etc/catalog/cassandra.properties b/tempto-examples/docker/trino-server/etc/catalog/cassandra.properties index 3c51fbcb..7cfa98be 100644 --- a/tempto-examples/docker/trino-server/etc/catalog/cassandra.properties +++ b/tempto-examples/docker/trino-server/etc/catalog/cassandra.properties @@ -1,2 +1,3 @@ connector.name=cassandra cassandra.contact-points=cassandra +cassandra.load-policy.dc-aware.local-dc=datacenter1 \ No newline at end of file diff --git a/tempto-examples/src/main/resources/tempto-configuration.yaml b/tempto-examples/src/main/resources/tempto-configuration.yaml index d3cf78cd..1908b676 100644 --- a/tempto-examples/src/main/resources/tempto-configuration.yaml +++ b/tempto-examples/src/main/resources/tempto-configuration.yaml @@ -69,6 +69,7 @@ databases: cassandra: host: ${cluster.cassandra} port: 9042 + local_datacenter: datacenter1 default_schema: test skip_create_schema: false table_manager_type: cassandra