Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -106,7 +108,8 @@ public static String getPartitionPathFromRow(Row row, List<String> partitionPath
if (fieldPos == -1 || row.isNullAt(fieldPos)) {
val = HUDI_DEFAULT_PARTITION_PATH;
} else {
val = row.getAs(field).toString();
Object data = row.get(fieldPos);
val = convertToTimestampIfInstant(data).toString();
if (val.isEmpty()) {
val = HUDI_DEFAULT_PARTITION_PATH;
}
Expand All @@ -115,11 +118,12 @@ public static String getPartitionPathFromRow(Row row, List<String> partitionPath
val = field + "=" + val;
}
} else { // nested
Object nestedVal = getNestedFieldVal(row, partitionPathPositions.get(field));
if (nestedVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER) || nestedVal.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
Object data = getNestedFieldVal(row, partitionPathPositions.get(field));
data = convertToTimestampIfInstant(data);
if (data.toString().contains(NULL_RECORDKEY_PLACEHOLDER) || data.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
val = hiveStylePartitioning ? field + "=" + HUDI_DEFAULT_PARTITION_PATH : HUDI_DEFAULT_PARTITION_PATH;
} else {
val = hiveStylePartitioning ? field + "=" + nestedVal.toString() : nestedVal.toString();
val = hiveStylePartitioning ? field + "=" + data.toString() : data.toString();
}
}
return val;
Expand Down Expand Up @@ -266,4 +270,11 @@ public static List<Integer> getNestedFieldIndices(StructType structType, String
}
return positions;
}

private static Object convertToTimestampIfInstant(Object data) {
if (data instanceof Instant) {
return Timestamp.from((Instant) data);
}
return data;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.hudi

import java.nio.ByteBuffer
import java.sql.{Date, Timestamp}
import java.time.Instant

import org.apache.avro.Conversions.DecimalConversion
import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis}
Expand Down Expand Up @@ -301,9 +302,17 @@ object AvroConversionHelper {
}.orNull
}
case TimestampType => (item: Any) =>
// Convert time to microseconds since spark-avro by default converts TimestampType to
// Avro Logical TimestampMicros
Option(item).map(_.asInstanceOf[Timestamp].getTime * 1000).orNull
if (item == null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can add a unit test?

null
} else {
val timestamp = item match {
case i: Instant => Timestamp.from(i)
case t: Timestamp => t
}
// Convert time to microseconds since spark-avro by default converts TimestampType to
// Avro Logical TimestampMicros
timestamp.getTime * 1000
}
case DateType => (item: Any) =>
Option(item).map(_.asInstanceOf[Date].toLocalDate.toEpochDay.toInt).orNull
case ArrayType(elementType, _) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.keygen

import java.sql.Timestamp

import org.apache.spark.sql.Row

import org.apache.hudi.keygen.RowKeyGeneratorHelper._

import org.junit.jupiter.api.{Assertions, Test}

import scala.collection.JavaConverters._

class TestRowGeneratorHelper {

@Test
def testGetPartitionPathFromRow(): Unit = {

/** single plain partition */
val row1 = Row.fromSeq(Seq(1, "z3", 10.0, "20220108"))
val ptField1 = List("dt").asJava
val ptPos1 = Map("dt" -> List(new Integer(3)).asJava).asJava
Assertions.assertEquals("20220108",
getPartitionPathFromRow(row1, ptField1, false, ptPos1))
Assertions.assertEquals("dt=20220108",
getPartitionPathFromRow(row1, ptField1, true, ptPos1))

/** multiple plain partitions */
val row2 = Row.fromSeq(Seq(1, "z3", 10.0, "2022", "01", "08"))
val ptField2 = List("year", "month", "day").asJava
val ptPos2 = Map("year" -> List(new Integer(3)).asJava,
"month" -> List(new Integer(4)).asJava,
"day" -> List(new Integer(5)).asJava
).asJava
Assertions.assertEquals("2022/01/08",
getPartitionPathFromRow(row2, ptField2, false, ptPos2))
Assertions.assertEquals("year=2022/month=01/day=08",
getPartitionPathFromRow(row2, ptField2, true, ptPos2))

/** multiple partitions which contains TimeStamp type or Instant type */
val timestamp = Timestamp.valueOf("2020-01-08 10:00:00")
val instant = timestamp.toInstant
val ptField3 = List("event", "event_time").asJava
val ptPos3 = Map("event" -> List(new Integer(3)).asJava,
"event_time" -> List(new Integer(4)).asJava
).asJava

// with timeStamp type
val row2_ts = Row.fromSeq(Seq(1, "z3", 10.0, "click", timestamp))
Assertions.assertEquals("click/2020-01-08 10:00:00.0",
getPartitionPathFromRow(row2_ts, ptField3, false, ptPos3))
Assertions.assertEquals("event=click/event_time=2020-01-08 10:00:00.0",
getPartitionPathFromRow(row2_ts, ptField3, true, ptPos3))

// with instant type
val row2_instant = Row.fromSeq(Seq(1, "z3", 10.0, "click", instant))
Assertions.assertEquals("click/2020-01-08 10:00:00.0",
getPartitionPathFromRow(row2_instant, ptField3, false, ptPos3))
Assertions.assertEquals("event=click/event_time=2020-01-08 10:00:00.0",
getPartitionPathFromRow(row2_instant, ptField3, true, ptPos3))

/** mixed case with plain and nested partitions */
val nestedRow4 = Row.fromSeq(Seq(instant, "ad"))
val ptField4 = List("event_time").asJava
val ptPos4 = Map("event_time" -> List(new Integer(3), new Integer(0)).asJava).asJava
// with instant type
val row4 = Row.fromSeq(Seq(1, "z3", 10.0, nestedRow4, "click"))
Assertions.assertEquals("2020-01-08 10:00:00.0",
getPartitionPathFromRow(row4, ptField4, false, ptPos4))
Assertions.assertEquals("event_time=2020-01-08 10:00:00.0",
getPartitionPathFromRow(row4, ptField4, true, ptPos4))

val nestedRow5 = Row.fromSeq(Seq(timestamp, "ad"))
val ptField5 = List("event", "event_time").asJava
val ptPos5 = Map(
"event_time" -> List(new Integer(3), new Integer(0)).asJava,
"event" -> List(new Integer(4)).asJava
).asJava
val row5 = Row.fromSeq(Seq(1, "z3", 10.0, nestedRow5, "click"))
Assertions.assertEquals("click/2020-01-08 10:00:00.0",
getPartitionPathFromRow(row5, ptField5, false, ptPos5))
Assertions.assertEquals("event=click/event_time=2020-01-08 10:00:00.0",
getPartitionPathFromRow(row5, ptField5, true, ptPos5))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,33 @@ class TestCreateTable extends TestHoodieSqlBase {
}
}

test("Test Create Table As Select when 'spark.sql.datetime.java8API.enabled' enables") {
try {
// enable spark.sql.datetime.java8API.enabled
// and use java.time.Instant to replace java.sql.Timestamp to represent TimestampType.
spark.conf.set("spark.sql.datetime.java8API.enabled", value = true)

val tableName = generateTableName
spark.sql(
s"""
|create table $tableName
|using hudi
|partitioned by(dt)
|options(type = 'cow', primaryKey = 'id')
|as
|select 1 as id, 'a1' as name, 10 as price, cast('2021-05-07 00:00:00' as timestamp) as dt
|""".stripMargin
)

checkAnswer(s"select id, name, price, cast(dt as string) from $tableName")(
Seq(1, "a1", 10, "2021-05-07 00:00:00")
)

} finally {
spark.conf.set("spark.sql.datetime.java8API.enabled", value = false)
}
}

test("Test Create Table From Exist Hoodie Table") {
withTempDir { tmp =>
Seq("2021-08-02", "2021/08/02").foreach { partitionValue =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,38 @@ class TestInsertTable extends TestHoodieSqlBase {
spark.sql("set hoodie.sql.insert.mode = upsert")
}


test("Test Insert timestamp when 'spark.sql.datetime.java8API.enabled' enables") {
try {
// enable spark.sql.datetime.java8API.enabled
// and use java.time.Instant to replace java.sql.Timestamp to represent TimestampType.
spark.conf.set("spark.sql.datetime.java8API.enabled", value = true)

val tableName = generateTableName
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| dt timestamp
|)
|using hudi
|partitioned by(dt)
|options(type = 'cow', primaryKey = 'id')
|""".stripMargin
)

spark.sql(s"insert into $tableName values (1, 'a1', 10, cast('2021-05-07 00:00:00' as timestamp))")
checkAnswer(s"select id, name, price, cast(dt as string) from $tableName")(
Seq(1, "a1", 10, "2021-05-07 00:00:00")
)

} finally {
spark.conf.set("spark.sql.datetime.java8API.enabled", value = false)
}
}

test("Test bulk insert") {
spark.sql("set hoodie.sql.insert.mode = non-strict")
withTempDir { tmp =>
Expand Down