Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ public Optional<TableFunctionApplicationResult<ConnectorTableHandle>> applyTable
return Optional.empty();
}

CassandraTableHandle tableHandle = queryHandle.getTableHandle();
CassandraTableHandle tableHandle = queryHandle.tableHandle();
List<ColumnHandle> columnHandles = getColumnHandles(((CassandraQueryRelationHandle) tableHandle.relationHandle()).getQuery());
return Optional.of(new TableFunctionApplicationResult<>(tableHandle, columnHandles));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,43 +88,29 @@ public long getReadTimeNanos()
public double getDouble(int i)
{
String columnName = validColumnName(columnNames.get(i));
switch (getCassandraType(i).kind()) {
case DOUBLE:
return currentRow.getDouble(columnName);
case FLOAT:
return currentRow.getFloat(columnName);
case DECIMAL:
return currentRow.getBigDecimal(columnName).doubleValue();
default:
throw new IllegalStateException("Cannot retrieve double for " + getCassandraType(i));
}
return switch (getCassandraType(i).kind()) {
case DOUBLE -> currentRow.getDouble(columnName);
case FLOAT -> currentRow.getFloat(columnName);
case DECIMAL -> currentRow.getBigDecimal(columnName).doubleValue();
default -> throw new IllegalStateException("Cannot retrieve double for " + getCassandraType(i));
};
}

@Override
public long getLong(int i)
{
String columnName = validColumnName(columnNames.get(i));
switch (getCassandraType(i).kind()) {
case INT:
return currentRow.getInt(columnName);
case SMALLINT:
return currentRow.getShort(columnName);
case TINYINT:
return currentRow.getByte(columnName);
case BIGINT:
case COUNTER:
return currentRow.getLong(columnName);
case TIME:
return currentRow.getLocalTime(columnName).toNanoOfDay() * PICOSECONDS_PER_NANOSECOND;
case TIMESTAMP:
return packDateTimeWithZone(currentRow.getInstant(columnName).toEpochMilli(), TimeZoneKey.UTC_KEY);
case DATE:
return currentRow.getLocalDate(columnName).toEpochDay();
case FLOAT:
return floatToRawIntBits(currentRow.getFloat(columnName));
default:
throw new IllegalStateException("Cannot retrieve long for " + getCassandraType(i));
}
return switch (getCassandraType(i).kind()) {
case INT -> currentRow.getInt(columnName);
case SMALLINT -> currentRow.getShort(columnName);
case TINYINT -> currentRow.getByte(columnName);
case BIGINT, COUNTER -> currentRow.getLong(columnName);
case TIME -> currentRow.getLocalTime(columnName).toNanoOfDay() * PICOSECONDS_PER_NANOSECOND;
case TIMESTAMP -> packDateTimeWithZone(currentRow.getInstant(columnName).toEpochMilli(), TimeZoneKey.UTC_KEY);
case DATE -> currentRow.getLocalDate(columnName).toEpochDay();
case FLOAT -> floatToRawIntBits(currentRow.getFloat(columnName));
default -> throw new IllegalStateException("Cannot retrieve long for " + getCassandraType(i));
};
}

private CassandraType getCassandraType(int i)
Expand All @@ -149,13 +135,10 @@ public Slice getSlice(int i)
public Object getObject(int i)
{
CassandraType cassandraType = cassandraTypes.get(i);
switch (cassandraType.kind()) {
case TUPLE:
case UDT:
return cassandraTypeManager.getColumnValue(cassandraType, currentRow, currentRow.firstIndexOf(validColumnName(columnNames.get(i)))).getValue();
default:
throw new IllegalArgumentException("getObject cannot be called for " + cassandraType);
}
return switch (cassandraType.kind()) {
case TUPLE, UDT -> cassandraTypeManager.getColumnValue(cassandraType, currentRow, currentRow.firstIndexOf(validColumnName(columnNames.get(i)))).getValue();
default -> throw new IllegalArgumentException("getObject cannot be called for " + cassandraType);
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
*/
package io.trino.plugin.cassandra;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.spi.HostAddress;
Expand All @@ -29,46 +27,15 @@
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;

public class CassandraSplit
public record CassandraSplit(String partitionId, String splitCondition, List<HostAddress> addresses)
implements ConnectorSplit
{
private static final int INSTANCE_SIZE = instanceSize(CassandraSplit.class);

private final String partitionId;
private final List<HostAddress> addresses;
private final String splitCondition;

@JsonCreator
public CassandraSplit(
@JsonProperty("partitionId") String partitionId,
@JsonProperty("splitCondition") String splitCondition,
@JsonProperty("addresses") List<HostAddress> addresses)
public CassandraSplit
{
requireNonNull(partitionId, "partitionId is null");
requireNonNull(addresses, "addresses is null");

this.partitionId = partitionId;
this.addresses = ImmutableList.copyOf(addresses);
this.splitCondition = splitCondition;
}

@JsonProperty
public String getSplitCondition()
{
return splitCondition;
}

@JsonProperty
public String getPartitionId()
{
return partitionId;
}

@JsonProperty
@Override
public List<HostAddress> getAddresses()
{
return addresses;
addresses = ImmutableList.copyOf(addresses);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ private List<ConnectorSplit> getSplitsByTokenRange(CassandraTable table, String
ImmutableList.Builder<ConnectorSplit> builder = ImmutableList.builder();
List<CassandraTokenSplitManager.TokenSplit> tokenSplits = tokenSplitMgr.getSplits(schema, tableName, sessionSplitsPerNode);
for (CassandraTokenSplitManager.TokenSplit tokenSplit : tokenSplits) {
String condition = buildTokenCondition(tokenExpression, tokenSplit.getTokenRange());
List<HostAddress> addresses = new HostAddressFactory().hostAddressNamesToHostAddressList(tokenSplit.getHosts());
String condition = buildTokenCondition(tokenExpression, tokenSplit.tokenRange());
List<HostAddress> addresses = new HostAddressFactory().hostAddressNamesToHostAddressList(tokenSplit.hosts());
CassandraSplit split = new CassandraSplit(partitionId, condition, addresses);
builder.add(split);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,25 +149,12 @@ private static TokenSplit createSplit(TokenRange range, List<String> endpoints)
return new TokenSplit(range, endpoints);
}

public static class TokenSplit
public record TokenSplit(TokenRange tokenRange, List<String> hosts)
{
private final TokenRange tokenRange;
private final List<String> hosts;

public TokenSplit(TokenRange tokenRange, List<String> hosts)
{
this.tokenRange = requireNonNull(tokenRange, "tokenRange is null");
this.hosts = ImmutableList.copyOf(requireNonNull(hosts, "hosts is null"));
}

public TokenRange getTokenRange()
{
return tokenRange;
}

public List<String> getHosts()
public TokenSplit
{
return hosts;
requireNonNull(tokenRange, "tokenRange is null");
hosts = ImmutableList.copyOf(requireNonNull(hosts, "hosts is null"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,30 +493,14 @@ public boolean isFullySupported(DataType dataType)
return false;
}

if (dataType instanceof UserDefinedType userDefinedType) {
return userDefinedType.getFieldTypes().stream()
.allMatch(this::isFullySupported);
}

if (dataType instanceof MapType mapType) {
return Arrays.stream(new DataType[] {mapType.getKeyType(), mapType.getValueType()})
.allMatch(this::isFullySupported);
}

if (dataType instanceof ListType listType) {
return isFullySupported(listType.getElementType());
}

if (dataType instanceof TupleType tupleType) {
return tupleType.getComponentTypes().stream()
.allMatch(this::isFullySupported);
}

if (dataType instanceof SetType setType) {
return isFullySupported(setType.getElementType());
}

return true;
return switch (dataType) {
case UserDefinedType userDefinedType -> userDefinedType.getFieldTypes().stream().allMatch(this::isFullySupported);
case MapType mapType -> Arrays.stream(new DataType[] {mapType.getKeyType(), mapType.getValueType()}).allMatch(this::isFullySupported);
case ListType listType -> isFullySupported(listType.getElementType());
case TupleType tupleType -> tupleType.getComponentTypes().stream().allMatch(this::isFullySupported);
case SetType setType -> isFullySupported(setType.getElementType());
default -> true;
};
}

public CassandraType toCassandraType(Type type, ProtocolVersion protocolVersion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
*/
package io.trino.plugin.cassandra.ptf;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
Expand Down Expand Up @@ -125,21 +123,12 @@ public TableFunctionAnalysis analyze(
}
}

public static class QueryHandle
public record QueryHandle(CassandraTableHandle tableHandle)
implements ConnectorTableFunctionHandle
{
private final CassandraTableHandle tableHandle;

@JsonCreator
public QueryHandle(@JsonProperty("tableHandle") CassandraTableHandle tableHandle)
{
this.tableHandle = requireNonNull(tableHandle, "tableHandle is null");
}

@JsonProperty
public CassandraTableHandle getTableHandle()
public QueryHandle
{
return tableHandle;
requireNonNull(tableHandle, "tableHandle is null");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,17 +196,19 @@ protected MaterializedResult getDescribeOrdersResult()
public void testShowCreateTable()
{
assertThat(computeActual("SHOW CREATE TABLE orders").getOnlyValue())
.isEqualTo("CREATE TABLE cassandra.tpch.orders (\n" +
" orderkey bigint,\n" +
" custkey bigint,\n" +
" orderstatus varchar,\n" +
" totalprice double,\n" +
" orderdate date,\n" +
" orderpriority varchar,\n" +
" clerk varchar,\n" +
" shippriority integer,\n" +
" comment varchar\n" +
")");
.isEqualTo(
"""
CREATE TABLE cassandra.tpch.orders (
orderkey bigint,
custkey bigint,
orderstatus varchar,
totalprice double,
orderdate date,
orderpriority varchar,
clerk varchar,
shippriority integer,
comment varchar
)""");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public void testJsonRoundTrip()
String json = codec.toJson(expected);
CassandraSplit actual = codec.fromJson(json);

assertThat(actual.getSplitCondition()).isEqualTo(expected.getSplitCondition());
assertThat(actual.getAddresses()).isEqualTo(expected.getAddresses());
assertThat(actual.splitCondition()).isEqualTo(expected.splitCondition());
assertThat(actual.addresses()).isEqualTo(expected.addresses());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ PRIMARY KEY(partition_key, clustering_key))
try (ConnectorSplitSource splitSource = splitManager.getSplits(null, null, tableHandle, null, null)) {
List<ConnectorSplit> splits = splitSource.getNextBatch(100).get().getSplits();
assertThat(splits).hasSize(2);
assertThat(((CassandraSplit) splits.get(0)).getPartitionId()).isEqualTo("\"partition_key\" in (0,1)");
assertThat(((CassandraSplit) splits.get(1)).getPartitionId()).isEqualTo("\"partition_key\" in (2)");
assertThat(((CassandraSplit) splits.get(0)).partitionId()).isEqualTo("\"partition_key\" in (0,1)");
assertThat(((CassandraSplit) splits.get(1)).partitionId()).isEqualTo("\"partition_key\" in (2)");
}

session.execute(format("DROP TABLE %s.%s", KEYSPACE, tableName));
Expand Down