-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Add File for Avro files throws PreconditionException #3273
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Checking the evaluators it looks like we really do need a negative 1 here see iceberg/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java Lines 90 to 99 in 7aef02b
If we set it to 0 then files will always be ignored, if we set it to -1 then that is currently used as a signal that the file has no metric information and must be scanned. This has been broken ever since we switched from using a direct DataFile constructor So I think we only have a few options
I think approach 1. is the safest thing to do here for future proofing behavior. Specifically I think we should allow for explicitly setting the rowCount to -1 if and only if the rest of the metrics are empty. This should preserve the behavior of allowing a file without metrics but also insure that we don't have rowcount as -1 when the metrics are set. |
|
Thanks for catching that. Done with approach 1, and also fixed checkstyle. Watching the tests |
|
Added the check mentioned in the last comment:
|
| .withMetrics(metrics) | ||
| .withPartitionPath(partitionKey) | ||
| .build(); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was saving this white-space for my retirement :nit:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
RussellSpitzer
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is the right thing to do, @rdblue can I ask you for a quick sanity check?
| } | ||
|
|
||
| @Test | ||
| public void addDataUnpartitionedAvroFile() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename method name, there seems to be another method with almost same name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| ); | ||
| assertEquals("Iceberg table contains correct data", | ||
| expected, | ||
| sql("SELECT * FROM %s ORDER BY id", tableName)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related to this change.
For a COUNT query, we currently do not rely on the record count metrics since Spark does not pushdown the count expression.
When Spark supports count pushdown, using -1 as a literal record count will give incorrect results.
Can we also add a COUNT assertion to the test?
Also since the users can read the metrics from manifests and compute the count, might be good idea to document the meaning of -1 for record count metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| expected, | ||
| sql("SELECT * FROM %s ORDER BY id", tableName)); | ||
|
|
||
| List<Object[]> expectedCount = Lists.newArrayList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lists.newArrayList(new Object[]{2L}) not possible as compiler confuses between Object[] and varargs Object.
| Preconditions.checkArgument(recordCount >= 0 || | ||
| (recordCount == -1 && valueCounts == null && columnSizes == null && nanValueCounts == null && | ||
| lowerBounds == null && upperBounds == null), | ||
| "Metrics cannot be set if record count is -1."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this change necessary? I don't think that we should allow writing a file without a record count.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because our importAvroPartitions code expects to be able to do this and our metrics evaluation code assumes it can be -1 as well. We could forbid this but that would require changing the import avro code to also fully scan avro files before importing. As is, importAvro has been broken since we switched to the builder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. What about getting the correct count in the import code? We already open up Parquet files and read the footer. It wouldn't be too bad to skip through Avro files, like we do to find the count for a specific starting offset: https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/avro/AvroIO.java#L149
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there is a problem with that, we were just working with the past behavior which was to include a rowcount of -1. I think writing new code to skip through avro blocks and sum up row counts is fine too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we leave this (even temporarily), should we change the exception message to mention something more in the range of Metrics shouldn't be set when the record count is -1. -1 is only valid for files which haven't been read yet or something?
I don't mind the -1 for now as it's consistent with some current behavior, but I feel like users (or at least framework developers) should be made aware this is not a relatively normal circumstance and that this means the file hasn't been read.
Ideally, we remove the -1 as soon as possible, but I can't think of any valid scenarios presently where we have metrics but not record count.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I will give this a try. @kbendick I guess we won't have -1 anymore in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took @rdblue suggestion and made an attempt to use the AvroIO method to get the row count, which internally just visits each block once. Potential follow up could be making this (and even the Parquet/ORC footer reading) into distributed Spark jobs. Added test.
Need to rebase following the spark directory refactor
build.gradle
Outdated
| } | ||
|
|
||
| testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you revert this change? It could cause conflicts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea I just rebased, and moved it to the appropriate build.gradle, hope its ok
… of long to differentiate unset vs set recordCount.
9a5cd9d to
bd71a89
Compare
|
|
||
| @Test | ||
| public void addAvroFile() throws Exception { | ||
| // Spark Session Catalog cannot load metadata tables |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This runs on the two other test parameters (hive catalog, hadoop catalog)
|
|
||
| DataWriter<Record> dataWriter = Avro.writeData(file) | ||
| .schema(schema) | ||
| .createWriterFunc(org.apache.iceberg.data.avro.DataWriter::create) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be fully-qualified?
| sql(createIceberg, tableName); | ||
|
|
||
| Object result = scalarSql("CALL %s.system.add_files('%s', '`avro`.`%s`')", | ||
| catalogName, tableName, path); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually dangerous and we probably want to disallow it. We should only import data files that do not have field IDs. Otherwise, the field IDs may not match and you could get strange behavior. I'd prefer if the test used an Avro file written without Iceberg support to ensure it doesn't have field IDs. Not a huge problem, but eventually I think we should catch that there were IDs in the imported file and fail if they don't match the table schema's IDs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, was not familiar with this. I changed to using native Avro writers, does that do the trick?
Re: fieldId check, sounds good, probably we can look at that and also add general schema validation while importing the file.
| } | ||
|
|
||
| static long findStartingRowPos(Supplier<SeekableInputStream> open, long start) { | ||
| public static long findStartingRowPos(Supplier<SeekableInputStream> open, long start) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than exposing this, could you add a helper method to the already public Avro class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on something like "public static long readRowCount(path)"
| long length = inFile.getLength(); | ||
|
|
||
| // Seeking to the end will count all the rows. | ||
| long rowCount = AvroIO.findStartingRowPos(inFile::newStream, length); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The findStartingRowPos method handles EOFException so I think you could implement this using AvroIO.findStartingRowPos(..., Long.MAX_VALUE). Then you wouldn't need to call to S3 to find the file length.
That's how I'd implement the util method:
public class Avro {
...
public long rowCount(InputFile file) {
return AvroIO.findStartingRowPos(file::newStream, Long.MAX_VALUE);
}
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, thanks for the code suggestion
|
@rdblue @RussellSpitzer addressed the comments, if you guys have time for another round |
|
Thanks, @szehon-ho! Nice work. |
|
Thanks for the review! |
#3263
As mentioned, this is one of the proposed solution to set 0 rows for added Avro files which fixes the immediate problem, as I am not sure any quick way to get the row count of outside Avro files. Was this the original intent?
I wonder if we should run a spark job that reads the Avro files to compute the real row count, but it might be self-defeating as add_files is supposed to be have performance benefit over an Iceberg insert.
Also adds a test case to demonstrate the problem.