diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java index aed6c6b7358b0..4e80009747e5b 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java @@ -45,6 +45,7 @@ public class AvroDFSSource extends AvroSource { public AvroDFSSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) throws IOException { super(props, sparkContext, sparkSession, schemaProvider); + sparkContext.hadoopConfiguration().set("avro.schema.input.key", schemaProvider.getSourceSchema().toString()); this.pathSelector = DFSPathSelector .createSourceSelector(props, sparkContext.hadoopConfiguration()); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java new file mode 100644 index 0000000000000..37abaa56b1bbc --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.utilities.testutils.sources.AbstractDFSSourceTestBase; +import org.apache.hadoop.fs.Path; +import org.junit.jupiter.api.BeforeEach; +import java.io.IOException; +import java.util.List; + +/** + * Basic tests for {@link TestAvroDFSSource}. + */ +public class TestAvroDFSSource extends AbstractDFSSourceTestBase { + + @BeforeEach + public void setup() throws Exception { + super.setup(); + this.dfsRoot = dfsBasePath + "/avroFiles"; + this.fileSuffix = ".avro"; + } + + @Override + protected Source prepareDFSSource() { + TypedProperties props = new TypedProperties(); + props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot); + try { + return new AvroDFSSource(props, jsc, sparkSession, schemaProvider); + } catch (IOException e) { + return null; + } + } + + @Override + protected void writeNewDataToFile(List records, Path path) throws IOException { + Helpers.saveAvroToDFS(Helpers.toGenericRecords(records), path); + } +} \ No newline at end of file diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java index bb00d2fef7324..90a3f5af38021 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java @@ -48,6 +48,8 @@ import com.fasterxml.jackson.dataformat.csv.CsvSchema; import com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder; import org.apache.avro.Schema; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileSystem; @@ -78,6 +80,7 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; +import java.io.OutputStream; import java.io.PrintStream; import java.util.ArrayList; import java.util.Arrays; @@ -341,6 +344,20 @@ public static void saveORCToDFS(List records, Path targetFile, Ty } } + public static void saveAvroToDFS(List records, Path targetFile) throws IOException { + saveAvroToDFS(records,targetFile,HoodieTestDataGenerator.AVRO_SCHEMA); + } + + public static void saveAvroToDFS(List records, Path targetFile, Schema schema) throws IOException { + FileSystem fs = targetFile.getFileSystem(HoodieTestUtils.getDefaultHadoopConf()); + OutputStream output = fs.create(targetFile); + try (DataFileWriter dataFileWriter = new DataFileWriter<>(new GenericDatumWriter(schema)).create(schema, output)) { + for (GenericRecord record : records) { + dataFileWriter.append(record); + } + } + } + public static TypedProperties setupSchemaOnDFS() throws IOException { return setupSchemaOnDFS("delta-streamer-config", "source.avsc"); }