Skip to content

Commit 0acd038

Browse files
committed
Add support for tuple type in the Cassandra connector
This adds support to read columns of type tuple from Cassandra as VARCHAR. Remaining unsupported types will now throw a NotSupportedException error instead of failing a null pointer check.
1 parent ba56db9 commit 0acd038

File tree

7 files changed

+49
-20
lines changed

7 files changed

+49
-20
lines changed

presto-cassandra/src/main/java/com/facebook/presto/cassandra/CassandraType.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
import com.datastax.driver.core.LocalDate;
1818
import com.datastax.driver.core.ProtocolVersion;
1919
import com.datastax.driver.core.Row;
20+
import com.datastax.driver.core.TupleValue;
2021
import com.datastax.driver.core.utils.Bytes;
2122
import com.facebook.presto.cassandra.util.CassandraCqlUtils;
23+
import com.facebook.presto.common.NotSupportedException;
2224
import com.facebook.presto.common.predicate.NullableValue;
2325
import com.facebook.presto.common.type.BigintType;
2426
import com.facebook.presto.common.type.BooleanType;
@@ -54,6 +56,7 @@
5456
import static io.airlift.slice.Slices.wrappedBuffer;
5557
import static java.lang.Float.floatToRawIntBits;
5658
import static java.lang.Float.intBitsToFloat;
59+
import static java.lang.String.format;
5760
import static java.util.Objects.requireNonNull;
5861

5962
public enum CassandraType
@@ -81,7 +84,8 @@ public enum CassandraType
8184
VARINT(createUnboundedVarcharType(), BigInteger.class),
8285
LIST(createUnboundedVarcharType(), null),
8386
MAP(createUnboundedVarcharType(), null),
84-
SET(createUnboundedVarcharType(), null);
87+
SET(createUnboundedVarcharType(), null),
88+
TUPLE(createUnboundedVarcharType(), null);
8589

8690
private static class Constants
8791
{
@@ -162,14 +166,16 @@ public static CassandraType getCassandraType(DataType.Name name)
162166
return TIMEUUID;
163167
case TINYINT:
164168
return TINYINT;
169+
case TUPLE:
170+
return TUPLE;
165171
case UUID:
166172
return UUID;
167173
case VARCHAR:
168174
return VARCHAR;
169175
case VARINT:
170176
return VARINT;
171177
default:
172-
return null;
178+
throw new NotSupportedException(format("Unsupported Cassandra type: %s", name));
173179
}
174180
}
175181

@@ -231,6 +237,9 @@ public static NullableValue getColumnValue(Row row, int position, CassandraType
231237
case MAP:
232238
checkTypeArguments(cassandraType, 2, typeArguments);
233239
return NullableValue.of(nativeType, utf8Slice(buildMapValue(row, position, typeArguments.get(0), typeArguments.get(1))));
240+
case TUPLE:
241+
TupleValue tupleValue = row.getTupleValue(position);
242+
return NullableValue.of(nativeType, utf8Slice(tupleValue.toString()));
234243
default:
235244
throw new IllegalStateException("Handling of type " + cassandraType
236245
+ " is not implemented");

presto-cassandra/src/main/java/com/facebook/presto/cassandra/NativeCassandraSession.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.datastax.driver.core.Session;
2929
import com.datastax.driver.core.Statement;
3030
import com.datastax.driver.core.TableMetadata;
31+
import com.datastax.driver.core.TableOptionsMetadata;
3132
import com.datastax.driver.core.TokenRange;
3233
import com.datastax.driver.core.VersionNumber;
3334
import com.datastax.driver.core.exceptions.NoHostAvailableException;
@@ -65,6 +66,7 @@
6566
import java.util.HashSet;
6667
import java.util.List;
6768
import java.util.Map;
69+
import java.util.Optional;
6870
import java.util.Set;
6971
import java.util.concurrent.ExecutionException;
7072
import java.util.stream.Stream;
@@ -216,10 +218,10 @@ public CassandraTable getTable(SchemaTableName schemaTableName)
216218
}
217219

218220
// check if there is a comment to establish column ordering
219-
String comment = tableMeta.getOptions().getComment();
221+
Optional<String> comment = Optional.ofNullable(tableMeta.getOptions()).map(TableOptionsMetadata::getComment);
220222
Set<String> hiddenColumns = ImmutableSet.of();
221-
if (comment != null && comment.startsWith(PRESTO_COMMENT_METADATA)) {
222-
String columnOrderingString = comment.substring(PRESTO_COMMENT_METADATA.length());
223+
if (comment.isPresent() && comment.get().startsWith(PRESTO_COMMENT_METADATA)) {
224+
String columnOrderingString = comment.get().substring(PRESTO_COMMENT_METADATA.length());
223225

224226
// column ordering
225227
List<ExtraColumnMetadata> extras = extraColumnMetadataCodec.fromJson(columnOrderingString);
@@ -361,7 +363,7 @@ private CassandraColumnHandle buildColumnHandle(AbstractTableMetadata tableMetad
361363
{
362364
CassandraType cassandraType = CassandraType.getCassandraType(columnMeta.getType().getName());
363365
List<CassandraType> typeArguments = null;
364-
if (cassandraType != null && cassandraType.getTypeArgumentSize() > 0) {
366+
if (cassandraType.getTypeArgumentSize() > 0) {
365367
List<DataType> typeArgs = columnMeta.getType().getTypeArguments();
366368
switch (cassandraType.getTypeArgumentSize()) {
367369
case 1:

presto-cassandra/src/test/java/com/facebook/presto/cassandra/CassandraServer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package com.facebook.presto.cassandra;
1515

1616
import com.datastax.driver.core.Cluster;
17+
import com.datastax.driver.core.Metadata;
1718
import com.datastax.driver.core.ResultSet;
1819
import com.datastax.driver.core.Row;
1920
import com.facebook.airlift.json.JsonCodec;
@@ -58,6 +59,7 @@ public class CassandraServer
5859
private final GenericContainer<?> dockerContainer;
5960

6061
private final CassandraSession session;
62+
private final Metadata metadata;
6163

6264
public CassandraServer()
6365
throws Exception
@@ -82,6 +84,7 @@ public CassandraServer()
8284
JsonCodec.listJsonCodec(ExtraColumnMetadata.class),
8385
cluster,
8486
new Duration(1, MINUTES));
87+
this.metadata = cluster.getMetadata();
8588

8689
try {
8790
checkConnectivity(session);
@@ -117,6 +120,11 @@ public CassandraSession getSession()
117120
return requireNonNull(session, "cluster is null");
118121
}
119122

123+
public Metadata getMetadata()
124+
{
125+
return metadata;
126+
}
127+
120128
public String getHost()
121129
{
122130
return dockerContainer.getContainerIpAddress();

presto-cassandra/src/test/java/com/facebook/presto/cassandra/CassandraTestingUtils.java

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
*/
1414
package com.facebook.presto.cassandra;
1515

16+
import com.datastax.driver.core.DataType;
17+
import com.datastax.driver.core.Metadata;
18+
import com.datastax.driver.core.TupleType;
1619
import com.datastax.driver.core.querybuilder.Insert;
1720
import com.datastax.driver.core.querybuilder.QueryBuilder;
1821
import com.facebook.presto.spi.SchemaTableName;
@@ -42,12 +45,12 @@ public class CassandraTestingUtils
4245

4346
private CassandraTestingUtils() {}
4447

45-
public static void createTestTables(CassandraSession cassandraSession, String keyspace, Date date)
48+
public static void createTestTables(CassandraSession cassandraSession, Metadata metadata, String keyspace, Date date)
4649
{
4750
createKeyspace(cassandraSession, keyspace);
48-
createTableAllTypes(cassandraSession, new SchemaTableName(keyspace, TABLE_ALL_TYPES), date, 9);
49-
createTableAllTypes(cassandraSession, new SchemaTableName(keyspace, TABLE_ALL_TYPES_INSERT), date, 0);
50-
createTableAllTypesPartitionKey(cassandraSession, new SchemaTableName(keyspace, TABLE_ALL_TYPES_PARTITION_KEY), date);
51+
createTableAllTypes(cassandraSession, metadata, new SchemaTableName(keyspace, TABLE_ALL_TYPES), date, 9);
52+
createTableAllTypes(cassandraSession, metadata, new SchemaTableName(keyspace, TABLE_ALL_TYPES_INSERT), date, 0);
53+
createTableAllTypesPartitionKey(cassandraSession, metadata, new SchemaTableName(keyspace, TABLE_ALL_TYPES_PARTITION_KEY), date);
5154
createTableClusteringKeys(cassandraSession, new SchemaTableName(keyspace, TABLE_CLUSTERING_KEYS), 9);
5255
createTableClusteringKeys(cassandraSession, new SchemaTableName(keyspace, TABLE_CLUSTERING_KEYS_LARGE), 1000);
5356
createTableMultiPartitionClusteringKeys(cassandraSession, new SchemaTableName(keyspace, TABLE_MULTI_PARTITION_CLUSTERING_KEYS));
@@ -142,7 +145,7 @@ public static void insertIntoTableClusteringKeysInequality(CassandraSession sess
142145
assertEquals(session.execute("SELECT COUNT(*) FROM " + table).all().get(0).getLong(0), rowsCount);
143146
}
144147

145-
public static void createTableAllTypes(CassandraSession session, SchemaTableName table, Date date, int rowsCount)
148+
public static void createTableAllTypes(CassandraSession session, Metadata metadata, SchemaTableName table, Date date, int rowsCount)
146149
{
147150
session.execute("DROP TABLE IF EXISTS " + table);
148151
session.execute("CREATE TABLE " + table + " (" +
@@ -164,11 +167,12 @@ public static void createTableAllTypes(CassandraSession session, SchemaTableName
164167
" typelist list<text>, " +
165168
" typemap map<int, bigint>, " +
166169
" typeset set<boolean>, " +
170+
" typetuple tuple<bigint, varchar>, " +
167171
")");
168-
insertTestData(session, table, date, rowsCount);
172+
insertTestData(session, metadata, table, date, rowsCount);
169173
}
170174

171-
public static void createTableAllTypesPartitionKey(CassandraSession session, SchemaTableName table, Date date)
175+
public static void createTableAllTypesPartitionKey(CassandraSession session, Metadata metadata, SchemaTableName table, Date date)
172176
{
173177
session.execute("DROP TABLE IF EXISTS " + table);
174178

@@ -191,6 +195,7 @@ public static void createTableAllTypesPartitionKey(CassandraSession session, Sch
191195
" typelist frozen <list<text>>, " +
192196
" typemap frozen <map<int, bigint>>, " +
193197
" typeset frozen <set<boolean>>, " +
198+
" typetuple frozen <tuple<bigint, varchar>>, " +
194199
" PRIMARY KEY ((" +
195200
" key, " +
196201
" typeuuid, " +
@@ -213,15 +218,18 @@ public static void createTableAllTypesPartitionKey(CassandraSession session, Sch
213218
// TODO: NOT YET SUPPORTED AS A PARTITION KEY
214219
" typelist, " +
215220
" typemap, " +
216-
" typeset" +
221+
" typeset," +
222+
" typetuple" +
217223
" ))" +
218224
")");
219225

220-
insertTestData(session, table, date, 9);
226+
insertTestData(session, metadata, table, date, 9);
221227
}
222228

223-
private static void insertTestData(CassandraSession session, SchemaTableName table, Date date, int rowsCount)
229+
private static void insertTestData(CassandraSession session, Metadata metadata, SchemaTableName table, Date date, int rowsCount)
224230
{
231+
TupleType tupleType = metadata.newTupleType(DataType.bigint(), DataType.varchar());
232+
225233
for (int rowNumber = 1; rowNumber <= rowsCount; rowNumber++) {
226234
Insert insert = QueryBuilder.insertInto(table.getSchemaName(), table.getTableName())
227235
.value("key", "key " + rowNumber)
@@ -241,7 +249,8 @@ private static void insertTestData(CassandraSession session, SchemaTableName tab
241249
.value("typetimeuuid", UUID.fromString(String.format("d2177dd0-eaa2-11de-a572-001b779c76e%d", rowNumber)))
242250
.value("typelist", ImmutableList.of("list-value-1" + rowNumber, "list-value-2" + rowNumber))
243251
.value("typemap", ImmutableMap.of(rowNumber, rowNumber + 1L, rowNumber + 2, rowNumber + 3L))
244-
.value("typeset", ImmutableSet.of(false, true));
252+
.value("typeset", ImmutableSet.of(false, true))
253+
.value("typetuple", tupleType.newValue((long) rowNumber, "row=" + rowNumber));
245254

246255
session.execute(insert);
247256
}

presto-cassandra/src/test/java/com/facebook/presto/cassandra/TestCassandraConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public void setup()
110110
this.server = new CassandraServer();
111111

112112
String keyspace = "test_connector";
113-
createTestTables(server.getSession(), keyspace, DATE);
113+
createTestTables(server.getSession(), server.getMetadata(), keyspace, DATE);
114114

115115
String connectorId = "cassandra-test";
116116
CassandraConnectorFactory connectorFactory = new CassandraConnectorFactory(connectorId);

presto-cassandra/src/test/java/com/facebook/presto/cassandra/TestCassandraIntegrationSmokeTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ protected QueryRunner createQueryRunner()
9696
CassandraServer server = new CassandraServer();
9797
this.server = server;
9898
this.session = server.getSession();
99-
createTestTables(session, KEYSPACE, DATE_TIME_LOCAL);
99+
createTestTables(session, server.getMetadata(), KEYSPACE, DATE_TIME_LOCAL);
100100
return createCassandraQueryRunner(server);
101101
}
102102

presto-docs/src/main/sphinx/connector/cassandra.rst

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ SET<?> VARCHAR
220220
TEXT VARCHAR
221221
TIMESTAMP TIMESTAMP
222222
TIMEUUID VARCHAR
223+
TUPLE VARCHAR
223224
VARCHAR VARCHAR
224225
VARINT VARCHAR
225226
SMALLINT INTEGER
@@ -230,7 +231,7 @@ DATE DATE
230231
Any collection (LIST/MAP/SET) can be designated as FROZEN, and the value is
231232
mapped to VARCHAR. Additionally, blobs have the limitation that they cannot be empty.
232233

233-
Types not mentioned in the table above are not supported (e.g. tuple or UDT).
234+
Data types not listed in the table above, such as UDT, are not supported.
234235

235236
Partition keys can only be of the following types:
236237

0 commit comments

Comments
 (0)