diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieTimelineTimeZone.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieTimelineTimeZone.java new file mode 100644 index 0000000000000..9b1c695d491ea --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieTimelineTimeZone.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +/** + * Hoodie TimelineZone. + */ +public enum HoodieTimelineTimeZone { + LOCAL("local"), + UTC("utc"); + + private final String timeZone; + + HoodieTimelineTimeZone(String timeZone) { + this.timeZone = timeZone; + } + + public String getTimeZone() { + return timeZone; + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index e4b60e2ea3854..54724d5f37aba 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -27,7 +27,9 @@ import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieTimelineTimeZone; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; +import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; @@ -167,6 +169,11 @@ public class HoodieTableConfig extends HoodieConfig { .noDefaultValue() .withDocumentation("Key Generator class property for the hoodie table"); + public static final ConfigProperty TIMELINE_TIMEZONE = ConfigProperty + .key("hoodie.table.timeline.timezone") + .defaultValue(HoodieTimelineTimeZone.LOCAL) + .withDocumentation("User can set hoodie commit timeline timezone, such as utc, local and so on. local is default"); + public static final ConfigProperty URL_ENCODE_PARTITIONING = KeyGeneratorOptions.URL_ENCODE_PARTITIONING; public static final ConfigProperty HIVE_STYLE_PARTITIONING_ENABLE = KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE; @@ -315,6 +322,9 @@ public static void create(FileSystem fs, Path metadataFolder, Properties propert // Use the default bootstrap index class. hoodieConfig.setDefaultValue(BOOTSTRAP_INDEX_CLASS_NAME, getDefaultBootstrapIndexClass(properties)); } + if (hoodieConfig.contains(TIMELINE_TIMEZONE)) { + HoodieInstantTimeGenerator.setCommitTimeZone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getString(TIMELINE_TIMEZONE))); + } hoodieConfig.getProps().store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis())); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index ec0271f6b98af..f44d28eca6351 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.fs.NoOpConsistencyGuard; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieTimelineTimeZone; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; @@ -639,6 +640,7 @@ public static class PropertyBuilder { private String keyGeneratorClassProp; private Boolean hiveStylePartitioningEnable; private Boolean urlEncodePartitioning; + private HoodieTimelineTimeZone commitTimeZone; private PropertyBuilder() { @@ -737,6 +739,11 @@ public PropertyBuilder setUrlEncodePartitioning(Boolean urlEncodePartitioning) { return this; } + public PropertyBuilder setCommitTimezone(HoodieTimelineTimeZone timelineTimeZone) { + this.commitTimeZone = timelineTimeZone; + return this; + } + public PropertyBuilder fromMetaClient(HoodieTableMetaClient metaClient) { return setTableType(metaClient.getTableType()) .setTableName(metaClient.getTableConfig().getTableName()) @@ -873,6 +880,9 @@ public Properties build() { if (null != urlEncodePartitioning) { tableConfig.setValue(HoodieTableConfig.URL_ENCODE_PARTITIONING, Boolean.toString(urlEncodePartitioning)); } + if (null != commitTimeZone) { + tableConfig.setValue(HoodieTableConfig.TIMELINE_TIMEZONE, commitTimeZone.toString()); + } return tableConfig.getProps(); } @@ -886,5 +896,6 @@ public HoodieTableMetaClient initTable(Configuration configuration, String baseP throws IOException { return HoodieTableMetaClient.initTableAndGetMetaClient(configuration, basePath, build()); } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java index 93f8e0b4b9433..ae4523e72bee3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java @@ -18,10 +18,12 @@ package org.apache.hudi.common.table.timeline; +import org.apache.hudi.common.model.HoodieTimelineTimeZone; import java.text.ParseException; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; +import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; import java.time.format.DateTimeParseException; @@ -56,6 +58,8 @@ public class HoodieInstantTimeGenerator { // when performing comparisons such as LESS_THAN_OR_EQUAL_TO private static final String DEFAULT_MILLIS_EXT = "999"; + private static HoodieTimelineTimeZone commitTimeZone = HoodieTimelineTimeZone.LOCAL; + /** * Returns next instant time that adds N milliseconds to the current time. * Ensures each instant time is atleast 1 second apart since we create instant times at second granularity @@ -66,8 +70,13 @@ public static String createNewInstantTime(long milliseconds) { return lastInstantTime.updateAndGet((oldVal) -> { String newCommitTime; do { - Date d = new Date(System.currentTimeMillis() + milliseconds); - newCommitTime = MILLIS_INSTANT_TIME_FORMATTER.format(convertDateToTemporalAccessor(d)); + if (commitTimeZone.equals(HoodieTimelineTimeZone.UTC.toString())) { + LocalDateTime now = LocalDateTime.now(ZoneOffset.UTC); + newCommitTime = now.format(MILLIS_INSTANT_TIME_FORMATTER); + } else { + Date d = new Date(System.currentTimeMillis() + milliseconds); + newCommitTime = MILLIS_INSTANT_TIME_FORMATTER.format(convertDateToTemporalAccessor(d)); + } } while (HoodieTimeline.compareTimestamps(newCommitTime, HoodieActiveTimeline.LESSER_THAN_OR_EQUALS, oldVal)); return newCommitTime; }); @@ -131,4 +140,8 @@ public static String getInstantForDateString(String dateString) { private static TemporalAccessor convertDateToTemporalAccessor(Date d) { return d.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime(); } + + public static void setCommitTimeZone(HoodieTimelineTimeZone commitTimeZone) { + HoodieInstantTimeGenerator.commitTimeZone = commitTimeZone; + } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 8ebc896fcb2bd..9e1fd43a27c29 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -19,18 +19,16 @@ package org.apache.hudi import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.conf.HiveConf - import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.HoodieWriterUtils._ import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient} import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, TypedProperties} import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType} +import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, HoodieTimelineTimeZone, WriteOperationType} import org.apache.hudi.common.table.timeline.HoodieActiveTimeline import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils, StringUtils} @@ -44,16 +42,13 @@ import org.apache.hudi.internal.DataSourceInternalWriterHelper import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.sync.common.AbstractSyncTool import org.apache.hudi.table.BulkInsertPartitioner - import org.apache.log4j.LogManager - import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession} import org.apache.spark.{SPARK_VERSION, SparkContext} - import java.util.Properties import scala.collection.JavaConversions._ @@ -147,6 +142,7 @@ object HoodieSparkSqlWriter { .setKeyGeneratorClassProp(HoodieWriterUtils.getOriginKeyGenerator(parameters)) .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING)) .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING)) + .setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))) .initTable(sparkContext.hadoopConfiguration, path) tableConfig = tableMetaClient.getTableConfig } @@ -397,6 +393,7 @@ object HoodieSparkSqlWriter { .setKeyGeneratorClassProp(keyGenProp) .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING)) .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING)) + .setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))) .initTable(sparkContext.hadoopConfiguration, path) }