diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala index 62a315b85a06b..23efce8298426 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala @@ -21,8 +21,7 @@ package org.apache.hudi import org.apache.hudi.common.config.TypedProperties import java.{util => ju} -import scala.collection.JavaConverters -import scala.jdk.CollectionConverters.dictionaryAsScalaMapConverter +import scala.collection.JavaConverters._ object HoodieConversionUtils { @@ -49,9 +48,7 @@ object HoodieConversionUtils { } def fromProperties(props: TypedProperties): Map[String, String] = { - props.asScala.map { - case (k, v) => (k.toString, v.toString) - }.toMap + props.asScala.toMap } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 7f28e190e9501..b8dbb18287e7d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -148,7 +148,7 @@ object HoodieSparkSqlWriter { latestTableSchemaOpt: Option[Schema], internalSchemaOpt: Option[InternalSchema], props: TypedProperties): Schema = { - deduceWriterSchema(sourceSchema, latestTableSchemaOpt, internalSchemaOpt, props.toMap) + deduceWriterSchema(sourceSchema, latestTableSchemaOpt, internalSchemaOpt, HoodieConversionUtils.fromProperties(props)) } def cleanup(): Unit = { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java index 61ed02da106f0..3af87d49489fb 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java @@ -35,7 +35,6 @@ import org.apache.hudi.utilities.sources.helpers.QueryInfo; import org.apache.hudi.utilities.sources.helpers.QueryRunner; -import org.apache.parquet.Strings; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -141,7 +140,7 @@ public S3EventsHoodieIncrSource( // This is to ensure backward compatibility where we were using the // config SOURCE_FILE_FORMAT for file format in previous versions. - this.fileFormat = Strings.isNullOrEmpty(getStringWithAltKeys(props, DATAFILE_FORMAT, EMPTY_STRING)) + this.fileFormat = StringUtils.isNullOrEmpty(getStringWithAltKeys(props, DATAFILE_FORMAT, EMPTY_STRING)) ? getStringWithAltKeys(props, SOURCE_FILE_FORMAT, true) : getStringWithAltKeys(props, DATAFILE_FORMAT, EMPTY_STRING); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BaseErrorTableWriter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BaseErrorTableWriter.java index e22942763a83e..77a858315185e 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BaseErrorTableWriter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BaseErrorTableWriter.java @@ -29,6 +29,8 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.SparkSession; +import java.io.Serializable; + /** * The class which handles error events while processing write records. All the * records which have a processing/write failure are triggered as error events to @@ -38,7 +40,7 @@ * * The writer can use the configs defined in HoodieErrorTableConfig to manage the error table. */ -public abstract class BaseErrorTableWriter { +public abstract class BaseErrorTableWriter implements Serializable { // The column name passed to Spark for option `columnNameOfCorruptRecord`. The record // is set to this column in case of an error diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java index 80f40033a3eb0..abbe983cbce6f 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java @@ -46,11 +46,18 @@ class TestSchemaRegistryProvider { private static final String REGISTRY_RESPONSE = "{\"schema\":\"{\\\"type\\\": \\\"record\\\", \\\"namespace\\\": \\\"example\\\", " + "\\\"name\\\": \\\"FullName\\\",\\\"fields\\\": [{ \\\"name\\\": \\\"first\\\", \\\"type\\\": " + "\\\"string\\\" }]}\"}"; + private static final String RAW_SCHEMA = "{\"type\": \"record\", \"namespace\": \"example\", " + + "\"name\": \"FullName\",\"fields\": [{ \"name\": \"first\", \"type\": " + + "\"string\" }]}"; private static final String CONVERTED_SCHEMA = "{\"type\": \"record\", \"namespace\": \"com.example.hoodie\", " + "\"name\": \"FullName\",\"fields\": [{ \"name\": \"first\", \"type\": " + "\"string\" }]}"; private static Schema getExpectedSchema() { + return new Schema.Parser().parse(RAW_SCHEMA); + } + + private static Schema getExpectedConvertedSchema() { return new Schema.Parser().parse(CONVERTED_SCHEMA); } @@ -60,7 +67,6 @@ private static TypedProperties getProps() { put("hoodie.deltastreamer.schemaprovider.registry.baseUrl", "http://" + BASIC_AUTH + "@localhost"); put("hoodie.deltastreamer.schemaprovider.registry.urlSuffix", "-value"); put("hoodie.deltastreamer.schemaprovider.registry.url", "http://foo:bar@localhost"); - put("hoodie.deltastreamer.schemaprovider.registry.schemaconverter", DummySchemaConverter.class.getName()); put("hoodie.deltastreamer.source.kafka.topic", "foo"); } }; @@ -97,10 +103,11 @@ public void testGetTargetSchemaShouldRequestSchemaWithCreds() throws IOException public void testGetSourceSchemaShouldRequestSchemaWithoutCreds() throws IOException { TypedProperties props = getProps(); props.put("hoodie.deltastreamer.schemaprovider.registry.url", "http://localhost"); + props.put("hoodie.deltastreamer.schemaprovider.registry.schemaconverter", DummySchemaConverter.class.getName()); SchemaRegistryProvider spyUnderTest = getUnderTest(props); Schema actual = spyUnderTest.getSourceSchema(); assertNotNull(actual); - assertEquals(getExpectedSchema(), actual); + assertEquals(getExpectedConvertedSchema(), actual); verify(spyUnderTest, times(0)).setAuthorizationHeader(Mockito.any(), Mockito.any()); } @@ -108,10 +115,11 @@ public void testGetSourceSchemaShouldRequestSchemaWithoutCreds() throws IOExcept public void testGetTargetSchemaShouldRequestSchemaWithoutCreds() throws IOException { TypedProperties props = getProps(); props.put("hoodie.deltastreamer.schemaprovider.registry.url", "http://localhost"); + props.put("hoodie.deltastreamer.schemaprovider.registry.schemaconverter", DummySchemaConverter.class.getName()); SchemaRegistryProvider spyUnderTest = getUnderTest(props); Schema actual = spyUnderTest.getTargetSchema(); assertNotNull(actual); - assertEquals(getExpectedSchema(), actual); + assertEquals(getExpectedConvertedSchema(), actual); verify(spyUnderTest, times(0)).setAuthorizationHeader(Mockito.any(), Mockito.any()); }