Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/src/main/sphinx/connector/cassandra.rst
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ DATE DATE
DECIMAL DOUBLE
DOUBLE DOUBLE
FLOAT REAL
INET VARCHAR(45)
INET IPADDRESS
INT INTEGER
LIST<?> VARCHAR
MAP<?, ?> VARCHAR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public void configure(Binder binder)
binder.bind(CassandraPageSinkProvider.class).in(Scopes.SINGLETON);
binder.bind(CassandraPartitionManager.class).in(Scopes.SINGLETON);
binder.bind(CassandraSessionProperties.class).in(Scopes.SINGLETON);
binder.bind(CassandraTypeManager.class).in(Scopes.SINGLETON);

configBinder(binder).bindConfig(CassandraClientConfig.class);

Expand Down Expand Up @@ -105,7 +106,7 @@ protected Type _deserialize(String value, DeserializationContext context)

@Singleton
@Provides
public static CassandraSession createCassandraSession(CassandraClientConfig config, JsonCodec<List<ExtraColumnMetadata>> extraColumnMetadataCodec)
public static CassandraSession createCassandraSession(CassandraTypeManager cassandraTypeManager, CassandraClientConfig config, JsonCodec<List<ExtraColumnMetadata>> extraColumnMetadataCodec)
{
requireNonNull(config, "config is null");
requireNonNull(extraColumnMetadataCodec, "extraColumnMetadataCodec is null");
Expand Down Expand Up @@ -168,6 +169,7 @@ public static CassandraSession createCassandraSession(CassandraClientConfig conf
cqlSessionBuilder.withConfigLoader(driverConfigLoaderBuilder.build());

return new CassandraSession(
cassandraTypeManager,
extraColumnMetadataCodec,
() -> {
contactPoints.forEach(contactPoint -> cqlSessionBuilder.addContactPoint(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@

public class CassandraClusteringPredicatesExtractor
{
private final CassandraTypeManager cassandraTypeManager;
private final ClusteringPushDownResult clusteringPushDownResult;
private final TupleDomain<ColumnHandle> predicates;

public CassandraClusteringPredicatesExtractor(List<CassandraColumnHandle> clusteringColumns, TupleDomain<ColumnHandle> predicates, Version cassandraVersion)
public CassandraClusteringPredicatesExtractor(CassandraTypeManager cassandraTypeManager, List<CassandraColumnHandle> clusteringColumns, TupleDomain<ColumnHandle> predicates, Version cassandraVersion)
{
this.cassandraTypeManager = requireNonNull(cassandraTypeManager, "cassandraTypeManager is null");
this.predicates = requireNonNull(predicates, "predicates is null");
this.clusteringPushDownResult = getClusteringKeysSet(clusteringColumns, predicates, requireNonNull(cassandraVersion, "cassandraVersion is null"));
}
Expand All @@ -52,7 +54,7 @@ public TupleDomain<ColumnHandle> getUnenforcedConstraints()
return predicates.filter(((columnHandle, domain) -> !clusteringPushDownResult.hasBeenFullyPushed(columnHandle)));
}

private static ClusteringPushDownResult getClusteringKeysSet(List<CassandraColumnHandle> clusteringColumns, TupleDomain<ColumnHandle> predicates, Version cassandraVersion)
private ClusteringPushDownResult getClusteringKeysSet(List<CassandraColumnHandle> clusteringColumns, TupleDomain<ColumnHandle> predicates, Version cassandraVersion)
{
ImmutableSet.Builder<ColumnHandle> fullyPushedColumnPredicates = ImmutableSet.builder();
ImmutableList.Builder<String> clusteringColumnSql = ImmutableList.builder();
Expand Down Expand Up @@ -101,7 +103,7 @@ private static ClusteringPushDownResult getClusteringKeysSet(List<CassandraColum
}

String inValues = discreteValues.getValues().stream()
.map(columnHandle.getCassandraType()::toCqlLiteral)
.map(value -> cassandraTypeManager.toCqlLiteral(columnHandle.getCassandraType(), value))
.collect(joining(","));
fullyPushedColumnPredicates.add(columnHandle);
return CassandraCqlUtils.validColumnName(columnHandle.getName()) + " IN (" + inValues + " )";
Expand Down Expand Up @@ -132,12 +134,12 @@ private static boolean isInExpressionNotAllowed(List<CassandraColumnHandle> clus
return cassandraVersion.compareTo(Version.parse("2.2.0")) < 0 && currentlyProcessedClusteringColumn != (clusteringColumns.size() - 1);
}

private static String toCqlLiteral(CassandraColumnHandle columnHandle, Object value)
private String toCqlLiteral(CassandraColumnHandle columnHandle, Object value)
{
return columnHandle.getCassandraType().toCqlLiteral(value);
return cassandraTypeManager.toCqlLiteral(columnHandle.getCassandraType(), value);
}

private static String translateRangeIntoCql(CassandraColumnHandle columnHandle, Range range)
private String translateRangeIntoCql(CassandraColumnHandle columnHandle, Range range)
{
if (columnHandle.getCassandraType().getKind() == CassandraType.Kind.TUPLE || columnHandle.getCassandraType().getKind() == CassandraType.Kind.UDT) {
// Building CQL literals for TUPLE and UDT type is not supported
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.MoreCollectors.toOptional;
import static io.trino.plugin.cassandra.CassandraType.toCassandraType;
import static io.trino.plugin.cassandra.util.CassandraCqlUtils.ID_COLUMN_NAME;
import static io.trino.plugin.cassandra.util.CassandraCqlUtils.cqlNameToSqlName;
import static io.trino.plugin.cassandra.util.CassandraCqlUtils.quoteStringLiteral;
Expand All @@ -80,16 +79,19 @@ public class CassandraMetadata
private final boolean allowDropTable;

private final JsonCodec<List<ExtraColumnMetadata>> extraColumnMetadataCodec;
private final CassandraTypeManager cassandraTypeManager;

@Inject
public CassandraMetadata(
CassandraSession cassandraSession,
CassandraPartitionManager partitionManager,
JsonCodec<List<ExtraColumnMetadata>> extraColumnMetadataCodec,
CassandraTypeManager cassandraTypeManager,
CassandraClientConfig config)
{
this.partitionManager = requireNonNull(partitionManager, "partitionManager is null");
this.cassandraSession = requireNonNull(cassandraSession, "cassandraSession is null");
this.cassandraTypeManager = requireNonNull(cassandraTypeManager, "cassandraTypeManager is null");
this.allowDropTable = requireNonNull(config, "config is null").getAllowDropTable();
this.extraColumnMetadataCodec = requireNonNull(extraColumnMetadataCodec, "extraColumnMetadataCodec is null");
}
Expand Down Expand Up @@ -221,6 +223,7 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
}
else {
CassandraClusteringPredicatesExtractor clusteringPredicatesExtractor = new CassandraClusteringPredicatesExtractor(
cassandraTypeManager,
cassandraSession.getTable(handle.getSchemaTableName()).getClusteringKeyColumns(),
partitionResult.getUnenforcedConstraint(),
cassandraSession.getCassandraVersion());
Expand Down Expand Up @@ -316,7 +319,7 @@ private CassandraOutputTableHandle createTable(ConnectorTableMetadata tableMetad
queryBuilder.append(", ")
.append(validColumnName(name))
.append(" ")
.append(toCassandraType(type, cassandraSession.getProtocolVersion()).getName().toLowerCase(ENGLISH));
.append(cassandraTypeManager.toCassandraType(type, cassandraSession.getProtocolVersion()).getName().toLowerCase(ENGLISH));
}
queryBuilder.append(") ");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.datastax.oss.driver.api.querybuilder.term.Term;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.net.InetAddresses;
import com.google.common.primitives.Shorts;
import com.google.common.primitives.SignedBytes;
import io.airlift.slice.Slice;
Expand Down Expand Up @@ -71,6 +72,7 @@
public class CassandraPageSink
implements ConnectorPageSink
{
private final CassandraTypeManager cassandraTypeManager;
private final CassandraSession cassandraSession;
private final PreparedStatement insert;
private final List<Type> columnTypes;
Expand All @@ -80,6 +82,7 @@ public class CassandraPageSink
private final BatchStatementBuilder batchStatement = BatchStatement.builder(DefaultBatchType.LOGGED);

public CassandraPageSink(
CassandraTypeManager cassandraTypeManager,
CassandraSession cassandraSession,
ProtocolVersion protocolVersion,
String schemaName,
Expand All @@ -89,6 +92,7 @@ public CassandraPageSink(
boolean generateUuid,
int batchSize)
{
this.cassandraTypeManager = requireNonNull(cassandraTypeManager, "cassandraTypeManager is null");
this.cassandraSession = requireNonNull(cassandraSession, "cassandraSession");
requireNonNull(schemaName, "schemaName is null");
requireNonNull(tableName, "tableName is null");
Expand Down Expand Up @@ -185,6 +189,9 @@ else if (VARBINARY.equals(type)) {
else if (UuidType.UUID.equals(type)) {
values.add(trinoUuidToJavaUuid(type.getSlice(block, position)));
}
else if (cassandraTypeManager.isIpAddressType(type)) {
values.add(InetAddresses.forString((String) type.getObjectValue(null, block, position)));
}
else {
throw new TrinoException(NOT_SUPPORTED, "Unsupported column type: " + type.getDisplayName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,17 @@
public class CassandraPageSinkProvider
implements ConnectorPageSinkProvider
{
private final CassandraTypeManager cassandraTypeManager;
private final CassandraSession cassandraSession;
private final int batchSize;

@Inject
public CassandraPageSinkProvider(CassandraSession cassandraSession, CassandraClientConfig cassandraClientConfig)
public CassandraPageSinkProvider(
CassandraTypeManager cassandraTypeManager,
CassandraSession cassandraSession,
CassandraClientConfig cassandraClientConfig)
{
this.cassandraTypeManager = requireNonNull(cassandraTypeManager, "cassandraTypeManager is null");
this.cassandraSession = requireNonNull(cassandraSession, "cassandraSession is null");
this.batchSize = requireNonNull(cassandraClientConfig, "cassandraClientConfig is null").getBatchSize();
}
Expand All @@ -46,6 +51,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
CassandraOutputTableHandle handle = (CassandraOutputTableHandle) tableHandle;

return new CassandraPageSink(
cassandraTypeManager,
cassandraSession,
cassandraSession.getProtocolVersion(),
handle.getSchemaName(),
Expand All @@ -64,6 +70,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
CassandraInsertTableHandle handle = (CassandraInsertTableHandle) tableHandle;

return new CassandraPageSink(
cassandraTypeManager,
cassandraSession,
cassandraSession.getProtocolVersion(),
handle.getSchemaName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@ public class CassandraPartitionManager
private static final Logger log = Logger.get(CassandraPartitionManager.class);

private final CassandraSession cassandraSession;
private final CassandraTypeManager cassandraTypeManager;

@Inject
public CassandraPartitionManager(CassandraSession cassandraSession)
public CassandraPartitionManager(CassandraSession cassandraSession, CassandraTypeManager cassandraTypeManager)
{
this.cassandraSession = requireNonNull(cassandraSession, "cassandraSession is null");
this.cassandraTypeManager = requireNonNull(cassandraTypeManager, "cassandraTypeManager is null");
}

public CassandraPartitionResult getPartitions(CassandraTableHandle cassandraTableHandle, TupleDomain<ColumnHandle> tupleDomain)
Expand Down Expand Up @@ -98,7 +100,7 @@ public CassandraPartitionResult getPartitions(CassandraTableHandle cassandraTabl
if (column.isIndexed() && domain.isSingleValue()) {
sb.append(CassandraCqlUtils.validColumnName(column.getName()))
.append(" = ")
.append(column.getCassandraType().toCqlLiteral(entry.getValue().getSingleValue()));
.append(cassandraTypeManager.toCqlLiteral(column.getCassandraType(), entry.getValue().getSingleValue()));
indexedColumns.add(column);
// Only one indexed column predicate can be pushed down.
break;
Expand Down Expand Up @@ -132,7 +134,7 @@ private List<CassandraPartition> getCassandraPartitions(CassandraTable table, Tu
return cassandraSession.getPartitions(table, partitionKeysList);
}

private static List<Set<Object>> getPartitionKeysList(CassandraTable table, TupleDomain<ColumnHandle> tupleDomain)
private List<Set<Object>> getPartitionKeysList(CassandraTable table, TupleDomain<ColumnHandle> tupleDomain)
{
ImmutableList.Builder<Set<Object>> partitionColumnValues = ImmutableList.builder();
for (CassandraColumnHandle columnHandle : table.getPartitionKeyColumns()) {
Expand All @@ -159,7 +161,7 @@ private static List<Set<Object>> getPartitionKeysList(CassandraTable table, Tupl
Object value = range.getSingleValue();

CassandraType valueType = columnHandle.getCassandraType();
if (valueType.isSupportedPartitionKey()) {
if (cassandraTypeManager.isSupportedPartitionKey(valueType.getKind())) {
columnValues.add(value);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ public class CassandraRecordCursor
implements RecordCursor
{
private final List<CassandraType> cassandraTypes;
private final CassandraTypeManager cassandraTypeManager;
private final ResultSet rs;
private Row currentRow;

public CassandraRecordCursor(CassandraSession cassandraSession, List<CassandraType> cassandraTypes, String cql)
public CassandraRecordCursor(CassandraSession cassandraSession, CassandraTypeManager cassandraTypeManager, List<CassandraType> cassandraTypes, String cql)
{
this.cassandraTypes = cassandraTypes;
this.cassandraTypeManager = cassandraTypeManager;
rs = cassandraSession.execute(cql);
currentRow = null;
}
Expand Down Expand Up @@ -126,7 +128,7 @@ public Slice getSlice(int i)
if (getCassandraType(i).getKind() == Kind.TIMESTAMP) {
throw new IllegalArgumentException("Timestamp column can not be accessed with getSlice");
}
NullableValue value = cassandraTypes.get(i).getColumnValue(currentRow, i);
NullableValue value = cassandraTypeManager.getColumnValue(cassandraTypes.get(i), currentRow, i);
if (value.getValue() instanceof Slice) {
return (Slice) value.getValue();
}
Expand All @@ -140,7 +142,7 @@ public Object getObject(int i)
switch (cassandraType.getKind()) {
case TUPLE:
case UDT:
return cassandraType.getColumnValue(currentRow, i).getValue();
return cassandraTypeManager.getColumnValue(cassandraType, currentRow, i).getValue();
default:
throw new IllegalArgumentException("getObject cannot be called for " + cassandraType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ public class CassandraRecordSet
implements RecordSet
{
private final CassandraSession cassandraSession;
private final CassandraTypeManager cassandraTypeManager;
private final String cql;
private final List<CassandraType> cassandraTypes;
private final List<Type> columnTypes;

public CassandraRecordSet(CassandraSession cassandraSession, String cql, List<CassandraColumnHandle> cassandraColumns)
public CassandraRecordSet(CassandraSession cassandraSession, CassandraTypeManager cassandraTypeManager, String cql, List<CassandraColumnHandle> cassandraColumns)
{
this.cassandraSession = requireNonNull(cassandraSession, "cassandraSession is null");
this.cassandraTypeManager = requireNonNull(cassandraTypeManager, "cassandraTypeManager is null");
this.cql = requireNonNull(cql, "cql is null");

requireNonNull(cassandraColumns, "cassandraColumns is null");
Expand All @@ -51,7 +53,7 @@ public List<Type> getColumnTypes()
@Override
public RecordCursor cursor()
{
return new CassandraRecordCursor(cassandraSession, cassandraTypes, cql);
return new CassandraRecordCursor(cassandraSession, cassandraTypeManager, cassandraTypes, cql);
}

private static <T, R> List<R> transformList(List<T> list, Function<T, R> function)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ public class CassandraRecordSetProvider
private static final Logger log = Logger.get(CassandraRecordSetProvider.class);

private final CassandraSession cassandraSession;
private final CassandraTypeManager cassandraTypeManager;

@Inject
public CassandraRecordSetProvider(CassandraSession cassandraSession)
public CassandraRecordSetProvider(CassandraSession cassandraSession, CassandraTypeManager cassandraTypeManager)
{
this.cassandraSession = requireNonNull(cassandraSession, "cassandraSession is null");
this.cassandraTypeManager = requireNonNull(cassandraTypeManager);
}

@Override
Expand All @@ -62,6 +64,6 @@ public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorS
String cql = sb.toString();
log.debug("Creating record set: %s", cql);

return new CassandraRecordSet(cassandraSession, cql, cassandraColumns);
return new CassandraRecordSet(cassandraSession, cassandraTypeManager, cql, cassandraColumns);
}
}
Loading