Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incorrect Results and SIGSEGV on Read with Iceberg + PySpark + Nessie #12178

Open
1 of 3 tasks
agrubb86 opened this issue Feb 5, 2025 · 2 comments
Open
1 of 3 tasks
Labels
bug Something isn't working

Comments

@agrubb86
Copy link

agrubb86 commented Feb 5, 2025

Apache Iceberg version

1.7.1 (latest release)

Query engine

Spark

Please describe the bug 🐞

To quickly summarize the issue, when writing to then reading from an Iceberg table stored in Nessie using PySpark, query results when reading are incorrect (data in the parquet file is correct) and certain types of queries cause SIGSEGVs in the Spark executors. I'm putting these into a single issue as I only encounter the SIGSEGVs when reading data incorrectly in certain types of queries. With queries that do cause SIGSEGVs, the number of SIGSEGVs encountered per query ranges from 0 - 4 (job is killed after 4 executor failures).

Environment:

- aarch64
- Java 17.0.13+11
- Spark 3.5.4
- Iceberg 1.7.1
- Nessie 0.101.2
- EKS on AWS

Jars added to Spark:

- avatica-core-1.25.0.jar
- aws-java-sdk-bundle-1.12.262.jar
- hadoop-aws-3.3.4.jar
- hbase-shaded-mapreduce-2.4.15.jar
- hbase-spark-1.1.0-SNAPSHOT.jar
- hbase-spark-protocol-shaded-1.1.0-SNAPSHOT.jar
- htrace-core4-4.2.0-incubating.jar
- httpclient5-5.4.1.jar
- httpcore5-5.3.1.jar
- httpcore5-h2-5.3.1.jar
- iceberg-aws-bundle-1.7.1.jar
- iceberg-spark-runtime-3.5_2.12-1.7.1.jar
- log4j-slf4j-impl-2.18.0.jar
- mysql-connector-j-9.1.0.jar
- nessie-spark-extensions-3.5_2.12-0.101.2.jar
- protobuf-java-3.21.9.jar
- slf4j-api-1.7.36.jar
- spark-hadoop-cloud_2.12-3.5.4.jar

(Potentially) Relevant Spark configs:

spark.hadoop.fs.s3a.impl                   org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.s3a.connection.ssl.enabled false
spark.hadoop.fs.s3a.fast.upload            true
spark.hive.metastore.uris                  thrift://XXX:9083
spark.kryo.registrationRequired            false
spark.kryo.unsafe                          false
spark.kryoserializer.buffer                1m
spark.kryoserializer.buffer.max            1g
spark.serializer                           org.apache.spark.serializer.KryoSerializer
spark.shuffle.sort.io.plugin.class         org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
spark.shuffle.useOldFetchProtocol          true
spark.sql.catalogImplementation            hive
spark.sql.catalog.nessie                   org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.nessie.auth_type         NONE
spark.sql.catalog.nessie.catalog-impl      org.apache.iceberg.nessie.NessieCatalog
spark.sql.catalog.nessie.ref               dev
spark.sql.catalog.nessie.uri               http://XXX:19120/api/v1
spark.sql.catalog.nessie.warehouse         s3a://XXX/
spark.sql.execution.arrow.pyspark.enabled  true
spark.sql.extensions                       org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions
spark.sql.sources.partitionOverwriteMode   dynamic
spark.sql.warehouse.dir                    s3a://XXX/
spark.submit.deployMode                    cluster

Source table definition:

CREATE TABLE source_table (
  id1 STRING,
  id2 STRING,
  pt_col1 STRING,
  cnt INT,
  dt STRING)
USING iceberg
PARTITIONED BY (dt, pt_col1)
TBLPROPERTIES (
  'format' = 'iceberg/parquet',
  'format-version' = '2',
  'gc.enabled' = 'false',
  'write.metadata.delete-after-commit.enabled' = 'false',
  'write.parquet.compression-codec' = 'zstd')

Example source data:

id1                | id2         | pt_col1 | cnt | dt
1a3bcbf1-f334-4408 | example.com | val1    | 1   | 2025-01-25

Sink table definition:

CREATE TABLE sink_table (
  id1 STRING,
  pt_col1 STRING,
  distinct_id2 INT,
  dt STRING)
USING iceberg
PARTITIONED BY (dt, pt_col1)
TBLPROPERTIES (
  'format' = 'iceberg/parquet',
  'format-version' = '2',
  'gc.enabled' = 'false',
  'write.metadata.delete-after-commit.enabled' = 'false',
  'write.parquet.compression-codec' = 'zstd')

Insert query:

INSERT OVERWRITE TABLE sink_table
    PARTITION (dt = '2025-01-26')
SELECT id1,
       pt_col1,
       COUNT(DISTINCT id2)
FROM source_table
WHERE dt >= '2024-10-29'
  AND dt <= '2025-01-26'
GROUP BY 1, 2

At this point, data in the parquet files is correct according to hadoop jar parquet-tools-1.11.0.jar dump, i.e. all id1's have distinct_id2 >= 1

Data stats:

dt         | pt_col1 | count(1)
2025-01-25 | val2    | 3402
2025-01-25 | val1    | 1022
2025-01-24 | val1    | 519
2025-01-24 | val2    | 1777
2025-01-26 | val2    | 5158
2025-01-26 | val1    | 1524

Note that dt='2025-01-26', val='val2' is the only partition combination where data incorrectness and SIGSEGVs are encountered, and num_rows_incorrect = num_total_rows - 5000 with SELECT * . The list of id1's that are incorrect is the same each time the insert query is run.

The following queries return correct results without SIGSEGV:

SELECT [* / COUNT(*)] 
FROM sink_table 
WHERE distinct_id2 = 0

Returns: [0 rows / 0]

SELECT COUNT(*) 
FROM sink_table 
WHERE dt = '2025-01-26
  AND pt_col1 = 'val2'

Returns: 5158 rows

The following query returns correct results with SIGSEGV:

SELECT id1, 
       count(*) 
FROM sink_table 
WHERE distinct_id2 = 2 
GROUP BY 1

Returns: 1 row

The following queries return incorrect results without SIGSEGV:

SELECT * 
FROM sink_table
WHERE dt = '2025-01-26' 
  AND pt_col1 = 'val2'

Returns: 158 rows with distinct_id2 = 0
Expected: 0 rows with distinct_id2 = 0

INSERT OVERWRITE TABLE sink_table_clone
SELECT *
FROM sink_table
WHERE dt = '2025-01-26'

Writes a parquet file with distinct_id2 = 0 as the value for 158 rows

The following queries return incorrect results with SIGSEGV:

SELECT * 
FROM sink_table 
WHERE dt = '2025-01-26
  AND pt_col1 = 'val2'
ORDER BY distinct_id2 DESC

Returns: 158 rows with distinct_id2 = 0
Expected: 0 rows with distinct_id2 = 0

SELECT id1, 
       count(*) 
FROM sink_table 
WHERE distinct_id2 = 1 
GROUP BY 1

Returns: 4999 rows
Expected: 5157 rows

Segfaults:

25/02/04 10:50:03 INFO Executor: Running task 0.0 in stage 249.0 (TID 197)
25/02/04 10:50:03 INFO MapOutputTrackerWorker: Updating epoch to 89 and clearing cache
25/02/04 10:50:03 INFO TorrentBroadcast: Started reading broadcast variable 268 with 1 pieces (estimated total size 4.0 MiB)
25/02/04 10:50:03 INFO MemoryStore: Block broadcast_268_piece0 stored as bytes in memory (estimated size 21.3 KiB, free 5.9 GiB)
25/02/04 10:50:03 INFO TorrentBroadcast: Reading broadcast variable 268 took 4 ms
25/02/04 10:50:03 INFO MemoryStore: Block broadcast_268 stored as values in memory (estimated size 48.2 KiB, free 5.9 GiB)
#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x0000ffff99f54ae0, pid=15, tid=282
#
# JRE version: OpenJDK Runtime Environment Temurin-17.0.13+11 (17.0.13+11) (build 17.0.13+11)
# Java VM: OpenJDK 64-Bit Server VM Temurin-17.0.13+11 (17.0.13+11, mixed mode, sharing, tiered, compressed oops, compressed class ptrs, g1 gc, linux-aarch64)
# Problematic frame:
# V  [libjvm.so+0xd44ae0]  ConcurrentHashTable<SymbolTableConfig, (MEMFLAGS)10>::Node* ConcurrentHashTable<SymbolTableConfig, (MEMFLAGS)10>::get_node<SymbolTableLookup>(ConcurrentHashTable<SymbolTableConfig, (MEMFLAGS)10>::Bucket const*, SymbolTableLookup&, bool*, unsigned long*) const [clone .isra.0]+0x50

This segfault is seen the overwhelming majority of the time. The other three were seen either once or twice each. The attached core dump is for this segfault; I wasn't able to retrieve the core dumps for the others due to the pod going away too quickly.
core_dump.txt

25/02/04 09:58:36 INFO Executor: Running task 0.0 in stage 201.0 (TID 159)
25/02/04 09:58:36 INFO TorrentBroadcast: Started reading broadcast variable 211 with 1 pieces (estimated total size 4.0 MiB)
25/02/04 09:58:36 INFO MemoryStore: Block broadcast_211_piece0 stored as bytes in memory (estimated size 12.2 KiB, free 5.9 GiB)
25/02/04 09:58:36 INFO TorrentBroadcast: Reading broadcast variable 211 took 4 ms
25/02/04 09:58:36 INFO MemoryStore: Block broadcast_211 stored as values in memory (estimated size 29.0 KiB, free 5.9 GiB)
25/02/04 09:58:36 INFO TransportClientFactory: Found inactive connection to spark-2f557594cfd0870d-driver-svc.spark-dev.svc/10.3.26.54:7078, creating a new one.
25/02/04 09:58:36 INFO TransportClientFactory: Successfully created connection to spark-2f557594cfd0870d-driver-svc.spark-dev.svc/10.3.26.54:7078 after 0 ms (0 ms spent in bootstraps)
25/02/04 09:58:36 INFO CodeGenerator: Code generated in 46.470428 ms
25/02/04 09:58:36 INFO SerializableTableWithSize: Releasing resources
25/02/04 09:58:36 INFO SerializableTableWithSize: Releasing resources
25/02/04 09:58:36 INFO CodeGenerator: Code generated in 15.156982 ms
25/02/04 09:58:36 INFO TorrentBroadcast: Started reading broadcast variable 210 with 1 pieces (estimated total size 4.0 MiB)
25/02/04 09:58:36 INFO MemoryStore: Block broadcast_210_piece0 stored as bytes in memory (estimated size 30.8 KiB, free 5.8 GiB)
25/02/04 09:58:36 INFO TorrentBroadcast: Reading broadcast variable 210 took 4 ms
25/02/04 09:58:36 INFO MemoryStore: Block broadcast_210 stored as values in memory (estimated size 32.0 KiB, free 5.8 GiB)
25/02/04 09:58:36 INFO S3AInputStream: Switching to Random IO seek policy
25/02/04 09:58:36 INFO CodecPool: Got brand-new decompressor [.zstd]
25/02/04 09:58:36 INFO S3AInputStream: Switching to Random IO seek policy
25/02/04 09:58:36 INFO CodecPool: Got brand-new decompressor [.zstd]
#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x0000ffff9093553c, pid=17, tid=162
#
# JRE version: OpenJDK Runtime Environment Temurin-17.0.13+11 (17.0.13+11) (build 17.0.13+11)
# Java VM: OpenJDK 64-Bit Server VM Temurin-17.0.13+11 (17.0.13+11, mixed mode, sharing, tiered, compressed oops, compressed class ptrs, g1 gc, linux-aarch64)
# Problematic frame:
# j  org.apache.iceberg.shaded.io.netty.util.internal.InternalThreadLocalMap.slowGet()Lorg/apache/iceberg/shaded/io/netty/util/internal/InternalThreadLocalMap;+0
25/02/04 12:39:14 INFO Executor: Running task 0.0 in stage 286.3 (TID 233)
25/02/04 12:39:14 INFO MapOutputTrackerWorker: Updating epoch to 106 and clearing cache
25/02/04 12:39:14 INFO TorrentBroadcast: Started reading broadcast variable 333 with 1 pieces (estimated total size 4.0 MiB)
25/02/04 12:39:14 INFO MemoryStore: Block broadcast_333_piece0 stored as bytes in memory (estimated size 18.0 KiB, free 5.9 GiB)
25/02/04 12:39:14 INFO TorrentBroadcast: Reading broadcast variable 333 took 13 ms
25/02/04 12:39:14 INFO MemoryStore: Block broadcast_333 stored as values in memory (estimated size 39.2 KiB, free 5.9 GiB)
25/02/04 12:39:14 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 79, fetching them
25/02/04 12:39:14 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://[email protected]:7078)
25/02/04 12:39:14 INFO MapOutputTrackerWorker: Got the map output locations
25/02/04 12:39:14 INFO ShuffleBlockFetcherIterator: Getting 200 (218.1 KiB) non-empty blocks including 200 (218.1 KiB) local and 0 (0.0 B) host-local and 0 (0.0 B) push-merged-local and 0 (0.0 B) remote blocks
25/02/04 12:39:14 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 13 ms
#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x0000ffff7f7fbee0, pid=15, tid=59
#
# JRE version: OpenJDK Runtime Environment Temurin-17.0.13+11 (17.0.13+11) (build 17.0.13+11)
# Java VM: OpenJDK 64-Bit Server VM Temurin-17.0.13+11 (17.0.13+11, mixed mode, sharing, tiered, compressed oops, compressed class ptrs, g1 gc, linux-aarch64)
# Problematic frame:
# V  [libjvm.so+0x7cbee0]  AccessInternal::PostRuntimeDispatch<G1BarrierSet::AccessBarrier<282726ul, G1BarrierSet>, (AccessInternal::BarrierType)3, 282726ul>::oop_access_barrier(oopDesc*, long)+0x0
25/02/04 12:30:02 INFO Executor: Running task 0.0 in stage 286.0 (TID 224)
25/02/04 12:30:02 INFO MapOutputTrackerWorker: Updating epoch to 100 and clearing cache
25/02/04 12:30:02 INFO TorrentBroadcast: Started reading broadcast variable 327 with 1 pieces (estimated total size 4.0 MiB)
25/02/04 12:30:02 INFO MemoryStore: Block broadcast_327_piece0 stored as bytes in memory (estimated size 18.0 KiB, free 5.9 GiB)
25/02/04 12:30:02 INFO TorrentBroadcast: Reading broadcast variable 327 took 6 ms
25/02/04 12:30:02 INFO MemoryStore: Block broadcast_327 stored as values in memory (estimated size 39.2 KiB, free 5.9 GiB)
25/02/04 12:30:02 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 79, fetching them
25/02/04 12:30:02 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://[email protected]:7078)
25/02/04 12:30:02 INFO MapOutputTrackerWorker: Got the map output locations
25/02/04 12:30:02 INFO ShuffleBlockFetcherIterator: Getting 200 (218.1 KiB) non-empty blocks including 200 (218.1 KiB) local and 0 (0.0 B) host-local and 0 (0.0 B) push-merged-local and 0 (0.0 B) remote blocks
25/02/04 12:30:02 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 2 ms
25/02/04 12:30:02 INFO CodeGenerator: Code generated in 16.769295 ms
#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x0000ffff9421465c, pid=15, tid=39
#
# JRE version: OpenJDK Runtime Environment Temurin-17.0.13+11 (17.0.13+11) (build 17.0.13+11)
# Java VM: OpenJDK 64-Bit Server VM Temurin-17.0.13+11 (17.0.13+11, mixed mode, sharing, tiered, compressed oops, compressed class ptrs, g1 gc, linux-aarch64)
# Problematic frame:
# V  [libjvm.so+0x6b465c]  void WeakProcessor::Task::work<G1STWIsAliveClosure, G1KeepAliveClosure>(unsigned int, G1STWIsAliveClosure*, G1KeepAliveClosure*)+0x1fc

Please let me know if I can be of further assistance.

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time
@agrubb86 agrubb86 added the bug Something isn't working label Feb 5, 2025
@pan3793
Copy link
Member

pan3793 commented Feb 6, 2025

see #11731

@agrubb86
Copy link
Author

agrubb86 commented Feb 6, 2025

Thank you very much @pan3793

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants