Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions api/src/main/java/org/apache/iceberg/types/Conversions.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.util.UUIDUtil;
Expand Down Expand Up @@ -71,6 +73,14 @@ public static Object fromPartitionString(Type type, String asString) {
return new BigDecimal(asString);
case DATE:
return Literal.of(asString).to(Types.DateType.get()).value();
case TIMESTAMP:
if (!asString.contains("T")) {
Instant instant = java.sql.Timestamp.valueOf(asString).toInstant();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to do this conversion without using java.sql.Timestamp?

return TimeUnit.SECONDS.toMicros(instant.getEpochSecond()) +
TimeUnit.NANOSECONDS.toMicros(instant.getNano());
} else {
return Literal.of(asString).to(Types.TimestampType.withoutZone()).value();
}
default:
throw new UnsupportedOperationException(
"Unsupported type for fromPartitionString: " + type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.File;
import java.io.IOException;
import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -33,8 +34,11 @@
import org.apache.avro.io.DatumWriter;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
Expand Down Expand Up @@ -381,6 +385,25 @@ public void addDataPartitionedByDateToPartitioned() {
sql("SELECT id, name, dept, date FROM %s ORDER BY id", tableName));
}

@Test
public void addDataPartitionedByTimestampToPartitioned() {
createTsPartitionedFileTable("parquet");

String createIceberg =
"CREATE TABLE %s (id Integer, name String, dept String, ts Timestamp) USING iceberg PARTITIONED BY (ts)";

sql(createIceberg, tableName);

Object result = scalarSql("CALL %s.system.add_files('%s', '`parquet`.`%s`')",
catalogName, tableName, fileTableDir.getAbsolutePath());

Assert.assertEquals(4L, result);

assertEquals("Iceberg table contains correct data",
sql("SELECT id, name, dept, ts FROM %s ORDER BY id", sourceTableName),
sql("SELECT id, name, dept, ts FROM %s ORDER BY id", tableName));
}

@Test
public void addFilteredPartitionsToPartitioned() {
createCompositePartitionedTable("parquet");
Expand Down Expand Up @@ -819,6 +842,27 @@ private static java.sql.Date toDate(String value) {
RowFactory.create(4, "Will Doe", "facilities", toDate("2021-01-02"))),
new StructType(dateStruct)).repartition(2);

private static final StructField[] timeStampstruct = {
new StructField("id", DataTypes.IntegerType, true, Metadata.empty()),
new StructField("name", DataTypes.StringType, true, Metadata.empty()),
new StructField("dept", DataTypes.StringType, true, Metadata.empty()),
new StructField("ts", DataTypes.TimestampType, true, Metadata.empty())
};

private static Timestamp toTimestamp(String value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I see. I was wrong before. This is actually producing a Timestamp because that's what Spark's public row API uses. Instead, could you use SparkSQL and literal values to avoid doing this conversion manually? I think that will make for a much more reliable test.

Literal<Long> timestamp = Literal.of(value).to(Types.TimestampType.withoutZone());
return Timestamp.valueOf(DateTimeUtil.timestampFromMicros(timestamp.value()));
}

private static final Dataset<Row> timestampDF =
spark.createDataFrame(
ImmutableList.of(
RowFactory.create(1, "John Doe", "hr", toTimestamp("2021-01-01T00:00:00.999999999")),
RowFactory.create(2, "Jane Doe", "hr", toTimestamp("2021-01-01T00:00:00.999999999")),
RowFactory.create(3, "Matt Doe", "hr", toTimestamp("2021-01-02T00:00:00.999999999")),
RowFactory.create(4, "Will Doe", "facilities", toTimestamp("2021-01-02T00:00:00.999999999"))),
new StructType(timeStampstruct)).repartition(2);

private void createUnpartitionedFileTable(String format) {
String createParquet =
"CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING %s LOCATION '%s'";
Expand Down Expand Up @@ -900,4 +944,13 @@ private void createDatePartitionedFileTable(String format) {

dateDF.write().insertInto(sourceTableName);
}

private void createTsPartitionedFileTable(String format) {
String createParquet =
"CREATE TABLE %s (id Integer, name String, dept String, ts timestamp) USING %s PARTITIONED BY (ts) " +
"LOCATION '%s'";

sql(createParquet, sourceTableName, format, fileTableDir.getAbsolutePath());
timestampDF.write().insertInto(sourceTableName);
}
}