Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.model;

/**
* Hoodie TimelineZone.
*/
public enum HoodieTimelineTimeZone {
LOCAL("local"),
UTC("utc");

private final String timeZone;

HoodieTimelineTimeZone(String timeZone) {
this.timeZone = timeZone;
}

public String getTimeZone() {
return timeZone;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieTimelineTimeZone;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
Expand Down Expand Up @@ -167,6 +169,11 @@ public class HoodieTableConfig extends HoodieConfig {
.noDefaultValue()
.withDocumentation("Key Generator class property for the hoodie table");

public static final ConfigProperty<HoodieTimelineTimeZone> TIMELINE_TIMEZONE = ConfigProperty
.key("hoodie.table.timeline.timezone")
.defaultValue(HoodieTimelineTimeZone.LOCAL)
.withDocumentation("User can set hoodie commit timeline timezone, such as utc, local and so on. local is default");

public static final ConfigProperty<String> URL_ENCODE_PARTITIONING = KeyGeneratorOptions.URL_ENCODE_PARTITIONING;
public static final ConfigProperty<String> HIVE_STYLE_PARTITIONING_ENABLE = KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE;

Expand Down Expand Up @@ -315,6 +322,9 @@ public static void create(FileSystem fs, Path metadataFolder, Properties propert
// Use the default bootstrap index class.
hoodieConfig.setDefaultValue(BOOTSTRAP_INDEX_CLASS_NAME, getDefaultBootstrapIndexClass(properties));
}
if (hoodieConfig.contains(TIMELINE_TIMEZONE)) {
HoodieInstantTimeGenerator.setCommitTimeZone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getString(TIMELINE_TIMEZONE)));
}
hoodieConfig.getProps().store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.fs.NoOpConsistencyGuard;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieTimelineTimeZone;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
Expand Down Expand Up @@ -639,6 +640,7 @@ public static class PropertyBuilder {
private String keyGeneratorClassProp;
private Boolean hiveStylePartitioningEnable;
private Boolean urlEncodePartitioning;
private HoodieTimelineTimeZone commitTimeZone;

private PropertyBuilder() {

Expand Down Expand Up @@ -737,6 +739,11 @@ public PropertyBuilder setUrlEncodePartitioning(Boolean urlEncodePartitioning) {
return this;
}

public PropertyBuilder setCommitTimezone(HoodieTimelineTimeZone timelineTimeZone) {
this.commitTimeZone = timelineTimeZone;
return this;
}

public PropertyBuilder fromMetaClient(HoodieTableMetaClient metaClient) {
return setTableType(metaClient.getTableType())
.setTableName(metaClient.getTableConfig().getTableName())
Expand Down Expand Up @@ -873,6 +880,9 @@ public Properties build() {
if (null != urlEncodePartitioning) {
tableConfig.setValue(HoodieTableConfig.URL_ENCODE_PARTITIONING, Boolean.toString(urlEncodePartitioning));
}
if (null != commitTimeZone) {
tableConfig.setValue(HoodieTableConfig.TIMELINE_TIMEZONE, commitTimeZone.toString());
}
return tableConfig.getProps();
}

Expand All @@ -886,5 +896,6 @@ public HoodieTableMetaClient initTable(Configuration configuration, String baseP
throws IOException {
return HoodieTableMetaClient.initTableAndGetMetaClient(configuration, basePath, build());
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

package org.apache.hudi.common.table.timeline;

import org.apache.hudi.common.model.HoodieTimelineTimeZone;
import java.text.ParseException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.format.DateTimeParseException;
Expand Down Expand Up @@ -56,6 +58,8 @@ public class HoodieInstantTimeGenerator {
// when performing comparisons such as LESS_THAN_OR_EQUAL_TO
private static final String DEFAULT_MILLIS_EXT = "999";

private static HoodieTimelineTimeZone commitTimeZone = HoodieTimelineTimeZone.LOCAL;

/**
* Returns next instant time that adds N milliseconds to the current time.
* Ensures each instant time is atleast 1 second apart since we create instant times at second granularity
Expand All @@ -66,8 +70,13 @@ public static String createNewInstantTime(long milliseconds) {
return lastInstantTime.updateAndGet((oldVal) -> {
String newCommitTime;
do {
Date d = new Date(System.currentTimeMillis() + milliseconds);
newCommitTime = MILLIS_INSTANT_TIME_FORMATTER.format(convertDateToTemporalAccessor(d));
if (commitTimeZone.equals(HoodieTimelineTimeZone.UTC.toString())) {
LocalDateTime now = LocalDateTime.now(ZoneOffset.UTC);
newCommitTime = now.format(MILLIS_INSTANT_TIME_FORMATTER);
} else {
Date d = new Date(System.currentTimeMillis() + milliseconds);
newCommitTime = MILLIS_INSTANT_TIME_FORMATTER.format(convertDateToTemporalAccessor(d));
}
} while (HoodieTimeline.compareTimestamps(newCommitTime, HoodieActiveTimeline.LESSER_THAN_OR_EQUALS, oldVal));
return newCommitTime;
});
Expand Down Expand Up @@ -131,4 +140,8 @@ public static String getInstantForDateString(String dateString) {
private static TemporalAccessor convertDateToTemporalAccessor(Date d) {
return d.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime();
}

public static void setCommitTimeZone(HoodieTimelineTimeZone commitTimeZone) {
HoodieInstantTimeGenerator.commitTimeZone = commitTimeZone;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,16 @@ package org.apache.hudi

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.conf.HiveConf

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieWriterUtils._
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType}
import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, HoodieTimelineTimeZone, WriteOperationType}
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils, StringUtils}
Expand All @@ -44,16 +42,13 @@ import org.apache.hudi.internal.DataSourceInternalWriterHelper
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.sync.common.AbstractSyncTool
import org.apache.hudi.table.BulkInsertPartitioner

import org.apache.log4j.LogManager

import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession}
import org.apache.spark.{SPARK_VERSION, SparkContext}

import java.util.Properties

import scala.collection.JavaConversions._
Expand Down Expand Up @@ -147,6 +142,7 @@ object HoodieSparkSqlWriter {
.setKeyGeneratorClassProp(HoodieWriterUtils.getOriginKeyGenerator(parameters))
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
.setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
.initTable(sparkContext.hadoopConfiguration, path)
tableConfig = tableMetaClient.getTableConfig
}
Expand Down Expand Up @@ -397,6 +393,7 @@ object HoodieSparkSqlWriter {
.setKeyGeneratorClassProp(keyGenProp)
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
.setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
.initTable(sparkContext.hadoopConfiguration, path)
}

Expand Down