diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java index 483e44830c7c1..181374336584d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java @@ -20,6 +20,7 @@ import org.apache.hudi.DataSourceReadOptions; import org.apache.hudi.DataSourceUtils; +import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieRecord; @@ -27,26 +28,33 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; import com.esotericsoftware.minlog.Log; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapPartitionsFunction; import org.apache.spark.sql.DataFrameReader; import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import java.io.IOException; +import java.net.URLDecoder; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH; import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT; @@ -172,37 +180,48 @@ public Pair>, String> fetchNextBatch(Option lastCkpt String s3FS = props.getString(Config.S3_FS_PREFIX, "s3").toLowerCase(); String s3Prefix = s3FS + "://"; - // Extract distinct file keys from s3 meta hoodie table - final List cloudMetaDf = source + // Create S3 paths + final boolean checkExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK, Config.DEFAULT_ENABLE_EXISTS_CHECK); + SerializableConfiguration serializableConfiguration = new SerializableConfiguration(sparkContext.hadoopConfiguration()); + List cloudFiles = source .filter(filter) .select("s3.bucket.name", "s3.object.key") .distinct() - .collectAsList(); - // Create S3 paths - final boolean checkExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK, Config.DEFAULT_ENABLE_EXISTS_CHECK); - List cloudFiles = new ArrayList<>(); - for (Row row : cloudMetaDf) { - // construct file path, row index 0 refers to bucket and 1 refers to key - String bucket = row.getString(0); - String filePath = s3Prefix + bucket + "/" + row.getString(1); - if (checkExists) { - FileSystem fs = FSUtils.getFs(s3Prefix + bucket, sparkSession.sparkContext().hadoopConfiguration()); - try { - if (fs.exists(new Path(filePath))) { - cloudFiles.add(filePath); - } - } catch (IOException e) { - LOG.error(String.format("Error while checking path exists for %s ", filePath), e); - } - } else { - cloudFiles.add(filePath); - } - } + .mapPartitions((MapPartitionsFunction) fileListIterator -> { + List cloudFilesPerPartition = new ArrayList<>(); + final Configuration configuration = serializableConfiguration.newCopy(); + fileListIterator.forEachRemaining(row -> { + String bucket = row.getString(0); + String filePath = s3Prefix + bucket + "/" + row.getString(1); + String decodeUrl = null; + try { + decodeUrl = URLDecoder.decode(filePath, StandardCharsets.UTF_8.name()); + if (checkExists) { + FileSystem fs = FSUtils.getFs(s3Prefix + bucket, configuration); + if (fs.exists(new Path(decodeUrl))) { + cloudFilesPerPartition.add(decodeUrl); + } + } else { + cloudFilesPerPartition.add(decodeUrl); + } + } catch (IOException e) { + LOG.error(String.format("Error while checking path exists for %s ", decodeUrl), e); + throw new HoodieIOException(String.format("Error while checking path exists for %s ", decodeUrl), e); + } catch (Throwable e) { + LOG.warn("Failed to add cloud file ", e); + throw new HoodieException("Failed to add cloud file", e); + } + }); + return cloudFilesPerPartition.iterator(); + }, Encoders.STRING()).collectAsList(); + Option> dataset = Option.empty(); if (!cloudFiles.isEmpty()) { DataFrameReader dataFrameReader = getDataFrameReader(fileFormat); dataset = Option.of(dataFrameReader.load(cloudFiles.toArray(new String[0]))); } + LOG.debug("Extracted distinct files " + cloudFiles.size() + + " and some samples " + cloudFiles.stream().limit(10).collect(Collectors.toList())); return Pair.of(dataset, queryTypeAndInstantEndpts.getRight().getRight()); } }