Skip to content

Conversation

@yihua
Copy link
Contributor

@yihua yihua commented Feb 24, 2022

What is the purpose of the pull request

This PR fixes the log file reader for S3 with hadoop-aws 2.7.x.

HoodieLogFileReader uses FSDataInputStream:seek API to scan a log file and load block content. In S3, S3AInputStream::seek implementation is used. For hadoop-aws 2.7.x, S3AInputStream throws EOFException when the seek position is the file size, as shown in the source code below:

    if (contentLength > 0 && pos > contentLength-1) {
      throw new EOFException(
          FSExceptionMessages.CANNOT_SEEK_PAST_EOF
          + " " + pos);
    }

This causes the EOFException for scanning the last block in a file, since the seek position is the file size. The problem can be reproduced by running hudi-cli metadata list-partitions on a Hudi MOR table on S3, with log files in metadata table.

The fix is to avoid seeking at the file size and handle the corrupt block checking differently.

Brief change log

  • Changes the logic of isBlockCorrupted() in HoodieLogFileReader to adapt to different seek behavior

Verify this pull request

The PR is verified through the following test scenarios. Two MOR tables are generated, with metadata table enabled and log files in metadata table, one on S3, another on local. The metadata table files partitions:

   121 Jan 28 23:03 .files-0000_00000000000000.log.1_0-0-0
  5746 Jan 28 23:04 .files-0000_00000000000000.log.1_0-21-249
  7697 Jan 28 23:04 .files-0000_00000000000000.log.2_0-34-260
  7696 Jan 28 23:13 .files-0000_00000000000000.log.3_0-16-175
    93 Jan 28 23:04 .hoodie_partition_metadata

Environments:

  • Docker demo environment: spark 2.4.4, hadoop-aws 2.7.4, aws-java-sdk 1.7.4, openjdk version "1.8.0_212"
cd $SPARK_HOME
cd jars/
curl https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar --output aws-java-sdk-1.7.4.jar
chmod 644 aws-java-sdk-1.7.4.jar
curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.4/hadoop-aws-2.7.4.jar -o hadoop-aws-2.7.4.jar 
chmod 644 hadoop-aws-2.7.4.jar
export CLIENT_JAR=/opt/spark/jars/hadoop-aws-2.7.4.jar:/opt/spark/jars/aws-java-sdk-1.7.4.jar
cd /var/hoodie/ws/
./hudi-cli/hudi-cli.sh 
  • Local environment: spark 3.2.0, hadoop-aws 3.3.1,aws-java-sdk-bundle 1.12.48, openjdk version "1.8.0_292"

Test scenarios:

  • Run hudi-cli metadata list-partitions on MOR table on S3. The partitions can be successfully listed.
  • Remove the tail byte from a log file, making the log block corrupted. The partitions can be successfully listed. Corrupted block can be identified:
22/02/24 21:12:30 INFO log.HoodieLogFileReader: Found corrupted block in file HoodieLogFile{pathStr='s3a://hudi-testing/metadata_test_table_22/.hoodie/metadata/files/.files-0000_00000000000000.log.1_0-21-249', fileLen=-1} with block size(5732) running past EOF
  • Run hudi-cli metadata list-partitions on MOR table on local. The partitions can be successfully listed.

Existing unit, functional, and integration tests cover the log file reader logic for HDFS.

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@yihua yihua force-pushed the HUDI-3341-fix-log-reader-for-s3 branch from 3e08c37 to d726c9c Compare February 25, 2022 00:29
@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@yihua yihua changed the title [WIP][HUDI-3341] Fix log file reader for S3 with hadoop-aws 2.7.x [HUDI-3341] Fix log file reader for S3 with hadoop-aws 2.7.x Feb 25, 2022
@yihua yihua requested a review from nsivabalan February 25, 2022 02:45
@yihua yihua assigned yihua and nsivabalan and unassigned yihua Feb 25, 2022
Copy link
Contributor

@zhangyue19921010 zhangyue19921010 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM Thanks a lot for this fix.
Also I tested this patch on our dev env. Take hudi-cli metadata list-partitions as an example:
Without this patch will throw exceptions

20068 [Spring Shell] INFO  org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader  - Scanning log file HoodieLogFile{pathStr='s3a://vir-dev-realtime/sos/yz/IT/cow_f_order_sa_delivered_hourly_yo/.hoodie/metadata/files/.files-0000_00000000000000.log.1_0-0-0', fileLen=0}
20068 [Spring Shell] INFO  org.apache.hudi.common.table.log.HoodieLogFileReader  - Found corrupted block in file HoodieLogFile{pathStr='s3a://vir-dev-realtime/sos/yz/IT/cow_f_order_sa_delivered_hourly_yo/.hoodie/metadata/files/.files-0000_00000000000000.log.1_0-0-0', fileLen=0} with block size(107) running past EOF
20069 [Spring Shell] INFO  org.apache.hudi.common.table.log.HoodieLogFileReader  - Log HoodieLogFile{pathStr='s3a://vir-dev-realtime/sos/yz/IT/cow_f_order_sa_delivered_hourly_yo/.hoodie/metadata/files/.files-0000_00000000000000.log.1_0-0-0', fileLen=0} has a corrupted block at 14
20069 [Spring Shell] ERROR org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader  - Got exception when reading log file
org.apache.hudi.exception.HoodieIOException: IOException when reading logblock from log file HoodieLogFile{pathStr='s3a://vir-dev-realtime/sos/yz/IT/cow_f_order_sa_delivered_hourly_yo/.hoodie/metadata/files/.files-0000_00000000000000.log.1_0-0-0', fileLen=0}
	at org.apache.hudi.common.table.log.HoodieLogFileReader.next(HoodieLogFileReader.java:409)
	at org.apache.hudi.common.table.log.HoodieLogFormatReader.next(HoodieLogFormatReader.java:120)
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:210)
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:178)
	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:103)
	at org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader.<init>(HoodieMetadataMergedLogRecordReader.java:71)
	at org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader.<init>(HoodieMetadataMergedLogRecordReader.java:51)
	at org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader$Builder.build(HoodieMetadataMergedLogRecordReader.java:246)
	at org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:346)
	at org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$openReadersIfNeeded$2(HoodieBackedTableMetadata.java:262)
	at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
	at org.apache.hudi.metadata.HoodieBackedTableMetadata.openReadersIfNeeded(HoodieBackedTableMetadata.java:239)
	at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:129)
	at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordByKey(HoodieBackedTableMetadata.java:124)
	at org.apache.hudi.metadata.BaseTableMetadata.fetchAllPartitionPaths(BaseTableMetadata.java:154)
	at org.apache.hudi.metadata.BaseTableMetadata.getAllPartitionPaths(BaseTableMetadata.java:98)
	at org.apache.hudi.cli.commands.MetadataCommand.listPartitions(MetadataCommand.java:206)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:216)
	at org.springframework.shell.core.SimpleExecutionStrategy.invoke(SimpleExecutionStrategy.java:68)
	at org.springframework.shell.core.SimpleExecutionStrategy.execute(SimpleExecutionStrategy.java:59)
	at org.springframework.shell.core.AbstractShell.executeCommand(AbstractShell.java:134)
	at org.springframework.shell.core.JLineShell.promptLoop(JLineShell.java:533)
	at org.springframework.shell.core.JLineShell.run(JLineShell.java:179)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Attempted read on closed stream.
	at org.apache.http.conn.EofSensorInputStream.isReadAllowed(EofSensorInputStream.java:109)
	at org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:135)
	at java.io.FilterInputStream.read(FilterInputStream.java:133)
	at java.io.FilterInputStream.read(FilterInputStream.java:133)
	at java.io.FilterInputStream.read(FilterInputStream.java:133)
	at com.amazonaws.util.ContentLengthValidationInputStream.read(ContentLengthValidationInputStream.java:77)
	at java.io.FilterInputStream.read(FilterInputStream.java:133)
	at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:160)
	at java.io.DataInputStream.read(DataInputStream.java:149)
	at java.io.DataInputStream.readFully(DataInputStream.java:195)
	at org.apache.hudi.common.table.log.HoodieLogFileReader.scanForNextAvailableBlockOffset(HoodieLogFileReader.java:338)
	at org.apache.hudi.common.table.log.HoodieLogFileReader.createCorruptBlock(HoodieLogFileReader.java:279)
	at org.apache.hudi.common.table.log.HoodieLogFileReader.readBlock(HoodieLogFileReader.java:204)
	at org.apache.hudi.common.table.log.HoodieLogFileReader.next(HoodieLogFileReader.java:407)
	... 27 more
20072 [Spring Shell] ERROR org.springframework.shell.core.SimpleExecutionStrategy  - Command failed org.apache.hudi.exception.HoodieMetadataException: Failed to retrieve list of partition from metadata
20072 [Spring Shell] WARN  org.springframework.shell.core.JLineShellComponent.exceptions  - Failed to retrieve list of partition from metadata
org.apache.hudi.exception.HoodieMetadataException: Failed to retrieve list of partition from metadata
	at org.apache.hudi.metadata.BaseTableMetadata.getAllPartitionPaths(BaseTableMetadata.java:100)
	at org.apache.hudi.cli.commands.MetadataCommand.listPartitions(MetadataCommand.java:206)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:216)
	at org.springframework.shell.core.SimpleExecutionStrategy.invoke(SimpleExecutionStrategy.java:68)
	at org.springframework.shell.core.SimpleExecutionStrategy.execute(SimpleExecutionStrategy.java:59)
	at org.springframework.shell.core.AbstractShell.executeCommand(AbstractShell.java:134)
	at org.springframework.shell.core.JLineShell.promptLoop(JLineShell.java:533)
	at org.springframework.shell.core.JLineShell.run(JLineShell.java:179)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hudi.exception.HoodieException: Exception when reading log file 
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:332)
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:178)
	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:103)
	at org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader.<init>(HoodieMetadataMergedLogRecordReader.java:71)
	at org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader.<init>(HoodieMetadataMergedLogRecordReader.java:51)
	at org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader$Builder.build(HoodieMetadataMergedLogRecordReader.java:246)
	at org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:346)
	at org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$openReadersIfNeeded$2(HoodieBackedTableMetadata.java:262)
	at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
	at org.apache.hudi.metadata.HoodieBackedTableMetadata.openReadersIfNeeded(HoodieBackedTableMetadata.java:239)
	at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:129)
	at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordByKey(HoodieBackedTableMetadata.java:124)
	at org.apache.hudi.metadata.BaseTableMetadata.fetchAllPartitionPaths(BaseTableMetadata.java:154)
	at org.apache.hudi.metadata.BaseTableMetadata.getAllPartitionPaths(BaseTableMetadata.java:98)
	... 12 more
Caused by: org.apache.hudi.exception.HoodieIOException: IOException when reading logblock from log file HoodieLogFile{pathStr='s3a://vir-dev-realtime/sos/yz/IT/cow_f_order_sa_delivered_hourly_yo/.hoodie/metadata/files/.files-0000_00000000000000.log.1_0-0-0', fileLen=0}
	at org.apache.hudi.common.table.log.HoodieLogFileReader.next(HoodieLogFileReader.java:409)
	at org.apache.hudi.common.table.log.HoodieLogFormatReader.next(HoodieLogFormatReader.java:120)
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:210)
	... 25 more
Caused by: java.io.IOException: Attempted read on closed stream.
	at org.apache.http.conn.EofSensorInputStream.isReadAllowed(EofSensorInputStream.java:109)
	at org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:135)
	at java.io.FilterInputStream.read(FilterInputStream.java:133)
	at java.io.FilterInputStream.read(FilterInputStream.java:133)
	at java.io.FilterInputStream.read(FilterInputStream.java:133)
	at com.amazonaws.util.ContentLengthValidationInputStream.read(ContentLengthValidationInputStream.java:77)
	at java.io.FilterInputStream.read(FilterInputStream.java:133)
	at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:160)
	at java.io.DataInputStream.read(DataInputStream.java:149)
	at java.io.DataInputStream.readFully(DataInputStream.java:195)
	at org.apache.hudi.common.table.log.HoodieLogFileReader.scanForNextAvailableBlockOffset(HoodieLogFileReader.java:338)
	at org.apache.hudi.common.table.log.HoodieLogFileReader.createCorruptBlock(HoodieLogFileReader.java:279)
	at org.apache.hudi.common.table.log.HoodieLogFileReader.readBlock(HoodieLogFileReader.java:204)
	at org.apache.hudi.common.table.log.HoodieLogFileReader.next(HoodieLogFileReader.java:407)
	... 27 more

After this patch it works correctly

22/02/25 05:46:04 INFO log.AbstractHoodieLogRecordReader: Number of remaining logblocks to merge 2
22/02/25 05:46:04 INFO log.AbstractHoodieLogRecordReader: Number of remaining logblocks to merge 1
22/02/25 05:46:04 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
22/02/25 05:46:04 INFO hfile.CacheConfig: Allocating LruBlockCache size=6.09 GB, blockSize=64 KB
22/02/25 05:46:04 INFO hfile.CacheConfig: Created cacheConfig: blockCache=LruBlockCache{blockCount=0, currentSize=6709144, freeSize=6534937192, maxSize=6541646336, heapSize=6709144, minSize=6214563840, minFactor=0.95, multiSize=3107281920, multiFactor=0.5, singleSize=1553640960, singleFactor=0.25}, cacheDataOnRead=true, cacheDataOnWrite=false, cacheIndexesOnWrite=false, cacheBloomsOnWrite=false, cacheEvictOnClose=false, cacheDataCompressed=false, prefetchOnOpen=false
22/02/25 05:46:04 INFO compress.CodecPool: Got brand-new decompressor [.gz]
22/02/25 05:46:04 INFO compress.CodecPool: Got brand-new decompressor [.gz]
22/02/25 05:46:04 INFO compress.CodecPool: Got brand-new decompressor [.gz]
22/02/25 05:46:04 INFO compress.CodecPool: Got brand-new decompressor [.gz]
22/02/25 05:46:04 INFO collection.ExternalSpillableMap: Estimated Payload size => 408
22/02/25 05:46:04 INFO log.AbstractHoodieLogRecordReader: Merging the final data blocks
22/02/25 05:46:04 INFO log.AbstractHoodieLogRecordReader: Number of remaining logblocks to merge 1
22/02/25 05:46:04 INFO hfile.CacheConfig: Created cacheConfig: blockCache=LruBlockCache{blockCount=0, currentSize=6709144, freeSize=6534937192, maxSize=6541646336, heapSize=6709144, minSize=6214563840, minFactor=0.95, multiSize=3107281920, multiFactor=0.5, singleSize=1553640960, singleFactor=0.25}, cacheDataOnRead=true, cacheDataOnWrite=false, cacheIndexesOnWrite=false, cacheBloomsOnWrite=false, cacheEvictOnClose=false, cacheDataCompressed=false, prefetchOnOpen=false
22/02/25 05:46:04 INFO compress.CodecPool: Got brand-new decompressor [.gz]
22/02/25 05:46:04 INFO compress.CodecPool: Got brand-new decompressor [.gz]
22/02/25 05:46:04 INFO compress.CodecPool: Got brand-new decompressor [.gz]
22/02/25 05:46:04 INFO compress.CodecPool: Got brand-new decompressor [.gz]
22/02/25 05:46:04 INFO log.HoodieMergedLogRecordScanner: Number of log files scanned => 3
22/02/25 05:46:04 INFO log.HoodieMergedLogRecordScanner: MaxMemoryInBytes allowed for compaction => 1073741824
22/02/25 05:46:04 INFO log.HoodieMergedLogRecordScanner: Number of entries in MemoryBasedMap in ExternalSpillableMap => 6
22/02/25 05:46:04 INFO log.HoodieMergedLogRecordScanner: Total size in bytes of MemoryBasedMap in ExternalSpillableMap => 2448
22/02/25 05:46:04 INFO log.HoodieMergedLogRecordScanner: Number of entries in BitCaskDiskMap in ExternalSpillableMap => 0
22/02/25 05:46:04 INFO log.HoodieMergedLogRecordScanner: Size of file spilled to disk => 0
22/02/25 05:46:04 INFO metadata.HoodieBackedTableMetadata: Opened 3 metadata log files (dataset instant=20220126110013865, metadata instant=20220126110013865) in 1670 ms
22/02/25 05:46:04 INFO metadata.BaseTableMetadata: Listed partitions from metadata: #partitions=5
22/02/25 05:46:04 INFO core.JLineShellComponent: ╔════════════╗
║ partition  ║
╠════════════╣
║ 2022012610 ║
╟────────────╢
║ 2022012606 ║
╟────────────╢
║ 2022012605 ║
╟────────────╢
║ 2022012604 ║
╟────────────╢
║ 2022012603 ║

@zhangyue19921010
Copy link
Contributor

By the way , using
Hudi: master
Spark: 2.4.4
JDK: 1.8.0_161

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. good find on the issue and elegant fix w/o making special fixes for S3AInputStream alone.

@nsivabalan nsivabalan merged commit 05e395a into apache:master Feb 28, 2022
@nsivabalan nsivabalan added the priority:critical Production degraded; pipelines stalled label Feb 28, 2022
rkkalluri pushed a commit to rkkalluri/hudi that referenced this pull request Mar 6, 2022
vingov pushed a commit to vingov/hudi that referenced this pull request Apr 3, 2022
stayrascal pushed a commit to stayrascal/hudi that referenced this pull request Apr 12, 2022
nsivabalan pushed a commit to nsivabalan/hudi that referenced this pull request May 2, 2023
nsivabalan added a commit to nsivabalan/hudi that referenced this pull request May 2, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

priority:critical Production degraded; pipelines stalled

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants