From 2feb08c60ebff5f383f33ebeadb20f40b2b39271 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=8F=E6=89=BF=E7=A5=A5?= Date: Wed, 14 Sep 2022 17:31:08 +0800 Subject: [PATCH 1/4] add LogFileLengthBasedCompactionStrategy --- .../hudi/config/HoodieCompactionConfig.java | 75 ++++-- .../apache/hudi/config/HoodieWriteConfig.java | 247 +++++++----------- .../LogFileLengthBasedCompactionStrategy.java | 49 ++++ .../TestHoodieCompactionStrategy.java | 54 +++- 4 files changed, 246 insertions(+), 179 deletions(-) create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileLengthBasedCompactionStrategy.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java index d1d0e6726173..be94078e457d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java @@ -106,6 +106,12 @@ public class HoodieCompactionConfig extends HoodieConfig { .withDocumentation("Only if the log file size is greater than the threshold in bytes," + " the file group will be compacted."); + public static final ConfigProperty COMPACTION_LOG_FILE_LENGTH_THRESHOLD = ConfigProperty + .key("hoodie.compaction.logfile.length.threshold") + .defaultValue(0L) + .withDocumentation("Only if the log file length is greater than the threshold," + + " the file group will be compacted."); + public static final ConfigProperty COMPACTION_STRATEGY = ConfigProperty .key("hoodie.compaction.strategy") .defaultValue(LogFileSizeBasedCompactionStrategy.class.getName()) @@ -164,16 +170,24 @@ public class HoodieCompactionConfig extends HoodieConfig { + " This is critical in computing the insert parallelism and bin-packing inserts into small files."); - /** @deprecated Use {@link #INLINE_COMPACT} and its methods instead */ + /** + * @deprecated Use {@link #INLINE_COMPACT} and its methods instead + */ @Deprecated public static final String INLINE_COMPACT_PROP = INLINE_COMPACT.key(); - /** @deprecated Use {@link #INLINE_COMPACT_NUM_DELTA_COMMITS} and its methods instead */ + /** + * @deprecated Use {@link #INLINE_COMPACT_NUM_DELTA_COMMITS} and its methods instead + */ @Deprecated public static final String INLINE_COMPACT_NUM_DELTA_COMMITS_PROP = INLINE_COMPACT_NUM_DELTA_COMMITS.key(); - /** @deprecated Use {@link #INLINE_COMPACT_TIME_DELTA_SECONDS} and its methods instead */ + /** + * @deprecated Use {@link #INLINE_COMPACT_TIME_DELTA_SECONDS} and its methods instead + */ @Deprecated public static final String INLINE_COMPACT_TIME_DELTA_SECONDS_PROP = INLINE_COMPACT_TIME_DELTA_SECONDS.key(); - /** @deprecated Use {@link #INLINE_COMPACT_TRIGGER_STRATEGY} and its methods instead */ + /** + * @deprecated Use {@link #INLINE_COMPACT_TRIGGER_STRATEGY} and its methods instead + */ @Deprecated public static final String INLINE_COMPACT_TRIGGER_STRATEGY_PROP = INLINE_COMPACT_TRIGGER_STRATEGY.key(); /** @@ -241,41 +255,61 @@ public class HoodieCompactionConfig extends HoodieConfig { */ @Deprecated public static final String COMPACTION_STRATEGY_PROP = COMPACTION_STRATEGY.key(); - /** @deprecated Use {@link #COMPACTION_STRATEGY} and its methods instead */ + /** + * @deprecated Use {@link #COMPACTION_STRATEGY} and its methods instead + */ @Deprecated public static final String DEFAULT_COMPACTION_STRATEGY = COMPACTION_STRATEGY.defaultValue(); - /** @deprecated Use {@link #COMPACTION_LAZY_BLOCK_READ_ENABLE} and its methods instead */ + /** + * @deprecated Use {@link #COMPACTION_LAZY_BLOCK_READ_ENABLE} and its methods instead + */ @Deprecated public static final String COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP = COMPACTION_LAZY_BLOCK_READ_ENABLE.key(); - /** @deprecated Use {@link #COMPACTION_LAZY_BLOCK_READ_ENABLE} and its methods instead */ + /** + * @deprecated Use {@link #COMPACTION_LAZY_BLOCK_READ_ENABLE} and its methods instead + */ @Deprecated public static final String DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED = COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue(); - /** @deprecated Use {@link #COMPACTION_REVERSE_LOG_READ_ENABLE} and its methods instead */ + /** + * @deprecated Use {@link #COMPACTION_REVERSE_LOG_READ_ENABLE} and its methods instead + */ @Deprecated public static final String COMPACTION_REVERSE_LOG_READ_ENABLED_PROP = COMPACTION_REVERSE_LOG_READ_ENABLE.key(); - /** @deprecated Use {@link #COMPACTION_REVERSE_LOG_READ_ENABLE} and its methods instead */ + /** + * @deprecated Use {@link #COMPACTION_REVERSE_LOG_READ_ENABLE} and its methods instead + */ @Deprecated public static final String DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED = COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue(); + /** + * @deprecated Use {@link #TARGET_PARTITIONS_PER_DAYBASED_COMPACTION} and its methods instead + */ + @Deprecated + public static final String TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP = TARGET_PARTITIONS_PER_DAYBASED_COMPACTION.key(); + /** + * @deprecated Use {@link #TARGET_PARTITIONS_PER_DAYBASED_COMPACTION} and its methods instead + */ + @Deprecated + public static final String DEFAULT_TARGET_PARTITIONS_PER_DAYBASED_COMPACTION = TARGET_PARTITIONS_PER_DAYBASED_COMPACTION.defaultValue(); /** * @deprecated Use {@link #INLINE_COMPACT} and its methods instead */ @Deprecated private static final String DEFAULT_INLINE_COMPACT = INLINE_COMPACT.defaultValue(); - /** @deprecated Use {@link #INLINE_COMPACT_NUM_DELTA_COMMITS} and its methods instead */ + /** + * @deprecated Use {@link #INLINE_COMPACT_NUM_DELTA_COMMITS} and its methods instead + */ @Deprecated private static final String DEFAULT_INLINE_COMPACT_NUM_DELTA_COMMITS = INLINE_COMPACT_NUM_DELTA_COMMITS.defaultValue(); - /** @deprecated Use {@link #INLINE_COMPACT_TIME_DELTA_SECONDS} and its methods instead */ + /** + * @deprecated Use {@link #INLINE_COMPACT_TIME_DELTA_SECONDS} and its methods instead + */ @Deprecated private static final String DEFAULT_INLINE_COMPACT_TIME_DELTA_SECONDS = INLINE_COMPACT_TIME_DELTA_SECONDS.defaultValue(); - /** @deprecated Use {@link #INLINE_COMPACT_TRIGGER_STRATEGY} and its methods instead */ + /** + * @deprecated Use {@link #INLINE_COMPACT_TRIGGER_STRATEGY} and its methods instead + */ @Deprecated private static final String DEFAULT_INLINE_COMPACT_TRIGGER_STRATEGY = INLINE_COMPACT_TRIGGER_STRATEGY.defaultValue(); - /** @deprecated Use {@link #TARGET_PARTITIONS_PER_DAYBASED_COMPACTION} and its methods instead */ - @Deprecated - public static final String TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP = TARGET_PARTITIONS_PER_DAYBASED_COMPACTION.key(); - /** @deprecated Use {@link #TARGET_PARTITIONS_PER_DAYBASED_COMPACTION} and its methods instead */ - @Deprecated - public static final String DEFAULT_TARGET_PARTITIONS_PER_DAYBASED_COMPACTION = TARGET_PARTITIONS_PER_DAYBASED_COMPACTION.defaultValue(); private HoodieCompactionConfig() { super(); @@ -381,6 +415,11 @@ public Builder withLogFileSizeThresholdBasedCompaction(long logFileSizeThreshold return this; } + public Builder withLogFileLengthThresholdBasedCompaction(int logFileLengthThreshold) { + compactionConfig.setValue(COMPACTION_LOG_FILE_LENGTH_THRESHOLD, String.valueOf(logFileLengthThreshold)); + return this; + } + public Builder withPreserveCommitMetadata(boolean preserveCommitMetadata) { compactionConfig.setValue(PRESERVE_COMMIT_METADATA, String.valueOf(preserveCommitMetadata)); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 90a468368f1a..637fe757ddbc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -101,61 +101,49 @@ + "higher level frameworks (e.g Spark datasources, Flink sink) and utilities (e.g DeltaStreamer).") public class HoodieWriteConfig extends HoodieConfig { - private static final Logger LOG = LogManager.getLogger(HoodieWriteConfig.class); - private static final long serialVersionUID = 0L; - // This is a constant as is should never be changed via config (will invalidate previous commits) // It is here so that both the client and deltastreamer use the same reference public static final String DELTASTREAMER_CHECKPOINT_KEY = "deltastreamer.checkpoint.key"; - public static final ConfigProperty TBL_NAME = ConfigProperty .key(HoodieTableConfig.HOODIE_TABLE_NAME_KEY) .noDefaultValue() .withDocumentation("Table name that will be used for registering with metastores like HMS. Needs to be same across runs."); - public static final ConfigProperty PRECOMBINE_FIELD_NAME = ConfigProperty .key("hoodie.datasource.write.precombine.field") .defaultValue("ts") .withDocumentation("Field used in preCombining before actual write. When two records have the same key value, " + "we will pick the one with the largest value for the precombine field, determined by Object.compareTo(..)"); - public static final ConfigProperty WRITE_PAYLOAD_CLASS_NAME = ConfigProperty .key("hoodie.datasource.write.payload.class") .defaultValue(OverwriteWithLatestAvroPayload.class.getName()) .withDocumentation("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. " + "This will render any value set for PRECOMBINE_FIELD_OPT_VAL in-effective"); - public static final ConfigProperty KEYGENERATOR_CLASS_NAME = ConfigProperty .key("hoodie.datasource.write.keygenerator.class") .noDefaultValue() .withDocumentation("Key generator class, that implements `org.apache.hudi.keygen.KeyGenerator` " + "extract a key out of incoming records."); - public static final ConfigProperty KEYGENERATOR_TYPE = ConfigProperty .key("hoodie.datasource.write.keygenerator.type") .defaultValue(KeyGeneratorType.SIMPLE.name()) .withDocumentation("Easily configure one the built-in key generators, instead of specifying the key generator class." + "Currently supports SIMPLE, COMPLEX, TIMESTAMP, CUSTOM, NON_PARTITION, GLOBAL_DELETE"); - public static final ConfigProperty ROLLBACK_USING_MARKERS_ENABLE = ConfigProperty .key("hoodie.rollback.using.markers") .defaultValue("true") .withDocumentation("Enables a more efficient mechanism for rollbacks based on the marker files generated " + "during the writes. Turned on by default."); - public static final ConfigProperty TIMELINE_LAYOUT_VERSION_NUM = ConfigProperty .key("hoodie.timeline.layout.version") .defaultValue(Integer.toString(TimelineLayoutVersion.VERSION_1)) .sinceVersion("0.5.1") .withDocumentation("Controls the layout of the timeline. Version 0 relied on renames, Version 1 (default) models " + "the timeline as an immutable log relying only on atomic writes for object storage."); - public static final ConfigProperty BASE_FILE_FORMAT = ConfigProperty .key("hoodie.table.base.file.format") .defaultValue(HoodieFileFormat.PARQUET) .withAlternatives("hoodie.table.ro.file.format") .withDocumentation("Base file format to store all the base file data."); - public static final ConfigProperty BASE_PATH = ConfigProperty .key("hoodie.base.path") .noDefaultValue() @@ -163,119 +151,123 @@ public class HoodieWriteConfig extends HoodieConfig { + "Always prefix it explicitly with the storage scheme (e.g hdfs://, s3:// etc). " + "Hudi stores all the main meta-data about commits, savepoints, cleaning audit logs " + "etc in .hoodie directory under this base path directory."); - public static final ConfigProperty AVRO_SCHEMA_STRING = ConfigProperty .key("hoodie.avro.schema") .noDefaultValue() .withDocumentation("Schema string representing the current write schema of the table. Hudi passes this to " + "implementations of HoodieRecordPayload to convert incoming records to avro. This is also used as the write schema " + "evolving records during an update."); - public static final ConfigProperty INTERNAL_SCHEMA_STRING = ConfigProperty .key("hoodie.internal.schema") .noDefaultValue() .withDocumentation("Schema string representing the latest schema of the table. Hudi passes this to " + "implementations of evolution of schema"); - public static final ConfigProperty ENABLE_INTERNAL_SCHEMA_CACHE = ConfigProperty .key("hoodie.schema.cache.enable") .defaultValue(false) .withDocumentation("cache query internalSchemas in driver/executor side"); - public static final ConfigProperty AVRO_SCHEMA_VALIDATE_ENABLE = ConfigProperty .key("hoodie.avro.schema.validate") .defaultValue("false") .withDocumentation("Validate the schema used for the write against the latest schema, for backwards compatibility."); - public static final ConfigProperty INSERT_PARALLELISM_VALUE = ConfigProperty .key("hoodie.insert.shuffle.parallelism") .defaultValue("200") .withDocumentation("Parallelism for inserting records into the table. Inserts can shuffle data before writing to tune file sizes and optimize the storage layout."); - public static final ConfigProperty BULKINSERT_PARALLELISM_VALUE = ConfigProperty .key("hoodie.bulkinsert.shuffle.parallelism") .defaultValue("200") .withDocumentation("For large initial imports using bulk_insert operation, controls the parallelism to use for sort modes or custom partitioning done" + "before writing records to the table."); - public static final ConfigProperty BULKINSERT_USER_DEFINED_PARTITIONER_SORT_COLUMNS = ConfigProperty - .key("hoodie.bulkinsert.user.defined.partitioner.sort.columns") - .noDefaultValue() - .withDocumentation("Columns to sort the data by when use org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner as user defined partitioner during bulk_insert. " - + "For example 'column1,column2'"); - + .key("hoodie.bulkinsert.user.defined.partitioner.sort.columns") + .noDefaultValue() + .withDocumentation("Columns to sort the data by when use org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner as user defined partitioner during bulk_insert. " + + "For example 'column1,column2'"); public static final ConfigProperty BULKINSERT_USER_DEFINED_PARTITIONER_CLASS_NAME = ConfigProperty .key("hoodie.bulkinsert.user.defined.partitioner.class") .noDefaultValue() .withDocumentation("If specified, this class will be used to re-partition records before they are bulk inserted. This can be used to sort, pack, cluster data" + " optimally for common query patterns. For now we support a build-in user defined bulkinsert partitioner org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner" + " which can does sorting based on specified column values set by " + BULKINSERT_USER_DEFINED_PARTITIONER_SORT_COLUMNS.key()); - + public static final ConfigProperty BULK_INSERT_SORT_MODE = ConfigProperty + .key("hoodie.bulkinsert.sort.mode") + .defaultValue(BulkInsertSortMode.NONE.toString()) + .withDocumentation("Sorting modes to use for sorting records for bulk insert. This is use when user " + + BULKINSERT_USER_DEFINED_PARTITIONER_CLASS_NAME.key() + "is not configured. Available values are - " + + "GLOBAL_SORT: this ensures best file sizes, with lowest memory overhead at cost of sorting. " + + "PARTITION_SORT: Strikes a balance by only sorting within a partition, still keeping the memory overhead of writing " + + "lowest and best effort file sizing. " + + "NONE: No sorting. Fastest and matches `spark.write.parquet()` in terms of number of files, overheads"); + /** + * @deprecated Use {@link #BULK_INSERT_SORT_MODE} and its methods instead + */ + @Deprecated + public static final String BULKINSERT_SORT_MODE = BULK_INSERT_SORT_MODE.key(); + /** + * @deprecated Use {@link #BULK_INSERT_SORT_MODE} and its methods instead + */ + @Deprecated + public static final String DEFAULT_BULKINSERT_SORT_MODE = BULK_INSERT_SORT_MODE.defaultValue(); + /** + * @deprecated Use {@link #BULKINSERT_USER_DEFINED_PARTITIONER_CLASS_NAME} and its methods instead + */ + @Deprecated + public static final String BULKINSERT_USER_DEFINED_PARTITIONER_CLASS = BULKINSERT_USER_DEFINED_PARTITIONER_CLASS_NAME.key(); public static final ConfigProperty UPSERT_PARALLELISM_VALUE = ConfigProperty .key("hoodie.upsert.shuffle.parallelism") .defaultValue("200") .withDocumentation("Parallelism to use for upsert operation on the table. Upserts can shuffle data to perform index lookups, file sizing, bin packing records optimally" + "into file groups."); - public static final ConfigProperty DELETE_PARALLELISM_VALUE = ConfigProperty .key("hoodie.delete.shuffle.parallelism") .defaultValue("200") .withDocumentation("Parallelism used for “delete” operation. Delete operations also performs shuffles, similar to upsert operation."); - public static final ConfigProperty ROLLBACK_PARALLELISM_VALUE = ConfigProperty .key("hoodie.rollback.parallelism") .defaultValue("100") .withDocumentation("Parallelism for rollback of commits. Rollbacks perform delete of files or logging delete blocks to file groups on storage in parallel."); - public static final ConfigProperty WRITE_BUFFER_LIMIT_BYTES_VALUE = ConfigProperty .key("hoodie.write.buffer.limit.bytes") .defaultValue(String.valueOf(4 * 1024 * 1024)) .withDocumentation("Size of in-memory buffer used for parallelizing network reads and lake storage writes."); - public static final ConfigProperty COMBINE_BEFORE_INSERT = ConfigProperty .key("hoodie.combine.before.insert") .defaultValue("false") .withDocumentation("When inserted records share same key, controls whether they should be first combined (i.e de-duplicated) before" + " writing to storage."); - public static final ConfigProperty COMBINE_BEFORE_UPSERT = ConfigProperty .key("hoodie.combine.before.upsert") .defaultValue("true") .withDocumentation("When upserted records share same key, controls whether they should be first combined (i.e de-duplicated) before" + " writing to storage. This should be turned off only if you are absolutely certain that there are no duplicates incoming, " + " otherwise it can lead to duplicate keys and violate the uniqueness guarantees."); - public static final ConfigProperty COMBINE_BEFORE_DELETE = ConfigProperty .key("hoodie.combine.before.delete") .defaultValue("true") .withDocumentation("During delete operations, controls whether we should combine deletes (and potentially also upserts) before " + " writing to storage."); - public static final ConfigProperty WRITE_STATUS_STORAGE_LEVEL_VALUE = ConfigProperty .key("hoodie.write.status.storage.level") .defaultValue("MEMORY_AND_DISK_SER") .withDocumentation("Write status objects hold metadata about a write (stats, errors), that is not yet committed to storage. " + "This controls the how that information is cached for inspection by clients. We rarely expect this to be changed."); - public static final ConfigProperty AUTO_COMMIT_ENABLE = ConfigProperty .key("hoodie.auto.commit") .defaultValue("true") .withDocumentation("Controls whether a write operation should auto commit. This can be turned off to perform inspection" + " of the uncommitted write before deciding to commit."); - public static final ConfigProperty WRITE_STATUS_CLASS_NAME = ConfigProperty .key("hoodie.writestatus.class") .defaultValue(WriteStatus.class.getName()) .withDocumentation("Subclass of " + WriteStatus.class.getName() + " to be used to collect information about a write. Can be " + "overridden to collection additional metrics/statistics about the data if needed."); - public static final ConfigProperty FINALIZE_WRITE_PARALLELISM_VALUE = ConfigProperty .key("hoodie.finalize.write.parallelism") .defaultValue("200") .withDocumentation("Parallelism for the write finalization internal operation, which involves removing any partially written " + "files from lake storage, before committing the write. Reduce this value, if the high number of tasks incur delays for smaller tables " + "or low latency writes."); - public static final ConfigProperty MARKERS_TYPE = ConfigProperty .key("hoodie.write.markers.type") .defaultValue(MarkerType.TIMELINE_SERVER_BASED.toString()) @@ -289,120 +281,91 @@ public class HoodieWriteConfig extends HoodieConfig { + "timeline server is disabled, DIRECT markers are used as fallback even if this " + "is configure. For Spark structured streaming, this configuration does not " + "take effect, i.e., DIRECT markers are always used for Spark structured streaming."); - public static final ConfigProperty MARKERS_TIMELINE_SERVER_BASED_BATCH_NUM_THREADS = ConfigProperty .key("hoodie.markers.timeline_server_based.batch.num_threads") .defaultValue(20) .sinceVersion("0.9.0") .withDocumentation("Number of threads to use for batch processing marker " + "creation requests at the timeline server"); - public static final ConfigProperty MARKERS_TIMELINE_SERVER_BASED_BATCH_INTERVAL_MS = ConfigProperty .key("hoodie.markers.timeline_server_based.batch.interval_ms") .defaultValue(50L) .sinceVersion("0.9.0") .withDocumentation("The batch interval in milliseconds for marker creation batch processing"); - public static final ConfigProperty MARKERS_DELETE_PARALLELISM_VALUE = ConfigProperty .key("hoodie.markers.delete.parallelism") .defaultValue("100") .withDocumentation("Determines the parallelism for deleting marker files, which are used to track all files (valid or invalid/partial) written during " + "a write operation. Increase this value if delays are observed, with large batch writes."); - - public static final ConfigProperty BULK_INSERT_SORT_MODE = ConfigProperty - .key("hoodie.bulkinsert.sort.mode") - .defaultValue(BulkInsertSortMode.NONE.toString()) - .withDocumentation("Sorting modes to use for sorting records for bulk insert. This is use when user " - + BULKINSERT_USER_DEFINED_PARTITIONER_CLASS_NAME.key() + "is not configured. Available values are - " - + "GLOBAL_SORT: this ensures best file sizes, with lowest memory overhead at cost of sorting. " - + "PARTITION_SORT: Strikes a balance by only sorting within a partition, still keeping the memory overhead of writing " - + "lowest and best effort file sizing. " - + "NONE: No sorting. Fastest and matches `spark.write.parquet()` in terms of number of files, overheads"); - public static final ConfigProperty EMBEDDED_TIMELINE_SERVER_ENABLE = ConfigProperty .key("hoodie.embed.timeline.server") .defaultValue("true") .withDocumentation("When true, spins up an instance of the timeline server (meta server that serves cached file listings, statistics)," + "running on each writer's driver process, accepting requests during the write from executors."); - public static final ConfigProperty EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED = ConfigProperty .key("hoodie.embed.timeline.server.reuse.enabled") .defaultValue("false") .withDocumentation("Controls whether the timeline server instance should be cached and reused across the JVM (across task lifecycles)" + "to avoid startup costs. This should rarely be changed."); - public static final ConfigProperty EMBEDDED_TIMELINE_SERVER_PORT_NUM = ConfigProperty .key("hoodie.embed.timeline.server.port") .defaultValue("0") .withDocumentation("Port at which the timeline server listens for requests. When running embedded in each writer, it picks " + "a free port and communicates to all the executors. This should rarely be changed."); - public static final ConfigProperty EMBEDDED_TIMELINE_NUM_SERVER_THREADS = ConfigProperty .key("hoodie.embed.timeline.server.threads") .defaultValue("-1") .withDocumentation("Number of threads to serve requests in the timeline server. By default, auto configured based on the number of underlying cores."); - public static final ConfigProperty EMBEDDED_TIMELINE_SERVER_COMPRESS_ENABLE = ConfigProperty .key("hoodie.embed.timeline.server.gzip") .defaultValue("true") .withDocumentation("Controls whether gzip compression is used, for large responses from the timeline server, to improve latency."); - public static final ConfigProperty EMBEDDED_TIMELINE_SERVER_USE_ASYNC_ENABLE = ConfigProperty .key("hoodie.embed.timeline.server.async") .defaultValue("false") .withDocumentation("Controls whether or not, the requests to the timeline server are processed in asynchronous fashion, " + "potentially improving throughput."); - public static final ConfigProperty FAIL_ON_TIMELINE_ARCHIVING_ENABLE = ConfigProperty .key("hoodie.fail.on.timeline.archiving") .defaultValue("true") .withDocumentation("Timeline archiving removes older instants from the timeline, after each write operation, to minimize metadata overhead. " + "Controls whether or not, the write should be failed as well, if such archiving fails."); - public static final ConfigProperty INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = ConfigProperty .key("hoodie.consistency.check.initial_interval_ms") .defaultValue(2000L) .withDocumentation("Initial time between successive attempts to ensure written data's metadata is consistent on storage. Grows with exponential" + " backoff after the initial value."); - public static final ConfigProperty MAX_CONSISTENCY_CHECK_INTERVAL_MS = ConfigProperty .key("hoodie.consistency.check.max_interval_ms") .defaultValue(300000L) .withDocumentation("Max time to wait between successive attempts at performing consistency checks"); - public static final ConfigProperty MAX_CONSISTENCY_CHECKS = ConfigProperty .key("hoodie.consistency.check.max_checks") .defaultValue(7) .withDocumentation("Maximum number of checks, for consistency of written data."); - public static final ConfigProperty MERGE_DATA_VALIDATION_CHECK_ENABLE = ConfigProperty .key("hoodie.merge.data.validation.enabled") .defaultValue("false") .withDocumentation("When enabled, data validation checks are performed during merges to ensure expected " + "number of records after merge operation."); - public static final ConfigProperty MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE = ConfigProperty .key("hoodie.merge.allow.duplicate.on.inserts") .defaultValue("false") .withDocumentation("When enabled, we allow duplicate keys even if inserts are routed to merge with an existing file (for ensuring file sizing)." + " This is only relevant for insert operation, since upsert, delete operations will ensure unique key constraints are maintained."); - public static final ConfigProperty MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT = ConfigProperty .key("hoodie.merge.small.file.group.candidates.limit") .defaultValue(1) .withDocumentation("Limits number of file groups, whose base file satisfies small-file limit, to consider for appending records during upsert operation. " + "Only applicable to MOR tables"); - public static final ConfigProperty CLIENT_HEARTBEAT_INTERVAL_IN_MS = ConfigProperty .key("hoodie.client.heartbeat.interval_in_ms") .defaultValue(60 * 1000) .withDocumentation("Writers perform heartbeats to indicate liveness. Controls how often (in ms), such heartbeats are registered to lake storage."); - public static final ConfigProperty CLIENT_HEARTBEAT_NUM_TOLERABLE_MISSES = ConfigProperty .key("hoodie.client.heartbeat.tolerable.misses") .defaultValue(2) .withDocumentation("Number of heartbeat misses, before a writer is deemed not alive and all pending writes are aborted."); - public static final ConfigProperty WRITE_CONCURRENCY_MODE = ConfigProperty .key("hoodie.write.concurrency.mode") .defaultValue(WriteConcurrencyMode.SINGLE_WRITER.name()) @@ -410,7 +373,6 @@ public class HoodieWriteConfig extends HoodieConfig { + "SINGLE_WRITER: Only one active writer to the table. Maximizes throughput" + "OPTIMISTIC_CONCURRENCY_CONTROL: Multiple writers can operate on the table and exactly one of them succeed " + "if a conflict (writes affect the same file group) is detected."); - /** * Currently the use this to specify the write schema. */ @@ -420,7 +382,6 @@ public class HoodieWriteConfig extends HoodieConfig { .withDocumentation("The specified write schema. In most case, we do not need set this parameter," + " but for the case the write schema is not equal to the specified table schema, we can" + " specify the write schema by this parameter. Used by MergeIntoHoodieTableCommand"); - /** * HUDI-858 : There are users who had been directly using RDD APIs and have relied on a behavior in 0.4.x to allow * multiple write operations (upsert/buk-insert/...) to be executed within a single commit. @@ -435,71 +396,59 @@ public class HoodieWriteConfig extends HoodieConfig { .key("_.hoodie.allow.multi.write.on.same.instant") .defaultValue("false") .withDocumentation(""); - public static final ConfigProperty AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE = ConfigProperty .key(AVRO_SCHEMA_STRING.key() + ".external.transformation") .defaultValue("false") .withAlternatives(AVRO_SCHEMA_STRING.key() + ".externalTransformation") .withDocumentation("When enabled, records in older schema are rewritten into newer schema during upsert,delete and background" + " compaction,clustering operations."); - + /** + * @deprecated Use {@link #AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE} and its methods instead + */ + @Deprecated + public static final String EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION = AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE.key(); + /** + * @deprecated Use {@link #AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE} and its methods instead + */ + @Deprecated + public static final String DEFAULT_EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION = AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE.defaultValue(); public static final ConfigProperty ALLOW_EMPTY_COMMIT = ConfigProperty .key("hoodie.allow.empty.commit") .defaultValue(true) .withDocumentation("Whether to allow generation of empty commits, even if no data was written in the commit. " + "It's useful in cases where extra metadata needs to be published regardless e.g tracking source offsets when ingesting data"); - public static final ConfigProperty ALLOW_OPERATION_METADATA_FIELD = ConfigProperty .key("hoodie.allow.operation.metadata.field") .defaultValue(false) .sinceVersion("0.9.0") .withDocumentation("Whether to include '_hoodie_operation' in the metadata fields. " + "Once enabled, all the changes of a record are persisted to the delta log directly without merge"); - public static final ConfigProperty FILEID_PREFIX_PROVIDER_CLASS = ConfigProperty .key("hoodie.fileid.prefix.provider.class") .defaultValue(RandomFileIdPrefixProvider.class.getName()) .sinceVersion("0.10.0") .withDocumentation("File Id Prefix provider class, that implements `org.apache.hudi.fileid.FileIdPrefixProvider`"); - public static final ConfigProperty TABLE_SERVICES_ENABLED = ConfigProperty .key("hoodie.table.services.enabled") .defaultValue(true) .sinceVersion("0.11.0") .withDocumentation("Master control to disable all table services including archive, clean, compact, cluster, etc."); - public static final ConfigProperty RELEASE_RESOURCE_ENABLE = ConfigProperty .key("hoodie.release.resource.on.completion.enable") .defaultValue(true) .sinceVersion("0.11.0") .withDocumentation("Control to enable release all persist rdds when the spark job finish."); - public static final ConfigProperty AUTO_ADJUST_LOCK_CONFIGS = ConfigProperty .key("hoodie.auto.adjust.lock.configs") .defaultValue(false) .sinceVersion("0.11.0") .withDocumentation("Auto adjust lock configurations when metadata table is enabled and for async table services."); - public static final ConfigProperty SKIP_DEFAULT_PARTITION_VALIDATION = ConfigProperty .key("hoodie.skip.default.partition.validation") .defaultValue(false) .sinceVersion("0.12.0") .withDocumentation("When table is upgraded from pre 0.12 to 0.12, we check for \"default\" partition and fail if found one. " + "Users are expected to rewrite the data in those partitions. Enabling this config will bypass this validation"); - - private ConsistencyGuardConfig consistencyGuardConfig; - private FileSystemRetryConfig fileSystemRetryConfig; - - // Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled - // We keep track of original config and rewritten config - private final FileSystemViewStorageConfig clientSpecifiedViewStorageConfig; - private FileSystemViewStorageConfig viewStorageConfig; - private HoodiePayloadConfig hoodiePayloadConfig; - private HoodieMetadataConfig metadataConfig; - private HoodieMetastoreConfig metastoreConfig; - private HoodieCommonConfig commonConfig; - private EngineType engineType; - /** * @deprecated Use {@link #TBL_NAME} and its methods instead */ @@ -580,11 +529,6 @@ public class HoodieWriteConfig extends HoodieConfig { */ @Deprecated public static final String BULKINSERT_PARALLELISM = BULKINSERT_PARALLELISM_VALUE.key(); - /** - * @deprecated Use {@link #BULKINSERT_USER_DEFINED_PARTITIONER_CLASS_NAME} and its methods instead - */ - @Deprecated - public static final String BULKINSERT_USER_DEFINED_PARTITIONER_CLASS = BULKINSERT_USER_DEFINED_PARTITIONER_CLASS_NAME.key(); @Deprecated public static final String BULKINSERT_INPUT_DATA_SCHEMA_DDL = "hoodie.bulkinsert.schema.ddl"; /** @@ -697,16 +641,6 @@ public class HoodieWriteConfig extends HoodieConfig { */ @Deprecated public static final String DEFAULT_MARKERS_DELETE_PARALLELISM = MARKERS_DELETE_PARALLELISM_VALUE.defaultValue(); - /** - * @deprecated Use {@link #BULK_INSERT_SORT_MODE} and its methods instead - */ - @Deprecated - public static final String BULKINSERT_SORT_MODE = BULK_INSERT_SORT_MODE.key(); - /** - * @deprecated Use {@link #BULK_INSERT_SORT_MODE} and its methods instead - */ - @Deprecated - public static final String DEFAULT_BULKINSERT_SORT_MODE = BULK_INSERT_SORT_MODE.defaultValue(); /** * @deprecated Use {@link #EMBEDDED_TIMELINE_SERVER_ENABLE} and its methods instead */ @@ -772,51 +706,16 @@ public class HoodieWriteConfig extends HoodieConfig { */ @Deprecated public static final String INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP = INITIAL_CONSISTENCY_CHECK_INTERVAL_MS.key(); - /** - * @deprecated Use {@link #INITIAL_CONSISTENCY_CHECK_INTERVAL_MS} and its methods instead - */ - @Deprecated - public static long DEFAULT_INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = INITIAL_CONSISTENCY_CHECK_INTERVAL_MS.defaultValue(); /** * @deprecated Use {@link #MAX_CONSISTENCY_CHECK_INTERVAL_MS} and its methods instead */ @Deprecated public static final String MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP = MAX_CONSISTENCY_CHECK_INTERVAL_MS.key(); - /** - * @deprecated Use {@link #MAX_CONSISTENCY_CHECK_INTERVAL_MS} and its methods instead - */ - @Deprecated - public static long DEFAULT_MAX_CONSISTENCY_CHECK_INTERVAL_MS = MAX_CONSISTENCY_CHECK_INTERVAL_MS.defaultValue(); /** * @deprecated Use {@link #MAX_CONSISTENCY_CHECKS} and its methods instead */ @Deprecated public static final String MAX_CONSISTENCY_CHECKS_PROP = MAX_CONSISTENCY_CHECKS.key(); - /** - * @deprecated Use {@link #MAX_CONSISTENCY_CHECKS} and its methods instead - */ - @Deprecated - public static int DEFAULT_MAX_CONSISTENCY_CHECKS = MAX_CONSISTENCY_CHECKS.defaultValue(); - /** - * @deprecated Use {@link #MERGE_DATA_VALIDATION_CHECK_ENABLE} and its methods instead - */ - @Deprecated - private static final String MERGE_DATA_VALIDATION_CHECK_ENABLED = MERGE_DATA_VALIDATION_CHECK_ENABLE.key(); - /** - * @deprecated Use {@link #MERGE_DATA_VALIDATION_CHECK_ENABLE} and its methods instead - */ - @Deprecated - private static final String DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED = MERGE_DATA_VALIDATION_CHECK_ENABLE.defaultValue(); - /** - * @deprecated Use {@link #MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE} and its methods instead - */ - @Deprecated - private static final String MERGE_ALLOW_DUPLICATE_ON_INSERTS = MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.key(); - /** - * @deprecated Use {@link #MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE} and its methods instead - */ - @Deprecated - private static final String DEFAULT_MERGE_ALLOW_DUPLICATE_ON_INSERTS = MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.defaultValue(); /** * @deprecated Use {@link #CLIENT_HEARTBEAT_INTERVAL_IN_MS} and its methods instead */ @@ -857,16 +756,54 @@ public class HoodieWriteConfig extends HoodieConfig { */ @Deprecated public static final String DEFAULT_ALLOW_MULTI_WRITE_ON_SAME_INSTANT = ALLOW_MULTI_WRITE_ON_SAME_INSTANT_ENABLE.defaultValue(); + private static final Logger LOG = LogManager.getLogger(HoodieWriteConfig.class); + private static final long serialVersionUID = 0L; /** - * @deprecated Use {@link #AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE} and its methods instead + * @deprecated Use {@link #MERGE_DATA_VALIDATION_CHECK_ENABLE} and its methods instead */ @Deprecated - public static final String EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION = AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE.key(); + private static final String MERGE_DATA_VALIDATION_CHECK_ENABLED = MERGE_DATA_VALIDATION_CHECK_ENABLE.key(); /** - * @deprecated Use {@link #AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE} and its methods instead + * @deprecated Use {@link #MERGE_DATA_VALIDATION_CHECK_ENABLE} and its methods instead */ @Deprecated - public static final String DEFAULT_EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION = AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE.defaultValue(); + private static final String DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED = MERGE_DATA_VALIDATION_CHECK_ENABLE.defaultValue(); + /** + * @deprecated Use {@link #MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE} and its methods instead + */ + @Deprecated + private static final String MERGE_ALLOW_DUPLICATE_ON_INSERTS = MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.key(); + /** + * @deprecated Use {@link #MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE} and its methods instead + */ + @Deprecated + private static final String DEFAULT_MERGE_ALLOW_DUPLICATE_ON_INSERTS = MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.defaultValue(); + /** + * @deprecated Use {@link #INITIAL_CONSISTENCY_CHECK_INTERVAL_MS} and its methods instead + */ + @Deprecated + public static long DEFAULT_INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = INITIAL_CONSISTENCY_CHECK_INTERVAL_MS.defaultValue(); + /** + * @deprecated Use {@link #MAX_CONSISTENCY_CHECK_INTERVAL_MS} and its methods instead + */ + @Deprecated + public static long DEFAULT_MAX_CONSISTENCY_CHECK_INTERVAL_MS = MAX_CONSISTENCY_CHECK_INTERVAL_MS.defaultValue(); + /** + * @deprecated Use {@link #MAX_CONSISTENCY_CHECKS} and its methods instead + */ + @Deprecated + public static int DEFAULT_MAX_CONSISTENCY_CHECKS = MAX_CONSISTENCY_CHECKS.defaultValue(); + // Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled + // We keep track of original config and rewritten config + private final FileSystemViewStorageConfig clientSpecifiedViewStorageConfig; + private ConsistencyGuardConfig consistencyGuardConfig; + private FileSystemRetryConfig fileSystemRetryConfig; + private FileSystemViewStorageConfig viewStorageConfig; + private HoodiePayloadConfig hoodiePayloadConfig; + private HoodieMetadataConfig metadataConfig; + private HoodieMetastoreConfig metastoreConfig; + private HoodieCommonConfig commonConfig; + private EngineType engineType; /** * Use Spark engine by default. @@ -919,14 +856,14 @@ public boolean getInternalSchemaCacheEnable() { return getBoolean(ENABLE_INTERNAL_SCHEMA_CACHE); } - public void setInternalSchemaString(String internalSchemaString) { - setValue(INTERNAL_SCHEMA_STRING, internalSchemaString); - } - public void setInternalSchemaCacheEnable(boolean enable) { setValue(ENABLE_INTERNAL_SCHEMA_CACHE, String.valueOf(enable)); } + public void setInternalSchemaString(String internalSchemaString) { + setValue(INTERNAL_SCHEMA_STRING, internalSchemaString); + } + public boolean getSchemaEvolutionEnable() { return getBoolean(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE); } @@ -937,9 +874,10 @@ public void setSchemaEvolutionEnable(boolean enable) { /** * Get the write schema for written records. - * + *

* If the WRITE_SCHEMA has specified, we use the WRITE_SCHEMA. * Or else we use the AVRO_SCHEMA as the write schema. + * * @return */ public String getWriteSchema() { @@ -1275,6 +1213,10 @@ public Long getCompactionLogFileSizeThreshold() { return getLong(HoodieCompactionConfig.COMPACTION_LOG_FILE_SIZE_THRESHOLD); } + public Long getCompactionLogFileLengthThreshold() { + return getLong(HoodieCompactionConfig.COMPACTION_LOG_FILE_LENGTH_THRESHOLD); + } + public Boolean getCompactionLazyBlockReadEnabled() { return getBoolean(HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE); } @@ -1912,14 +1854,14 @@ public ConsistencyGuardConfig getConsistencyGuardConfig() { return consistencyGuardConfig; } - public FileSystemRetryConfig getFileSystemRetryConfig() { - return fileSystemRetryConfig; - } - public void setConsistencyGuardConfig(ConsistencyGuardConfig consistencyGuardConfig) { this.consistencyGuardConfig = consistencyGuardConfig; } + public FileSystemRetryConfig getFileSystemRetryConfig() { + return fileSystemRetryConfig; + } + public FileSystemViewStorageConfig getViewStorageConfig() { return viewStorageConfig; } @@ -2044,6 +1986,7 @@ public int getMetadataCleanerCommitsRetained() { /** * Hoodie Client Lock Configs. + * * @return */ public boolean isAutoAdjustLockConfigs() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileLengthBasedCompactionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileLengthBasedCompactionStrategy.java new file mode 100644 index 000000000000..6f5cb96ffc39 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileLengthBasedCompactionStrategy.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.compact.strategy; + +import org.apache.hudi.avro.model.HoodieCompactionOperation; +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.config.HoodieWriteConfig; + +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; + +/** + * LogFileLengthBasedCompactionStrategy orders the compactions based on the total log files length, + * filters the file group which log files length is greater than the threshold and limits the compactions within a configured IO bound. + */ +public class LogFileLengthBasedCompactionStrategy extends BoundedIOCompactionStrategy + implements Comparator { + + @Override + public List orderAndFilter(HoodieWriteConfig writeConfig, List operations, List pendingCompactionPlans) { + Long lengthThreshold = writeConfig.getCompactionLogFileLengthThreshold(); + List filterOperator = operations.stream() + .filter(e -> e.getDeltaFilePaths().size() >= lengthThreshold) + .sorted(this).collect(Collectors.toList()); + return super.orderAndFilter(writeConfig, filterOperator, pendingCompactionPlans); + } + + @Override + public int compare(HoodieCompactionOperation hco1, HoodieCompactionOperation hco2) { + return hco2.getDeltaFilePaths().size() - hco1.getDeltaFilePaths().size(); + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java index 0c7190092e73..4d664b5ef9e5 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java @@ -49,8 +49,8 @@ public class TestHoodieCompactionStrategy { private static final long MB = 1024 * 1024L; - private String[] partitionPaths = {"2017/01/01", "2017/01/02", "2017/01/03"}; private static final Random RANDOM = new Random(); + private String[] partitionPaths = {"2017/01/01", "2017/01/02", "2017/01/03"}; @Test public void testUnBounded() { @@ -76,7 +76,7 @@ public void testBoundedIOSimple() { sizesMap.put(90 * MB, Collections.singletonList(1024 * MB)); BoundedIOCompactionStrategy strategy = new BoundedIOCompactionStrategy(); HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig( - HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(400).build()) + HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(400).build()) .build(); List operations = createCompactionOperations(writeConfig, sizesMap); List returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>()); @@ -99,8 +99,8 @@ public void testLogFileSizeCompactionSimple() { sizesMap.put(90 * MB, Collections.singletonList(1024 * MB)); LogFileSizeBasedCompactionStrategy strategy = new LogFileSizeBasedCompactionStrategy(); HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig( - HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(1205) - .withLogFileSizeThresholdBasedCompaction(100 * 1024 * 1024).build()) + HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(1205) + .withLogFileSizeThresholdBasedCompaction(100 * 1024 * 1024).build()) .build(); List operations = createCompactionOperations(writeConfig, sizesMap); List returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>()); @@ -123,7 +123,7 @@ public void testDayBasedCompactionSimple() { sizesMap.put(100 * MB, Collections.singletonList(MB)); sizesMap.put(90 * MB, Collections.singletonList(1024 * MB)); - Map keyToPartitionMap = Collections.unmodifiableMap(new HashMap() { + Map keyToPartitionMap = Collections.unmodifiableMap(new HashMap() { { put(120 * MB, partitionPaths[2]); put(110 * MB, partitionPaths[2]); @@ -169,7 +169,7 @@ public void testBoundedPartitionAwareCompactionSimple() { String currentDayPlus1 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(1)); String currentDayPlus5 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(5)); - Map keyToPartitionMap = Collections.unmodifiableMap(new HashMap() { + Map keyToPartitionMap = Collections.unmodifiableMap(new HashMap() { { put(120 * MB, currentDay); put(110 * MB, currentDayMinus1); @@ -218,7 +218,7 @@ public void testUnboundedPartitionAwareCompactionSimple() { String currentDayPlus1 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(1)); String currentDayPlus5 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(5)); - Map keyToPartitionMap = Collections.unmodifiableMap(new HashMap() { + Map keyToPartitionMap = Collections.unmodifiableMap(new HashMap() { { put(120 * MB, currentDay); put(110 * MB, currentDayMinus1); @@ -243,8 +243,44 @@ public void testUnboundedPartitionAwareCompactionSimple() { "BoundedPartitionAwareCompactionStrategy should have resulted in 1 compaction"); } + @Test + public void testLogFileLengthBasedCompactionStrategy() { + Map> sizesMap = new HashMap<>(); + sizesMap.put(120 * MB, Arrays.asList(60 * MB, 10 * MB, 80 * MB)); + sizesMap.put(110 * MB, new ArrayList<>()); + sizesMap.put(100 * MB, Collections.singletonList(2048 * MB)); + sizesMap.put(90 * MB, Arrays.asList(512 * MB, 512 * MB)); + LogFileLengthBasedCompactionStrategy strategy = new LogFileLengthBasedCompactionStrategy(); + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig( + HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(1024) + .withLogFileLengthThresholdBasedCompaction(2).build()) + .build(); + List operations = createCompactionOperations(writeConfig, sizesMap); + List returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>()); + + assertTrue(returned.size() < operations.size(), + "LogFileLengthBasedCompactionStrategy should have resulted in fewer compactions"); + assertEquals(2, returned.size(), "LogFileLengthBasedCompactionStrategy should have resulted in 2 compaction"); + + // Delte log File length + Integer allFileLength = returned.stream().map(s -> s.getDeltaFilePaths().size()) + .reduce(Integer::sum).orElse(0); + + assertEquals(5, allFileLength); + assertEquals(3, returned.get(0).getDeltaFilePaths().size()); + assertEquals(2, returned.get(1).getDeltaFilePaths().size()); + // Total size of all the log files + Long returnedSize = returned.stream().map(s -> s.getMetrics().get(BoundedIOCompactionStrategy.TOTAL_IO_MB)) + .map(Double::longValue).reduce(Long::sum).orElse(0L); + // TOTAL_IO_MB: ( 120 + 90 ) * 2 + 521 + 521 + 60 + 10 + 80 + assertEquals(1594, (long) returnedSize, + "Should chose the first 2 compactions which should result in a total IO of 1594 MB"); + + + } + private List createCompactionOperations(HoodieWriteConfig config, - Map> sizesMap) { + Map> sizesMap) { Map keyToPartitionMap = sizesMap.keySet().stream() .map(e -> Pair.of(e, partitionPaths[RANDOM.nextInt(partitionPaths.length - 1)])) .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); @@ -252,7 +288,7 @@ private List createCompactionOperations(HoodieWriteCo } private List createCompactionOperations(HoodieWriteConfig config, - Map> sizesMap, Map keyToPartitionMap) { + Map> sizesMap, Map keyToPartitionMap) { List operations = new ArrayList<>(sizesMap.size()); sizesMap.forEach((k, v) -> { From 462f77736f855dc277cc62e0778fb4c1fa04f09a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=8F=E6=89=BF=E7=A5=A5?= Date: Wed, 14 Sep 2022 17:47:26 +0800 Subject: [PATCH 2/4] fix big change --- .../apache/hudi/config/HoodieWriteConfig.java | 235 +++++++++++------- 1 file changed, 148 insertions(+), 87 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 637fe757ddbc..ff3de4695eb9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -101,49 +101,61 @@ + "higher level frameworks (e.g Spark datasources, Flink sink) and utilities (e.g DeltaStreamer).") public class HoodieWriteConfig extends HoodieConfig { + private static final Logger LOG = LogManager.getLogger(HoodieWriteConfig.class); + private static final long serialVersionUID = 0L; + // This is a constant as is should never be changed via config (will invalidate previous commits) // It is here so that both the client and deltastreamer use the same reference public static final String DELTASTREAMER_CHECKPOINT_KEY = "deltastreamer.checkpoint.key"; + public static final ConfigProperty TBL_NAME = ConfigProperty .key(HoodieTableConfig.HOODIE_TABLE_NAME_KEY) .noDefaultValue() .withDocumentation("Table name that will be used for registering with metastores like HMS. Needs to be same across runs."); + public static final ConfigProperty PRECOMBINE_FIELD_NAME = ConfigProperty .key("hoodie.datasource.write.precombine.field") .defaultValue("ts") .withDocumentation("Field used in preCombining before actual write. When two records have the same key value, " + "we will pick the one with the largest value for the precombine field, determined by Object.compareTo(..)"); + public static final ConfigProperty WRITE_PAYLOAD_CLASS_NAME = ConfigProperty .key("hoodie.datasource.write.payload.class") .defaultValue(OverwriteWithLatestAvroPayload.class.getName()) .withDocumentation("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. " + "This will render any value set for PRECOMBINE_FIELD_OPT_VAL in-effective"); + public static final ConfigProperty KEYGENERATOR_CLASS_NAME = ConfigProperty .key("hoodie.datasource.write.keygenerator.class") .noDefaultValue() .withDocumentation("Key generator class, that implements `org.apache.hudi.keygen.KeyGenerator` " + "extract a key out of incoming records."); + public static final ConfigProperty KEYGENERATOR_TYPE = ConfigProperty .key("hoodie.datasource.write.keygenerator.type") .defaultValue(KeyGeneratorType.SIMPLE.name()) .withDocumentation("Easily configure one the built-in key generators, instead of specifying the key generator class." + "Currently supports SIMPLE, COMPLEX, TIMESTAMP, CUSTOM, NON_PARTITION, GLOBAL_DELETE"); + public static final ConfigProperty ROLLBACK_USING_MARKERS_ENABLE = ConfigProperty .key("hoodie.rollback.using.markers") .defaultValue("true") .withDocumentation("Enables a more efficient mechanism for rollbacks based on the marker files generated " + "during the writes. Turned on by default."); + public static final ConfigProperty TIMELINE_LAYOUT_VERSION_NUM = ConfigProperty .key("hoodie.timeline.layout.version") .defaultValue(Integer.toString(TimelineLayoutVersion.VERSION_1)) .sinceVersion("0.5.1") .withDocumentation("Controls the layout of the timeline. Version 0 relied on renames, Version 1 (default) models " + "the timeline as an immutable log relying only on atomic writes for object storage."); + public static final ConfigProperty BASE_FILE_FORMAT = ConfigProperty .key("hoodie.table.base.file.format") .defaultValue(HoodieFileFormat.PARQUET) .withAlternatives("hoodie.table.ro.file.format") .withDocumentation("Base file format to store all the base file data."); + public static final ConfigProperty BASE_PATH = ConfigProperty .key("hoodie.base.path") .noDefaultValue() @@ -151,123 +163,119 @@ public class HoodieWriteConfig extends HoodieConfig { + "Always prefix it explicitly with the storage scheme (e.g hdfs://, s3:// etc). " + "Hudi stores all the main meta-data about commits, savepoints, cleaning audit logs " + "etc in .hoodie directory under this base path directory."); + public static final ConfigProperty AVRO_SCHEMA_STRING = ConfigProperty .key("hoodie.avro.schema") .noDefaultValue() .withDocumentation("Schema string representing the current write schema of the table. Hudi passes this to " + "implementations of HoodieRecordPayload to convert incoming records to avro. This is also used as the write schema " + "evolving records during an update."); + public static final ConfigProperty INTERNAL_SCHEMA_STRING = ConfigProperty .key("hoodie.internal.schema") .noDefaultValue() .withDocumentation("Schema string representing the latest schema of the table. Hudi passes this to " + "implementations of evolution of schema"); + public static final ConfigProperty ENABLE_INTERNAL_SCHEMA_CACHE = ConfigProperty .key("hoodie.schema.cache.enable") .defaultValue(false) .withDocumentation("cache query internalSchemas in driver/executor side"); + public static final ConfigProperty AVRO_SCHEMA_VALIDATE_ENABLE = ConfigProperty .key("hoodie.avro.schema.validate") .defaultValue("false") .withDocumentation("Validate the schema used for the write against the latest schema, for backwards compatibility."); + public static final ConfigProperty INSERT_PARALLELISM_VALUE = ConfigProperty .key("hoodie.insert.shuffle.parallelism") .defaultValue("200") .withDocumentation("Parallelism for inserting records into the table. Inserts can shuffle data before writing to tune file sizes and optimize the storage layout."); + public static final ConfigProperty BULKINSERT_PARALLELISM_VALUE = ConfigProperty .key("hoodie.bulkinsert.shuffle.parallelism") .defaultValue("200") .withDocumentation("For large initial imports using bulk_insert operation, controls the parallelism to use for sort modes or custom partitioning done" + "before writing records to the table."); + public static final ConfigProperty BULKINSERT_USER_DEFINED_PARTITIONER_SORT_COLUMNS = ConfigProperty .key("hoodie.bulkinsert.user.defined.partitioner.sort.columns") .noDefaultValue() .withDocumentation("Columns to sort the data by when use org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner as user defined partitioner during bulk_insert. " + "For example 'column1,column2'"); + public static final ConfigProperty BULKINSERT_USER_DEFINED_PARTITIONER_CLASS_NAME = ConfigProperty .key("hoodie.bulkinsert.user.defined.partitioner.class") .noDefaultValue() .withDocumentation("If specified, this class will be used to re-partition records before they are bulk inserted. This can be used to sort, pack, cluster data" + " optimally for common query patterns. For now we support a build-in user defined bulkinsert partitioner org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner" + " which can does sorting based on specified column values set by " + BULKINSERT_USER_DEFINED_PARTITIONER_SORT_COLUMNS.key()); - public static final ConfigProperty BULK_INSERT_SORT_MODE = ConfigProperty - .key("hoodie.bulkinsert.sort.mode") - .defaultValue(BulkInsertSortMode.NONE.toString()) - .withDocumentation("Sorting modes to use for sorting records for bulk insert. This is use when user " - + BULKINSERT_USER_DEFINED_PARTITIONER_CLASS_NAME.key() + "is not configured. Available values are - " - + "GLOBAL_SORT: this ensures best file sizes, with lowest memory overhead at cost of sorting. " - + "PARTITION_SORT: Strikes a balance by only sorting within a partition, still keeping the memory overhead of writing " - + "lowest and best effort file sizing. " - + "NONE: No sorting. Fastest and matches `spark.write.parquet()` in terms of number of files, overheads"); - /** - * @deprecated Use {@link #BULK_INSERT_SORT_MODE} and its methods instead - */ - @Deprecated - public static final String BULKINSERT_SORT_MODE = BULK_INSERT_SORT_MODE.key(); - /** - * @deprecated Use {@link #BULK_INSERT_SORT_MODE} and its methods instead - */ - @Deprecated - public static final String DEFAULT_BULKINSERT_SORT_MODE = BULK_INSERT_SORT_MODE.defaultValue(); - /** - * @deprecated Use {@link #BULKINSERT_USER_DEFINED_PARTITIONER_CLASS_NAME} and its methods instead - */ - @Deprecated - public static final String BULKINSERT_USER_DEFINED_PARTITIONER_CLASS = BULKINSERT_USER_DEFINED_PARTITIONER_CLASS_NAME.key(); + public static final ConfigProperty UPSERT_PARALLELISM_VALUE = ConfigProperty .key("hoodie.upsert.shuffle.parallelism") .defaultValue("200") .withDocumentation("Parallelism to use for upsert operation on the table. Upserts can shuffle data to perform index lookups, file sizing, bin packing records optimally" + "into file groups."); + public static final ConfigProperty DELETE_PARALLELISM_VALUE = ConfigProperty .key("hoodie.delete.shuffle.parallelism") .defaultValue("200") .withDocumentation("Parallelism used for “delete” operation. Delete operations also performs shuffles, similar to upsert operation."); + public static final ConfigProperty ROLLBACK_PARALLELISM_VALUE = ConfigProperty .key("hoodie.rollback.parallelism") .defaultValue("100") .withDocumentation("Parallelism for rollback of commits. Rollbacks perform delete of files or logging delete blocks to file groups on storage in parallel."); + public static final ConfigProperty WRITE_BUFFER_LIMIT_BYTES_VALUE = ConfigProperty .key("hoodie.write.buffer.limit.bytes") .defaultValue(String.valueOf(4 * 1024 * 1024)) .withDocumentation("Size of in-memory buffer used for parallelizing network reads and lake storage writes."); + public static final ConfigProperty COMBINE_BEFORE_INSERT = ConfigProperty .key("hoodie.combine.before.insert") .defaultValue("false") .withDocumentation("When inserted records share same key, controls whether they should be first combined (i.e de-duplicated) before" + " writing to storage."); + public static final ConfigProperty COMBINE_BEFORE_UPSERT = ConfigProperty .key("hoodie.combine.before.upsert") .defaultValue("true") .withDocumentation("When upserted records share same key, controls whether they should be first combined (i.e de-duplicated) before" + " writing to storage. This should be turned off only if you are absolutely certain that there are no duplicates incoming, " + " otherwise it can lead to duplicate keys and violate the uniqueness guarantees."); + public static final ConfigProperty COMBINE_BEFORE_DELETE = ConfigProperty .key("hoodie.combine.before.delete") .defaultValue("true") .withDocumentation("During delete operations, controls whether we should combine deletes (and potentially also upserts) before " + " writing to storage."); + public static final ConfigProperty WRITE_STATUS_STORAGE_LEVEL_VALUE = ConfigProperty .key("hoodie.write.status.storage.level") .defaultValue("MEMORY_AND_DISK_SER") .withDocumentation("Write status objects hold metadata about a write (stats, errors), that is not yet committed to storage. " + "This controls the how that information is cached for inspection by clients. We rarely expect this to be changed."); + public static final ConfigProperty AUTO_COMMIT_ENABLE = ConfigProperty .key("hoodie.auto.commit") .defaultValue("true") .withDocumentation("Controls whether a write operation should auto commit. This can be turned off to perform inspection" + " of the uncommitted write before deciding to commit."); + public static final ConfigProperty WRITE_STATUS_CLASS_NAME = ConfigProperty .key("hoodie.writestatus.class") .defaultValue(WriteStatus.class.getName()) .withDocumentation("Subclass of " + WriteStatus.class.getName() + " to be used to collect information about a write. Can be " + "overridden to collection additional metrics/statistics about the data if needed."); + public static final ConfigProperty FINALIZE_WRITE_PARALLELISM_VALUE = ConfigProperty .key("hoodie.finalize.write.parallelism") .defaultValue("200") .withDocumentation("Parallelism for the write finalization internal operation, which involves removing any partially written " + "files from lake storage, before committing the write. Reduce this value, if the high number of tasks incur delays for smaller tables " + "or low latency writes."); + public static final ConfigProperty MARKERS_TYPE = ConfigProperty .key("hoodie.write.markers.type") .defaultValue(MarkerType.TIMELINE_SERVER_BASED.toString()) @@ -281,91 +289,120 @@ public class HoodieWriteConfig extends HoodieConfig { + "timeline server is disabled, DIRECT markers are used as fallback even if this " + "is configure. For Spark structured streaming, this configuration does not " + "take effect, i.e., DIRECT markers are always used for Spark structured streaming."); + public static final ConfigProperty MARKERS_TIMELINE_SERVER_BASED_BATCH_NUM_THREADS = ConfigProperty .key("hoodie.markers.timeline_server_based.batch.num_threads") .defaultValue(20) .sinceVersion("0.9.0") .withDocumentation("Number of threads to use for batch processing marker " + "creation requests at the timeline server"); + public static final ConfigProperty MARKERS_TIMELINE_SERVER_BASED_BATCH_INTERVAL_MS = ConfigProperty .key("hoodie.markers.timeline_server_based.batch.interval_ms") .defaultValue(50L) .sinceVersion("0.9.0") .withDocumentation("The batch interval in milliseconds for marker creation batch processing"); + public static final ConfigProperty MARKERS_DELETE_PARALLELISM_VALUE = ConfigProperty .key("hoodie.markers.delete.parallelism") .defaultValue("100") .withDocumentation("Determines the parallelism for deleting marker files, which are used to track all files (valid or invalid/partial) written during " + "a write operation. Increase this value if delays are observed, with large batch writes."); + + public static final ConfigProperty BULK_INSERT_SORT_MODE = ConfigProperty + .key("hoodie.bulkinsert.sort.mode") + .defaultValue(BulkInsertSortMode.NONE.toString()) + .withDocumentation("Sorting modes to use for sorting records for bulk insert. This is use when user " + + BULKINSERT_USER_DEFINED_PARTITIONER_CLASS_NAME.key() + "is not configured. Available values are - " + + "GLOBAL_SORT: this ensures best file sizes, with lowest memory overhead at cost of sorting. " + + "PARTITION_SORT: Strikes a balance by only sorting within a partition, still keeping the memory overhead of writing " + + "lowest and best effort file sizing. " + + "NONE: No sorting. Fastest and matches `spark.write.parquet()` in terms of number of files, overheads"); + public static final ConfigProperty EMBEDDED_TIMELINE_SERVER_ENABLE = ConfigProperty .key("hoodie.embed.timeline.server") .defaultValue("true") .withDocumentation("When true, spins up an instance of the timeline server (meta server that serves cached file listings, statistics)," + "running on each writer's driver process, accepting requests during the write from executors."); + public static final ConfigProperty EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED = ConfigProperty .key("hoodie.embed.timeline.server.reuse.enabled") .defaultValue("false") .withDocumentation("Controls whether the timeline server instance should be cached and reused across the JVM (across task lifecycles)" + "to avoid startup costs. This should rarely be changed."); + public static final ConfigProperty EMBEDDED_TIMELINE_SERVER_PORT_NUM = ConfigProperty .key("hoodie.embed.timeline.server.port") .defaultValue("0") .withDocumentation("Port at which the timeline server listens for requests. When running embedded in each writer, it picks " + "a free port and communicates to all the executors. This should rarely be changed."); + public static final ConfigProperty EMBEDDED_TIMELINE_NUM_SERVER_THREADS = ConfigProperty .key("hoodie.embed.timeline.server.threads") .defaultValue("-1") .withDocumentation("Number of threads to serve requests in the timeline server. By default, auto configured based on the number of underlying cores."); + public static final ConfigProperty EMBEDDED_TIMELINE_SERVER_COMPRESS_ENABLE = ConfigProperty .key("hoodie.embed.timeline.server.gzip") .defaultValue("true") .withDocumentation("Controls whether gzip compression is used, for large responses from the timeline server, to improve latency."); + public static final ConfigProperty EMBEDDED_TIMELINE_SERVER_USE_ASYNC_ENABLE = ConfigProperty .key("hoodie.embed.timeline.server.async") .defaultValue("false") .withDocumentation("Controls whether or not, the requests to the timeline server are processed in asynchronous fashion, " + "potentially improving throughput."); + public static final ConfigProperty FAIL_ON_TIMELINE_ARCHIVING_ENABLE = ConfigProperty .key("hoodie.fail.on.timeline.archiving") .defaultValue("true") .withDocumentation("Timeline archiving removes older instants from the timeline, after each write operation, to minimize metadata overhead. " + "Controls whether or not, the write should be failed as well, if such archiving fails."); + public static final ConfigProperty INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = ConfigProperty .key("hoodie.consistency.check.initial_interval_ms") .defaultValue(2000L) .withDocumentation("Initial time between successive attempts to ensure written data's metadata is consistent on storage. Grows with exponential" + " backoff after the initial value."); + public static final ConfigProperty MAX_CONSISTENCY_CHECK_INTERVAL_MS = ConfigProperty .key("hoodie.consistency.check.max_interval_ms") .defaultValue(300000L) .withDocumentation("Max time to wait between successive attempts at performing consistency checks"); + public static final ConfigProperty MAX_CONSISTENCY_CHECKS = ConfigProperty .key("hoodie.consistency.check.max_checks") .defaultValue(7) .withDocumentation("Maximum number of checks, for consistency of written data."); + public static final ConfigProperty MERGE_DATA_VALIDATION_CHECK_ENABLE = ConfigProperty .key("hoodie.merge.data.validation.enabled") .defaultValue("false") .withDocumentation("When enabled, data validation checks are performed during merges to ensure expected " + "number of records after merge operation."); + public static final ConfigProperty MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE = ConfigProperty .key("hoodie.merge.allow.duplicate.on.inserts") .defaultValue("false") .withDocumentation("When enabled, we allow duplicate keys even if inserts are routed to merge with an existing file (for ensuring file sizing)." + " This is only relevant for insert operation, since upsert, delete operations will ensure unique key constraints are maintained."); + public static final ConfigProperty MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT = ConfigProperty .key("hoodie.merge.small.file.group.candidates.limit") .defaultValue(1) .withDocumentation("Limits number of file groups, whose base file satisfies small-file limit, to consider for appending records during upsert operation. " + "Only applicable to MOR tables"); + public static final ConfigProperty CLIENT_HEARTBEAT_INTERVAL_IN_MS = ConfigProperty .key("hoodie.client.heartbeat.interval_in_ms") .defaultValue(60 * 1000) .withDocumentation("Writers perform heartbeats to indicate liveness. Controls how often (in ms), such heartbeats are registered to lake storage."); + public static final ConfigProperty CLIENT_HEARTBEAT_NUM_TOLERABLE_MISSES = ConfigProperty .key("hoodie.client.heartbeat.tolerable.misses") .defaultValue(2) .withDocumentation("Number of heartbeat misses, before a writer is deemed not alive and all pending writes are aborted."); + public static final ConfigProperty WRITE_CONCURRENCY_MODE = ConfigProperty .key("hoodie.write.concurrency.mode") .defaultValue(WriteConcurrencyMode.SINGLE_WRITER.name()) @@ -373,6 +410,7 @@ public class HoodieWriteConfig extends HoodieConfig { + "SINGLE_WRITER: Only one active writer to the table. Maximizes throughput" + "OPTIMISTIC_CONCURRENCY_CONTROL: Multiple writers can operate on the table and exactly one of them succeed " + "if a conflict (writes affect the same file group) is detected."); + /** * Currently the use this to specify the write schema. */ @@ -382,6 +420,7 @@ public class HoodieWriteConfig extends HoodieConfig { .withDocumentation("The specified write schema. In most case, we do not need set this parameter," + " but for the case the write schema is not equal to the specified table schema, we can" + " specify the write schema by this parameter. Used by MergeIntoHoodieTableCommand"); + /** * HUDI-858 : There are users who had been directly using RDD APIs and have relied on a behavior in 0.4.x to allow * multiple write operations (upsert/buk-insert/...) to be executed within a single commit. @@ -396,59 +435,71 @@ public class HoodieWriteConfig extends HoodieConfig { .key("_.hoodie.allow.multi.write.on.same.instant") .defaultValue("false") .withDocumentation(""); + public static final ConfigProperty AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE = ConfigProperty .key(AVRO_SCHEMA_STRING.key() + ".external.transformation") .defaultValue("false") .withAlternatives(AVRO_SCHEMA_STRING.key() + ".externalTransformation") .withDocumentation("When enabled, records in older schema are rewritten into newer schema during upsert,delete and background" + " compaction,clustering operations."); - /** - * @deprecated Use {@link #AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE} and its methods instead - */ - @Deprecated - public static final String EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION = AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE.key(); - /** - * @deprecated Use {@link #AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE} and its methods instead - */ - @Deprecated - public static final String DEFAULT_EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION = AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE.defaultValue(); + public static final ConfigProperty ALLOW_EMPTY_COMMIT = ConfigProperty .key("hoodie.allow.empty.commit") .defaultValue(true) .withDocumentation("Whether to allow generation of empty commits, even if no data was written in the commit. " + "It's useful in cases where extra metadata needs to be published regardless e.g tracking source offsets when ingesting data"); + public static final ConfigProperty ALLOW_OPERATION_METADATA_FIELD = ConfigProperty .key("hoodie.allow.operation.metadata.field") .defaultValue(false) .sinceVersion("0.9.0") .withDocumentation("Whether to include '_hoodie_operation' in the metadata fields. " + "Once enabled, all the changes of a record are persisted to the delta log directly without merge"); + public static final ConfigProperty FILEID_PREFIX_PROVIDER_CLASS = ConfigProperty .key("hoodie.fileid.prefix.provider.class") .defaultValue(RandomFileIdPrefixProvider.class.getName()) .sinceVersion("0.10.0") .withDocumentation("File Id Prefix provider class, that implements `org.apache.hudi.fileid.FileIdPrefixProvider`"); + public static final ConfigProperty TABLE_SERVICES_ENABLED = ConfigProperty .key("hoodie.table.services.enabled") .defaultValue(true) .sinceVersion("0.11.0") .withDocumentation("Master control to disable all table services including archive, clean, compact, cluster, etc."); + public static final ConfigProperty RELEASE_RESOURCE_ENABLE = ConfigProperty .key("hoodie.release.resource.on.completion.enable") .defaultValue(true) .sinceVersion("0.11.0") .withDocumentation("Control to enable release all persist rdds when the spark job finish."); + public static final ConfigProperty AUTO_ADJUST_LOCK_CONFIGS = ConfigProperty .key("hoodie.auto.adjust.lock.configs") .defaultValue(false) .sinceVersion("0.11.0") .withDocumentation("Auto adjust lock configurations when metadata table is enabled and for async table services."); + public static final ConfigProperty SKIP_DEFAULT_PARTITION_VALIDATION = ConfigProperty .key("hoodie.skip.default.partition.validation") .defaultValue(false) .sinceVersion("0.12.0") .withDocumentation("When table is upgraded from pre 0.12 to 0.12, we check for \"default\" partition and fail if found one. " + "Users are expected to rewrite the data in those partitions. Enabling this config will bypass this validation"); + + private ConsistencyGuardConfig consistencyGuardConfig; + private FileSystemRetryConfig fileSystemRetryConfig; + + // Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled + // We keep track of original config and rewritten config + private final FileSystemViewStorageConfig clientSpecifiedViewStorageConfig; + private FileSystemViewStorageConfig viewStorageConfig; + private HoodiePayloadConfig hoodiePayloadConfig; + private HoodieMetadataConfig metadataConfig; + private HoodieMetastoreConfig metastoreConfig; + private HoodieCommonConfig commonConfig; + private EngineType engineType; + /** * @deprecated Use {@link #TBL_NAME} and its methods instead */ @@ -529,6 +580,11 @@ public class HoodieWriteConfig extends HoodieConfig { */ @Deprecated public static final String BULKINSERT_PARALLELISM = BULKINSERT_PARALLELISM_VALUE.key(); + /** + * @deprecated Use {@link #BULKINSERT_USER_DEFINED_PARTITIONER_CLASS_NAME} and its methods instead + */ + @Deprecated + public static final String BULKINSERT_USER_DEFINED_PARTITIONER_CLASS = BULKINSERT_USER_DEFINED_PARTITIONER_CLASS_NAME.key(); @Deprecated public static final String BULKINSERT_INPUT_DATA_SCHEMA_DDL = "hoodie.bulkinsert.schema.ddl"; /** @@ -641,6 +697,16 @@ public class HoodieWriteConfig extends HoodieConfig { */ @Deprecated public static final String DEFAULT_MARKERS_DELETE_PARALLELISM = MARKERS_DELETE_PARALLELISM_VALUE.defaultValue(); + /** + * @deprecated Use {@link #BULK_INSERT_SORT_MODE} and its methods instead + */ + @Deprecated + public static final String BULKINSERT_SORT_MODE = BULK_INSERT_SORT_MODE.key(); + /** + * @deprecated Use {@link #BULK_INSERT_SORT_MODE} and its methods instead + */ + @Deprecated + public static final String DEFAULT_BULKINSERT_SORT_MODE = BULK_INSERT_SORT_MODE.defaultValue(); /** * @deprecated Use {@link #EMBEDDED_TIMELINE_SERVER_ENABLE} and its methods instead */ @@ -706,16 +772,51 @@ public class HoodieWriteConfig extends HoodieConfig { */ @Deprecated public static final String INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP = INITIAL_CONSISTENCY_CHECK_INTERVAL_MS.key(); + /** + * @deprecated Use {@link #INITIAL_CONSISTENCY_CHECK_INTERVAL_MS} and its methods instead + */ + @Deprecated + public static long DEFAULT_INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = INITIAL_CONSISTENCY_CHECK_INTERVAL_MS.defaultValue(); /** * @deprecated Use {@link #MAX_CONSISTENCY_CHECK_INTERVAL_MS} and its methods instead */ @Deprecated public static final String MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP = MAX_CONSISTENCY_CHECK_INTERVAL_MS.key(); + /** + * @deprecated Use {@link #MAX_CONSISTENCY_CHECK_INTERVAL_MS} and its methods instead + */ + @Deprecated + public static long DEFAULT_MAX_CONSISTENCY_CHECK_INTERVAL_MS = MAX_CONSISTENCY_CHECK_INTERVAL_MS.defaultValue(); /** * @deprecated Use {@link #MAX_CONSISTENCY_CHECKS} and its methods instead */ @Deprecated public static final String MAX_CONSISTENCY_CHECKS_PROP = MAX_CONSISTENCY_CHECKS.key(); + /** + * @deprecated Use {@link #MAX_CONSISTENCY_CHECKS} and its methods instead + */ + @Deprecated + public static int DEFAULT_MAX_CONSISTENCY_CHECKS = MAX_CONSISTENCY_CHECKS.defaultValue(); + /** + * @deprecated Use {@link #MERGE_DATA_VALIDATION_CHECK_ENABLE} and its methods instead + */ + @Deprecated + private static final String MERGE_DATA_VALIDATION_CHECK_ENABLED = MERGE_DATA_VALIDATION_CHECK_ENABLE.key(); + /** + * @deprecated Use {@link #MERGE_DATA_VALIDATION_CHECK_ENABLE} and its methods instead + */ + @Deprecated + private static final String DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED = MERGE_DATA_VALIDATION_CHECK_ENABLE.defaultValue(); + /** + * @deprecated Use {@link #MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE} and its methods instead + */ + @Deprecated + private static final String MERGE_ALLOW_DUPLICATE_ON_INSERTS = MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.key(); + /** + * @deprecated Use {@link #MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE} and its methods instead + */ + @Deprecated + private static final String DEFAULT_MERGE_ALLOW_DUPLICATE_ON_INSERTS = MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.defaultValue(); /** * @deprecated Use {@link #CLIENT_HEARTBEAT_INTERVAL_IN_MS} and its methods instead */ @@ -756,54 +857,16 @@ public class HoodieWriteConfig extends HoodieConfig { */ @Deprecated public static final String DEFAULT_ALLOW_MULTI_WRITE_ON_SAME_INSTANT = ALLOW_MULTI_WRITE_ON_SAME_INSTANT_ENABLE.defaultValue(); - private static final Logger LOG = LogManager.getLogger(HoodieWriteConfig.class); - private static final long serialVersionUID = 0L; /** - * @deprecated Use {@link #MERGE_DATA_VALIDATION_CHECK_ENABLE} and its methods instead - */ - @Deprecated - private static final String MERGE_DATA_VALIDATION_CHECK_ENABLED = MERGE_DATA_VALIDATION_CHECK_ENABLE.key(); - /** - * @deprecated Use {@link #MERGE_DATA_VALIDATION_CHECK_ENABLE} and its methods instead - */ - @Deprecated - private static final String DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED = MERGE_DATA_VALIDATION_CHECK_ENABLE.defaultValue(); - /** - * @deprecated Use {@link #MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE} and its methods instead - */ - @Deprecated - private static final String MERGE_ALLOW_DUPLICATE_ON_INSERTS = MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.key(); - /** - * @deprecated Use {@link #MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE} and its methods instead - */ - @Deprecated - private static final String DEFAULT_MERGE_ALLOW_DUPLICATE_ON_INSERTS = MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.defaultValue(); - /** - * @deprecated Use {@link #INITIAL_CONSISTENCY_CHECK_INTERVAL_MS} and its methods instead - */ - @Deprecated - public static long DEFAULT_INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = INITIAL_CONSISTENCY_CHECK_INTERVAL_MS.defaultValue(); - /** - * @deprecated Use {@link #MAX_CONSISTENCY_CHECK_INTERVAL_MS} and its methods instead + * @deprecated Use {@link #AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE} and its methods instead */ @Deprecated - public static long DEFAULT_MAX_CONSISTENCY_CHECK_INTERVAL_MS = MAX_CONSISTENCY_CHECK_INTERVAL_MS.defaultValue(); + public static final String EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION = AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE.key(); /** - * @deprecated Use {@link #MAX_CONSISTENCY_CHECKS} and its methods instead + * @deprecated Use {@link #AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE} and its methods instead */ @Deprecated - public static int DEFAULT_MAX_CONSISTENCY_CHECKS = MAX_CONSISTENCY_CHECKS.defaultValue(); - // Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled - // We keep track of original config and rewritten config - private final FileSystemViewStorageConfig clientSpecifiedViewStorageConfig; - private ConsistencyGuardConfig consistencyGuardConfig; - private FileSystemRetryConfig fileSystemRetryConfig; - private FileSystemViewStorageConfig viewStorageConfig; - private HoodiePayloadConfig hoodiePayloadConfig; - private HoodieMetadataConfig metadataConfig; - private HoodieMetastoreConfig metastoreConfig; - private HoodieCommonConfig commonConfig; - private EngineType engineType; + public static final String DEFAULT_EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION = AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE.defaultValue(); /** * Use Spark engine by default. @@ -856,14 +919,14 @@ public boolean getInternalSchemaCacheEnable() { return getBoolean(ENABLE_INTERNAL_SCHEMA_CACHE); } - public void setInternalSchemaCacheEnable(boolean enable) { - setValue(ENABLE_INTERNAL_SCHEMA_CACHE, String.valueOf(enable)); - } - public void setInternalSchemaString(String internalSchemaString) { setValue(INTERNAL_SCHEMA_STRING, internalSchemaString); } + public void setInternalSchemaCacheEnable(boolean enable) { + setValue(ENABLE_INTERNAL_SCHEMA_CACHE, String.valueOf(enable)); + } + public boolean getSchemaEvolutionEnable() { return getBoolean(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE); } @@ -874,10 +937,9 @@ public void setSchemaEvolutionEnable(boolean enable) { /** * Get the write schema for written records. - *

+ * * If the WRITE_SCHEMA has specified, we use the WRITE_SCHEMA. * Or else we use the AVRO_SCHEMA as the write schema. - * * @return */ public String getWriteSchema() { @@ -1854,14 +1916,14 @@ public ConsistencyGuardConfig getConsistencyGuardConfig() { return consistencyGuardConfig; } - public void setConsistencyGuardConfig(ConsistencyGuardConfig consistencyGuardConfig) { - this.consistencyGuardConfig = consistencyGuardConfig; - } - public FileSystemRetryConfig getFileSystemRetryConfig() { return fileSystemRetryConfig; } + public void setConsistencyGuardConfig(ConsistencyGuardConfig consistencyGuardConfig) { + this.consistencyGuardConfig = consistencyGuardConfig; + } + public FileSystemViewStorageConfig getViewStorageConfig() { return viewStorageConfig; } @@ -1986,7 +2048,6 @@ public int getMetadataCleanerCommitsRetained() { /** * Hoodie Client Lock Configs. - * * @return */ public boolean isAutoAdjustLockConfigs() { From 9d0a67996db91ad90808ddb83aa281ad04e91a76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=8F=E6=89=BF=E7=A5=A5?= Date: Fri, 16 Sep 2022 11:46:07 +0800 Subject: [PATCH 3/4] rename class&conf --- .../hudi/config/HoodieCompactionConfig.java | 74 ++++++------------- .../apache/hudi/config/HoodieWriteConfig.java | 4 +- ...=> LogFileNumBasedCompactionStrategy.java} | 8 +- .../TestHoodieCompactionStrategy.java | 4 +- 4 files changed, 31 insertions(+), 59 deletions(-) rename hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/{LogFileLengthBasedCompactionStrategy.java => LogFileNumBasedCompactionStrategy.java} (87%) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java index be94078e457d..acada47a0f3b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java @@ -106,10 +106,10 @@ public class HoodieCompactionConfig extends HoodieConfig { .withDocumentation("Only if the log file size is greater than the threshold in bytes," + " the file group will be compacted."); - public static final ConfigProperty COMPACTION_LOG_FILE_LENGTH_THRESHOLD = ConfigProperty - .key("hoodie.compaction.logfile.length.threshold") + public static final ConfigProperty COMPACTION_LOG_FILE_NUM_THRESHOLD = ConfigProperty + .key("hoodie.compaction.logfile.num.threshold") .defaultValue(0L) - .withDocumentation("Only if the log file length is greater than the threshold," + .withDocumentation("Only if the log file num is greater than the threshold," + " the file group will be compacted."); public static final ConfigProperty COMPACTION_STRATEGY = ConfigProperty @@ -170,24 +170,16 @@ public class HoodieCompactionConfig extends HoodieConfig { + " This is critical in computing the insert parallelism and bin-packing inserts into small files."); - /** - * @deprecated Use {@link #INLINE_COMPACT} and its methods instead - */ + /** @deprecated Use {@link #INLINE_COMPACT} and its methods instead */ @Deprecated public static final String INLINE_COMPACT_PROP = INLINE_COMPACT.key(); - /** - * @deprecated Use {@link #INLINE_COMPACT_NUM_DELTA_COMMITS} and its methods instead - */ + /** @deprecated Use {@link #INLINE_COMPACT_NUM_DELTA_COMMITS} and its methods instead */ @Deprecated public static final String INLINE_COMPACT_NUM_DELTA_COMMITS_PROP = INLINE_COMPACT_NUM_DELTA_COMMITS.key(); - /** - * @deprecated Use {@link #INLINE_COMPACT_TIME_DELTA_SECONDS} and its methods instead - */ + /** @deprecated Use {@link #INLINE_COMPACT_TIME_DELTA_SECONDS} and its methods instead */ @Deprecated public static final String INLINE_COMPACT_TIME_DELTA_SECONDS_PROP = INLINE_COMPACT_TIME_DELTA_SECONDS.key(); - /** - * @deprecated Use {@link #INLINE_COMPACT_TRIGGER_STRATEGY} and its methods instead - */ + /** @deprecated Use {@link #INLINE_COMPACT_TRIGGER_STRATEGY} and its methods instead */ @Deprecated public static final String INLINE_COMPACT_TRIGGER_STRATEGY_PROP = INLINE_COMPACT_TRIGGER_STRATEGY.key(); /** @@ -255,61 +247,41 @@ public class HoodieCompactionConfig extends HoodieConfig { */ @Deprecated public static final String COMPACTION_STRATEGY_PROP = COMPACTION_STRATEGY.key(); - /** - * @deprecated Use {@link #COMPACTION_STRATEGY} and its methods instead - */ + /** @deprecated Use {@link #COMPACTION_STRATEGY} and its methods instead */ @Deprecated public static final String DEFAULT_COMPACTION_STRATEGY = COMPACTION_STRATEGY.defaultValue(); - /** - * @deprecated Use {@link #COMPACTION_LAZY_BLOCK_READ_ENABLE} and its methods instead - */ + /** @deprecated Use {@link #COMPACTION_LAZY_BLOCK_READ_ENABLE} and its methods instead */ @Deprecated public static final String COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP = COMPACTION_LAZY_BLOCK_READ_ENABLE.key(); - /** - * @deprecated Use {@link #COMPACTION_LAZY_BLOCK_READ_ENABLE} and its methods instead - */ + /** @deprecated Use {@link #COMPACTION_LAZY_BLOCK_READ_ENABLE} and its methods instead */ @Deprecated public static final String DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED = COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue(); - /** - * @deprecated Use {@link #COMPACTION_REVERSE_LOG_READ_ENABLE} and its methods instead - */ + /** @deprecated Use {@link #COMPACTION_REVERSE_LOG_READ_ENABLE} and its methods instead */ @Deprecated public static final String COMPACTION_REVERSE_LOG_READ_ENABLED_PROP = COMPACTION_REVERSE_LOG_READ_ENABLE.key(); - /** - * @deprecated Use {@link #COMPACTION_REVERSE_LOG_READ_ENABLE} and its methods instead - */ + /** @deprecated Use {@link #COMPACTION_REVERSE_LOG_READ_ENABLE} and its methods instead */ @Deprecated public static final String DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED = COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue(); - /** - * @deprecated Use {@link #TARGET_PARTITIONS_PER_DAYBASED_COMPACTION} and its methods instead - */ - @Deprecated - public static final String TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP = TARGET_PARTITIONS_PER_DAYBASED_COMPACTION.key(); - /** - * @deprecated Use {@link #TARGET_PARTITIONS_PER_DAYBASED_COMPACTION} and its methods instead - */ - @Deprecated - public static final String DEFAULT_TARGET_PARTITIONS_PER_DAYBASED_COMPACTION = TARGET_PARTITIONS_PER_DAYBASED_COMPACTION.defaultValue(); /** * @deprecated Use {@link #INLINE_COMPACT} and its methods instead */ @Deprecated private static final String DEFAULT_INLINE_COMPACT = INLINE_COMPACT.defaultValue(); - /** - * @deprecated Use {@link #INLINE_COMPACT_NUM_DELTA_COMMITS} and its methods instead - */ + /** @deprecated Use {@link #INLINE_COMPACT_NUM_DELTA_COMMITS} and its methods instead */ @Deprecated private static final String DEFAULT_INLINE_COMPACT_NUM_DELTA_COMMITS = INLINE_COMPACT_NUM_DELTA_COMMITS.defaultValue(); - /** - * @deprecated Use {@link #INLINE_COMPACT_TIME_DELTA_SECONDS} and its methods instead - */ + /** @deprecated Use {@link #INLINE_COMPACT_TIME_DELTA_SECONDS} and its methods instead */ @Deprecated private static final String DEFAULT_INLINE_COMPACT_TIME_DELTA_SECONDS = INLINE_COMPACT_TIME_DELTA_SECONDS.defaultValue(); - /** - * @deprecated Use {@link #INLINE_COMPACT_TRIGGER_STRATEGY} and its methods instead - */ + /** @deprecated Use {@link #INLINE_COMPACT_TRIGGER_STRATEGY} and its methods instead */ @Deprecated private static final String DEFAULT_INLINE_COMPACT_TRIGGER_STRATEGY = INLINE_COMPACT_TRIGGER_STRATEGY.defaultValue(); + /** @deprecated Use {@link #TARGET_PARTITIONS_PER_DAYBASED_COMPACTION} and its methods instead */ + @Deprecated + public static final String TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP = TARGET_PARTITIONS_PER_DAYBASED_COMPACTION.key(); + /** @deprecated Use {@link #TARGET_PARTITIONS_PER_DAYBASED_COMPACTION} and its methods instead */ + @Deprecated + public static final String DEFAULT_TARGET_PARTITIONS_PER_DAYBASED_COMPACTION = TARGET_PARTITIONS_PER_DAYBASED_COMPACTION.defaultValue(); private HoodieCompactionConfig() { super(); @@ -415,8 +387,8 @@ public Builder withLogFileSizeThresholdBasedCompaction(long logFileSizeThreshold return this; } - public Builder withLogFileLengthThresholdBasedCompaction(int logFileLengthThreshold) { - compactionConfig.setValue(COMPACTION_LOG_FILE_LENGTH_THRESHOLD, String.valueOf(logFileLengthThreshold)); + public Builder withLogFileNumThresholdBasedCompaction(int logFileNumThreshold) { + compactionConfig.setValue(COMPACTION_LOG_FILE_NUM_THRESHOLD, String.valueOf(logFileNumThreshold)); return this; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index ff3de4695eb9..78c8d677523a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1275,8 +1275,8 @@ public Long getCompactionLogFileSizeThreshold() { return getLong(HoodieCompactionConfig.COMPACTION_LOG_FILE_SIZE_THRESHOLD); } - public Long getCompactionLogFileLengthThreshold() { - return getLong(HoodieCompactionConfig.COMPACTION_LOG_FILE_LENGTH_THRESHOLD); + public Long getCompactionLogFileNumThreshold() { + return getLong(HoodieCompactionConfig.COMPACTION_LOG_FILE_NUM_THRESHOLD); } public Boolean getCompactionLazyBlockReadEnabled() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileLengthBasedCompactionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileNumBasedCompactionStrategy.java similarity index 87% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileLengthBasedCompactionStrategy.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileNumBasedCompactionStrategy.java index 6f5cb96ffc39..6f79b684d0a4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileLengthBasedCompactionStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileNumBasedCompactionStrategy.java @@ -27,17 +27,17 @@ import java.util.stream.Collectors; /** - * LogFileLengthBasedCompactionStrategy orders the compactions based on the total log files length, + * LogFileLengthBasedCompactionStrategy orders the compactions based on the total log files num, * filters the file group which log files length is greater than the threshold and limits the compactions within a configured IO bound. */ -public class LogFileLengthBasedCompactionStrategy extends BoundedIOCompactionStrategy +public class LogFileNumBasedCompactionStrategy extends BoundedIOCompactionStrategy implements Comparator { @Override public List orderAndFilter(HoodieWriteConfig writeConfig, List operations, List pendingCompactionPlans) { - Long lengthThreshold = writeConfig.getCompactionLogFileLengthThreshold(); + Long numThreshold = writeConfig.getCompactionLogFileNumThreshold(); List filterOperator = operations.stream() - .filter(e -> e.getDeltaFilePaths().size() >= lengthThreshold) + .filter(e -> e.getDeltaFilePaths().size() >= numThreshold) .sorted(this).collect(Collectors.toList()); return super.orderAndFilter(writeConfig, filterOperator, pendingCompactionPlans); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java index 4d664b5ef9e5..968d43ef389b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java @@ -250,10 +250,10 @@ public void testLogFileLengthBasedCompactionStrategy() { sizesMap.put(110 * MB, new ArrayList<>()); sizesMap.put(100 * MB, Collections.singletonList(2048 * MB)); sizesMap.put(90 * MB, Arrays.asList(512 * MB, 512 * MB)); - LogFileLengthBasedCompactionStrategy strategy = new LogFileLengthBasedCompactionStrategy(); + LogFileNumBasedCompactionStrategy strategy = new LogFileNumBasedCompactionStrategy(); HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig( HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(1024) - .withLogFileLengthThresholdBasedCompaction(2).build()) + .withLogFileNumThresholdBasedCompaction(2).build()) .build(); List operations = createCompactionOperations(writeConfig, sizesMap); List returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>()); From 13d66a602715f9ecaca1f23d9474efcfc79512d8 Mon Sep 17 00:00:00 2001 From: xiaosuda Date: Sat, 17 Sep 2022 17:38:17 +0800 Subject: [PATCH 4/4] change method name --- .../java/org/apache/hudi/config/HoodieCompactionConfig.java | 2 +- .../action/compact/strategy/TestHoodieCompactionStrategy.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java index acada47a0f3b..3b61d4972729 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java @@ -387,7 +387,7 @@ public Builder withLogFileSizeThresholdBasedCompaction(long logFileSizeThreshold return this; } - public Builder withLogFileNumThresholdBasedCompaction(int logFileNumThreshold) { + public Builder withCompactionLogFileNumThreshold(int logFileNumThreshold) { compactionConfig.setValue(COMPACTION_LOG_FILE_NUM_THRESHOLD, String.valueOf(logFileNumThreshold)); return this; } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java index 968d43ef389b..319d6ea031eb 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java @@ -253,7 +253,7 @@ public void testLogFileLengthBasedCompactionStrategy() { LogFileNumBasedCompactionStrategy strategy = new LogFileNumBasedCompactionStrategy(); HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig( HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(1024) - .withLogFileNumThresholdBasedCompaction(2).build()) + .withCompactionLogFileNumThreshold(2).build()) .build(); List operations = createCompactionOperations(writeConfig, sizesMap); List returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());