Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,11 @@ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Opti
LOG.info("Committing " + instantTime + " action " + commitActionType);
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable table = createTable(config, hadoopConf);
return doCommit(instantTime, stats, extraMetadata, commitActionType, partitionToReplaceFileIds, table);
}

private boolean doCommit(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata, String commitActionType,
Map<String, List<String>> partitionToReplaceFileIds, HoodieTable table) {
HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, partitionToReplaceFileIds,
extraMetadata, operationType, config.getWriteSchema(), commitActionType);
HoodieInstant inflightInstant = new HoodieInstant(State.INFLIGHT, table.getMetaClient().getCommitActionType(), instantTime);
Expand Down Expand Up @@ -268,6 +273,17 @@ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Opti
return true;
}

public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata,
String commitActionType, Map<String, List<String>> partitionToReplaceFileIds, HoodieTable table) {
// Skip the empty commit if not allowed
if (!config.allowEmptyCommit() && stats.isEmpty()) {
return true;
}
LOG.info("Committing " + instantTime + " action " + commitActionType);
// Create a Hoodie table which encapsulated the commits and files visible
return doCommit(instantTime, stats, extraMetadata, commitActionType, partitionToReplaceFileIds, table);
}

protected void commit(HoodieTable table, String commitActionType, String instantTime, HoodieCommitMetadata metadata,
List<HoodieWriteStat> stats) throws IOException {
LOG.info("Committing " + instantTime + " action " + commitActionType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import static org.apache.hudi.index.HoodieIndex.IndexType.GLOBAL_SIMPLE;
import static org.apache.hudi.index.HoodieIndex.IndexType.HBASE;
import static org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY;
import static org.apache.hudi.index.HoodieIndex.IndexType.RECORD_LEVEL;
import static org.apache.hudi.index.HoodieIndex.IndexType.SIMPLE;

/**
Expand All @@ -72,10 +73,10 @@ public class HoodieIndexConfig extends HoodieConfig {
// Builder#getDefaultIndexType has already set it according to engine type
.noDefaultValue()
.withValidValues(HBASE.name(), INMEMORY.name(), BLOOM.name(), GLOBAL_BLOOM.name(),
SIMPLE.name(), GLOBAL_SIMPLE.name(), BUCKET.name())
SIMPLE.name(), GLOBAL_SIMPLE.name(), BUCKET.name(), RECORD_LEVEL.name())
.withDocumentation("Type of index to use. Default is SIMPLE on Spark engine, "
+ "and INMEMORY on Flink and Java engines. "
+ "Possible options are [BLOOM | GLOBAL_BLOOM |SIMPLE | GLOBAL_SIMPLE | INMEMORY | HBASE | BUCKET]. "
+ "Possible options are [BLOOM | GLOBAL_BLOOM |SIMPLE | GLOBAL_SIMPLE | INMEMORY | HBASE | BUCKET | RECORD_LEVEL]. "
+ "Bloom filters removes the dependency on a external system "
+ "and is stored in the footer of the Parquet Data Files");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,17 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("When table is upgraded from pre 0.12 to 0.12, we check for \"default\" partition and fail if found one. "
+ "Users are expected to rewrite the data in those partitions. Enabling this config will bypass this validation");

public static final ConfigProperty<Boolean> WRITE_SHOULD_UPDATE_PARTITION = ConfigProperty
.key("hoodie.write.should.update.partition")
.defaultValue(true)
.withDocumentation("In the merge into syntax, source may not contain a partition field. "
+ "So the partition field of the source needs to be filled with the partition of the target to avoid data partition change");

public static final ConfigProperty<String> TABLE_INDEX_TTL = ConfigProperty
.key("hoodie.table.index.ttl")
.defaultValue("-1d")
.withDocumentation("spark batch index ttl cycle.");

private ConsistencyGuardConfig consistencyGuardConfig;
private FileSystemRetryConfig fileSystemRetryConfig;

Expand Down Expand Up @@ -1697,6 +1708,10 @@ public int getColumnStatsIndexParallelism() {
return metadataConfig.getColumnStatsIndexParallelism();
}

public int getRecordLevelIndexParallelism() {
return metadataConfig.getRecordLevelIndexParallelism();
}

public int getBloomIndexKeysPerBucket() {
return getInt(HoodieIndexConfig.BLOOM_INDEX_KEYS_PER_BUCKET);
}
Expand Down Expand Up @@ -2127,6 +2142,22 @@ public int getMetadataCleanerCommitsRetained() {
return getInt(HoodieMetadataConfig.CLEANER_COMMITS_RETAINED);
}

/**
* Record levle index metadata configs.
*/
public int getRecordLevelIndexShardCount() {
return metadataConfig.getRecordLevelIndexFileGroupCount();
}

public boolean shouldUpdatePartition() {
return getBoolean(WRITE_SHOULD_UPDATE_PARTITION);
}

public Integer tableIndexTTL() {
String ttl = getString(TABLE_INDEX_TTL).toLowerCase().replaceAll("d","");
return Integer.parseInt(ttl);
}

/**
* Hoodie Client Lock Configs.
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void close() {
}

public enum IndexType {
HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM, SIMPLE, GLOBAL_SIMPLE, BUCKET, FLINK_STATE
HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM, SIMPLE, GLOBAL_SIMPLE, BUCKET, FLINK_STATE, RECORD_LEVEL
}

public enum BucketIndexEngineType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.hadoop.CachingPath;
import org.apache.hudi.hadoop.SerializablePath;
import org.apache.hudi.keygen.BaseKeyGenerator;

import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -208,6 +209,9 @@ private void enablePartitions() {
if (metadataConfig.isColumnStatsIndexEnabled()) {
enablePartition(MetadataPartitionType.COLUMN_STATS, metadataConfig, metaClient, fsView, isBootstrapCompleted);
}
if (metadataConfig.isRecordLevelIndexEnabled()) {
enablePartition(MetadataPartitionType.RECORD_LEVEL_INDEX, metadataConfig, metaClient, fsView, isBootstrapCompleted);
}
}

/**
Expand Down Expand Up @@ -783,6 +787,7 @@ private MetadataRecordsGenerationParams getRecordsGenerationParams() {
dataWriteConfig.getMetadataBloomFilterIndexParallelism(),
dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
dataWriteConfig.getColumnStatsIndexParallelism(),
dataWriteConfig.getRecordLevelIndexParallelism(),
dataWriteConfig.getColumnsEnabledForColumnStatsIndex(),
dataWriteConfig.getColumnsEnabledForBloomFilterIndex());
}
Expand Down Expand Up @@ -1098,6 +1103,13 @@ private void initialCommit(String createInstantTime, List<MetadataPartitionType>
partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, recordsRDD);
}

if (partitionTypes.contains(MetadataPartitionType.RECORD_LEVEL_INDEX) && totalDataFilesCount > 0) {
Option<BaseKeyGenerator> keyGeneratorOpt = getKeyGenerator(dataWriteConfig);
final HoodieData<HoodieRecord> recordsRDD = HoodieTableMetadataUtil.convertFilesToRecordLevelIndex(
engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams(), createInstantTime, dataWriteConfig.getBasePath(), keyGeneratorOpt);
partitionToRecordsMap.put(MetadataPartitionType.RECORD_LEVEL_INDEX, recordsRDD);
}

LOG.info("Committing " + partitions.size() + " partitions and " + totalDataFilesCount + " files to metadata");

commit(createInstantTime, partitionToRecordsMap, false);
Expand Down Expand Up @@ -1185,4 +1197,12 @@ Map<String, Long> getFileNameToSizeMap() {
return filenameToSizeMap;
}
}

public Option<BaseKeyGenerator> getKeyGenerator(HoodieWriteConfig dataWriteConfig) {
return Option.empty();
}

public void update(MetadataPartitionType metadataPartitionType, HoodieData<HoodieRecord> hoodieData){

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;

import java.io.IOException;
Expand Down Expand Up @@ -102,4 +104,6 @@ public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable {
* @param partitions - list of {@link MetadataPartitionType} to drop
*/
void deletePartitions(String instantTime, List<MetadataPartitionType> partitions);

void update(MetadataPartitionType metadataPartitionType, HoodieData<HoodieRecord> hoodieData);
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
private transient FileSystemViewManager viewManager;
protected final transient HoodieEngineContext context;

private Option<HoodieTableMetadataWriter> hoodieTableMetadataWriterOption = Option.empty();

protected HoodieTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) {
this.config = config;
this.hadoopConfiguration = context.getHadoopConf();
Expand Down Expand Up @@ -986,4 +988,11 @@ public HoodieTableMetadata getMetadataTable() {
public Runnable getPreExecuteRunnable() {
return Functions.noop();
}

public Option<HoodieTableMetadataWriter> getMetadataWriterAndPresent(String triggeringInstantTimestamp) {
if (!hoodieTableMetadataWriterOption.isPresent()) {
hoodieTableMetadataWriterOption = getMetadataWriter(triggeringInstantTimestamp, Option.empty());
}
return hoodieTableMetadataWriterOption;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.SparkHoodieIndexFactory;
import org.apache.hudi.index.recordlevel.SparkRecordLevelIndex;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.metrics.DistributedRegistry;
import org.apache.hudi.table.BulkInsertPartitioner;
Expand All @@ -67,12 +69,15 @@
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.storage.StorageLevel;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.hudi.config.HoodieWriteConfig.WRITE_STATUS_STORAGE_LEVEL_VALUE;

@SuppressWarnings("checkstyle:LineLength")
public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
BaseHoodieWriteClient<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
Expand Down Expand Up @@ -122,8 +127,25 @@ protected HoodieIndex createIndex(HoodieWriteConfig writeConfig) {
public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata,
String commitActionType, Map<String, List<String>> partitionToReplacedFileIds) {
context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: " + config.getTableName());
if (HoodieIndex.IndexType.RECORD_LEVEL.equals(config.getIndexType())) {
if (writeStatuses.getStorageLevel() == StorageLevel.NONE()) {
writeStatuses.persist(StorageLevel.fromString(config.getString(WRITE_STATUS_STORAGE_LEVEL_VALUE)));
}
}
List<HoodieWriteStat> writeStats = writeStatuses.map(WriteStatus::getStat).collect();
return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds);
HoodieTable table = createTable(config, hadoopConf);
if (HoodieIndex.IndexType.RECORD_LEVEL.equals(config.getIndexType())) {
final HoodieData<HoodieRecord> hoodieRecordHoodieData = ((SparkRecordLevelIndex) table.getIndex()).updateLocationToMetadata(writeStatuses, table);
table.getMetadataWriterAndPresent(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(MetadataPartitionType.RECORD_LEVEL_INDEX, hoodieRecordHoodieData));
}
return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds, table);
}

@Override
protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table: " + config.getTableName());
table.getMetadataWriterAndPresent(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, instantTime,
table.isTableServiceAction(actionType)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hudi.index.bucket.HoodieSparkConsistentBucketIndex;
import org.apache.hudi.index.hbase.SparkHoodieHBaseIndex;
import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex;
import org.apache.hudi.index.recordlevel.SparkRecordLevelIndex;
import org.apache.hudi.index.simple.HoodieGlobalSimpleIndex;
import org.apache.hudi.index.simple.HoodieSimpleIndex;
import org.apache.hudi.keygen.BaseKeyGenerator;
Expand Down Expand Up @@ -74,6 +75,8 @@ public static HoodieIndex createIndex(HoodieWriteConfig config) {
default:
throw new HoodieIndexException("Unknown bucket index engine type: " + config.getBucketIndexEngineType());
}
case RECORD_LEVEL:
return new SparkRecordLevelIndex(config);
default:
throw new HoodieIndexException("Index type unspecified, set " + config.getIndexType());
}
Expand Down Expand Up @@ -105,7 +108,7 @@ public static boolean isGlobalIndex(HoodieWriteConfig config) {
}
}

private static Option<BaseKeyGenerator> getKeyGeneratorForSimpleIndex(HoodieWriteConfig config) {
public static Option<BaseKeyGenerator> getKeyGeneratorForSimpleIndex(HoodieWriteConfig config) {
try {
return config.populateMetaFields() ? Option.empty()
: Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(config.getProps())));
Expand Down
Loading