Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,41 @@

import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;

import com.esotericsoftware.minlog.Log;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.io.IOException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH;
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT;
Expand Down Expand Up @@ -172,37 +180,48 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
String s3FS = props.getString(Config.S3_FS_PREFIX, "s3").toLowerCase();
String s3Prefix = s3FS + "://";

// Extract distinct file keys from s3 meta hoodie table
final List<Row> cloudMetaDf = source
// Create S3 paths
final boolean checkExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK, Config.DEFAULT_ENABLE_EXISTS_CHECK);
SerializableConfiguration serializableConfiguration = new SerializableConfiguration(sparkContext.hadoopConfiguration());
List<String> cloudFiles = source
.filter(filter)
.select("s3.bucket.name", "s3.object.key")
.distinct()
.collectAsList();
// Create S3 paths
final boolean checkExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK, Config.DEFAULT_ENABLE_EXISTS_CHECK);
List<String> cloudFiles = new ArrayList<>();
for (Row row : cloudMetaDf) {
// construct file path, row index 0 refers to bucket and 1 refers to key
String bucket = row.getString(0);
String filePath = s3Prefix + bucket + "/" + row.getString(1);
if (checkExists) {
FileSystem fs = FSUtils.getFs(s3Prefix + bucket, sparkSession.sparkContext().hadoopConfiguration());
try {
if (fs.exists(new Path(filePath))) {
cloudFiles.add(filePath);
}
} catch (IOException e) {
LOG.error(String.format("Error while checking path exists for %s ", filePath), e);
}
} else {
cloudFiles.add(filePath);
}
}
.mapPartitions((MapPartitionsFunction<Row, String>) fileListIterator -> {
List<String> cloudFilesPerPartition = new ArrayList<>();
final Configuration configuration = serializableConfiguration.newCopy();
fileListIterator.forEachRemaining(row -> {
String bucket = row.getString(0);
String filePath = s3Prefix + bucket + "/" + row.getString(1);
String decodeUrl = null;
try {
decodeUrl = URLDecoder.decode(filePath, StandardCharsets.UTF_8.name());
if (checkExists) {
FileSystem fs = FSUtils.getFs(s3Prefix + bucket, configuration);
if (fs.exists(new Path(decodeUrl))) {
cloudFilesPerPartition.add(decodeUrl);
}
} else {
cloudFilesPerPartition.add(decodeUrl);
}
} catch (IOException e) {
LOG.error(String.format("Error while checking path exists for %s ", decodeUrl), e);
throw new HoodieIOException(String.format("Error while checking path exists for %s ", decodeUrl), e);
} catch (Throwable e) {
LOG.warn("Failed to add cloud file ", e);
throw new HoodieException("Failed to add cloud file", e);
}
});
return cloudFilesPerPartition.iterator();
}, Encoders.STRING()).collectAsList();

Option<Dataset<Row>> dataset = Option.empty();
if (!cloudFiles.isEmpty()) {
DataFrameReader dataFrameReader = getDataFrameReader(fileFormat);
dataset = Option.of(dataFrameReader.load(cloudFiles.toArray(new String[0])));
}
LOG.debug("Extracted distinct files " + cloudFiles.size()
+ " and some samples " + cloudFiles.stream().limit(10).collect(Collectors.toList()));
return Pair.of(dataset, queryTypeAndInstantEndpts.getRight().getRight());
}
}