Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hudi.utilities.sources.AvroSource;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.JsonSource;
import org.apache.hudi.utilities.sources.ParquetSource;
import org.apache.hudi.utilities.sources.RowSource;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
Expand Down Expand Up @@ -59,6 +60,8 @@ public InputBatch<JavaRDD<GenericRecord>> fetchNewDataInAvroFormat(Option<String
switch (source.getSourceType()) {
case AVRO:
return ((AvroSource) source).fetchNext(lastCkptStr, sourceLimit);
case PARQUET:
return ((ParquetSource) source).fetchNext(lastCkptStr, sourceLimit);
case JSON: {
InputBatch<JavaRDD<String>> r = ((JsonSource) source).fetchNext(lastCkptStr, sourceLimit);
AvroConvertor convertor = new AvroConvertor(r.getSchemaProvider().getSourceSchema());
Expand Down Expand Up @@ -99,6 +102,18 @@ public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<String> lastCkptS
.orElse(null)),
r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
case PARQUET: {
InputBatch<JavaRDD<GenericRecord>> r = ((ParquetSource) source).fetchNext(lastCkptStr, sourceLimit);
Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
return new InputBatch<>(
Option
.ofNullable(
r.getBatch()
.map(rdd -> AvroConversionUtils.createDataFrame(JavaRDD.toRDD(rdd), sourceSchema.toString(),
source.getSparkSession()))
.orElse(null)),
r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
case JSON: {
InputBatch<JavaRDD<String>> r = ((JsonSource) source).fetchNext(lastCkptStr, sourceLimit);
Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.utilities.sources;

import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
import org.apache.parquet.avro.AvroParquetInputFormat;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

/**
* DFS Source that reads parquet data
*/
public class ParquetDFSSource extends ParquetSource {

private final DFSPathSelector pathSelector;

public ParquetDFSSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider);
this.pathSelector = new DFSPathSelector(props, this.sparkContext.hadoopConfiguration());
}

@Override
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCkptStr, long sourceLimit) {
Pair<Option<String>, String> selectPathsWithMaxModificationTime =
pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr, sourceLimit);
return selectPathsWithMaxModificationTime.getLeft()
.map(pathStr -> new InputBatch<>(Option.of(fromFiles(pathStr)), selectPathsWithMaxModificationTime.getRight()))
.orElseGet(() -> new InputBatch<>(Option.empty(), selectPathsWithMaxModificationTime.getRight()));
}

private JavaRDD<GenericRecord> fromFiles(String pathStr) {
JavaPairRDD<Void, GenericRecord> avroRDD = sparkContext.newAPIHadoopFile(pathStr, AvroParquetInputFormat.class,
Void.class, GenericRecord.class, sparkContext.hadoopConfiguration());
return avroRDD.values();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.utilities.sources;

import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

public abstract class ParquetSource extends Source<JavaRDD<GenericRecord>> {

public ParquetSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider, SourceType.PARQUET);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public abstract class Source<T> implements Serializable {
protected static volatile Logger log = LogManager.getLogger(Source.class);

public enum SourceType {
JSON, AVRO, ROW
JSON, AVRO, ROW, PARQUET
}

protected transient TypedProperties props;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,31 @@
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.service.server.HiveServer2;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.TestRawTripPayload;
import org.apache.hudi.common.minicluster.HdfsTestService;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveClient;
import org.apache.hudi.hive.util.HiveTestService;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
Expand Down Expand Up @@ -178,13 +186,40 @@ public static void saveStringsToDFS(String[] lines, FileSystem fs, String target
os.close();
}

public static void saveParquetToDFS(List<GenericRecord> records, Path targetFile) throws IOException {
try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(targetFile)
.withSchema(HoodieTestDataGenerator.avroSchema).withConf(HoodieTestUtils.getDefaultHadoopConf()).build()) {
for (GenericRecord record : records) {
writer.write(record);
}
}
}

public static TypedProperties setupSchemaOnDFS() throws IOException {
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc");
TypedProperties props = new TypedProperties();
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
return props;
}

public static GenericRecord toGenericRecord(HoodieRecord hoodieRecord, HoodieTestDataGenerator dataGenerator) {
try {
Option<IndexedRecord> recordOpt = hoodieRecord.getData().getInsertValue(dataGenerator.avroSchema);
return (GenericRecord) recordOpt.get();
} catch (IOException e) {
return null;
}
}

public static List<GenericRecord> toGenericRecords(List<HoodieRecord> hoodieRecords,
HoodieTestDataGenerator dataGenerator) {
List<GenericRecord> records = new ArrayList<GenericRecord>();
for (HoodieRecord hoodieRecord : hoodieRecords) {
records.add(toGenericRecord(hoodieRecord, dataGenerator));
}
return records;
}

public static String toJsonString(HoodieRecord hr) {
try {
return ((TestRawTripPayload) hr.getData()).getJsonData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@
package org.apache.hudi.utilities.sources;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
Expand All @@ -41,7 +46,7 @@
import org.junit.Test;

/**
* Basic tests against all subclasses of {@link JsonDFSSource}
* Basic tests against all subclasses of {@link JsonDFSSource} and {@link ParquetDFSSource}
*/
public class TestDFSSource extends UtilitiesTestBase {

Expand Down Expand Up @@ -82,11 +87,17 @@ public void testJsonDFSSource() throws IOException {
assertEquals(Option.empty(), jsonSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
UtilitiesTestBase.Helpers.saveStringsToDFS(Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 100)), dfs,
dfsBasePath + "/jsonFiles/1.json");
assertEquals(Option.empty(), jsonSource.fetchNewDataInAvroFormat(Option.empty(), 10).getBatch());
InputBatch<JavaRDD<GenericRecord>> fetch1 = jsonSource.fetchNewDataInAvroFormat(Option.empty(), 1000000);
// Test respecting sourceLimit
int sourceLimit = 10;
RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path(dfsBasePath + "/jsonFiles/1.json"), true);
FileStatus file1Status = files.next();
assertTrue(file1Status.getLen() > sourceLimit);
assertEquals(Option.empty(), jsonSource.fetchNewDataInAvroFormat(Option.empty(), sourceLimit).getBatch());
// Test json -> Avro
InputBatch<JavaRDD<GenericRecord>> fetch1 = jsonSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
assertEquals(100, fetch1.getBatch().get().count());
// Test json -> Row format
InputBatch<Dataset<Row>> fetch1AsRows = jsonSource.fetchNewDataInRowFormat(Option.empty(), 1000000);
InputBatch<Dataset<Row>> fetch1AsRows = jsonSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE);
assertEquals(100, fetch1AsRows.getBatch().get().count());
// Test Avro -> Row format
Dataset<Row> fetch1Rows = AvroConversionUtils.createDataFrame(JavaRDD.toRDD(fetch1.getBatch().get()),
Expand All @@ -113,5 +124,69 @@ public void testJsonDFSSource() throws IOException {
InputBatch<JavaRDD<GenericRecord>> fetch4 =
jsonSource.fetchNewDataInAvroFormat(Option.of(fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(Option.empty(), fetch4.getBatch());

// 5. Extract from the beginning
InputBatch<JavaRDD<GenericRecord>> fetch5 = jsonSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
assertEquals(10100, fetch5.getBatch().get().count());
}

@Test
public void testParquetDFSSource() throws IOException {
dfs.mkdirs(new Path(dfsBasePath + "/parquetFiles"));
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();

TypedProperties props = new TypedProperties();
props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsBasePath + "/parquetFiles");
ParquetSource parquetDFSSource = new ParquetDFSSource(props, jsc, sparkSession, schemaProvider);
SourceFormatAdapter parquetSource = new SourceFormatAdapter(parquetDFSSource);

// 1. Extract without any checkpoint => get all the data, respecting sourceLimit
assertEquals(Option.empty(), parquetSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
List<GenericRecord> batch1 = Helpers.toGenericRecords(dataGenerator.generateInserts("000", 100), dataGenerator);
Path file1 = new Path(dfsBasePath + "/parquetFiles", "1.parquet");
Helpers.saveParquetToDFS(batch1, file1);
// Test respecting sourceLimit
int sourceLimit = 10;
RemoteIterator<LocatedFileStatus> files = dfs.listFiles(file1, true);
FileStatus file1Status = files.next();
assertTrue(file1Status.getLen() > sourceLimit);
assertEquals(Option.empty(), parquetSource.fetchNewDataInAvroFormat(Option.empty(), sourceLimit).getBatch());
// Test parquet -> Avro
InputBatch<JavaRDD<GenericRecord>> fetch1 = parquetSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
assertEquals(100, fetch1.getBatch().get().count());
// Test parquet -> Row
InputBatch<Dataset<Row>> fetch1AsRows = parquetSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE);
assertEquals(100, fetch1AsRows.getBatch().get().count());

// 2. Produce new data, extract new data
List<GenericRecord> batch2 = Helpers.toGenericRecords(dataGenerator.generateInserts("001", 10000), dataGenerator);
Path file2 = new Path(dfsBasePath + "/parquetFiles", "2.parquet");
Helpers.saveParquetToDFS(batch2, file2);
// Test parquet -> Avro
InputBatch<JavaRDD<GenericRecord>> fetch2 =
parquetSource.fetchNewDataInAvroFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(10000, fetch2.getBatch().get().count());
// Test parquet -> Row
InputBatch<Dataset<Row>> fetch2AsRows =
parquetSource.fetchNewDataInRowFormat(Option.of(fetch1AsRows.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(10000, fetch2AsRows.getBatch().get().count());

// 3. Extract with previous checkpoint => gives same data back (idempotent)
InputBatch<Dataset<Row>> fetch3AsRows =
parquetSource.fetchNewDataInRowFormat(Option.of(fetch1AsRows.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(10000, fetch3AsRows.getBatch().get().count());
assertEquals(fetch2AsRows.getCheckpointForNextBatch(), fetch3AsRows.getCheckpointForNextBatch());
fetch3AsRows.getBatch().get().registerTempTable("test_dfs_table");
Dataset<Row> rowDataset = new SQLContext(jsc.sc()).sql("select * from test_dfs_table");
assertEquals(10000, rowDataset.count());

// 4. Extract with latest checkpoint => no new data returned
InputBatch<JavaRDD<GenericRecord>> fetch4 =
parquetSource.fetchNewDataInAvroFormat(Option.of(fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(Option.empty(), fetch4.getBatch());

// 5. Extract from the beginning
InputBatch<JavaRDD<GenericRecord>> fetch5 = parquetSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
assertEquals(10100, fetch5.getBatch().get().count());
}
}