diff --git a/api/src/main/java/org/apache/iceberg/types/Conversions.java b/api/src/main/java/org/apache/iceberg/types/Conversions.java index d6ef24449b82..51b043b229f2 100644 --- a/api/src/main/java/org/apache/iceberg/types/Conversions.java +++ b/api/src/main/java/org/apache/iceberg/types/Conversions.java @@ -28,8 +28,10 @@ import java.nio.charset.CharsetDecoder; import java.nio.charset.CharsetEncoder; import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.util.Arrays; import java.util.UUID; +import java.util.concurrent.TimeUnit; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.util.UUIDUtil; @@ -71,6 +73,14 @@ public static Object fromPartitionString(Type type, String asString) { return new BigDecimal(asString); case DATE: return Literal.of(asString).to(Types.DateType.get()).value(); + case TIMESTAMP: + if (!asString.contains("T")) { + Instant instant = java.sql.Timestamp.valueOf(asString).toInstant(); + return TimeUnit.SECONDS.toMicros(instant.getEpochSecond()) + + TimeUnit.NANOSECONDS.toMicros(instant.getNano()); + } else { + return Literal.of(asString).to(Types.TimestampType.withoutZone()).value(); + } default: throw new UnsupportedOperationException( "Unsupported type for fromPartitionString: " + type); diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java index e274ad857875..920e22eb6510 100644 --- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.IOException; +import java.sql.Timestamp; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -33,8 +34,11 @@ import org.apache.avro.io.DatumWriter; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; +import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; @@ -381,6 +385,25 @@ public void addDataPartitionedByDateToPartitioned() { sql("SELECT id, name, dept, date FROM %s ORDER BY id", tableName)); } + @Test + public void addDataPartitionedByTimestampToPartitioned() { + createTsPartitionedFileTable("parquet"); + + String createIceberg = + "CREATE TABLE %s (id Integer, name String, dept String, ts Timestamp) USING iceberg PARTITIONED BY (ts)"; + + sql(createIceberg, tableName); + + Object result = scalarSql("CALL %s.system.add_files('%s', '`parquet`.`%s`')", + catalogName, tableName, fileTableDir.getAbsolutePath()); + + Assert.assertEquals(4L, result); + + assertEquals("Iceberg table contains correct data", + sql("SELECT id, name, dept, ts FROM %s ORDER BY id", sourceTableName), + sql("SELECT id, name, dept, ts FROM %s ORDER BY id", tableName)); + } + @Test public void addFilteredPartitionsToPartitioned() { createCompositePartitionedTable("parquet"); @@ -819,6 +842,27 @@ private static java.sql.Date toDate(String value) { RowFactory.create(4, "Will Doe", "facilities", toDate("2021-01-02"))), new StructType(dateStruct)).repartition(2); + private static final StructField[] timeStampstruct = { + new StructField("id", DataTypes.IntegerType, true, Metadata.empty()), + new StructField("name", DataTypes.StringType, true, Metadata.empty()), + new StructField("dept", DataTypes.StringType, true, Metadata.empty()), + new StructField("ts", DataTypes.TimestampType, true, Metadata.empty()) + }; + + private static Timestamp toTimestamp(String value) { + Literal timestamp = Literal.of(value).to(Types.TimestampType.withoutZone()); + return Timestamp.valueOf(DateTimeUtil.timestampFromMicros(timestamp.value())); + } + + private static final Dataset timestampDF = + spark.createDataFrame( + ImmutableList.of( + RowFactory.create(1, "John Doe", "hr", toTimestamp("2021-01-01T00:00:00.999999999")), + RowFactory.create(2, "Jane Doe", "hr", toTimestamp("2021-01-01T00:00:00.999999999")), + RowFactory.create(3, "Matt Doe", "hr", toTimestamp("2021-01-02T00:00:00.999999999")), + RowFactory.create(4, "Will Doe", "facilities", toTimestamp("2021-01-02T00:00:00.999999999"))), + new StructType(timeStampstruct)).repartition(2); + private void createUnpartitionedFileTable(String format) { String createParquet = "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING %s LOCATION '%s'"; @@ -900,4 +944,13 @@ private void createDatePartitionedFileTable(String format) { dateDF.write().insertInto(sourceTableName); } + + private void createTsPartitionedFileTable(String format) { + String createParquet = + "CREATE TABLE %s (id Integer, name String, dept String, ts timestamp) USING %s PARTITIONED BY (ts) " + + "LOCATION '%s'"; + + sql(createParquet, sourceTableName, format, fileTableDir.getAbsolutePath()); + timestampDF.write().insertInto(sourceTableName); + } }