diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java new file mode 100644 index 0000000000000..5815317d9281b --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import org.apache.hudi.AvroConversionUtils; +import org.apache.hudi.common.HoodieTestDataGenerator; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.utilities.UtilitiesTestBase; +import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter; +import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * An abstract test base for {@link Source} using DFS as the file system. + */ +public abstract class AbstractDFSSourceTestBase extends UtilitiesTestBase { + + FilebasedSchemaProvider schemaProvider; + String dfsRoot; + String fileSuffix; + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + + @BeforeClass + public static void initClass() throws Exception { + UtilitiesTestBase.initClass(); + } + + @AfterClass + public static void cleanupClass() throws Exception { + UtilitiesTestBase.cleanupClass(); + } + + @Before + public void setup() throws Exception { + super.setup(); + schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(), jsc); + } + + @After + public void teardown() throws Exception { + super.teardown(); + } + + /** + * Prepares the specific {@link Source} to test, by passing in necessary configurations. + * + * @return A {@link Source} using DFS as the file system. + */ + abstract Source prepareDFSSource(); + + /** + * Writes test data, i.e., a {@link List} of {@link HoodieRecord}, to a file on DFS. + * + * @param records Test data. + * @param path The path in {@link Path} of the file to write. + * @throws IOException + */ + abstract void writeNewDataToFile(List records, Path path) throws IOException; + + /** + * Generates a batch of test data and writes the data to a file. + * + * @param filename The name of the file. + * @param commitTime The commit time. + * @param n The number of records to generate. + * @return The file path. + * @throws IOException + */ + Path generateOneFile(String filename, String commitTime, int n) throws IOException { + Path path = new Path(dfsRoot, filename + fileSuffix); + writeNewDataToFile(dataGenerator.generateInserts(commitTime, n), path); + return path; + } + + /** + * Runs the test scenario of reading data from the source. + * + * @throws IOException + */ + @Test + public void testReadingFromSource() throws IOException { + dfs.mkdirs(new Path(dfsRoot)); + SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(prepareDFSSource()); + + // 1. Extract without any checkpoint => get all the data, respecting sourceLimit + assertEquals(Option.empty(), + sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch()); + // Test respecting sourceLimit + int sourceLimit = 10; + RemoteIterator files = dfs.listFiles(generateOneFile("1", "000", 100), true); + FileStatus file1Status = files.next(); + assertTrue(file1Status.getLen() > sourceLimit); + assertEquals(Option.empty(), + sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), sourceLimit).getBatch()); + // Test fetching Avro format + InputBatch> fetch1 = + sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE); + assertEquals(100, fetch1.getBatch().get().count()); + // Test fetching Row format + InputBatch> fetch1AsRows = + sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE); + assertEquals(100, fetch1AsRows.getBatch().get().count()); + // Test Avro to Row format + Dataset fetch1Rows = AvroConversionUtils + .createDataFrame(JavaRDD.toRDD(fetch1.getBatch().get()), + schemaProvider.getSourceSchema().toString(), sparkSession); + assertEquals(100, fetch1Rows.count()); + + // 2. Produce new data, extract new data + generateOneFile("2", "001", 10000); + // Test fetching Avro format + InputBatch> fetch2 = sourceFormatAdapter.fetchNewDataInAvroFormat( + Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE); + assertEquals(10000, fetch2.getBatch().get().count()); + // Test fetching Row format + InputBatch> fetch2AsRows = sourceFormatAdapter.fetchNewDataInRowFormat( + Option.of(fetch1AsRows.getCheckpointForNextBatch()), Long.MAX_VALUE); + assertEquals(10000, fetch2AsRows.getBatch().get().count()); + + // 3. Extract with previous checkpoint => gives same data back (idempotent) + InputBatch> fetch3AsRows = sourceFormatAdapter.fetchNewDataInRowFormat( + Option.of(fetch1AsRows.getCheckpointForNextBatch()), Long.MAX_VALUE); + assertEquals(10000, fetch3AsRows.getBatch().get().count()); + assertEquals(fetch2AsRows.getCheckpointForNextBatch(), + fetch3AsRows.getCheckpointForNextBatch()); + fetch3AsRows.getBatch().get().createOrReplaceTempView("test_dfs_table"); + Dataset rowDataset = SparkSession.builder().sparkContext(jsc.sc()).getOrCreate() + .sql("select * from test_dfs_table"); + assertEquals(10000, rowDataset.count()); + + // 4. Extract with latest checkpoint => no new data returned + InputBatch> fetch4 = sourceFormatAdapter.fetchNewDataInAvroFormat( + Option.of(fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE); + assertEquals(Option.empty(), fetch4.getBatch()); + + // 5. Extract from the beginning + InputBatch> fetch5 = sourceFormatAdapter.fetchNewDataInAvroFormat( + Option.empty(), Long.MAX_VALUE); + assertEquals(10100, fetch5.getBatch().get().count()); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java deleted file mode 100644 index f7ac61f28b2a6..0000000000000 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.utilities.sources; - -import org.apache.hudi.AvroConversionUtils; -import org.apache.hudi.common.HoodieTestDataGenerator; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.TypedProperties; -import org.apache.hudi.utilities.UtilitiesTestBase; -import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter; -import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; - -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SQLContext; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.IOException; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * Basic tests against all subclasses of {@link JsonDFSSource} and {@link ParquetDFSSource}. - */ -public class TestDFSSource extends UtilitiesTestBase { - - private FilebasedSchemaProvider schemaProvider; - - @BeforeClass - public static void initClass() throws Exception { - UtilitiesTestBase.initClass(); - } - - @AfterClass - public static void cleanupClass() throws Exception { - UtilitiesTestBase.cleanupClass(); - } - - @Before - public void setup() throws Exception { - super.setup(); - schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(), jsc); - } - - @After - public void teardown() throws Exception { - super.teardown(); - } - - @Test - public void testJsonDFSSource() throws IOException { - dfs.mkdirs(new Path(dfsBasePath + "/jsonFiles")); - HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); - - TypedProperties props = new TypedProperties(); - props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsBasePath + "/jsonFiles"); - JsonDFSSource jsonDFSSource = new JsonDFSSource(props, jsc, sparkSession, schemaProvider); - SourceFormatAdapter jsonSource = new SourceFormatAdapter(jsonDFSSource); - - // 1. Extract without any checkpoint => get all the data, respecting sourceLimit - assertEquals(Option.empty(), jsonSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch()); - UtilitiesTestBase.Helpers.saveStringsToDFS(Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 100)), dfs, - dfsBasePath + "/jsonFiles/1.json"); - // Test respecting sourceLimit - int sourceLimit = 10; - RemoteIterator files = dfs.listFiles(new Path(dfsBasePath + "/jsonFiles/1.json"), true); - FileStatus file1Status = files.next(); - assertTrue(file1Status.getLen() > sourceLimit); - assertEquals(Option.empty(), jsonSource.fetchNewDataInAvroFormat(Option.empty(), sourceLimit).getBatch()); - // Test json -> Avro - InputBatch> fetch1 = jsonSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE); - assertEquals(100, fetch1.getBatch().get().count()); - // Test json -> Row format - InputBatch> fetch1AsRows = jsonSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE); - assertEquals(100, fetch1AsRows.getBatch().get().count()); - // Test Avro -> Row format - Dataset fetch1Rows = AvroConversionUtils.createDataFrame(JavaRDD.toRDD(fetch1.getBatch().get()), - schemaProvider.getSourceSchema().toString(), jsonDFSSource.getSparkSession()); - assertEquals(100, fetch1Rows.count()); - - // 2. Produce new data, extract new data - UtilitiesTestBase.Helpers.saveStringsToDFS(Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 10000)), dfs, - dfsBasePath + "/jsonFiles/2.json"); - InputBatch> fetch2 = - jsonSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE); - assertEquals(10000, fetch2.getBatch().get().count()); - - // 3. Extract with previous checkpoint => gives same data back (idempotent) - InputBatch> fetch3 = - jsonSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE); - assertEquals(10000, fetch3.getBatch().get().count()); - assertEquals(fetch2.getCheckpointForNextBatch(), fetch3.getCheckpointForNextBatch()); - fetch3.getBatch().get().registerTempTable("test_dfs_table"); - Dataset rowDataset = new SQLContext(jsc.sc()).sql("select * from test_dfs_table"); - assertEquals(10000, rowDataset.count()); - - // 4. Extract with latest checkpoint => no new data returned - InputBatch> fetch4 = - jsonSource.fetchNewDataInAvroFormat(Option.of(fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE); - assertEquals(Option.empty(), fetch4.getBatch()); - - // 5. Extract from the beginning - InputBatch> fetch5 = jsonSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE); - assertEquals(10100, fetch5.getBatch().get().count()); - } - - @Test - public void testParquetDFSSource() throws IOException { - dfs.mkdirs(new Path(dfsBasePath + "/parquetFiles")); - HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); - - TypedProperties props = new TypedProperties(); - props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsBasePath + "/parquetFiles"); - ParquetDFSSource parquetDFSSource = new ParquetDFSSource(props, jsc, sparkSession, schemaProvider); - SourceFormatAdapter parquetSource = new SourceFormatAdapter(parquetDFSSource); - - // 1. Extract without any checkpoint => get all the data, respecting sourceLimit - assertEquals(Option.empty(), parquetSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch()); - List batch1 = Helpers.toGenericRecords(dataGenerator.generateInserts("000", 100), dataGenerator); - Path file1 = new Path(dfsBasePath + "/parquetFiles", "1.parquet"); - Helpers.saveParquetToDFS(batch1, file1); - // Test respecting sourceLimit - int sourceLimit = 10; - RemoteIterator files = dfs.listFiles(file1, true); - FileStatus file1Status = files.next(); - assertTrue(file1Status.getLen() > sourceLimit); - assertEquals(Option.empty(), parquetSource.fetchNewDataInAvroFormat(Option.empty(), sourceLimit).getBatch()); - // Test parquet -> Avro - InputBatch> fetch1 = parquetSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE); - assertEquals(100, fetch1.getBatch().get().count()); - // Test parquet -> Row - InputBatch> fetch1AsRows = parquetSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE); - assertEquals(100, fetch1AsRows.getBatch().get().count()); - - // 2. Produce new data, extract new data - List batch2 = Helpers.toGenericRecords(dataGenerator.generateInserts("001", 10000), dataGenerator); - Path file2 = new Path(dfsBasePath + "/parquetFiles", "2.parquet"); - Helpers.saveParquetToDFS(batch2, file2); - // Test parquet -> Avro - InputBatch> fetch2 = - parquetSource.fetchNewDataInAvroFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE); - assertEquals(10000, fetch2.getBatch().get().count()); - // Test parquet -> Row - InputBatch> fetch2AsRows = - parquetSource.fetchNewDataInRowFormat(Option.of(fetch1AsRows.getCheckpointForNextBatch()), Long.MAX_VALUE); - assertEquals(10000, fetch2AsRows.getBatch().get().count()); - - // 3. Extract with previous checkpoint => gives same data back (idempotent) - InputBatch> fetch3AsRows = - parquetSource.fetchNewDataInRowFormat(Option.of(fetch1AsRows.getCheckpointForNextBatch()), Long.MAX_VALUE); - assertEquals(10000, fetch3AsRows.getBatch().get().count()); - assertEquals(fetch2AsRows.getCheckpointForNextBatch(), fetch3AsRows.getCheckpointForNextBatch()); - fetch3AsRows.getBatch().get().registerTempTable("test_dfs_table"); - Dataset rowDataset = new SQLContext(jsc.sc()).sql("select * from test_dfs_table"); - assertEquals(10000, rowDataset.count()); - - // 4. Extract with latest checkpoint => no new data returned - InputBatch> fetch4 = - parquetSource.fetchNewDataInAvroFormat(Option.of(fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE); - assertEquals(Option.empty(), fetch4.getBatch()); - - // 5. Extract from the beginning - InputBatch> fetch5 = parquetSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE); - assertEquals(10100, fetch5.getBatch().get().count()); - } -} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java new file mode 100644 index 0000000000000..3cdae7aa78940 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.TypedProperties; +import org.apache.hudi.utilities.UtilitiesTestBase; + +import org.apache.hadoop.fs.Path; +import org.junit.Before; + +import java.io.IOException; +import java.util.List; + +/** + * Basic tests for {@link JsonDFSSource}. + */ +public class TestJsonDFSSource extends AbstractDFSSourceTestBase { + + @Before + public void setup() throws Exception { + super.setup(); + this.dfsRoot = dfsBasePath + "/jsonFiles"; + this.fileSuffix = ".json"; + } + + @Override + Source prepareDFSSource() { + TypedProperties props = new TypedProperties(); + props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot); + return new JsonDFSSource(props, jsc, sparkSession, schemaProvider); + } + + @Override + void writeNewDataToFile(List records, Path path) throws IOException { + UtilitiesTestBase.Helpers.saveStringsToDFS( + Helpers.jsonifyRecords(records), dfs, path.toString()); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java new file mode 100644 index 0000000000000..6d8d7001a1f54 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.TypedProperties; + +import org.apache.hadoop.fs.Path; +import org.junit.Before; + +import java.io.IOException; +import java.util.List; + +/** + * Basic tests for {@link ParquetDFSSource}. + */ +public class TestParquetDFSSource extends AbstractDFSSourceTestBase { + + @Before + public void setup() throws Exception { + super.setup(); + this.dfsRoot = dfsBasePath + "/parquetFiles"; + this.fileSuffix = ".parquet"; + } + + @Override + Source prepareDFSSource() { + TypedProperties props = new TypedProperties(); + props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot); + return new ParquetDFSSource(props, jsc, sparkSession, schemaProvider); + } + + @Override + void writeNewDataToFile(List records, Path path) throws IOException { + Helpers.saveParquetToDFS(Helpers.toGenericRecords(records, dataGenerator), path); + } +}