Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
Expand Down Expand Up @@ -86,8 +85,6 @@
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function2;
import scala.Option;
import scala.Some;
Expand All @@ -106,14 +103,7 @@
*/
public class SparkTableUtil {

private static final Logger LOG = LoggerFactory.getLogger(SparkTableUtil.class);

private static final Joiner.MapJoiner MAP_JOINER = Joiner.on(",").withKeyValueSeparator("=");

private static final PathFilter HIDDEN_PATH_FILTER =
p -> !p.getName().startsWith("_") && !p.getName().startsWith(".");

private static final String duplicateFileMessage = "Cannot complete import because data files " +
private static final String DUPLICATE_FILE_MESSAGE = "Cannot complete import because data files " +
"to be imported already exist within the target table: %s. " +
"This is disabled by default as Iceberg is not designed for mulitple references to the same file" +
" within the same table. If you are sure, you may set 'check_duplicate_files' to false to force the import.";
Expand Down Expand Up @@ -479,7 +469,7 @@ private static void importUnpartitionedSparkTable(SparkSession spark, TableIdent
Dataset<String> duplicates = importedFiles.join(existingFiles, joinCond)
.select("file_path").as(Encoders.STRING());
Preconditions.checkState(duplicates.isEmpty(),
String.format(duplicateFileMessage, Joiner.on(",").join((String[]) duplicates.take(10))));
String.format(DUPLICATE_FILE_MESSAGE, Joiner.on(",").join((String[]) duplicates.take(10))));
}

AppendFiles append = targetTable.newAppend();
Expand Down Expand Up @@ -535,7 +525,7 @@ public static void importSparkPartitions(SparkSession spark, List<SparkPartition
Dataset<String> duplicates = importedFiles.join(existingFiles, joinCond)
.select("file_path").as(Encoders.STRING());
Preconditions.checkState(duplicates.isEmpty(),
String.format(duplicateFileMessage, Joiner.on(",").join((String[]) duplicates.take(10))));
String.format(DUPLICATE_FILE_MESSAGE, Joiner.on(",").join((String[]) duplicates.take(10))));
}

List<ManifestFile> manifests = filesToImport
Expand Down