Skip to content

Parquet: avoid premature closing of FSDataInputStream object#3276

Closed
velctor wants to merge 2 commits intoapache:mainfrom
velctor:avoid-stream-closed
Closed

Parquet: avoid premature closing of FSDataInputStream object#3276
velctor wants to merge 2 commits intoapache:mainfrom
velctor:avoid-stream-closed

Conversation

@velctor
Copy link
Copy Markdown

@velctor velctor commented Oct 12, 2021

#90 adds a finalizer that closes open streams when they are garbage collected. But the org.apache.iceberg.io.SeekableStream object is discarded in the ParquetIO#stream,and HadoopStream#finalizer() is called, which causes the FSDataOutputStream to be closed early. This will result in deletefile reading fails with Failed to open Parquet file ... or Stream closed.

In order to avoid this, add the member variable icebergHadoopStream to the ParquetIO$ParquetInputFile class and and use it as the parameter of the ParquetIO#stream.

public SeekableInputStream newStream() throws IOException {
return stream(file.newStream());
icebergHadoopStream = file.newStream();
return stream(icebergHadoopStream);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't look correct to me. The InputFile is a factory interface and is not responsible for keeping references to the streams that are created.

Copy link
Copy Markdown
Author

@velctor velctor Oct 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that, but the org.apache.iceberg.io.SeekableStream object will be GC after this.f = file.newStream(); in org.apache.parquet.hadoop.ParquetFileReader class. It is difficult to modify the code in the parquet library, so this may be an easier way to solve this problem, under the premise of keeping #90. I want to know if you have any other ideas.

@rdblue
Copy link
Copy Markdown
Contributor

rdblue commented Oct 19, 2021

@velctor, can you post the problem that you're seeing? If you have a stream that was closed by the finalizer, how is anything still using it to hit the Stream closed error?

@velctor
Copy link
Copy Markdown
Author

velctor commented Oct 20, 2021

@velctor, can you post the problem that you're seeing? If you have a stream that was closed by the finalizer, how is anything still using it to hit the Stream closed error?

The error Stream closed occurs when the created stream is used to read the deletefile. The specific error is as follows:

Caused by: org.apache.iceberg.exceptions.RuntimeIOException: java.io.IOException: Stream is closed!
        at org.apache.iceberg.parquet.ParquetReader$FileIterator.advance(ParquetReader.java:135)
        at org.apache.iceberg.parquet.ParquetReader$FileIterator.next(ParquetReader.java:112)
        at org.apache.iceberg.io.CloseableIterable$ConcatCloseableIterable$ConcatCloseableIterator.next(CloseableIterable.java:206)
        at org.apache.iceberg.io.CloseableIterable$4$1.next(CloseableIterable.java:113)
        at org.apache.iceberg.relocated.com.google.common.collect.Iterators.addAll(Iterators.java:356)
        at org.apache.iceberg.relocated.com.google.common.collect.Iterables.addAll(Iterables.java:320)
        at org.apache.iceberg.deletes.Deletes.toEqualitySet(Deletes.java:83)
        at org.apache.iceberg.data.DeleteFilter.applyEqDeletes(DeleteFilter.java:137)
        at org.apache.iceberg.data.DeleteFilter.applyEqDeletes(DeleteFilter.java:166)
        at org.apache.iceberg.data.DeleteFilter.filter(DeleteFilter.java:112)
        at io.trino.plugin.iceberg.IcebergPageSource.getNextPage(IcebergPageSource.java:125)
        ... 12 more
Caused by: java.io.IOException: Stream is closed!
        at org.apache.hadoop.hdfs.DFSInputStream.seek(DFSInputStream.java:1453)
        at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:65)
        at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:65)
        at org.apache.parquet.hadoop.util.H2SeekableInputStream.seek(H2SeekableInputStream.java:60)
        at org.apache.parquet.hadoop.ParquetFileReader$ConsecutivePartList.readAll(ParquetFileReader.java:1687)
        at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:925)
        at org.apache.iceberg.parquet.ParquetReader$FileIterator.advance(ParquetReader.java:133)
        ... 22 more

@rdblue
Copy link
Copy Markdown
Contributor

rdblue commented Oct 20, 2021

@velctor, why do you think that the stream was closed by the finalizer and not by a normal call to close?

@velctor
Copy link
Copy Markdown
Author

velctor commented Oct 21, 2021

@velctor, why do you think that the stream was closed by the finalizer and not by a normal call to close?

In order to determine that the stream closed by the Finalizer thread is the same as the stream in the error log, I add
some informations to the log and recompile the relevant jar package. The final log is as follows:

2021-*-*T14:47:43.211+0800	DEBUG	20210927_064723_00001_ajsi3.1.0-13-103	org.apache.hadoop.fs.FileSystem	Bypassing cache to create filesystem ***00000-1-b5ccab76-0db2-4f2e-887f-f70b9833cb91-00227.parquet
2021-*-*T14:47:43.381+0800	DEBUG	Finalizer	org.apache.hadoop.hdfs.DFSClient	close DFSInputStream ***00000-1-b5ccab76-0db2-4f2e-887f-f70b9833cb91-00227.parquet  thread name:Finalizer  client:DFSClient_NONMAPREDUCE_373867554_103
2021-*-*T14:47:43.382+0800	DEBUG	20210927_064723_00001_ajsi3.1.0-13-103	org.apache.hadoop.hdfs.DFSClient	DFSInputStream has been closed already ***00000-1-b5ccab76-0db2-4f2e-887f-f70b9833cb91-00227.parquet  thread name:20210927_064723_00001_ajsi3.1.0-13-103  client:DFSClient_NONMAPREDUCE_373867554_103
org.apache.iceberg.exceptions.RuntimeIOException: Failed to open Parquet file: ***00000-1-b5ccab76-0db2-4f2e-887f-f70b9833cb91-00227.parquet, thread name: 20210927_064723_00001_ajsi3.1.0-13-103

From the client: DFSClient_NONMAPREDUCE_373867554_103 field in the newly added log, it can be seen that the file stream is indeed closed by the Finalizer thread.

In addition, when I applied the changes in this PR to iceberg, no related errors were reported after testing.

@velctor velctor requested a review from rdblue November 1, 2021 06:19
@rdblue
Copy link
Copy Markdown
Contributor

rdblue commented Nov 2, 2021

@velctor, it looks like that is a different finalizer. If you were seeing something that was closed by the Iceberg finalizer, you'd get a stack trace for where the stream was created and it would be no longer referenced. I think in this case, your FileSystem is getting closed by the finalizer, which closes all streams associated with it. That's why you get the DFS classes in those logs (I think). Are you not caching the FS instance in Hadoop's FileSystem class?

@velctor
Copy link
Copy Markdown
Author

velctor commented Nov 17, 2021

@velctor, it looks like that is a different finalizer. If you were seeing something that was closed by the Iceberg finalizer, you'd get a stack trace for where the stream was created and it would be no longer referenced. I think in this case, your FileSystem is getting closed by the finalizer, which closes all streams associated with it. That's why you get the DFS classes in those logs (I think). Are you not caching the FS instance in Hadoop's FileSystem class?

Oh, the stack information is too long, So I didn't add it before. The relevant complete information is as follows:

2021-11-17T16:50:01.177+0800	DEBUG	20211117_084955_00003_gwkuf.1.2-11-160	org.apache.hadoop.fs.FileSystem	Bypassing cache to create filesystem hdfs://***:9000/user/hive/warehouse/SingleTable2/data/00000-1-b5ccab76-0db2-4f2e-887f-f70b9833cb91-00533.parquet
2021-11-17T16:50:01.304+0800    WARN    Finalizer       org.apache.iceberg.hadoop.HadoopStreams call finalize by:
        org.apache.iceberg.hadoop.HadoopStreams$HadoopSeekableInputStream.finalize(HadoopStreams.java:125)
        java.base/java.lang.System$2.invokeFinalize(System.java:2125)
        java.base/java.lang.ref.Finalizer.runFinalizer(Finalizer.java:87)
        java.base/java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:171)
2021-11-17T16:50:01.304+0800    DEBUG   Finalizer       org.apache.hadoop.hdfs.DFSClient        close DFSInputStream /user/hive/warehouse/SingleTable2/data/00000-1-b5ccab76-0db2-4f2e-887f-f70b9833cb91-00533.parquet  thread name:Finalizer  client:DFSClient_NONMAPREDUCE_1873007395_160
2021-11-17T16:50:01.304+0800    DEBUG   Finalizer       org.apache.hadoop.hdfs.DFSClient        call DFSInputStream close by:
        org.apache.hadoop.hdfs.DFSInputStream.close(DFSInputStream.java:660)
        java.base/java.io.FilterInputStream.close(FilterInputStream.java:180)
        org.apache.iceberg.hadoop.HadoopStreams$HadoopSeekableInputStream.close(HadoopStreams.java:92)
        org.apache.iceberg.hadoop.HadoopStreams$HadoopSeekableInputStream.finalize(HadoopStreams.java:129)
        java.base/java.lang.System$2.invokeFinalize(System.java:2125)
        java.base/java.lang.ref.Finalizer.runFinalizer(Finalizer.java:87)
        java.base/java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:171)
2021-11-17T16:50:01.304+0800    WARN    Finalizer       org.apache.iceberg.hadoop.HadoopStreams Unclosed input stream created by:
        org.apache.iceberg.hadoop.HadoopStreams$HadoopSeekableInputStream.<init>(HadoopStreams.java:81)
        org.apache.iceberg.hadoop.HadoopStreams.wrap(HadoopStreams.java:56)
        org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:175)
        io.trino.plugin.hive.authentication.NoHdfsAuthentication.doAs(NoHdfsAuthentication.java:23)
        io.trino.plugin.hive.HdfsEnvironment.doAs(HdfsEnvironment.java:98)
        io.trino.plugin.iceberg.HdfsInputFile.newStream(HdfsInputFile.java:60)
        org.apache.iceberg.parquet.ParquetIO$ParquetInputFile.newStream(ParquetIO.java:181)
        org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:773)
        org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)
        org.apache.iceberg.parquet.ReadConf.newReader(ReadConf.java:218)
        org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:74)
        org.apache.iceberg.parquet.ParquetReader.init(ParquetReader.java:66)
        org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:77)
        org.apache.iceberg.io.CloseableIterable$ConcatCloseableIterable$ConcatCloseableIterator.hasNext(CloseableIterable.java:173)
        org.apache.iceberg.io.FilterIterator.advance(FilterIterator.java:65)
        org.apache.iceberg.io.FilterIterator.hasNext(FilterIterator.java:50)
        org.apache.iceberg.io.CloseableIterable$4$1.hasNext(CloseableIterable.java:108)
        org.apache.iceberg.io.CloseableIterable$ConcatCloseableIterable$ConcatCloseableIterator.hasNext(CloseableIterable.java:161)
        org.apache.iceberg.relocated.com.google.common.collect.Iterators.addAll(Iterators.java:355)
        org.apache.iceberg.relocated.com.google.common.collect.Sets.newHashSet(Sets.java:275)
        org.apache.iceberg.relocated.com.google.common.collect.Sets.newHashSet(Sets.java:258)
        org.apache.iceberg.deletes.Deletes.toPositionSet(Deletes.java:101)
        org.apache.iceberg.deletes.Deletes.toPositionSet(Deletes.java:96)
        org.apache.iceberg.deletes.Deletes.toPositionSet(Deletes.java:88)
        io.trino.plugin.iceberg.TrinoDeleteFilter.getPositionSet(TrinoDeleteFilter.java:146)
        io.trino.plugin.iceberg.IcebergDeleteFilter.<init>(IcebergDeleteFilter.java:44)
        io.trino.plugin.iceberg.IcebergPageSource.<init>(IcebergPageSource.java:62)
        io.trino.plugin.iceberg.IcebergPageSourceProvider.createPageSource(IcebergPageSourceProvider.java:215)   
    io.trino.plugin.base.classloader.ClassLoaderSafeConnectorPageSourceProvider.createPageSource(ClassLoaderSafeConnectorPageSourceProvider.java:49)
        io.trino.split.PageSourceManager.createPageSource(PageSourceManager.java:64)
        io.trino.operator.TableScanOperator.getOutput(TableScanOperator.java:306)
        io.trino.operator.Driver.processInternal(Driver.java:387)
        io.trino.operator.Driver.lambda$processFor$9(Driver.java:291)
        io.trino.operator.Driver.tryWithLock(Driver.java:683)
        io.trino.operator.Driver.processFor(Driver.java:284)
        io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1076)
        io.trino.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163)
        io.trino.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:484)
        io.trino.$gen.Trino_359____20211117_084839_2.run(Unknown Source)
        java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        java.base/java.lang.Thread.run(Thread.java:829)
2021-11-17T16:50:01.318+0800	DEBUG	20211117_084955_00003_gwkuf.1.2-11-160	org.apache.hadoop.hdfs.DFSClient	DFSInputStream has been closed already /user/hive/warehouse/SingleTable2/data/00000-1-b5ccab76-0db2-4f2e-887f-f70b9833cb91-00533.parquet  thread name:20211117_084955_00003_gwkuf.1.2-11-160  client:DFSClient_NONMAPREDUCE_1873007395_160
org.apache.iceberg.exceptions.RuntimeIOException: Failed to open Parquet file: hdfs://***:9000/user/hive/warehouse/SingleTable2/data/00000-1-b5ccab76-0db2-4f2e-887f-f70b9833cb91-00533.parquet, thread name: 20211117_084955_00003_gwkuf.1.2-11-160

2021-11-17T16:50:01.318+0800    DEBUG   20211117_084955_00003_gwkuf.1.2-11-160  org.apache.hadoop.hdfs.DFSClient        DFSInputStream has been closed already /user/hive/warehouse/SingleTable2/data/00000-1-b5ccab76-0db2-4f2e-887f-f70b9833cb91-00533.parquet  thread name:20211117_084955_00003_gwkuf.1.2-11-160  client:DFSClient_NONMAPREDUCE_1873007395_16

I have added the call stack log of the finalize and close functions. Together with the stack trace for where the stream was created, we can see that the stream is closed by the Finalizer.

@rdblue
Copy link
Copy Markdown
Contributor

rdblue commented Nov 18, 2021

Thanks for the logs! It's always helpful to have the full picture.

I think what's happening is that Trino is extracting the underlying FsDataInputStream and using it directly. The fix should be for whatever in Trino is doing that to keep a reference to the HadoopSeekableInputStream that Iceberg creates. I wasn't expecting to see io.trino.plugin in the stack trace. That tells me that Trino is injecting something that is non-Iceberg. Since Iceberg only accesses the FsDataInputStream from the objects that reference it, that's why I thought it wasn't possible for this to be the Iceberg finalizer. But if something gets the underlying stream and discards the Iceberg wrapper, that could cause this problem.

@electrum, can you help us take a look at this problem? Looks like something is getting the underlying stream from Iceberg and keeping a reference past when the reference to the Iceberg stream is garbage collected.

@electrum
Copy link
Copy Markdown
Contributor

We had this problem in Trino previously that @phd3 fixed in trinodb/trino#8504

@velctor It looks like you are using a patched version of the Trino Iceberg connector. Can you share the patch? I don't believe we see this issue today with the current version, so this might be related to the patch. Are you using a version of trinodb/trino#8534 from @jackye1995?

@electrum
Copy link
Copy Markdown
Contributor

Longer explanation of the problem: trinodb/trino#5201 (comment)

@electrum
Copy link
Copy Markdown
Contributor

Looking at this again with fresh eyes, the actual problem is that Trino's HdfsInputFile delegates to Iceberg's HadoopInputFile, which adds the wrapping. We should either subclass it and override the offending method, or simply re-implement the code rather than delegating (since it's fairly trivial).

@rdblue
Copy link
Copy Markdown
Contributor

rdblue commented Nov 18, 2021

Thanks, @electrum!

@velctor
Copy link
Copy Markdown
Author

velctor commented Nov 29, 2021

@rdblue @electrum Thanks very much! The version that I used is 359, I will merge with the PR trinodb/trino#8504 to test.

@github-actions
Copy link
Copy Markdown

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Jul 28, 2024
@github-actions
Copy link
Copy Markdown

github-actions bot commented Aug 5, 2024

This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Aug 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants