Skip to content

Conversation

@hililiwei
Copy link
Contributor

@hililiwei hililiwei commented Dec 21, 2021

close #3169

The length method of the OrcFileAppender class is modified. If the file is closed, the value of 'file.toInputFile().getLength()' is return, If not closed, use the estimated memory usage in treeWriter plus the position of the last tripe. Use reflection to get treeWriter(it's private).

@github-actions github-actions bot added the core label Dec 21, 2021
@hililiwei hililiwei marked this pull request as draft December 22, 2021 03:26
@hililiwei hililiwei force-pushed the 3169 branch 2 times, most recently from 7af31f0 to 8c93b98 Compare December 27, 2021 06:27
@hililiwei hililiwei marked this pull request as ready for review December 27, 2021 07:30
@hililiwei hililiwei requested a review from rdblue January 20, 2022 13:17
@rdblue
Copy link
Contributor

rdblue commented Jan 24, 2022

@hililiwei, can you describe how you're estimating the size of data that is buffered in memory for ORC? I think a description to explain to reviewers would help.

@hililiwei
Copy link
Contributor Author

hililiwei commented Jan 27, 2022

@hililiwei, can you describe how you're estimating the size of data that is buffered in memory for ORC? I think a description to explain to reviewers would help.

If a file is being written, to estimate its size, in three steps:

  1. Size of data that has been written to stripe.The value is obtained by summing the offset and length of the last stripe of the writer.
  2. Size of data that has been submitted to the writer but has not been written to the stripe. When creating OrcFileAppender, treeWriter is obtained through reflection, and use its estimateMemory to estimate how much memory is being used.
  3. Data that has not been submitted to the writer, that is, the size of the buffer. The maximum default value of the buffer is used here.

Add these three values to estimate the data size.

@rdblue
Copy link
Contributor

rdblue commented Feb 6, 2022

@hililiwei, I don't understand what #3 is. Why is this tracking the data that hasn't been submitted to the writer? It seems like all you're doing is adding a constant to the estimated size. For Parquet, we use the current file offset plus the size that is buffered in memory.

@hililiwei
Copy link
Contributor Author

@hililiwei, I don't understand what #3 is. Why is this tracking the data that hasn't been submitted to the writer? It seems like all you're doing is adding a constant to the estimated size. For Parquet, we use the current file offset plus the size that is buffered in memory.

#3 mainly refers to the data in the VectorizedRowBatch

public void add(D datum) {
try {
valueWriter.write(datum, batch);
if (batch.size == this.batchSize) {
writer.addRowBatch(batch);
batch.reset();
}
} catch (IOException ioe) {
throw new RuntimeIOException(ioe, "Problem writing to ORC file %s", file.location());
}
}

The data is written to the batch first.

@coolderli
Copy link
Contributor

Any update about this? We found use orc can save more storage space than using parquet. So I'd like to try the Orc file.

@hililiwei
Copy link
Contributor Author

Any update about this? We found use orc can save more storage space than using parquet. So I'd like to try the Orc file.

ping @rdblue @liubo1022126

@liubo1022126
Copy link

@coolderli yes, parquet query performance is worse than orc when select by trino.

@hililiwei and does this pr have any remaining unfinished work? I want merge this pr to my branch.

@hililiwei
Copy link
Contributor Author

@coolderli yes, parquet query performance is worse than orc when select by trino.

@hililiwei and does this pr have any remaining unfinished work? I want merge this pr to my branch.

For now, there are no major changes. However, I'm still waiting for comments from @rdblue or anyone else, so may revise it again. 😄

@hililiwei
Copy link
Contributor Author

cc @rdblue @openinx, Could you please review when you have some time?

@openinx
Copy link
Member

openinx commented Mar 23, 2022

There are 3 failure cases from travis CI report:

org.apache.iceberg.flink.actions.TestRewriteDataFilesAction > testRewriteAvoidRepeateCompress[catalogName=testhive, baseNamespace=, format=ORC] FAILED
    java.lang.AssertionError: Action should add 1 data file expected:<1> but was:<2>
        at org.junit.Assert.fail(Assert.java:89)
        at org.junit.Assert.failNotEquals(Assert.java:835)
        at org.junit.Assert.assertEquals(Assert.java:647)
        at org.apache.iceberg.flink.actions.TestRewriteDataFilesAction.testRewriteAvoidRepeateCompress(TestRewriteDataFilesAction.java:367)

org.apache.iceberg.flink.actions.TestRewriteDataFilesAction > testRewriteAvoidRepeateCompress[catalogName=testhadoop, baseNamespace=, format=ORC] FAILED
    java.lang.AssertionError: Action should add 1 data file expected:<1> but was:<2>
        at org.junit.Assert.fail(Assert.java:89)
        at org.junit.Assert.failNotEquals(Assert.java:835)
        at org.junit.Assert.assertEquals(Assert.java:647)
        at org.apache.iceberg.flink.actions.TestRewriteDataFilesAction.testRewriteAvoidRepeateCompress(TestRewriteDataFilesAction.java:367)

org.apache.iceberg.flink.actions.TestRewriteDataFilesAction > testRewriteAvoidRepeateCompress[catalogName=testhadoop_basenamespace, baseNamespace=l0.l1, format=ORC] FAILED
    java.lang.AssertionError: Action should add 1 data file expected:<1> but was:<2>
        at org.junit.Assert.fail(Assert.java:89)
        at org.junit.Assert.failNotEquals(Assert.java:835)
        at org.junit.Assert.assertEquals(Assert.java:647)
        at org.apache.iceberg.flink.actions.TestRewriteDataFilesAction.testRewriteAvoidRepeateCompress(TestRewriteDataFilesAction.java:367)

return 0;
}

switch (primitive.getCategory()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to align with the approach to estimate the avg width for each data type. I think the basic rule is: we need to read the GenericOrcWriters to see how those data types are encouded into the ORC column vector. That is the occupied in-memory byte size without any columnar compression.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The corresponding relationship is as follows:

Boolean	->	LongColumnVector
Byte	->	LongColumnVector
Short	->	LongColumnVector
INT	->	LongColumnVector
LONG 	->	LongColumnVector 	
FLOAT	->	DoubleColumnVector
DOUBLE	->	DoubleColumnVector
DATE	->	LongColumnVector
TIMESTAMP	->	TimestampColumnVector
BINARY	->	BytesColumnVector
STRING	->	BytesColumnVector
DECIMAL	->	Decimal18Writer or Decimal38Writer

The byte estimation corresponds:

LongColumnVector	->	8 byte
DoubleColumnVector	->	8 byte
TimestampColumnVector	->	12 byte
Decimal18Writer/Decimal38Writer	->	(precision + 4) / 2 byte
BytesColumnVector	->	128 byte

How about this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The estimated byte size of decimal is just: precision + 2. Just as I said in another comment, each digit will occupy just one byte. and in fact, the BigDecimal's unscaled value is usually a BigInteger, and the BigInteger will just encode each digit into a byte.

Copy link
Contributor

@kbendick kbendick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @hililiwei. Left some further comments.

Additionally, is it possible for these changes to be backported to earlier Spark versions in subsequent PRs to make reviewing easier? It's possible I missed some discussion on this, so let me know if so.

Comment on lines +75 to +77
this.avgRowByteSize =
OrcSchemaVisitor.visitSchema(orcSchema, new EstimateOrcAvgWidthVisitor()).stream().reduce(Integer::sum)
.orElse(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of orElse(0) concerns me somewhat.

Looking at its usage, it seems as though using avgRowByteSize of 0 would mean that the entirety of batch.size would be unaccounted for in the estimate in the length function.

return (long) (dataLength + (estimateMemory + (long) batch.size * avgRowByteSize) * 0.2);

Under what situations would we expect this to reasonably return 0? Is that possible / expected in some edge case, or more indicative of a bug?

Would it make sense to default to some non-zero value (even 1) so that the ongoing batch.size isn't entirely dropped?

At the very least, it seems like we should potentially log a debug message stating that 0 is being used. If user's are investigating ORC files being written at sizes they find strange, having a log would be beneficial.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially set it to 1, but as long as the Schema has a field, it won't be 0. Setting it to 1 might mask some exceptions. When the value is 0, we can raise an WARN in the log.

@hililiwei
Copy link
Contributor Author

hililiwei commented Mar 24, 2022

Thanks @hililiwei. Left some further comments.

Additionally, is it possible for these changes to be backported to earlier Spark versions in subsequent PRs to make reviewing easier? It's possible I missed some discussion on this, so let me know if so.

reverted old version changes for flink and spark.

@Override
protected FileWriter<T, DataWriteResult> newWriter(PartitionSpec spec, StructLike partition) {
// TODO: support ORC rolling writers
if (fileFormat == FileFormat.ORC) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When ORC support rolling writers, fileFormat is using in no where.
Should we deprecated/remove it?

Copy link
Contributor Author

@hililiwei hililiwei Mar 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these methods involve multiple spark/flink versions, I suggest that a separate PR cleans it up after this is done.

@hililiwei hililiwei requested a review from openinx March 28, 2022 01:40
Copy link
Member

@openinx openinx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me now.

@openinx openinx merged commit 6d39f3c into apache:master Mar 28, 2022
@openinx
Copy link
Member

openinx commented Mar 28, 2022

Got this merged now, thanks all for reviewing, and thanks @hililiwei for the contribution !

@hililiwei
Copy link
Contributor Author

Thanks openinx and all for reviewing. 😃
I'm going to start port it to multiple supported flink/spark versions and do some cleanup.

@chenwyi2
Copy link

chenwyi2 commented Sep 6, 2023

In OrcFileAppender, i found writer.estimateMemory() is 0 and writer.getStripes() is empty, why?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

ORC supports rolling writers.

9 participants