Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@
import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.prestosql.spi.type.BigintType.BIGINT;
import static io.prestosql.spi.type.BooleanType.BOOLEAN;
import static io.prestosql.spi.type.DateTimeEncoding.unpackMillisUtc;
import static io.prestosql.spi.type.DateType.DATE;
import static io.prestosql.spi.type.DoubleType.DOUBLE;
import static io.prestosql.spi.type.IntegerType.INTEGER;
import static io.prestosql.spi.type.RealType.REAL;
import static io.prestosql.spi.type.SmallintType.SMALLINT;
import static io.prestosql.spi.type.TimestampType.TIMESTAMP;
import static io.prestosql.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
import static io.prestosql.spi.type.TinyintType.TINYINT;
import static io.prestosql.spi.type.VarbinaryType.VARBINARY;
import static io.prestosql.spi.type.Varchars.isVarcharType;
Expand Down Expand Up @@ -157,8 +158,8 @@ else if (REAL.equals(type)) {
else if (DATE.equals(type)) {
values.add(toCassandraDate.apply(type.getLong(block, position)));
}
else if (TIMESTAMP.equals(type)) {
values.add(new Timestamp(type.getLong(block, position)));
else if (TIMESTAMP_WITH_TIME_ZONE.equals(type)) {
values.add(new Timestamp(unpackMillisUtc(type.getLong(block, position))));
}
else if (isVarcharType(type)) {
values.add(type.getSlice(block, position).toStringUtf8());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
import io.airlift.slice.Slice;
import io.prestosql.spi.connector.RecordCursor;
import io.prestosql.spi.predicate.NullableValue;
import io.prestosql.spi.type.TimeZoneKey;
import io.prestosql.spi.type.Type;

import java.util.List;

import static io.airlift.slice.Slices.utf8Slice;
import static io.prestosql.plugin.cassandra.CassandraType.TIMESTAMP;
import static io.prestosql.spi.type.DateTimeEncoding.packDateTimeWithZone;
import static java.lang.Float.floatToRawIntBits;

public class CassandraRecordCursor
Expand Down Expand Up @@ -103,7 +106,7 @@ public long getLong(int i)
case COUNTER:
return currentRow.getLong(i);
case TIMESTAMP:
return currentRow.getTimestamp(i).getTime();
return packDateTimeWithZone(currentRow.getTimestamp(i).getTime(), TimeZoneKey.UTC_KEY);
case DATE:
return currentRow.getDate(i).getDaysSinceEpoch();
case FLOAT:
Expand All @@ -121,6 +124,9 @@ private CassandraType getCassandraType(int i)
@Override
public Slice getSlice(int i)
{
if (getCassandraType(i) == TIMESTAMP) {
throw new IllegalArgumentException("Timestamp column can not be accessed with getSlice");
}
NullableValue value = cassandraTypes.get(i).getColumnValue(currentRow, i);
if (value.getValue() instanceof Slice) {
return (Slice) value.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
import io.prestosql.spi.type.IntegerType;
import io.prestosql.spi.type.RealType;
import io.prestosql.spi.type.SmallintType;
import io.prestosql.spi.type.TimestampType;
import io.prestosql.spi.type.TimeZoneKey;
import io.prestosql.spi.type.TimestampWithTimeZoneType;
import io.prestosql.spi.type.TinyintType;
import io.prestosql.spi.type.Type;
import io.prestosql.spi.type.VarbinaryType;
Expand All @@ -49,6 +50,8 @@
import static io.airlift.slice.Slices.wrappedBuffer;
import static io.prestosql.plugin.cassandra.util.CassandraCqlUtils.quoteStringLiteral;
import static io.prestosql.plugin.cassandra.util.CassandraCqlUtils.quoteStringLiteralForJson;
import static io.prestosql.spi.type.DateTimeEncoding.packDateTimeWithZone;
import static io.prestosql.spi.type.DateTimeEncoding.unpackMillisUtc;
import static io.prestosql.spi.type.VarcharType.createUnboundedVarcharType;
import static io.prestosql.spi.type.VarcharType.createVarcharType;
import static io.prestosql.spi.type.Varchars.isVarcharType;
Expand All @@ -70,7 +73,7 @@ public enum CassandraType
DECIMAL(DoubleType.DOUBLE),

DATE(DateType.DATE),
TIMESTAMP(TimestampType.TIMESTAMP),
TIMESTAMP(TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE),

ASCII(createUnboundedVarcharType()),
TEXT(createUnboundedVarcharType()),
Expand Down Expand Up @@ -197,7 +200,7 @@ public NullableValue getColumnValue(Row row, int position)
case TIMEUUID:
return NullableValue.of(prestoType, utf8Slice(row.getUUID(position).toString()));
case TIMESTAMP:
return NullableValue.of(prestoType, row.getTimestamp(position).getTime());
return NullableValue.of(prestoType, packDateTimeWithZone(row.getTimestamp(position).getTime(), TimeZoneKey.UTC_KEY));
case DATE:
return NullableValue.of(prestoType, (long) row.getDate(position).getDaysSinceEpoch());
case INET:
Expand Down Expand Up @@ -315,6 +318,10 @@ public String getColumnValueForCql(Row row, int position)
// TODO unify with getColumnValueForCql
public String toCqlLiteral(Object prestoNativeValue)
{
if (this == TIMESTAMP) {
return String.valueOf(unpackMillisUtc((Long) prestoNativeValue));
}

String value;
if (prestoNativeValue instanceof Slice) {
value = ((Slice) prestoNativeValue).toStringUtf8();
Expand Down Expand Up @@ -494,7 +501,7 @@ public static CassandraType toCassandraType(Type type, ProtocolVersion protocolV
if (type.equals(VarbinaryType.VARBINARY)) {
return BLOB;
}
if (type.equals(TimestampType.TIMESTAMP)) {
if (type.equals(TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE)) {
return TIMESTAMP;
}
throw new IllegalArgumentException("unsupported type: " + type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@
import static io.prestosql.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED;
import static io.prestosql.spi.type.BigintType.BIGINT;
import static io.prestosql.spi.type.BooleanType.BOOLEAN;
import static io.prestosql.spi.type.DateTimeEncoding.packDateTimeWithZone;
import static io.prestosql.spi.type.DoubleType.DOUBLE;
import static io.prestosql.spi.type.IntegerType.INTEGER;
import static io.prestosql.spi.type.RealType.REAL;
import static io.prestosql.spi.type.TimestampType.TIMESTAMP;
import static io.prestosql.spi.type.TimeZoneKey.UTC_KEY;
import static io.prestosql.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
import static io.prestosql.spi.type.VarbinaryType.VARBINARY;
import static io.prestosql.spi.type.Varchars.isVarcharType;
import static java.lang.String.format;
Expand Down Expand Up @@ -190,7 +192,7 @@ public void testGetRecords()

assertEquals(cursor.getSlice(columnIndex.get("typeuuid")).toStringUtf8(), format("00000000-0000-0000-0000-%012d", rowId));

assertEquals(cursor.getSlice(columnIndex.get("typetimestamp")).toStringUtf8(), Long.valueOf(DATE.getTime()).toString());
assertEquals(cursor.getLong(columnIndex.get("typetimestamp")), packDateTimeWithZone(DATE.getTime(), UTC_KEY));

long newCompletedBytes = cursor.getCompletedBytes();
assertTrue(newCompletedBytes >= completedBytes);
Expand All @@ -216,7 +218,7 @@ else if (INTEGER.equals(type)) {
else if (BIGINT.equals(type)) {
cursor.getLong(columnIndex);
}
else if (TIMESTAMP.equals(type)) {
else if (TIMESTAMP_WITH_TIME_ZONE.equals(type)) {
cursor.getLong(columnIndex);
}
else if (DOUBLE.equals(type)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ protected Optional<DataMappingTestSetup> filterDataMappingSmokeTestData(DataMapp
{
String typeName = dataMappingTestSetup.getPrestoTypeName();
if (typeName.equals("time")
|| typeName.equals("timestamp(3) with time zone")
|| typeName.equals("timestamp")
|| typeName.equals("decimal(5,3)")
|| typeName.equals("decimal(15,3)")
|| typeName.equals("char(3)")) {
Expand All @@ -167,4 +167,10 @@ protected Optional<DataMappingTestSetup> filterDataMappingSmokeTestData(DataMapp
}
return Optional.of(dataMappingTestSetup);
}

@Override
protected String dataMappingTableName(String prestoTypeName)
{
return "presto_tmp_" + System.nanoTime();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.List;

import static com.datastax.driver.core.utils.Bytes.toRawHexString;
Expand All @@ -48,7 +49,7 @@
import static io.prestosql.spi.type.DoubleType.DOUBLE;
import static io.prestosql.spi.type.IntegerType.INTEGER;
import static io.prestosql.spi.type.RealType.REAL;
import static io.prestosql.spi.type.TimestampType.TIMESTAMP;
import static io.prestosql.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
import static io.prestosql.spi.type.VarbinaryType.VARBINARY;
import static io.prestosql.spi.type.VarcharType.VARCHAR;
import static io.prestosql.spi.type.VarcharType.createUnboundedVarcharType;
Expand All @@ -74,9 +75,7 @@ public class TestCassandraIntegrationSmokeTest
private static final String KEYSPACE = "smoke_test";
private static final Session SESSION = createCassandraSession(KEYSPACE);

private static final Timestamp DATE_TIME_LOCAL = Timestamp.valueOf(LocalDateTime.of(1970, 1, 1, 3, 4, 5, 0));
// TODO should match DATE_TIME_LOCAL after https://github.com/prestosql/presto/issues/37
private static final LocalDateTime TIMESTAMP_LOCAL = LocalDateTime.of(1969, 12, 31, 23, 4, 5);
private static final ZonedDateTime TIMESTAMP_VALUE = ZonedDateTime.of(1970, 1, 1, 3, 4, 5, 0, ZoneId.of("UTC"));

private CassandraServer server;
private CassandraSession session;
Expand All @@ -87,7 +86,7 @@ protected QueryRunner createQueryRunner()
{
server = new CassandraServer();
session = server.getSession();
createTestTables(session, KEYSPACE, DATE_TIME_LOCAL);
createTestTables(session, KEYSPACE, Timestamp.from(TIMESTAMP_VALUE.toInstant()));
return createCassandraQueryRunner(server, CUSTOMER, NATION, ORDERS, REGION);
}

Expand Down Expand Up @@ -143,7 +142,7 @@ public void testPartitionKeyPredicate()
" AND typeinteger = 7" +
" AND typelong = 1007" +
" AND typebytes = from_hex('" + toRawHexString(ByteBuffer.wrap(Ints.toByteArray(7))) + "')" +
" AND typetimestamp = TIMESTAMP '1969-12-31 23:04:05'" +
" AND typetimestamp = TIMESTAMP '1970-01-01 03:04:05Z'" +
" AND typeansi = 'ansi 7'" +
" AND typeboolean = false" +
" AND typedecimal = 128.0" +
Expand Down Expand Up @@ -301,17 +300,17 @@ public void testClusteringKeyPushdownInequality()
assertEquals(execute(sql).getRowCount(), 4);
sql = "SELECT * FROM " + TABLE_CLUSTERING_KEYS_INEQUALITY + " WHERE key='key_1' AND clust_one='clust_one' AND clust_two=2";
assertEquals(execute(sql).getRowCount(), 1);
sql = "SELECT * FROM " + TABLE_CLUSTERING_KEYS_INEQUALITY + " WHERE key='key_1' AND clust_one='clust_one' AND clust_two=2 AND clust_three = timestamp '1969-12-31 23:04:05.020'";
sql = "SELECT * FROM " + TABLE_CLUSTERING_KEYS_INEQUALITY + " WHERE key='key_1' AND clust_one='clust_one' AND clust_two=2 AND clust_three = timestamp '1970-01-01 03:04:05.020Z'";
assertEquals(execute(sql).getRowCount(), 1);
sql = "SELECT * FROM " + TABLE_CLUSTERING_KEYS_INEQUALITY + " WHERE key='key_1' AND clust_one='clust_one' AND clust_two=2 AND clust_three = timestamp '1969-12-31 23:04:05.010'";
sql = "SELECT * FROM " + TABLE_CLUSTERING_KEYS_INEQUALITY + " WHERE key='key_1' AND clust_one='clust_one' AND clust_two=2 AND clust_three = timestamp '1970-01-01 03:04:05.010Z'";
assertEquals(execute(sql).getRowCount(), 0);
sql = "SELECT * FROM " + TABLE_CLUSTERING_KEYS_INEQUALITY + " WHERE key='key_1' AND clust_one='clust_one' AND clust_two IN (1,2)";
assertEquals(execute(sql).getRowCount(), 2);
sql = "SELECT * FROM " + TABLE_CLUSTERING_KEYS_INEQUALITY + " WHERE key='key_1' AND clust_one='clust_one' AND clust_two > 1 AND clust_two < 3";
assertEquals(execute(sql).getRowCount(), 1);
sql = "SELECT * FROM " + TABLE_CLUSTERING_KEYS_INEQUALITY + " WHERE key='key_1' AND clust_one='clust_one' AND clust_two=2 AND clust_three >= timestamp '1969-12-31 23:04:05.010' AND clust_three <= timestamp '1969-12-31 23:04:05.020'";
sql = "SELECT * FROM " + TABLE_CLUSTERING_KEYS_INEQUALITY + " WHERE key='key_1' AND clust_one='clust_one' AND clust_two=2 AND clust_three >= timestamp '1970-01-01 03:04:05.010Z' AND clust_three <= timestamp '1970-01-01 03:04:05.020Z'";
assertEquals(execute(sql).getRowCount(), 1);
sql = "SELECT * FROM " + TABLE_CLUSTERING_KEYS_INEQUALITY + " WHERE key='key_1' AND clust_one='clust_one' AND clust_two IN (1,2) AND clust_three >= timestamp '1969-12-31 23:04:05.010' AND clust_three <= timestamp '1969-12-31 23:04:05.020'";
sql = "SELECT * FROM " + TABLE_CLUSTERING_KEYS_INEQUALITY + " WHERE key='key_1' AND clust_one='clust_one' AND clust_two IN (1,2) AND clust_three >= timestamp '1970-01-01 03:04:05.010Z' AND clust_three <= timestamp '1970-01-01 03:04:05.020Z'";
assertEquals(execute(sql).getRowCount(), 2);
sql = "SELECT * FROM " + TABLE_CLUSTERING_KEYS_INEQUALITY + " WHERE key='key_1' AND clust_one='clust_one' AND clust_two IN (1,2,3) AND clust_two < 2";
assertEquals(execute(sql).getRowCount(), 1);
Expand Down Expand Up @@ -562,7 +561,7 @@ public void testInsert()
"1, " +
"1000, " +
"null, " +
"timestamp '1970-01-01 08:34:05.0', " +
"timestamp '1970-01-01 08:34:05.0Z', " +
"'ansi1', " +
"true, " +
"null, " +
Expand All @@ -586,7 +585,7 @@ public void testInsert()
1,
1000L,
null,
LocalDateTime.of(1970, 1, 1, 8, 34, 5),
ZonedDateTime.of(1970, 1, 1, 8, 34, 5, 0, ZoneId.of("UTC")),
"ansi1",
true,
null,
Expand Down Expand Up @@ -666,7 +665,7 @@ private void assertSelect(String tableName, boolean createdByPresto)
INTEGER,
BIGINT,
VARBINARY,
TIMESTAMP,
TIMESTAMP_WITH_TIME_ZONE,
createUnboundedVarcharType(),
BOOLEAN,
DOUBLE,
Expand All @@ -691,7 +690,7 @@ private void assertSelect(String tableName, boolean createdByPresto)
rowNumber,
rowNumber + 1000L,
ByteBuffer.wrap(Ints.toByteArray(rowNumber)),
TIMESTAMP_LOCAL,
TIMESTAMP_VALUE,
"ansi " + rowNumber,
rowNumber % 2 == 0,
Math.pow(2, rowNumber),
Expand Down
2 changes: 1 addition & 1 deletion presto-docs/src/main/sphinx/connector/cassandra.rst
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ MAP<?, ?> VARCHAR
SET<?> VARCHAR
SMALLINT SMALLINT
TEXT VARCHAR
TIMESTAMP TIMESTAMP
TIMESTAMP TIMESTAMP(3) WITH TIME ZONE
TIMEUUID VARCHAR
TINYINT TINYINT
VARCHAR VARCHAR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
import java.net.Inet4Address;
import java.net.UnknownHostException;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.UUID;

Expand All @@ -52,24 +53,52 @@ private DataTypesTableDefinition() {}
RelationalDataSource dataSource = () -> {
try {
return ImmutableList.of(
ImmutableList.of("\0", Long.MIN_VALUE, Bytes.fromHexString("0x00"), false,
BigDecimal.ZERO, Double.MIN_VALUE, LocalDate.fromYearMonthDay(1970, 1, 2), Float.MIN_VALUE, ImmutableSet.of(0),
Inet4Address.getByName("0.0.0.0"), Integer.MIN_VALUE, ImmutableList.of(0),
ImmutableMap.of("a", 0, "\0", Integer.MIN_VALUE), ImmutableSet.of(0), Short.MIN_VALUE,
"\0", Byte.MIN_VALUE, Timestamp.valueOf(LocalDateTime.of(1970, 1, 1, 0, 0)),
ImmutableList.of(
"\0",
Long.MIN_VALUE,
Bytes.fromHexString("0x00"),
false,
BigDecimal.ZERO,
Double.MIN_VALUE,
LocalDate.fromYearMonthDay(1970, 1, 2),
Float.MIN_VALUE,
ImmutableSet.of(0),
Inet4Address.getByName("0.0.0.0"),
Integer.MIN_VALUE,
ImmutableList.of(0),
ImmutableMap.of("a", 0, "\0", Integer.MIN_VALUE),
ImmutableSet.of(0),
Short.MIN_VALUE,
"\0",
Byte.MIN_VALUE,
Timestamp.from(OffsetDateTime.of(1970, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC).toInstant()),
UUID.fromString("d2177dd0-eaa2-11de-a572-001b779c76e3"),
UUID.fromString("01234567-0123-0123-0123-0123456789ab"),
"\0", BigInteger.valueOf(Long.MIN_VALUE)),
ImmutableList.of("the quick brown fox jumped over the lazy dog", Long.MAX_VALUE,
Bytes.fromHexString("0x3031323334"), true,
"\0",
BigInteger.valueOf(Long.MIN_VALUE)),
ImmutableList.of(
"the quick brown fox jumped over the lazy dog",
Long.MAX_VALUE,
Bytes.fromHexString("0x3031323334"),
true,
BigDecimal.valueOf(parseDouble("99999999999999999999999999999999999999")),
Double.MAX_VALUE, LocalDate.fromYearMonthDay(9999, 12, 31), Float.MAX_VALUE, ImmutableSet.of(4, 5, 6, 7),
Inet4Address.getByName("255.255.255.255"), Integer.MAX_VALUE,
ImmutableList.of(4, 5, 6), ImmutableMap.of("a", 1, "b", 2), ImmutableSet.of(4, 5, 6), Short.MAX_VALUE,
"this is a text value", Byte.MAX_VALUE, Timestamp.valueOf(LocalDateTime.of(9999, 12, 31, 23, 59, 59)),
Double.MAX_VALUE,
LocalDate.fromYearMonthDay(9999, 12, 31),
Float.MAX_VALUE,
ImmutableSet.of(4, 5, 6, 7),
Inet4Address.getByName("255.255.255.255"),
Integer.MAX_VALUE,
ImmutableList.of(4, 5, 6),
ImmutableMap.of("a", 1, "b", 2),
ImmutableSet.of(4, 5, 6),
Short.MAX_VALUE,
"this is a text value",
Byte.MAX_VALUE,
Timestamp.from(OffsetDateTime.of(9999, 12, 31, 23, 59, 59, 0, ZoneOffset.UTC).toInstant()),
UUID.fromString("d2177dd0-eaa2-11de-a572-001b779c76e3"),
UUID.fromString("01234567-0123-0123-0123-0123456789ab"),
"abc", BigInteger.valueOf(Long.MAX_VALUE)),
"abc",
BigInteger.valueOf(Long.MAX_VALUE)),
Arrays.asList(new Object[] {"def", null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null, null, null, null, null, null, null})
).iterator();
Expand Down
Loading