Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map.Entry;
import java.util.Set;

import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.index.bucket.BucketIdentifier;
import scala.Tuple2;

Expand All @@ -42,6 +43,9 @@
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;

import static org.apache.hudi.common.model.WriteOperationType.INSERT_OVERWRITE;
import static org.apache.hudi.common.model.WriteOperationType.INSERT_OVERWRITE_TABLE;

/**
* Packs incoming records to be inserted into buckets (1 bucket = 1 RDD partition).
*/
Expand All @@ -57,6 +61,7 @@ public class SparkBucketIndexPartitioner<T> extends
* The partition offset is a multiple of the bucket num.
*/
private final Map<String, Integer> partitionPathOffset;
private final boolean isOverwrite;

/**
* Partition path and file groups in it pair. Decide the file group an incoming update should go to.
Expand Down Expand Up @@ -84,6 +89,8 @@ public SparkBucketIndexPartitioner(WorkloadProfile profile,
i += numBuckets;
}
assignUpdates(profile);
WriteOperationType operationType = profile.getOperationType();
this.isOverwrite = INSERT_OVERWRITE.equals(operationType) || INSERT_OVERWRITE_TABLE.equals(operationType);
}

private void assignUpdates(WorkloadProfile profile) {
Expand All @@ -106,6 +113,10 @@ private void assignUpdates(WorkloadProfile profile) {
public BucketInfo getBucketInfo(int bucketNumber) {
String partitionPath = partitionPaths.get(bucketNumber / numBuckets);
String bucketId = BucketIdentifier.bucketIdStr(bucketNumber % numBuckets);
// Insert overwrite always generates new bucket file id
if (isOverwrite) {
return new BucketInfo(BucketType.INSERT, BucketIdentifier.newBucketFileIdPrefix(bucketId), partitionPath);
}
Option<String> fileIdOption = Option.fromJavaOptional(updatePartitionPathFileIds
.getOrDefault(partitionPath, Collections.emptySet()).stream()
.filter(e -> e.startsWith(bucketId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.spark.Partitioner;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -85,4 +86,17 @@ protected List<String> getAllExistingFileIds(String partitionPath) {
// because new commit is not complete. it is safe to mark all existing file Ids as old files
return table.getSliceView().getLatestFileSlices(partitionPath).map(FileSlice::getFileId).distinct().collect(Collectors.toList());
}

@Override
protected Iterator<List<WriteStatus>> handleInsertPartition(String instantTime, Integer partition, Iterator recordItr, Partitioner partitioner) {
SparkHoodiePartitioner upsertPartitioner = (SparkHoodiePartitioner) partitioner;
BucketInfo binfo = upsertPartitioner.getBucketInfo(partition);
BucketType btype = binfo.bucketType;
switch (btype) {
case INSERT:
return handleInsert(binfo.fileIdPrefix, recordItr);
default:
throw new AssertionError("Expect INSERT bucketType for insert overwrite, please correct the logical of " + partitioner.getClass().getName());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.table.action.commit;

import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
Expand All @@ -41,6 +42,20 @@ public SparkInsertOverwritePartitioner(WorkloadProfile profile, HoodieEngineCont
super(profile, context, table, config);
}

@Override
public BucketInfo getBucketInfo(int bucketNumber) {
BucketInfo bucketInfo = super.getBucketInfo(bucketNumber);
switch (bucketInfo.bucketType) {
case INSERT:
return bucketInfo;
case UPDATE:
// Insert overwrite always generates new bucket file id
return new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), bucketInfo.partitionPath);
default:
throw new AssertionError();
}
}

/**
* Returns a list of small files in the given partition path.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,33 @@ class TestMORDataSourceWithBucketIndex extends HoodieSparkClientTestBase {
assertEquals(100,
hudiSnapshotDF1.join(hudiSnapshotDF4, Seq("_hoodie_record_key"), "inner").count())
}

@Test def testInsertOverwrite(): Unit = {
val partitionPaths = new Array[String](1)
partitionPaths.update(0, "2020/01/10")
val newDataGen = new HoodieTestDataGenerator(partitionPaths)
val records1 = recordsToStrings(newDataGen.generateInserts("001", 100)).toList
val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
inputDF1.write.format("org.apache.hudi")
.options(commonOpts)
.option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.mode(SaveMode.Append)
.save(basePath)
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
val records2 = recordsToStrings(newDataGen.generateInserts("002", 20)).toList
val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
inputDF2.write.format("org.apache.hudi")
.options(commonOpts)
.option("hoodie.compact.inline", "false")
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OVERWRITE_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.mode(SaveMode.Append)
.save(basePath)
val hudiSnapshotDF1 = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
.load(basePath + "/*/*/*/*")
assertEquals(20, hudiSnapshotDF1.count())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.hudi

import org.apache.hudi.DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
Expand All @@ -30,7 +29,6 @@ import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase.getLastCommitMetadata
import org.scalatest.Inspectors.forAll

import java.io.File

Expand Down Expand Up @@ -1125,4 +1123,116 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
}
}
}

test("Test Insert Overwrite Into Bucket Index Table") {
withSQLConf("hoodie.sql.bulk.insert.enable" -> "false") {
Seq("mor", "cow").foreach { tableType =>
withRecordType()(withTempDir { tmp =>
val tableName = generateTableName
// Create a partitioned table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long,
| dt string
|) using hudi
|tblproperties (
| type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'ts',
| hoodie.index.type = 'BUCKET',
| hoodie.bucket.index.num.buckets = '4'
|)
| partitioned by (dt)
| location '${tmp.getCanonicalPath}/$tableName'
""".stripMargin)

spark.sql(
s"""insert into $tableName values
|(5, 'a', 35, 1000, '2021-01-05'),
|(1, 'a', 31, 1000, '2021-01-05'),
|(3, 'a', 33, 1000, '2021-01-05'),
|(4, 'b', 16, 1000, '2021-01-05'),
|(2, 'b', 18, 1000, '2021-01-05'),
|(6, 'b', 17, 1000, '2021-01-05'),
|(8, 'a', 21, 1000, '2021-01-05'),
|(9, 'a', 22, 1000, '2021-01-05'),
|(7, 'a', 23, 1000, '2021-01-05')
|""".stripMargin)

// Insert overwrite static partition
spark.sql(
s"""
| insert overwrite table $tableName partition(dt = '2021-01-05')
| select * from (select 13 , 'a2', 12, 1000) limit 10
""".stripMargin)
checkAnswer(s"select id, name, price, ts, dt from $tableName order by dt")(
Seq(13, "a2", 12.0, 1000, "2021-01-05")
)
})
}
}
}

test("Test Insert Overwrite Into Consistent Bucket Index Table") {
withSQLConf("hoodie.sql.bulk.insert.enable" -> "false") {
withRecordType()(withTempDir { tmp =>
val tableName = generateTableName
// Create a partitioned table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long,
| dt string
|) using hudi
|tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts',
| hoodie.index.type = 'BUCKET',
| hoodie.index.bucket.engine = "CONSISTENT_HASHING",
| hoodie.bucket.index.num.buckets = '4'
|)
| partitioned by (dt)
| location '${tmp.getCanonicalPath}/$tableName'
""".stripMargin)

spark.sql(
s"""insert into $tableName values
|(5, 'a', 35, 1000, '2021-01-05'),
|(1, 'a', 31, 1000, '2021-01-05'),
|(3, 'a', 33, 1000, '2021-01-05'),
|(4, 'b', 16, 1000, '2021-01-05'),
|(2, 'b', 18, 1000, '2021-01-05'),
|(6, 'b', 17, 1000, '2021-01-05'),
|(8, 'a', 21, 1000, '2021-01-05'),
|(9, 'a', 22, 1000, '2021-01-05'),
|(7, 'a', 23, 1000, '2021-01-05')
|""".stripMargin)

// Insert overwrite static partition
spark.sql(
s"""
| insert overwrite table $tableName partition(dt = '2021-01-05')
| select * from (select 13 , 'a2', 12, 1000) limit 10
""".stripMargin)

// Double insert overwrite static partition
spark.sql(
s"""
| insert overwrite table $tableName partition(dt = '2021-01-05')
| select * from (select 13 , 'a3', 12, 1000) limit 10
""".stripMargin)
checkAnswer(s"select id, name, price, ts, dt from $tableName order by dt")(
Seq(13, "a3", 12.0, 1000, "2021-01-05")
)
})
}
}
}