Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ pool:
variables:
MAVEN_CACHE_FOLDER: $(Pipeline.Workspace)/.m2/repository
MAVEN_OPTS: '-Dmaven.repo.local=$(MAVEN_CACHE_FOLDER) -Dcheckstyle.skip=true -Drat.skip=true -Djacoco.skip=true'
SPARK_VERSION: '2.4.4'
SPARK_VERSION: '3.2.1'
HADOOP_VERSION: '2.7'
SPARK_ARCHIVE: spark-$(SPARK_VERSION)-bin-hadoop$(HADOOP_VERSION)
SPAKR_PROFILE: scala-2.12,spark3

stages:
- stage: test
Expand All @@ -48,7 +49,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'install'
options: -T 2.5C -DskipTests
options: -T 2.5C -DskipTests -P $(SPAKR_PROFILE)
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
Expand All @@ -57,7 +58,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'test'
options: -Punit-tests -pl hudi-common,hudi-flink,hudi-client/hudi-spark-client
options: -P $(SPAKR_PROFILE),unit-tests -pl hudi-common,hudi-flink,hudi-client/hudi-spark-client
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
Expand All @@ -66,7 +67,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'test'
options: -Pfunctional-tests -pl hudi-common,hudi-flink
options: -P $(SPAKR_PROFILE),functional-tests -pl hudi-common,hudi-flink
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
Expand All @@ -87,7 +88,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'install'
options: -T 2.5C -DskipTests
options: -T 2.5C -DskipTests -P $(SPAKR_PROFILE)
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
Expand All @@ -96,7 +97,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'test'
options: -Pfunctional-tests -pl hudi-client/hudi-spark-client
options: -P $(SPAKR_PROFILE),functional-tests -pl hudi-client/hudi-spark-client
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
Expand All @@ -117,7 +118,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'install'
options: -T 2.5C -DskipTests
options: -T 2.5C -DskipTests -P $(SPAKR_PROFILE)
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
Expand All @@ -126,7 +127,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'test'
options: -Punit-tests -pl hudi-client/hudi-client-common,hudi-client/hudi-flink-client,hudi-client/hudi-java-client,hudi-cli,hudi-utilities,hudi-sync/hudi-hive-sync
options: -P $(SPAKR_PROFILE),unit-tests -pl hudi-client/hudi-client-common,hudi-client/hudi-flink-client,hudi-client/hudi-java-client,hudi-cli,hudi-utilities,hudi-sync/hudi-hive-sync
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
Expand All @@ -135,7 +136,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'test'
options: -Pfunctional-tests -pl hudi-client/hudi-client-common,hudi-client/hudi-flink-client,hudi-client/hudi-java-client,hudi-cli,hudi-utilities,hudi-sync/hudi-hive-sync
options: -P $(SPAKR_PROFILE),functional-tests -pl hudi-client/hudi-client-common,hudi-client/hudi-flink-client,hudi-client/hudi-java-client,hudi-cli,hudi-utilities,hudi-sync/hudi-hive-sync
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
Expand All @@ -156,7 +157,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'install'
options: -T 2.5C -DskipTests
options: -T 2.5C -DskipTests -P $(SPAKR_PROFILE)
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
Expand All @@ -165,7 +166,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'test'
options: -Punit-tests -pl !hudi-common,!hudi-flink,!hudi-client/hudi-spark-client,!hudi-client/hudi-client-common,!hudi-client/hudi-flink-client,!hudi-client/hudi-java-client,!hudi-cli,!hudi-utilities,!hudi-sync/hudi-hive-sync
options: -P $(SPAKR_PROFILE),unit-tests -pl !hudi-common,!hudi-flink,!hudi-client/hudi-spark-client,!hudi-client/hudi-client-common,!hudi-client/hudi-flink-client,!hudi-client/hudi-java-client,!hudi-cli,!hudi-utilities,!hudi-sync/hudi-hive-sync
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
Expand All @@ -174,7 +175,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'test'
options: -Pfunctional-tests -pl !hudi-common,!hudi-flink,!hudi-client/hudi-spark-client,!hudi-client/hudi-client-common,!hudi-client/hudi-flink-client,!hudi-client/hudi-java-client,!hudi-cli,!hudi-utilities,!hudi-sync/hudi-hive-sync
options: -P $(SPAKR_PROFILE),functional-tests -pl !hudi-common,!hudi-flink,!hudi-client/hudi-spark-client,!hudi-client/hudi-client-common,!hudi-client/hudi-flink-client,!hudi-client/hudi-java-client,!hudi-cli,!hudi-utilities,!hudi-sync/hudi-hive-sync
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
Expand All @@ -194,5 +195,5 @@ stages:
tar -xvf $(Pipeline.Workspace)/$(SPARK_ARCHIVE).tgz -C $(Pipeline.Workspace)/
mkdir /tmp/spark-events/
- script: |
mvn $(MAVEN_OPTS) -Pintegration-tests verify
mvn $(MAVEN_OPTS) -P $(SPAKR_PROFILE),integration-tests verify
displayName: IT
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.io.storage;

import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
Expand Down Expand Up @@ -49,6 +50,7 @@
import static org.apache.hudi.io.storage.HoodieOrcConfig.AVRO_SCHEMA_METADATA_KEY;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;

Expand Down Expand Up @@ -242,7 +244,7 @@ public void testWriteReadWithEvolvedSchema() throws Exception {
assertEquals("key" + index, record.get("_row_key").toString());
assertEquals(Integer.toString(index), record.get("time").toString());
assertEquals(Integer.toString(index), record.get("number").toString());
assertNull(record.get("added_field"));
assertThrows(AvroRuntimeException.class, () -> record.get("added_field"));
index++;
}

Expand All @@ -253,8 +255,8 @@ public void testWriteReadWithEvolvedSchema() throws Exception {
GenericRecord record = iter.next();
assertEquals("key" + index, record.get("_row_key").toString());
assertEquals(Integer.toString(index), record.get("time").toString());
assertNull(record.get("number"));
assertNull(record.get("added_field"));
assertThrows(AvroRuntimeException.class, () -> record.get("number"));
assertThrows(AvroRuntimeException.class, () -> record.get("added_field"));
index++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,21 @@ void testConvertComplexTypes() {
final String expected = "message converted {\n"
+ " optional group f_array (LIST) {\n"
+ " repeated group list {\n"
+ " optional binary element (UTF8);\n"
+ " optional binary element (STRING);\n"
+ " }\n"
+ " }\n"
+ " optional group f_map (MAP) {\n"
+ " repeated group key_value {\n"
+ " optional int32 key;\n"
+ " optional binary value (UTF8);\n"
+ " optional binary value (STRING);\n"
+ " }\n"
+ " }\n"
+ " optional group f_row {\n"
+ " optional int32 f_row_f0;\n"
+ " optional binary f_row_f1 (UTF8);\n"
+ " optional binary f_row_f1 (STRING);\n"
+ " optional group f_row_f2 {\n"
+ " optional int32 f_row_f2_f0;\n"
+ " optional binary f_row_f2_f1 (UTF8);\n"
+ " optional binary f_row_f2_f1 (STRING);\n"
+ " }\n"
+ " }\n"
+ "}\n";
Expand Down
34 changes: 34 additions & 0 deletions hudi-client/hudi-spark-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,30 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.orc</groupId>
<artifactId>orc-mapreduce</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand All @@ -65,6 +85,14 @@
<artifactId>parquet-avro</artifactId>
</dependency>

<!-- org.codehaus.jackson test -->
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
<version>${codehaus-jackson.version}</version>
<scope>test</scope>
</dependency>

<!-- Hoodie - Test -->
<dependency>
<groupId>org.apache.hudi</groupId>
Expand Down Expand Up @@ -173,6 +201,12 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.thoughtworks.paranamer</groupId>
<artifactId>paranamer</artifactId>
<version>2.8</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ public class TestSparkHoodieHBaseIndex extends SparkClientFunctionalTestHarness
@BeforeAll
public static void init() throws Exception {
// Initialize HbaseMiniCluster
System.setProperty("zookeeper.preAllocSize", "100");
System.setProperty("zookeeper.maxCnxns", "60");
System.setProperty("zookeeper.4lw.commands.whitelist", "*");

hbaseConfig = HBaseConfiguration.create();
hbaseConfig.set("zookeeper.znode.parent", "/hudi-hbase-test");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@

package org.apache.hudi.testutils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
Expand Down Expand Up @@ -64,14 +71,6 @@
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.timeline.service.TimelineService;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
Expand All @@ -82,6 +81,7 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
import scala.Tuple2;

import java.io.IOException;
import java.io.Serializable;
Expand All @@ -97,8 +97,6 @@
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

import scala.Tuple2;

import static org.apache.hudi.common.util.CleanerUtils.convertCleanMetadata;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -407,11 +405,15 @@ protected void initDFSMetaClient() throws IOException {
protected void cleanupDFS() throws IOException {
if (hdfsTestService != null) {
hdfsTestService.stop();
dfsCluster.shutdown();
hdfsTestService = null;
}

if (dfsCluster != null) {
dfsCluster.shutdown();
dfsCluster = null;
dfs = null;
}

// Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
// same JVM
FileSystem.closeAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,14 +474,15 @@ public static Object getNestedFieldVal(GenericRecord record, String fieldName, b
try {
for (; i < parts.length; i++) {
String part = parts[i];
Field field = valueNode.getSchema().getField(part);
Object val = valueNode.get(part);
if (val == null) {
if (field == null || val == null) {
break;
}

// return, if last part of name
if (i == parts.length - 1) {
Schema fieldSchema = valueNode.getSchema().getField(part).schema();
Schema fieldSchema = field.schema();
return convertValueForSpecificDataTypes(fieldSchema, val, consistentLogicalTimestampEnabled);
} else {
// VC: Need a test here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@

package org.apache.hudi.common.model.debezium;

import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.util.Option;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.util.Option;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import javax.annotation.Nullable;
import java.io.IOException;

/**
Expand Down Expand Up @@ -72,11 +72,21 @@ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue

protected abstract boolean shouldPickCurrentRecord(IndexedRecord currentRecord, IndexedRecord insertRecord, Schema schema) throws IOException;

@Nullable
private static Object getFieldVal(GenericRecord record, String fieldName) {
Schema.Field recordField = record.getSchema().getField(fieldName);
if (recordField == null) {
return null;
}

return record.get(recordField.pos());
}

private Option<IndexedRecord> handleDeleteOperation(IndexedRecord insertRecord) {
boolean delete = false;
if (insertRecord instanceof GenericRecord) {
GenericRecord record = (GenericRecord) insertRecord;
Object value = record.get(DebeziumConstants.FLATTENED_OP_COL_NAME);
Object value = getFieldVal(record, DebeziumConstants.FLATTENED_OP_COL_NAME);
delete = value != null && value.toString().equalsIgnoreCase(DebeziumConstants.DELETE_OP);
}

Expand All @@ -86,4 +96,4 @@ private Option<IndexedRecord> handleDeleteOperation(IndexedRecord insertRecord)
private IndexedRecord getInsertRecord(Schema schema) throws IOException {
return super.getInsertValue(schema).get();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public void testGetNestedFieldVal() {
try {
HoodieAvroUtils.getNestedFieldVal(rec, "fake_key", false, false);
} catch (Exception e) {
assertEquals("fake_key(Part -fake_key) field not found in record. Acceptable fields were :[timestamp, _row_key, non_pii_col, pii_col]",
assertEquals("Not a valid schema field: fake_key",
e.getMessage());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1868,7 +1868,7 @@ public void testDataBlockFormatAppendAndReadWithProjectedSchema(
new HashMap<HoodieLogBlockType, Integer>() {{
put(HoodieLogBlockType.AVRO_DATA_BLOCK, 0); // not supported
put(HoodieLogBlockType.HFILE_DATA_BLOCK, 0); // not supported
put(HoodieLogBlockType.PARQUET_DATA_BLOCK, 2605);
put(HoodieLogBlockType.PARQUET_DATA_BLOCK, 2593);
}};

List<IndexedRecord> recordsRead = getRecords(dataBlockRead);
Expand Down
Loading