diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedPartitionAwareCompactionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedPartitionAwareCompactionStrategy.java index 747e0b2f3c47d..09c19b1aabe85 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedPartitionAwareCompactionStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedPartitionAwareCompactionStrategy.java @@ -39,14 +39,17 @@ */ public class BoundedPartitionAwareCompactionStrategy extends DayBasedCompactionStrategy { - SimpleDateFormat dateFormat = new SimpleDateFormat(DayBasedCompactionStrategy.DATE_PARTITION_FORMAT); + // NOTE: {@code SimpleDataFormat} is NOT thread-safe + // TODO replace w/ DateTimeFormatter + private final ThreadLocal dateFormat = + ThreadLocal.withInitial(() -> new SimpleDateFormat(DayBasedCompactionStrategy.DATE_PARTITION_FORMAT)); @Override public List orderAndFilter(HoodieWriteConfig writeConfig, List operations, List pendingCompactionPlans) { // The earliest partition to compact - current day minus the target partitions limit String earliestPartitionPathToCompact = - dateFormat.format(getDateAtOffsetFromToday(-1 * writeConfig.getTargetPartitionsPerDayBasedCompaction())); + dateFormat.get().format(getDateAtOffsetFromToday(-1 * writeConfig.getTargetPartitionsPerDayBasedCompaction())); // Filter out all partitions greater than earliestPartitionPathToCompact return operations.stream().collect(Collectors.groupingBy(HoodieCompactionOperation::getPartitionPath)).entrySet() @@ -59,7 +62,7 @@ public List orderAndFilter(HoodieWriteConfig writeCon public List filterPartitionPaths(HoodieWriteConfig writeConfig, List partitionPaths) { // The earliest partition to compact - current day minus the target partitions limit String earliestPartitionPathToCompact = - dateFormat.format(getDateAtOffsetFromToday(-1 * writeConfig.getTargetPartitionsPerDayBasedCompaction())); + dateFormat.get().format(getDateAtOffsetFromToday(-1 * writeConfig.getTargetPartitionsPerDayBasedCompaction())); // Get all partitions and sort them return partitionPaths.stream().map(partition -> partition.replace("/", "-")) .sorted(Comparator.reverseOrder()).map(partitionPath -> partitionPath.replace("-", "/")) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/DayBasedCompactionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/DayBasedCompactionStrategy.java index 4a12bb8a08b72..94d74b50dc24a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/DayBasedCompactionStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/DayBasedCompactionStrategy.java @@ -41,12 +41,18 @@ public class DayBasedCompactionStrategy extends CompactionStrategy { // For now, use SimpleDateFormat as default partition format protected static final String DATE_PARTITION_FORMAT = "yyyy/MM/dd"; // Sorts compaction in LastInFirstCompacted order + + // NOTE: {@code SimpleDataFormat} is NOT thread-safe + // TODO replace w/ DateTimeFormatter + private static final ThreadLocal DATE_FORMAT = + ThreadLocal.withInitial(() -> new SimpleDateFormat(DATE_PARTITION_FORMAT, Locale.ENGLISH)); + protected static Comparator comparator = (String leftPartition, String rightPartition) -> { try { leftPartition = getPartitionPathWithoutPartitionKeys(leftPartition); rightPartition = getPartitionPathWithoutPartitionKeys(rightPartition); - Date left = new SimpleDateFormat(DATE_PARTITION_FORMAT, Locale.ENGLISH).parse(leftPartition); - Date right = new SimpleDateFormat(DATE_PARTITION_FORMAT, Locale.ENGLISH).parse(rightPartition); + Date left = DATE_FORMAT.get().parse(leftPartition); + Date right = DATE_FORMAT.get().parse(rightPartition); return left.after(right) ? -1 : right.after(left) ? 1 : 0; } catch (ParseException e) { throw new HoodieException("Invalid Partition Date Format", e); diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala index aa4ea91daf24c..e1c3a5a8efe18 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala @@ -48,7 +48,12 @@ import java.text.SimpleDateFormat import scala.collection.immutable.Map object HoodieSqlUtils extends SparkAdapterSupport { - private val defaultDateFormat = new SimpleDateFormat("yyyy-MM-dd") + // NOTE: {@code SimpleDataFormat} is NOT thread-safe + // TODO replace w/ DateTimeFormatter + private val defaultDateFormat = + ThreadLocal.withInitial(new java.util.function.Supplier[SimpleDateFormat] { + override def get() = new SimpleDateFormat("yyyy-MM-dd") + }) def isHoodieTable(table: CatalogTable): Boolean = { table.provider.map(_.toLowerCase(Locale.ROOT)).orNull == "hudi" @@ -298,7 +303,7 @@ object HoodieSqlUtils extends SparkAdapterSupport { HoodieActiveTimeline.parseDateFromInstantTime(queryInstant) // validate the format queryInstant } else if (instantLength == 10) { // for yyyy-MM-dd - HoodieActiveTimeline.formatDate(defaultDateFormat.parse(queryInstant)) + HoodieActiveTimeline.formatDate(defaultDateFormat.get().parse(queryInstant)) } else { throw new IllegalArgumentException(s"Unsupported query instant time format: $queryInstant," + s"Supported time format are: 'yyyy-MM-dd: HH:mm:ss' or 'yyyy-MM-dd' or 'yyyyMMddHHmmss'")