Skip to content

[SUPPORT] Appeding to files during UPSERT causes executors to die due to memory issues. #7062

@HEPBO3AH

Description

@HEPBO3AH

Describe the problem you faced

During the run of the upsert, we sometimes have executors die due to the memory issue. This issue usally cannot be resolved over the 4 retries and the entire job fail. Please look at the executor stack trace below.

We tracked this issue down to the HoodieMergeHandle class. Please have a look at the To Reproduce section for more details.

To Reproduce

We had hard time replicating the issue in a small example which we can share with the Hudi team.
The main condition for the issue to appear was appending data to exsiting small files during upsert.

For us, the appending of the data cupled with an AvgRecordSize estimation being too small due to clustering triggered the OOM issue quite often. After the fix of the AvgRecordSize estimate this issue is still there but it was not as prevalent.

In our code conitions are following:

PARQUET_SMALL_FILE_LIMIT = 67108864 //64MB
PARQUET_MAX_FILE_SIZE = 83886080 //80MB
INLINE_CLUSTERING = true
INLINE_CLUSTERING_MAX_COMMITS = 2
PLAN_STRATEGY_SMALL_FILE_LIMIT = 73400320 //70MB
PLAN_STRATEGY_TARGET_FILE_MAX_BYTES = 209715200 //200MB
TABLE_TYPE = COW
INDEX_TYPE = BLOOM

When the AvgRecordSize estimate was wrong, the Hudi would pack many more records into a single file than it should. Those executors which didn't fail produced files of 250MB in size, well above the target of 80MB.

We tracked this issue down to the HoodieMergeHandle class.

Why we suspect there there is an issue with this class?
The reason we think there is an issue with the class is that when we change settings to following:

PARQUET_MAX_FILE_SIZE = 262144000 // 250MB
PARQUET_SMALL_FILE_LIMIT = 0 // dont append to small files

We noticed that the class HoodieMergeHandle is not being used due to PARQUET_SMALL_FILE_LIMIT = 0 and the job passes successfully.
This indicates that the problem is not in the amount of data being processed by single executor but rather the part which appends to files.

Expected behavior

When appending to exsiting files the job should succeed similar to how it does when writing large files.

Environment Description

AWS Glue 3.0 running on G2X machines

  • Hudi version : 0.11

  • Spark version : 3.1

  • Hive version : NA

  • Hadoop version : N/A

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : No

Additional context

Add any other context about the problem here.

Stacktrace

2022-10-06 06:38:15,938 INFO [Executor task launch worker for task 0.0 in stage 39.1 (TID 18599)] io.HoodieMergeHandle (HoodieMergeHandle.java:init(249)): Number of entries in MemoryBasedMap => 601116, Total size in bytes of MemoryBasedMap => 858994816, Number of entries in BitCaskDiskMap => 5477112, Size of file spilled to disk => 5073839525  
2022-10-06 06:38:15,939 INFO [Executor task launch worker for task 0.0 in stage 39.1 (TID 18599)] io.HoodieMergeHandle (HoodieMergeHandle.java:init(161)): partitionPath:<pathtopartition>, fileId to be merged:20404289-5191-4634-b3bb-217d1f341940-0  

2022-10-06 06:38:15,963 INFO [Executor task launch worker for task 0.0 in stage 39.1 (TID 18599)] io.HoodieMergeHandle (HoodieMergeHandle.java:init(177)): Merging new data into oldPath s3a://<oldparquetfile>, as newPath s3a://<newparquetfile>  

2022-10-06 06:38:16,000 INFO [Executor task launch worker for task 0.0 in stage 39.1 (TID 18599)] marker.DirectWriteMarkers (DirectWriteMarkers.java:create(173)): Creating Marker Path=s3a://<pathtomarker>.parquet.marker.MERGE  

2022-10-06 06:38:16,209 INFO [Executor task launch worker for task 0.0 in stage 39.1 (TID 18599)] marker.DirectWriteMarkers (DirectWriteMarkers.java:create(178)): [direct] Created marker file s3a://<pathtomarker>.parquet.marker.MERGE in 246 ms  

2022-10-06 06:38:16,419 INFO [producer-thread-1] queue.IteratorBasedQueueProducer (IteratorBasedQueueProducer.java:produce(44)): starting to buffer records  

2022-10-06 06:38:16,419 INFO [consumer-thread-1] queue.BoundedInMemoryExecutor (BoundedInMemoryExecutor.java:lambda$null$2(131)): starting consumer thread  

2022-10-06 06:38:16,460 INFO [producer-thread-1] s3a.S3AInputStream (S3AInputStream.java:seekInStream(304)): Switching to Random IO seek policy  

2022-10-06 06:38:34,399 INFO [producer-thread-1] queue.IteratorBasedQueueProducer (IteratorBasedQueueProducer.java:produce(48)): finished buffering records  

2022-10-06 06:38:34,402 INFO [consumer-thread-1] queue.BoundedInMemoryExecutor (BoundedInMemoryExecutor.java:lambda$null$2(135)): Queue Consumption is done; notifying producer threads
2022-10-06 06:39:14,505 ERROR [Executor task launch worker for task 0.0 in stage 39.1 (TID 18599)] executor.Executor (Logging.scala:logError(94)): Exception in task 0.0 in stage 39.1 (TID 18599)
org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :5
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.exception.HoodieIOException: Unable to readFromDisk Hoodie Record from disk
	at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:214)
	at org.apache.hudi.common.util.collection.LazyFileIterable$LazyFileIterator.next(LazyFileIterable.java:102)
	at org.apache.hudi.common.util.collection.ExternalSpillableMap$IteratorWrapper.next(ExternalSpillableMap.java:332)
	at org.apache.hudi.io.HoodieMergeHandle.writeIncomingRecords(HoodieMergeHandle.java:378)
	at org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:388)
	at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:154)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
	... 28 more
Caused by: java.io.IOException: Cannot allocate memory
	at java.io.RandomAccessFile.readBytes(Native Method)
	at java.io.RandomAccessFile.read(RandomAccessFile.java:377)
	at org.apache.hudi.common.util.BufferedRandomAccessFile.fillBuffer(BufferedRandomAccessFile.java:174)
	at org.apache.hudi.common.util.BufferedRandomAccessFile.seek(BufferedRandomAccessFile.java:220)
	at org.apache.hudi.common.util.BufferedRandomAccessFile.loadNewBlockToBuffer(BufferedRandomAccessFile.java:268)
	at org.apache.hudi.common.util.BufferedRandomAccessFile.read(BufferedRandomAccessFile.java:311)
	at java.io.RandomAccessFile.readFully(RandomAccessFile.java:436)
	at org.apache.hudi.common.util.SpillableMapUtils.readInternal(SpillableMapUtils.java:70)
	at org.apache.hudi.common.util.SpillableMapUtils.readBytesFromDisk(SpillableMapUtils.java:50)
	at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:208)
	... 36 more
2022-10-06 06:40:20,294 ERROR [shuffle-server-6-5] server.ChunkFetchRequestHandler (ChunkFetchRequestHandler.java:lambda$respond$1(148)): Error sending result ChunkFetchSuccess[streamChunkId=StreamChunkId[streamId=1351148232721,chunkIndex=0],buffer=FileSegmentManagedBuffer[file=/tmp/blockmgr-505d9da3-7ed3-4a7c-9f23-3b0464c6d313/09/shuffle_11_18220_0.data,offset=487986,length=1570101]] to /172.34.92.1:41572; closing connection
java.io.IOException: Cannot allocate memory
	at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
	at sun.nio.ch.FileChannelImpl.transferToDirectlyInternal(FileChannelImpl.java:428)
	at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:493)
	at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:605)
	at io.netty.channel.DefaultFileRegion.transferTo(DefaultFileRegion.java:130)
	at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:121)
	at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:362)
	at io.netty.channel.nio.AbstractNioByteChannel.doWriteInternal(AbstractNioByteChannel.java:235)
	at io.netty.channel.nio.AbstractNioByteChannel.doWrite0(AbstractNioByteChannel.java:209)
	at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:400)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:930)
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:897)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
	at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
	at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
	at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
	at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:808)
	at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1025)
	at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:294)
	at org.apache.spark.network.server.ChunkFetchRequestHandler.respond(ChunkFetchRequestHandler.java:142)
	at org.apache.spark.network.server.ChunkFetchRequestHandler.processFetchRequest(ChunkFetchRequestHandler.java:116)
	at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:107)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:750)

Metadata

Metadata

Assignees

Labels

area:writerWrite client and core write operationspriority:highSignificant impact; potential bugs

Type

No type

Projects

Status

🚧 Needs Repro

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions