Skip to content

[SUPPORT] An exception occurs when using OCC and the second concurrent writer #3598

@Tandoy

Description

@Tandoy

Describe the problem you faced
HoodieDeltaStreamerException when using OCC and a second concurrent writer
that after a few commits from each the deltastreamer is likely to fail with the above exception when the datasource writer creates non-isolated inflight commits

To Reproduce
Steps to reproduce the behavior:
1.Start a deltastreamer job against some table Foo

spark-submit \
--master yarn \
--driver-memory 1G \
--num-executors 2 \
--executor-memory 1G \
--executor-cores 4 \
--deploy-mode cluster \
--conf spark.yarn.executor.memoryOverhead=512 \
--conf spark.yarn.driver.memoryOverhead=512 \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls /home/appuser/tangzhi/hudi-0.8/hudi-release-0.8.0/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.8.0.jar` \
--props file:///opt/apps/hudi/hudi-utilities/src/test/resources/delta-streamer-config/kafka.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--target-base-path hdfs://dxbigdata101:8020/user/hudi/test/data/hudi_test_occ \
--op UPSERT \
--continuous \
--target-table hudi_test_occ \
--table-type COPY_ON_WRITE \
--source-ordering-field uid \
--source-limit 5000000

2.In parallel, start writing to the same table Foo using spark datasource writer

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_test_occ"
val basePath = "hdfs://dxbigdata101:8020/user/hudi/test/data/hudi_test_occ"
val inserts = Seq("""{"area":"hunan","uid":"1","itemid":"11","npgid":"43","evid":"addComment","os":"andriod","pgid":"30","appid":"gmall2019","mid":"mid_117","type":"event","ts":"2021-08-14 12:23:34"}""")
import spark.implicits._
val ds = spark.createDataset(inserts)
val df = spark.read.json(ds)
df.write.format("hudi")
.options(getQuickstartWriteConfigs)
.option("hoodie.write.meta.key.prefixes", "deltastreamer.checkpoint.key")
.option("hoodie.cleaner.policy.failed.writes", "LAZY")
.option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
.option("hoodie.write.lock.zookeeper.url", "dxbigdata103")
.option("hoodie.write.lock.zookeeper.port", "2181")
.option("hoodie.write.lock.zookeeper.lock_key", "occ")
.option("hoodie.write.lock.zookeeper.base_path", "/hudi/occ")
.option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.TimestampBasedKeyGenerator")
.option("hoodie.deltastreamer.keygen.timebased.timestamp.type", "DATE_STRING")
.option("hoodie.deltastreamer.keygen.timebased.input.dateformat", "yyyy-MM-dd HH:mm:ss")
.option("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyy/MM/dd")
.option("hoodie.deltastreamer.keygen.timebased.timezone", "GMT+8:00")
.option(PRECOMBINE_FIELD_OPT_KEY, "ts")
.option(RECORDKEY_FIELD_OPT_KEY, "uid")
.option(PARTITIONPATH_FIELD_OPT_KEY, "ts")
.option(TABLE_NAME, tableName)
.mode(Overwrite)
.save(basePath)

Environment Description

Hudi version : 0.8
Spark version : 2.4.0.cloudera2
Hadoop version : 2.6.0-cdh5.13.3
Hive version : 1.1.0-cdh5.13.3
Storage (HDFS/S3/GCS..) : HDFS
Running on Docker? (yes/no) :no

Stacktrace

Caused by: org.apache.hudi.exception.HoodieException: Unable to find previous checkpoint. Please double check if this table was indeed built via delta streamer. Last Commit :Option{val=[20210903233550__commit__COMPLETED]}, Instants :[[20210903233550__commit__COMPLETED]], CommitMetadata={
  "partitionToWriteStats" : {
    "2021/08/14" : [ {
      "fileId" : "4cfbfc21-273d-4b79-b8c0-11269b3b3112-0",
      "path" : "2021/08/14/4cfbfc21-273d-4b79-b8c0-11269b3b3112-0_0-22-22_20210903233550.parquet",
      "prevCommit" : "null",
      "numWrites" : 1,
      "numDeletes" : 0,
      "numUpdateWrites" : 0,
      "numInserts" : 1,
      "totalWriteBytes" : 436383,
      "totalWriteErrors" : 0,
      "tempPath" : null,
      "partitionPath" : "2021/08/14",
      "totalLogRecords" : 0,
      "totalLogFilesCompacted" : 0,
      "totalLogSizeCompacted" : 0,
      "totalUpdatedRecordsCompacted" : 0,
      "totalLogBlocks" : 0,
      "totalCorruptLogBlock" : 0,
      "totalRollbackBlocks" : 0,
      "fileSizeInBytes" : 436383,
      "minEventTime" : null,
      "maxEventTime" : null
    } ]
  },```

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:writerWrite client and core write operationspriority:highSignificant impact; potential bugs

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions