Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ package org.apache.hudi
import org.apache.hudi.common.config.TypedProperties

import java.{util => ju}
import scala.collection.JavaConverters
import scala.jdk.CollectionConverters.dictionaryAsScalaMapConverter
import scala.collection.JavaConverters._

object HoodieConversionUtils {

Expand All @@ -49,9 +48,7 @@ object HoodieConversionUtils {
}

def fromProperties(props: TypedProperties): Map[String, String] = {
props.asScala.map {
case (k, v) => (k.toString, v.toString)
}.toMap
props.asScala.toMap
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ object HoodieSparkSqlWriter {
latestTableSchemaOpt: Option[Schema],
internalSchemaOpt: Option[InternalSchema],
props: TypedProperties): Schema = {
deduceWriterSchema(sourceSchema, latestTableSchemaOpt, internalSchemaOpt, props.toMap)
deduceWriterSchema(sourceSchema, latestTableSchemaOpt, internalSchemaOpt, HoodieConversionUtils.fromProperties(props))
}

def cleanup(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.hudi.utilities.sources.helpers.QueryRunner;

import org.apache.parquet.Strings;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
Expand Down Expand Up @@ -141,7 +140,7 @@ public S3EventsHoodieIncrSource(

// This is to ensure backward compatibility where we were using the
// config SOURCE_FILE_FORMAT for file format in previous versions.
this.fileFormat = Strings.isNullOrEmpty(getStringWithAltKeys(props, DATAFILE_FORMAT, EMPTY_STRING))
this.fileFormat = StringUtils.isNullOrEmpty(getStringWithAltKeys(props, DATAFILE_FORMAT, EMPTY_STRING))
? getStringWithAltKeys(props, SOURCE_FILE_FORMAT, true)
: getStringWithAltKeys(props, DATAFILE_FORMAT, EMPTY_STRING);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.SparkSession;

import java.io.Serializable;

/**
* The class which handles error events while processing write records. All the
* records which have a processing/write failure are triggered as error events to
Expand All @@ -38,7 +40,7 @@
*
* The writer can use the configs defined in HoodieErrorTableConfig to manage the error table.
*/
public abstract class BaseErrorTableWriter<T extends ErrorEvent> {
public abstract class BaseErrorTableWriter<T extends ErrorEvent> implements Serializable {

// The column name passed to Spark for option `columnNameOfCorruptRecord`. The record
// is set to this column in case of an error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,18 @@ class TestSchemaRegistryProvider {
private static final String REGISTRY_RESPONSE = "{\"schema\":\"{\\\"type\\\": \\\"record\\\", \\\"namespace\\\": \\\"example\\\", "
+ "\\\"name\\\": \\\"FullName\\\",\\\"fields\\\": [{ \\\"name\\\": \\\"first\\\", \\\"type\\\": "
+ "\\\"string\\\" }]}\"}";
private static final String RAW_SCHEMA = "{\"type\": \"record\", \"namespace\": \"example\", "
+ "\"name\": \"FullName\",\"fields\": [{ \"name\": \"first\", \"type\": "
+ "\"string\" }]}";
private static final String CONVERTED_SCHEMA = "{\"type\": \"record\", \"namespace\": \"com.example.hoodie\", "
+ "\"name\": \"FullName\",\"fields\": [{ \"name\": \"first\", \"type\": "
+ "\"string\" }]}";

private static Schema getExpectedSchema() {
return new Schema.Parser().parse(RAW_SCHEMA);
}

private static Schema getExpectedConvertedSchema() {
return new Schema.Parser().parse(CONVERTED_SCHEMA);
}

Expand All @@ -60,7 +67,6 @@ private static TypedProperties getProps() {
put("hoodie.deltastreamer.schemaprovider.registry.baseUrl", "http://" + BASIC_AUTH + "@localhost");
put("hoodie.deltastreamer.schemaprovider.registry.urlSuffix", "-value");
put("hoodie.deltastreamer.schemaprovider.registry.url", "http://foo:bar@localhost");
put("hoodie.deltastreamer.schemaprovider.registry.schemaconverter", DummySchemaConverter.class.getName());
put("hoodie.deltastreamer.source.kafka.topic", "foo");
}
};
Expand Down Expand Up @@ -97,21 +103,23 @@ public void testGetTargetSchemaShouldRequestSchemaWithCreds() throws IOException
public void testGetSourceSchemaShouldRequestSchemaWithoutCreds() throws IOException {
TypedProperties props = getProps();
props.put("hoodie.deltastreamer.schemaprovider.registry.url", "http://localhost");
props.put("hoodie.deltastreamer.schemaprovider.registry.schemaconverter", DummySchemaConverter.class.getName());
SchemaRegistryProvider spyUnderTest = getUnderTest(props);
Schema actual = spyUnderTest.getSourceSchema();
assertNotNull(actual);
assertEquals(getExpectedSchema(), actual);
assertEquals(getExpectedConvertedSchema(), actual);
verify(spyUnderTest, times(0)).setAuthorizationHeader(Mockito.any(), Mockito.any());
}

@Test
public void testGetTargetSchemaShouldRequestSchemaWithoutCreds() throws IOException {
TypedProperties props = getProps();
props.put("hoodie.deltastreamer.schemaprovider.registry.url", "http://localhost");
props.put("hoodie.deltastreamer.schemaprovider.registry.schemaconverter", DummySchemaConverter.class.getName());
SchemaRegistryProvider spyUnderTest = getUnderTest(props);
Schema actual = spyUnderTest.getTargetSchema();
assertNotNull(actual);
assertEquals(getExpectedSchema(), actual);
assertEquals(getExpectedConvertedSchema(), actual);
verify(spyUnderTest, times(0)).setAuthorizationHeader(Mockito.any(), Mockito.any());
}

Expand Down