Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public HoodieBaseBloomIndexCheckFunction(HoodieTable hoodieTable, HoodieWriteCon
}

@Override
public Iterator<List<KeyLookupResult>> apply(Iterator<Pair<String, HoodieKey>> fileParitionRecordKeyTripletItr) {
return new LazyKeyCheckIterator(fileParitionRecordKeyTripletItr);
public Iterator<List<KeyLookupResult>> apply(Iterator<Pair<String, HoodieKey>> filePartitionRecordKeyTripletItr) {
return new LazyKeyCheckIterator(filePartitionRecordKeyTripletItr);
}

class LazyKeyCheckIterator extends LazyIterableIterator<Pair<String, HoodieKey>, List<KeyLookupResult>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public static Dataset<Row> readRecordsForBaseFiles(SQLContext sqlContext, List<S
}

/**
* Get reads from paritions modified including any inflight commits.
* Get reads from partitions modified including any inflight commits.
* Note that this only works for COW tables
*/
public static Dataset<Row> getRecordsFromPendingCommits(SQLContext sqlContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ public HoodieBloomIndexCheckFunction(HoodieTable hoodieTable, HoodieWriteConfig

@Override
public Iterator<List<KeyLookupResult>> call(Integer partition,
Iterator<Tuple2<String, HoodieKey>> fileParitionRecordKeyTripletItr) {
return new LazyKeyCheckIterator(fileParitionRecordKeyTripletItr);
Iterator<Tuple2<String, HoodieKey>> filePartitionRecordKeyTripletItr) {
return new LazyKeyCheckIterator(filePartitionRecordKeyTripletItr);
}

class LazyKeyCheckIterator extends LazyIterableIterator<Tuple2<String, HoodieKey>, List<KeyLookupResult>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
lazy val tableTypeName: String = tableType.name()

/**
* Recored Field List(Primary Key List)
* Record Field List(Primary Key List)
*/
lazy val primaryKeys: Array[String] = tableConfig.getRecordKeyFields.orElse(Array.empty)

Expand All @@ -108,7 +108,7 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
lazy val preCombineKey: Option[String] = Option(tableConfig.getPreCombineField)

/**
* Paritition Fields
* Partition Fields
*/
lazy val partitionFields: Array[String] = tableConfig.getPartitionFields.orElse(Array.empty)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,14 @@ public void testBulkInsertPreCombine(boolean enablePreCombine) {
}

int metadataRecordKeyIndex = resultSchema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD);
int metadataParitionPathIndex = resultSchema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
int metadataPartitionPathIndex = resultSchema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
int metadataCommitTimeIndex = resultSchema.fieldIndex(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
int metadataCommitSeqNoIndex = resultSchema.fieldIndex(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD);
int metadataFilenameIndex = resultSchema.fieldIndex(HoodieRecord.FILENAME_METADATA_FIELD);

result.toJavaRDD().foreach(entry -> {
assertTrue(entry.get(metadataRecordKeyIndex).equals(entry.getAs("_row_key")));
assertTrue(entry.get(metadataParitionPathIndex).equals(entry.getAs("partition")));
assertTrue(entry.get(metadataPartitionPathIndex).equals(entry.getAs("partition")));
assertTrue(entry.get(metadataCommitSeqNoIndex).equals(""));
assertTrue(entry.get(metadataCommitTimeIndex).equals(""));
assertTrue(entry.get(metadataFilenameIndex).equals(""));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,12 @@ class TestCOWDataSource extends HoodieClientTestBase {
spark.sql(String.format("select count(*) from tmpTable")).show()

// step4: Query the rows count from hoodie table for partition1 DEFAULT_FIRST_PARTITION_PATH
val recordCountForParititon1 = spark.sql(String.format("select count(*) from tmpTable where partition = '%s'", HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).collect()
assertEquals("6", recordCountForParititon1(0).get(0).toString)
val recordCountForPartition1 = spark.sql(String.format("select count(*) from tmpTable where partition = '%s'", HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).collect()
assertEquals("6", recordCountForPartition1(0).get(0).toString)

// step5: Query the rows count from hoodie table for partition2 DEFAULT_SECOND_PARTITION_PATH
val recordCountForParititon2 = spark.sql(String.format("select count(*) from tmpTable where partition = '%s'", HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)).collect()
assertEquals("7", recordCountForParititon2(0).get(0).toString)
val recordCountForPartition2 = spark.sql(String.format("select count(*) from tmpTable where partition = '%s'", HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)).collect()
assertEquals("7", recordCountForPartition2(0).get(0).toString)

// step6: Query the rows count from hoodie table for partition2 DEFAULT_SECOND_PARTITION_PATH using spark.collect and then filter mode
val recordsForPartitionColumn = spark.sql(String.format("select partition from tmpTable")).collect()
Expand Down Expand Up @@ -292,12 +292,12 @@ class TestCOWDataSource extends HoodieClientTestBase {
spark.sql(String.format("select count(*) from tmpTable")).show()

// step3: Query the rows count from hoodie table for partition1 DEFAULT_FIRST_PARTITION_PATH
val recordCountForParititon1 = spark.sql(String.format("select count(*) from tmpTable where partition = '%s'", HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).collect()
assertEquals("0", recordCountForParititon1(0).get(0).toString)
val recordCountForPartition1 = spark.sql(String.format("select count(*) from tmpTable where partition = '%s'", HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).collect()
assertEquals("0", recordCountForPartition1(0).get(0).toString)

// step4: Query the rows count from hoodie table for partition2 DEFAULT_SECOND_PARTITION_PATH
val recordCountForParititon2 = spark.sql(String.format("select count(*) from tmpTable where partition = '%s'", HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)).collect()
assertEquals("7", recordCountForParititon2(0).get(0).toString)
val recordCountForPartition2 = spark.sql(String.format("select count(*) from tmpTable where partition = '%s'", HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)).collect()
assertEquals("7", recordCountForPartition2(0).get(0).toString)

// step5: Query the rows count from hoodie table
val recordCount = spark.sql(String.format("select count(*) from tmpTable")).collect()
Expand Down Expand Up @@ -417,7 +417,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
.mode(SaveMode.Overwrite)
}

@Test def testSparkPartitonByWithCustomKeyGenerator(): Unit = {
@Test def testSparkPartitionByWithCustomKeyGenerator(): Unit = {
// Without fieldType, the default is SIMPLE
var writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName)
writer.partitionBy("current_ts")
Expand Down Expand Up @@ -465,7 +465,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
}
}

@Test def testSparkPartitonByWithSimpleKeyGenerator() {
@Test def testSparkPartitionByWithSimpleKeyGenerator() {
// Use the `driver` field as the partition key
var writer = getDataFrameWriter(classOf[SimpleKeyGenerator].getName)
writer.partitionBy("driver")
Expand All @@ -484,7 +484,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= lit("default")).count() == 0)
}

@Test def testSparkPartitonByWithComplexKeyGenerator() {
@Test def testSparkPartitionByWithComplexKeyGenerator() {
// Use the `driver` field as the partition key
var writer = getDataFrameWriter(classOf[ComplexKeyGenerator].getName)
writer.partitionBy("driver")
Expand All @@ -503,7 +503,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= concat(col("driver"), lit("/"), col("rider"))).count() == 0)
}

@Test def testSparkPartitonByWithTimestampBasedKeyGenerator() {
@Test def testSparkPartitionByWithTimestampBasedKeyGenerator() {
val writer = getDataFrameWriter(classOf[TimestampBasedKeyGenerator].getName)
writer.partitionBy("current_ts")
.option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS")
Expand All @@ -517,7 +517,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= udf_date_format(col("current_ts"))).count() == 0)
}

@Test def testSparkPartitonByWithGlobalDeleteKeyGenerator() {
@Test def testSparkPartitionByWithGlobalDeleteKeyGenerator() {
val writer = getDataFrameWriter(classOf[GlobalDeleteKeyGenerator].getName)
writer.partitionBy("driver")
.mode(SaveMode.Overwrite)
Expand All @@ -528,7 +528,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= lit("")).count() == 0)
}

@Test def testSparkPartitonByWithNonpartitionedKeyGenerator() {
@Test def testSparkPartitionByWithNonpartitionedKeyGenerator() {
// Empty string column
var writer = getDataFrameWriter(classOf[NonpartitionedKeyGenerator].getName)
writer.partitionBy("")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.List;

/**
* Extractor for Hive Style Partitioned tables, when the parition folders are key value pairs.
* Extractor for Hive Style Partitioned tables, when the partition folders are key value pairs.
*
* <p>This implementation extracts the partition value of yyyy-mm-dd from the path of type datestr=yyyy-mm-dd.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(JavaS
long lastCheckpointTime = lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);
HoodieSparkEngineContext context = new HoodieSparkEngineContext(sparkContext);
SerializableConfiguration serializedConf = new SerializableConfiguration(fs.getConf());
List<String> prunedParitionPaths = pruneDatePartitionPaths(context, fs, props.getString(ROOT_INPUT_PATH_PROP), currentDate);
List<String> prunedPartitionPaths = pruneDatePartitionPaths(context, fs, props.getString(ROOT_INPUT_PATH_PROP), currentDate);

List<FileStatus> eligibleFiles = context.flatMap(prunedParitionPaths,
List<FileStatus> eligibleFiles = context.flatMap(prunedPartitionPaths,
path -> {
FileSystem fs = new Path(path).getFileSystem(serializedConf.get());
return listEligibleFiles(fs, new Path(path), lastCheckpointTime).stream();
Expand Down