Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,17 @@
*/
public class BoundedPartitionAwareCompactionStrategy extends DayBasedCompactionStrategy {

SimpleDateFormat dateFormat = new SimpleDateFormat(DayBasedCompactionStrategy.DATE_PARTITION_FORMAT);
// NOTE: {@code SimpleDataFormat} is NOT thread-safe
// TODO replace w/ DateTimeFormatter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have a tracking ticket for this?

private final ThreadLocal<SimpleDateFormat> dateFormat =
ThreadLocal.withInitial(() -> new SimpleDateFormat(DayBasedCompactionStrategy.DATE_PARTITION_FORMAT));

@Override
public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig,
List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans) {
// The earliest partition to compact - current day minus the target partitions limit
String earliestPartitionPathToCompact =
dateFormat.format(getDateAtOffsetFromToday(-1 * writeConfig.getTargetPartitionsPerDayBasedCompaction()));
dateFormat.get().format(getDateAtOffsetFromToday(-1 * writeConfig.getTargetPartitionsPerDayBasedCompaction()));
// Filter out all partitions greater than earliestPartitionPathToCompact

return operations.stream().collect(Collectors.groupingBy(HoodieCompactionOperation::getPartitionPath)).entrySet()
Expand All @@ -59,7 +62,7 @@ public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig writeCon
public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> partitionPaths) {
// The earliest partition to compact - current day minus the target partitions limit
String earliestPartitionPathToCompact =
dateFormat.format(getDateAtOffsetFromToday(-1 * writeConfig.getTargetPartitionsPerDayBasedCompaction()));
dateFormat.get().format(getDateAtOffsetFromToday(-1 * writeConfig.getTargetPartitionsPerDayBasedCompaction()));
// Get all partitions and sort them
return partitionPaths.stream().map(partition -> partition.replace("/", "-"))
.sorted(Comparator.reverseOrder()).map(partitionPath -> partitionPath.replace("-", "/"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,18 @@ public class DayBasedCompactionStrategy extends CompactionStrategy {
// For now, use SimpleDateFormat as default partition format
protected static final String DATE_PARTITION_FORMAT = "yyyy/MM/dd";
// Sorts compaction in LastInFirstCompacted order

// NOTE: {@code SimpleDataFormat} is NOT thread-safe
// TODO replace w/ DateTimeFormatter
private static final ThreadLocal<SimpleDateFormat> DATE_FORMAT =
ThreadLocal.withInitial(() -> new SimpleDateFormat(DATE_PARTITION_FORMAT, Locale.ENGLISH));

protected static Comparator<String> comparator = (String leftPartition, String rightPartition) -> {
try {
leftPartition = getPartitionPathWithoutPartitionKeys(leftPartition);
rightPartition = getPartitionPathWithoutPartitionKeys(rightPartition);
Date left = new SimpleDateFormat(DATE_PARTITION_FORMAT, Locale.ENGLISH).parse(leftPartition);
Date right = new SimpleDateFormat(DATE_PARTITION_FORMAT, Locale.ENGLISH).parse(rightPartition);
Date left = DATE_FORMAT.get().parse(leftPartition);
Date right = DATE_FORMAT.get().parse(rightPartition);
return left.after(right) ? -1 : right.after(left) ? 1 : 0;
} catch (ParseException e) {
throw new HoodieException("Invalid Partition Date Format", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ import java.text.SimpleDateFormat
import scala.collection.immutable.Map

object HoodieSqlUtils extends SparkAdapterSupport {
private val defaultDateFormat = new SimpleDateFormat("yyyy-MM-dd")
// NOTE: {@code SimpleDataFormat} is NOT thread-safe
// TODO replace w/ DateTimeFormatter
private val defaultDateFormat =
ThreadLocal.withInitial(new java.util.function.Supplier[SimpleDateFormat] {
override def get() = new SimpleDateFormat("yyyy-MM-dd")
})

def isHoodieTable(table: CatalogTable): Boolean = {
table.provider.map(_.toLowerCase(Locale.ROOT)).orNull == "hudi"
Expand Down Expand Up @@ -298,7 +303,7 @@ object HoodieSqlUtils extends SparkAdapterSupport {
HoodieActiveTimeline.parseDateFromInstantTime(queryInstant) // validate the format
queryInstant
} else if (instantLength == 10) { // for yyyy-MM-dd
HoodieActiveTimeline.formatDate(defaultDateFormat.parse(queryInstant))
HoodieActiveTimeline.formatDate(defaultDateFormat.get().parse(queryInstant))
} else {
throw new IllegalArgumentException(s"Unsupported query instant time format: $queryInstant,"
+ s"Supported time format are: 'yyyy-MM-dd: HH:mm:ss' or 'yyyy-MM-dd' or 'yyyyMMddHHmmss'")
Expand Down