Skip to content

Conversation

@jerryshao
Copy link
Contributor

@jerryshao jerryshao commented May 21, 2020

If we don't use qualified path (file:/temp/test_db) to create or save into (Hadoop) table, then the file_path queried out is not a qualified path, for example:

  private Dataset<Row> buildValidDataFileDF() {
    String allDataFilesMetadataTable = metadataTableName(MetadataTableType.ALL_DATA_FILES);
    return spark.read().format("iceberg")
        .load(allDataFilesMetadataTable)
        .select("file_path");
  }

The result here could be:

+-----------------------------------------------------------------------------------+
|file_path                                                                          |
+-----------------------------------------------------------------------------------+
|tmp/iceberg_test2/data/00000-172-2805f207-2c0d-4717-acc2-fed60430afeb-00000.parquet|
|tmp/iceberg_test2/data/00001-173-bd5e807d-e96f-49de-b84e-2c254c0777bb-00000.parquet|
|tmp/iceberg_test2/data/00002-174-fb7f84f0-d2ed-4e53-b5b9-6ef2f7da8a73-00000.parquet|
+-----------------------------------------------------------------------------------+

But the code here file.getPath().toString() in RemoveOrphanFilesAction#listDirRecursively returns qualified path:

     for (FileStatus file : fs.listStatus(path, HiddenPathFilter.get())) {
        if (file.isDirectory()) {
          subDirs.add(file.getPath().toString());
        } else if (file.isFile() && predicate.test(file)) {
          matchingFiles.add(file.getPath().toString());
        }
      }

So the join condition equalTo may not correctly get the orphan files and delete the file mistakenly.

So here propose to fix the join condition to contains. Another solution is to change the relative path to qualified one in everywhere.

Dataset<Row> actualFileDF = buildActualFileDF();

Column joinCond = validFileDF.col("file_path").equalTo(actualFileDF.col("file_path"));
Column joinCond = actualFileDF.col("file_path").contains(validFileDF.col("file_path"));
Copy link
Contributor

@rdblue rdblue May 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this going to cause Spark to use a nested loop join (full join) because there is no way to partition the data for this expression?

To fix it, what about using just the file name as well? File names should be unique because we embed the write UUID, partition, and task ID. And if we add both checks, filename could be used to distribute the data without many collisions and contains could be used for final correctness.

Column nameEqual = filename(actualFileDF.col("file_path")).equals(filename(validFileDF.col("file_path")));
Column actualContains = actualFileDF.col("file_path").contains(validFileDF.col("file_path"));
Column joinCond = nameEqual.and(actualContains);

FYI @aokolnychyi.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I see. If the file name is unique, then I think it would be fine to change to this way. Let me update the code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if it isn't unique, we don't expect many duplicates because writers will be operating in parallel. And the worst case is all files have the same name and all get joined in a task -- which is pretty much the same as using a nested loop join.

@rdblue rdblue merged commit b3932e1 into apache:master May 22, 2020
@rdblue
Copy link
Contributor

rdblue commented May 22, 2020

Looks great! Thanks for fixing this, @jerryshao!

FYI @aokolnychyi.

@aokolnychyi
Copy link
Contributor

aokolnychyi commented May 22, 2020

Yeah, I've seen this problem but didn't get time to fix it. Thanks, @jerryshao.

I believe the problem is not about having a qualified path. The problem is about not having a scheme in the table's location. I believe LocationProvider will use the table location as a basis to generate a qualified location for each new file. However, those locations will not have a scheme if the root table location does not have it.

That's why I am not sure how this UDF will help us:

  private static final UserDefinedFunction filename = functions.udf((String path) -> {
     int lastIndex = path.lastIndexOf(File.separator);
     if (lastIndex == -1) {
       return path;
     } else {
       return path.substring(lastIndex + 1);
     }
   }, DataTypes.StringType);

Also, switching to contains means using BroadcastNestedLoopJoin. This action is already very expensive for large tables. I worry it wouldn't complete at all now.

@aokolnychyi
Copy link
Contributor

Actually, I take it back. I missed the point that the UDF we added actually fetches the file name only and having an equality predicate will avoid the nested loop join.

@jerryshao
Copy link
Contributor Author

Yes, the key problem is that we don't always change the path to a qualified one, and depends on user's provided path. If we could unify the path to a qualified one, then this problem could be fixed. But when I check the code, seems there're various places need to be changed, so I chose this simple fix instead.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants