Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O>
protected Option<BaseKeyGenerator> keyGeneratorOpt;
private HoodieBaseFile baseFileToMerge;

protected Option<String[]> partitionFields = Option.empty();
protected Object[] partitionValues = new Object[0];

public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt) {
Expand Down Expand Up @@ -476,4 +479,20 @@ public IOType getIOType() {
public HoodieBaseFile baseFileForMerge() {
return baseFileToMerge;
}

public void setPartitionFields(Option<String[]> partitionFields) {
this.partitionFields = partitionFields;
}

public Option<String[]> getPartitionFields() {
return this.partitionFields;
}

public void setPartitionValues(Object[] partitionValues) {
this.partitionValues = partitionValues;
}

public Object[] getPartitionValues() {
return this.partitionValues;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.table.action.commit;

import org.apache.hudi.client.utils.MergingIterator;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
Expand Down Expand Up @@ -119,14 +118,12 @@ public void runMerge(HoodieTable<?, ?, ?, ?> table,
if (baseFile.getBootstrapBaseFile().isPresent()) {
Path bootstrapFilePath = new Path(baseFile.getBootstrapBaseFile().get().getPath());
Configuration bootstrapFileConfig = new Configuration(table.getHadoopConf());
bootstrapFileReader =
HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(bootstrapFileConfig, bootstrapFilePath);

recordIterator = new MergingIterator<>(
baseFileRecordIterator,
bootstrapFileReader.getRecordIterator(),
(left, right) ->
left.joinWith(right, mergeHandle.getWriterSchemaWithMetaFields()));
bootstrapFileReader = HoodieFileReaderFactory.getReaderFactory(recordType).newBootstrapFileReader(
baseFileReader,
HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(bootstrapFileConfig, bootstrapFilePath),
mergeHandle.getPartitionFields(),
mergeHandle.getPartitionValues());
recordIterator = bootstrapFileReader.getRecordIterator(mergeHandle.getWriterSchemaWithMetaFields());
recordSchema = mergeHandle.getWriterSchemaWithMetaFields();
} else {
recordIterator = baseFileRecordIterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.hudi.client.clustering.run.strategy;

import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
Expand Down Expand Up @@ -54,7 +52,6 @@
import org.apache.hudi.execution.bulkinsert.RDDSpatialCurveSortPartitioner;
import org.apache.hudi.execution.bulkinsert.RowCustomColumnsSortPartitioner;
import org.apache.hudi.execution.bulkinsert.RowSpatialCurveSortPartitioner;
import org.apache.hudi.hadoop.CachingPath;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
Expand All @@ -72,8 +69,6 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.execution.datasources.SparkParsePartitionUtil;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.sources.BaseRelation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -90,6 +85,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.client.utils.SparkPartitionUtils.getPartitionFieldVals;
import static org.apache.hudi.common.config.HoodieCommonConfig.TIMESTAMP_AS_OF;
import static org.apache.hudi.common.table.log.HoodieFileSliceReader.getFileSliceReader;
import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
Expand Down Expand Up @@ -379,16 +375,7 @@ private HoodieFileReader getBaseOrBootstrapFileReader(SerializableConfiguration
if (partitionFields.isPresent()) {
int startOfPartitionPath = bootstrapFilePath.indexOf(bootstrapBasePath) + bootstrapBasePath.length() + 1;
String partitionFilePath = bootstrapFilePath.substring(startOfPartitionPath, bootstrapFilePath.lastIndexOf("/"));
CachingPath bootstrapCachingPath = new CachingPath(bootstrapBasePath);
SparkParsePartitionUtil sparkParsePartitionUtil = SparkAdapterSupport$.MODULE$.sparkAdapter().getSparkParsePartitionUtil();
partitionValues = HoodieSparkUtils.parsePartitionColumnValues(
partitionFields.get(),
partitionFilePath,
bootstrapCachingPath,
AvroConversionUtils.convertAvroSchemaToStructType(baseFileReader.getSchema()),
hadoopConf.get().get("timeZone", SQLConf.get().sessionLocalTimeZone()),
sparkParsePartitionUtil,
hadoopConf.get().getBoolean("spark.sql.sources.validatePartitionColumns", true));
partitionValues = getPartitionFieldVals(partitionFields, partitionFilePath, bootstrapBasePath, baseFileReader.getSchema(), hadoopConf.get());
}
baseFileReader = HoodieFileReaderFactory.getReaderFactory(recordType).newBootstrapFileReader(
baseFileReader,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.utils;

import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.CachingPath;

import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.sql.execution.datasources.SparkParsePartitionUtil;
import org.apache.spark.sql.internal.SQLConf;

public class SparkPartitionUtils {

public static Object[] getPartitionFieldVals(Option<String[]> partitionFields,
String partitionPath,
String basePath,
Schema writerSchema,
Configuration hadoopConf) {
if (!partitionFields.isPresent()) {
return new Object[0];
}
SparkParsePartitionUtil sparkParsePartitionUtil = SparkAdapterSupport$.MODULE$.sparkAdapter().getSparkParsePartitionUtil();
return HoodieSparkUtils.parsePartitionColumnValues(
partitionFields.get(),
partitionPath,
new CachingPath(basePath),
AvroConversionUtils.convertAvroSchemaToStructType(writerSchema),
hadoopConf.get("timeZone", SQLConf.get().sessionLocalTimeZone()),
sparkParsePartitionUtil,
hadoopConf.getBoolean("spark.sql.sources.validatePartitionColumns", true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.SparkPartitionUtils;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
Expand Down Expand Up @@ -222,6 +223,14 @@ protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?,
throw new HoodieUpsertException(
"Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId);
} else {
if (upsertHandle.baseFileForMerge().getBootstrapBaseFile().isPresent()) {
Option<String[]> partitionFields = getMetaClient().getTableConfig().getPartitionFields();
Object[] partitionValues = SparkPartitionUtils.getPartitionFieldVals(partitionFields, upsertHandle.getPartitionPath(),
getMetaClient().getTableConfig().getBootstrapBasePath().get(),
upsertHandle.getWriterSchema(), getHadoopConf());
upsertHandle.setPartitionFields(partitionFields);
upsertHandle.setPartitionValues(partitionValues);
}
HoodieMergeHelper.newInstance().runMerge(this, upsertHandle);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.table.action.commit;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.SparkPartitionUtils;
import org.apache.hudi.client.utils.SparkValidatorUtils;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
Expand Down Expand Up @@ -366,6 +367,15 @@ protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?,
throw new HoodieUpsertException(
"Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId);
} else {
if (upsertHandle.baseFileForMerge().getBootstrapBaseFile().isPresent()) {
Option<String[]> partitionFields = table.getMetaClient().getTableConfig().getPartitionFields();
Object[] partitionValues = SparkPartitionUtils.getPartitionFieldVals(partitionFields, upsertHandle.getPartitionPath(),
table.getMetaClient().getTableConfig().getBootstrapBasePath().get(),
upsertHandle.getWriterSchema(), table.getHadoopConf());
upsertHandle.setPartitionFields(partitionFields);
upsertHandle.setPartitionValues(partitionValues);
}

HoodieMergeHelper.newInstance().runMerge(table, upsertHandle);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;

Expand Down Expand Up @@ -64,35 +63,21 @@ public Set<String> filterRowKeys(Set<String> candidateRowKeys) {
public ClosableIterator<HoodieRecord<T>> getRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
ClosableIterator<HoodieRecord<T>> skeletonIterator = skeletonFileReader.getRecordIterator(readerSchema, requestedSchema);
ClosableIterator<HoodieRecord<T>> dataFileIterator = dataFileReader.getRecordIterator(HoodieAvroUtils.removeMetadataFields(readerSchema), requestedSchema);
return new ClosableIterator<HoodieRecord<T>>() {
return new HoodieBootstrapRecordIterator<T>(skeletonIterator, dataFileIterator, readerSchema, partitionFields, partitionValues) {
@Override
public void close() {
skeletonIterator.close();
dataFileIterator.close();
}

@Override
public boolean hasNext() {
return skeletonIterator.hasNext() && dataFileIterator.hasNext();
protected void setPartitionPathField(int position, Object fieldValue, T row) {
setPartitionField(position, fieldValue, row);
}
};
}

public ClosableIterator<HoodieRecord<T>> getRecordIterator(Schema schema) throws IOException {
ClosableIterator<HoodieRecord<T>> skeletonIterator = skeletonFileReader.getRecordIterator(schema);
ClosableIterator<HoodieRecord<T>> dataFileIterator = dataFileReader.getRecordIterator(dataFileReader.getSchema());
return new HoodieBootstrapRecordIterator<T>(skeletonIterator, dataFileIterator, schema, partitionFields, partitionValues) {
@Override
public HoodieRecord<T> next() {
HoodieRecord<T> dataRecord = dataFileIterator.next();
HoodieRecord<T> skeletonRecord = skeletonIterator.next();
HoodieRecord<T> ret = dataRecord.prependMetaFields(readerSchema, readerSchema,
new MetadataValues().setCommitTime(skeletonRecord.getRecordKey(readerSchema, HoodieRecord.COMMIT_TIME_METADATA_FIELD))
.setCommitSeqno(skeletonRecord.getRecordKey(readerSchema, HoodieRecord.COMMIT_SEQNO_METADATA_FIELD))
.setRecordKey(skeletonRecord.getRecordKey(readerSchema, HoodieRecord.RECORD_KEY_METADATA_FIELD))
.setPartitionPath(skeletonRecord.getRecordKey(readerSchema, HoodieRecord.PARTITION_PATH_METADATA_FIELD))
.setFileName(skeletonRecord.getRecordKey(readerSchema, HoodieRecord.FILENAME_METADATA_FIELD)), null);
if (partitionFields.isPresent()) {
for (int i = 0; i < partitionValues.length; i++) {
int position = readerSchema.getField(partitionFields.get()[i]).pos();
setPartitionField(position, partitionValues[i], ret.getData());
}
}
return ret;
protected void setPartitionPathField(int position, Object fieldValue, T row) {
setPartitionField(position, fieldValue, row);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.io.storage;

import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;

import org.apache.avro.Schema;

import static org.apache.hudi.common.util.ValidationUtils.checkState;

public abstract class HoodieBootstrapRecordIterator<T> implements ClosableIterator<HoodieRecord<T>> {

protected ClosableIterator<HoodieRecord<T>> skeletonIterator;
protected ClosableIterator<HoodieRecord<T>> dataFileIterator;
private final Option<String[]> partitionFields;
private final Object[] partitionValues;

protected Schema schema;

public HoodieBootstrapRecordIterator(ClosableIterator<HoodieRecord<T>> skeletonIterator,
ClosableIterator<HoodieRecord<T>> dataFileIterator,
Schema schema,
Option<String[]> partitionFields,
Object[] partitionValues) {
this.skeletonIterator = skeletonIterator;
this.dataFileIterator = dataFileIterator;
this.schema = schema;
this.partitionFields = partitionFields;
this.partitionValues = partitionValues;
}

@Override
public void close() {

}

@Override
public boolean hasNext() {
checkState(skeletonIterator.hasNext() == dataFileIterator.hasNext());
return skeletonIterator.hasNext();
}

@Override
public HoodieRecord<T> next() {
HoodieRecord<T> dataRecord = dataFileIterator.next();
HoodieRecord<T> skeletonRecord = skeletonIterator.next();
HoodieRecord<T> ret = dataRecord.prependMetaFields(schema, schema,
new MetadataValues().setCommitTime(skeletonRecord.getRecordKey(schema, HoodieRecord.COMMIT_TIME_METADATA_FIELD))
.setCommitSeqno(skeletonRecord.getRecordKey(schema, HoodieRecord.COMMIT_SEQNO_METADATA_FIELD))
.setRecordKey(skeletonRecord.getRecordKey(schema, HoodieRecord.RECORD_KEY_METADATA_FIELD))
.setPartitionPath(skeletonRecord.getRecordKey(schema, HoodieRecord.PARTITION_PATH_METADATA_FIELD))
.setFileName(skeletonRecord.getRecordKey(schema, HoodieRecord.FILENAME_METADATA_FIELD)), null);
if (partitionFields.isPresent()) {
for (int i = 0; i < partitionValues.length; i++) {
int position = schema.getField(partitionFields.get()[i]).pos();
setPartitionPathField(position, partitionValues[i], ret.getData());
}
}
return ret;
}

protected abstract void setPartitionPathField(int position, Object fieldValue, T row);
}
Loading