Skip to content

[SUPPORT] missing records when HoodieDeltaStreamer run in continuous mode #7757

@pushpavanthar

Description

@pushpavanthar

Tips before filing an issue

  • Have you gone through our FAQs? Yes

  • Join the mailing list to engage in conversations and get faster support at [email protected].

  • If you have triaged this as a bug, then file an issue directly. yes

Describe the problem you faced
I've beed running HoodieDeltaStreamer in continuous mode for COW tables in EMR for sometime. To validate the consistency of resultant table, I run a query comparing distinct primary keys created at each hour with the source table. I'm surprised to find some count of unique primary keys in the Hudi table. This implies that HoodieDeltaStreamer in continuous mode for COW tables, the data is not consistent. This issue reappears often in the different streams. Have captured driver logs for investigation.

To Reproduce

Steps to reproduce the behavior:

  1. Start EMR cluster with below configs

Screenshot 2023-01-26 at 8 04 18 PM

[{"classification":"hive-env", "properties":{}, "configurations":[{"classification":"export", "properties":{"HADOOP_HEAPSIZE":"5120"}, "configurations":[]}]},{"classification":"presto-connector-hive", "properties":{"hive.parquet.use-column-names":"true", "hive.s3.max-client-retries":"30", "hive.s3select-pushdown.max-connections":"6000", "hive.metastore":"glue", "hive.s3select-pushdown.enabled":"true", "hive.s3-file-system-type":"PRESTO"}, "configurations":[]},{"classification":"yarn-site", "properties":{"yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs":"600", "yarn.nodemanager.localizer.cache.target-size-mb":"10120", "yarn.nodemanager.localizer.cache.cleanup.interval-ms":"1500000"}, "configurations":[]},{"classification":"capacity-scheduler", "properties":{"yarn.scheduler.capacity.resource-calculator":"org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"}, "configurations":[]},{"classification":"spark-defaults", "properties":{"spark.driver.memory":"4096M", "spark.history.fs.cleaner.maxAge":"1h", "spark.blacklist.decommissioning.timeout":"600s", "spark.port.maxRetries":"32", "spark.history.fs.cleaner.interval":"1h", "spark.history.fs.cleaner.enabled":"true"}, "configurations":[]},{"classification":"emrfs-site", "properties":{"fs.s3.maxRetries":"50"}, "configurations":[]},{"classification":"hive-site", "properties":{"hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"}, "configurations":[]},{"classification":"spark-hive-site", "properties":{"hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"}, "configurations":[]}]
  1. Submit step to EMR to run deltastreamer as below
spark-submit --master yarn \
--jars /usr/lib/spark/external/lib/spark-avro.jar,s3://lake-bucket/jars/hudi-utilities-bundle_2.12-0.11.1.jar \
--files s3://artifact_bucket/config/hudi/log4j.properties \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
--conf spark.executor.cores=3 \
--conf spark.driver.memory=4g \
--conf spark.driver.memoryOverhead=1250m \
--conf spark.executor.memoryOverhead=1250m \
--conf spark.executor.memory=27g \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.initialExecutors=1 \
--conf spark.dynamicAllocation.minExecutors=1 \
--conf spark.dynamicAllocation.maxExecutors=3 \
--conf spark.scheduler.mode=FAIR \
--conf spark.task.maxFailures=5 \
--conf spark.rdd.compress=true \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.shuffle.service.enabled=true \
--conf spark.sql.hive.convertMetastoreParquet=false \
--conf spark.yarn.max.executor.failures=5 \
--conf spark.sql.catalogImplementation=hive \
--conf spark.driver.cores=3 \
--conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j.properties -XX:+UseG1GC -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=6 -XX:ParallelGCThreads=12 -XX:G1HeapRegionSize=33554432 -XX:G1HeapWastePercent=15 -XX:OnOutOfMemoryError='kill -9 %p'" \
--conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j.properties -XX:+UseG1GC -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=6 -XX:ParallelGCThreads=12 -XX:G1HeapRegionSize=33554432 -XX:G1HeapWastePercent=15 -XX:OnOutOfMemoryError='kill -9 %p'" \
--conf spark.app.name=cow_workflow_manager_service_kyc \
--conf spark.driver.userClassPathFirst=true \
--deploy-mode cluster s3://lake-bucket/jars/deltastreamer-addons-1.3.jar \
--enable-sync \
--hoodie-conf hoodie.deltastreamer.source.kafka.auto.reset.offsets=earliest \
--hoodie-conf hoodie.parquet.compression.codec=snappy \
--hoodie-conf partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor \
--hoodie-conf hive.metastore.disallow.incompatible.col.type.changes=false \
--hoodie-conf hoodie.deltastreamer.schemaprovider.spark_avro_post_processor.enable=false \
--hoodie-conf auto.offset.reset=earliest \
--hoodie-conf hoodie.clean.automatic=true \
--hoodie-conf hoodie.clean.async=true \
--hoodie-conf hoodie.clean.max.commits=30 \
--hoodie-conf hoodie.table.services.enabled=true \
--hoodie-conf hoodie.metadata.enable=false \
--table-type COPY_ON_WRITE \
--source-class com.domain.sources.ConfluentAvroKafkaSource \
--schemaprovider-class org.apache.hudi.utilities.schema.NullTargetSchemaRegistryProvider \
--props s3://artifact_bucket/config/kafka/kafka-source.properties \
--source-limit 1000000 \
--hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=https://schema-registry.in/subjects/workflow_manager_service.public.kyc-value/versions/latest \
--hoodie-conf hoodie.datasource.hive_sync.database=workflow_manager_service \
--hoodie-conf hoodie.datasource.hive_sync.table=kyc \
--hoodie-conf hoodie.datasource.write.recordkey.field=id \
--hoodie-conf hoodie.datasource.write.precombine.field=__lsn \
--hoodie-conf hoodie.deltastreamer.source.kafka.topic=workflow_manager_service.public.kyc \
--hoodie-conf group.id=cds-workflow_manager_service-kyc \
--hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_COMMITS \
--hoodie-conf hoodie.keep.max.commits=6000 \
--hoodie-conf hoodie.keep.min.commits=5000 \
--hoodie-conf hoodie.cleaner.commits.retained=4500 \
--hoodie-conf hoodie.cleaner.parallelism=500 \
--hoodie-conf hoodie.clean.allow.multiple=false \
--hoodie-conf hoodie.cleaner.incremental.mode=true \
--hoodie-conf hoodie.archive.async=true \
--hoodie-conf hoodie.archive.automatic=true \
--hoodie-conf hoodie.archive.merge.files.batch.size=60 \
--hoodie-conf hoodie.commits.archival.batch=30 \
--hoodie-conf hoodie.archive.delete.parallelism=500 \
--hoodie-conf hoodie.archive.merge.enable=true \
--hoodie-conf hoodie.clustering.inline=false \
--hoodie-conf hoodie.index.type=GLOBAL_BLOOM \
--hoodie-conf hoodie.write.markers.type=DIRECT \
--source-ordering-field __lsn \
--target-base-path s3://lake-bucket/raw-data/workflow_manager_service/kyc \
--target-table kyc \
--payload-class com.domain.payload.PostgresSoftDeleteDebeziumAvroPayload \
--hoodie-conf hoodie.bloom.index.update.partition.path=false \
--hoodie-conf hoodie.metrics.on=true \
--hoodie-conf hoodie.metrics.reporter.type=PROMETHEUS_PUSHGATEWAY \
--hoodie-conf hoodie.metrics.pushgateway.host=pushgateway.in \
--hoodie-conf hoodie.metrics.pushgateway.port=443 \
--hoodie-conf hoodie.metrics.pushgateway.delete.on.shutdown=false \
--hoodie-conf hoodie.metrics.pushgateway.job.name=hudi_continuous_workflow_manager_service_kyc_hudi \
--hoodie-conf hoodie.metrics.pushgateway.random.job.name.suffix=false \
--hoodie-conf hoodie.metrics.reporter.metricsname.prefix=hudi \
--hoodie-conf hoodie.datasource.write.partitionpath.field='' \
--hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator \
--hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor \
--transformer-class com.domain.transform.DebeziumTransformer \
--hoodie-conf hoodie.deltastreamer.source.kafka.enable.commit.offset=true \
--sync-tool-classes org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool \
--continuous
  1. Keep it running for sometime maybe few days and keep comparing with below query. This confirms data missing in Hudi tables compared when compared with raw data(debezium output) retained into s3.

Screenshot 2023-01-26 at 8 40 41 PM

with raw_table as (
  select 'workflow_manager_service_public_kyc' as table_name, date_format(from_iso8601_timestamp(created_at), '%Y%m%d %H') as created_at,
    count(distinct id) as raw_count
  FROM raw_cdc_db.raw_workflow_manager_service_public_kyc
  where date_format(from_iso8601_timestamp(created_at), '%Y%m%d') >= date_format((NOW() - INTERVAL '3' DAY), '%Y%m%d')
    and date_format(from_iso8601_timestamp(created_at), '%Y%m%d %H') <= date_format((NOW() - INTERVAL '1' HOUR), '%Y%m%d %H')
    and dt >= date_format((NOW() - INTERVAL '7' DAY), '%Y%m%d')
  group by 1, 2
),
hudi_table as (
  select 'workflow_manager_service_public_kyc' as table_name, date_format(from_iso8601_timestamp(created_at), '%Y%m%d %H') as p_date,
    count(1) as hudi_count
  from workflow_manager_service.kyc where date_format(from_iso8601_timestamp(created_at), '%Y%m%d') >= date_format((NOW() - INTERVAL '3' DAY), '%Y%m%d')
  and date_format(from_iso8601_timestamp(created_at), '%Y%m%d %H') <= date_format((NOW() - INTERVAL '1' HOUR), '%Y%m%d %H')
  group by 1, 2
),
union_table as (
  select coalesce(l.table_name, r.table_name) as table_name, 
    coalesce(l.created_at, r.p_date) as create_at_dt,
    l.raw_count,
    r.hudi_count
  from raw_table l
    FULL OUTER JOIN hudi_table r on (l.created_at = r.p_date and l.table_name = r.table_name)
)
select *
from union_table
where raw_count is NULL or hudi_count is NULL
  or raw_count != hudi_count

Expected behavior
Hudi table should have exact number of records for each hour when compared with raw events. Hudi having less records compared to Raw table implies that data is missing from Hudi. Can someone help me where to start with to debug this issue?

Environment Description

  • Hudi version : 0.11.1

  • Spark version : 3.1.1

  • Hive version : 3.1.2

  • Hadoop version : Amazon 3.2.1

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : No

Additional context
Driver logs are retained if it can be of some help for analysis.

Metadata

Metadata

Assignees

Type

No type

Projects

Status

✅ Done

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions