Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to run icebergevents mode in Append with Hive + MinIO #326

Closed
AbhishekGedela opened this issue May 22, 2024 · 10 comments · Fixed by #327
Closed

Unable to run icebergevents mode in Append with Hive + MinIO #326

AbhishekGedela opened this issue May 22, 2024 · 10 comments · Fixed by #327

Comments

@AbhishekGedela
Copy link

AbhishekGedela commented May 22, 2024

Hi @ismailsimsek

I am facing the below error when I try to run the server in debezium.sink.type icebergevents.

2024-05-23 03:29:08,200 ERROR [org.apa.had.hiv.met.ObjectStore] (pool-13-thread-1) Error loading PartitionExpressionProxy: MetaException(message:org.apache.hadoop.hive.ql.optimizer.ppr.PartitionExpressionForMetastore class not found)
	at org.apache.hadoop.hive.metastore.utils.JavaUtils.getClass(JavaUtils.java:54)
	at org.apache.hadoop.hive.metastore.ObjectStore.createExpressionProxy(ObjectStore.java:538)
	at org.apache.hadoop.hive.metastore.ObjectStore.initializeHelper(ObjectStore.java:495)
	at org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:421)
	at org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:376)
	at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:79)
	at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:140)
	at org.apache.hadoop.hive.metastore.RawStoreProxy.<init>(RawStoreProxy.java:59)
	at org.apache.hadoop.hive.metastore.RawStoreProxy.getProxy(RawStoreProxy.java:67)
	at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStoreForConf(HiveMetaStore.java:720)
	at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMSForConf(HiveMetaStore.java:698)
	at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:692)
	at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:769)
	at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:540)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invokeInternal(RetryingHMSHandler.java:147)
	at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:108)
	at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:80)
	at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:93)
	at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:8678)
	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:169)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)
	at org.apache.hadoop.hive.metastore.utils.JavaUtils.newInstance(JavaUtils.java:84)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:95)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:148)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:119)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:112)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.apache.iceberg.common.DynMethods$UnboundMethod.invokeChecked(DynMethods.java:60)
	at org.apache.iceberg.common.DynMethods$UnboundMethod.invoke(DynMethods.java:72)
	at org.apache.iceberg.common.DynMethods$StaticMethod.invoke(DynMethods.java:185)
	at org.apache.iceberg.hive.HiveClientPool.newClient(HiveClientPool.java:63)
	at org.apache.iceberg.hive.HiveClientPool.newClient(HiveClientPool.java:34)
	at org.apache.iceberg.ClientPoolImpl.get(ClientPoolImpl.java:125)
	at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:56)
	at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:51)
	at org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:122)
	at org.apache.iceberg.hive.HiveTableOperations.doRefresh(HiveTableOperations.java:146)
	at org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:97)
	at org.apache.iceberg.BaseMetastoreTableOperations.current(BaseMetastoreTableOperations.java:80)
	at org.apache.iceberg.BaseMetastoreCatalog.loadTable(BaseMetastoreCatalog.java:49)
	at io.debezium.server.iceberg.history.IcebergSchemaHistory.storageExists(IcebergSchemaHistory.java:205)
	at io.debezium.server.iceberg.history.IcebergSchemaHistory.lambda$start$0(IcebergSchemaHistory.java:115)
	at io.debezium.util.FunctionalReadWriteLock.write(FunctionalReadWriteLock.java:99)
	at io.debezium.server.iceberg.history.IcebergSchemaHistory.start(IcebergSchemaHistory.java:112)
	at io.debezium.relational.HistorizedRelationalDatabaseSchema.<init>(HistorizedRelationalDatabaseSchema.java:51)
	at io.debezium.connector.binlog.BinlogDatabaseSchema.<init>(BinlogDatabaseSchema.java:79)
	at io.debezium.connector.mysql.MySqlDatabaseSchema.<init>(MySqlDatabaseSchema.java:40)
	at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:99)
	at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:241)
	at io.debezium.embedded.async.AsyncEmbeddedEngine.lambda$startSourceTasks$2(AsyncEmbeddedEngine.java:400)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

2024-05-23 03:29:08,203 DEBUG [org.apa.had.hiv.met.ObjectStore] (pool-13-thread-1) Non-retriable exception during ObjectStore initialize.: java.lang.RuntimeException: Error loading PartitionExpressionProxy: org.apache.hadoop.hive.ql.optimizer.ppr.PartitionExpressionForMetastore class not found
	at org.apache.hadoop.hive.metastore.ObjectStore.createExpressionProxy(ObjectStore.java:542)
	at org.apache.hadoop.hive.metastore.ObjectStore.initializeHelper(ObjectStore.java:495)
	at org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:421)
	at org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:376)
	at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:79)
	at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:140)
	at org.apache.hadoop.hive.metastore.RawStoreProxy.<init>(RawStoreProxy.java:59)
	at org.apache.hadoop.hive.metastore.RawStoreProxy.getProxy(RawStoreProxy.java:67)
	at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStoreForConf(HiveMetaStore.java:720)
	at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMSForConf(HiveMetaStore.java:698)
	at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:692)
	at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:769)
	at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:540)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invokeInternal(RetryingHMSHandler.java:147)
	at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:108)
	at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:80)
	at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:93)
	at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:8678)
	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:169)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)
	at org.apache.hadoop.hive.metastore.utils.JavaUtils.newInstance(JavaUtils.java:84)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:95)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:148)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:119)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:112)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.apache.iceberg.common.DynMethods$UnboundMethod.invokeChecked(DynMethods.java:60)
	at org.apache.iceberg.common.DynMethods$UnboundMethod.invoke(DynMethods.java:72)
	at org.apache.iceberg.common.DynMethods$StaticMethod.invoke(DynMethods.java:185)
	at org.apache.iceberg.hive.HiveClientPool.newClient(HiveClientPool.java:63)
	at org.apache.iceberg.hive.HiveClientPool.newClient(HiveClientPool.java:34)
	at org.apache.iceberg.ClientPoolImpl.get(ClientPoolImpl.java:125)
	at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:56)
	at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:51)
	at org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:122)
	at org.apache.iceberg.hive.HiveTableOperations.doRefresh(HiveTableOperations.java:146)
	at org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:97)
	at org.apache.iceberg.BaseMetastoreTableOperations.current(BaseMetastoreTableOperations.java:80)
	at org.apache.iceberg.BaseMetastoreCatalog.loadTable(BaseMetastoreCatalog.java:49)
	at io.debezium.server.iceberg.history.IcebergSchemaHistory.storageExists(IcebergSchemaHistory.java:205)
	at io.debezium.server.iceberg.history.IcebergSchemaHistory.lambda$start$0(IcebergSchemaHistory.java:115)
	at io.debezium.util.FunctionalReadWriteLock.write(FunctionalReadWriteLock.java:99)
	at io.debezium.server.iceberg.history.IcebergSchemaHistory.start(IcebergSchemaHistory.java:112)
	at io.debezium.relational.HistorizedRelationalDatabaseSchema.<init>(HistorizedRelationalDatabaseSchema.java:51)
	at io.debezium.connector.binlog.BinlogDatabaseSchema.<init>(BinlogDatabaseSchema.java:79)
	at io.debezium.connector.mysql.MySqlDatabaseSchema.<init>(MySqlDatabaseSchema.java:40)
	at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:99)
	at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:241)
	at io.debezium.embedded.async.AsyncEmbeddedEngine.lambda$startSourceTasks$2(AsyncEmbeddedEngine.java:400)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)	

I tried debugging by adding print statements in IcebergSchemaHistory class. Compared it with debezium.sink.type iceberg. I can see that schema.history.internal.iceberg.* are not being set in icebergevents mode.

================iceberg conf=====================
HiveCatalog{name=default, uri=null}
=================================================
2024-05-23 03:40:03,520 INFO  [io.deb.ser.ice.his.IcebergSchemaHistory] (pool-13-thread-1) Starting IcebergSchemaHistory storage table:default.tpch_raw_debezium_database_history_storage_table
2024-05-23 03:40:03,537 DEBUG [org.apa.had.hiv.con.HiveConf] (pool-13-thread-1) Found metastore URI of null

I tried it in both master and v0.4.0.Final versions.

Can you please help me with this?

Posting my application.properties file below:

# Use iceberg sink
debezium.sink.type=icebergevents
debezium.sink.iceberg.catalog-name=default

# Iceberg sink config
debezium.sink.iceberg.table-prefix=tpch_raw_
debezium.sink.iceberg.upsert=false
debezium.sink.iceberg.write.format.default=parquet

# Config with hive meatastore catalogs
debezium.sink.iceberg.type=hive
debezium.sink.iceberg.uri=thrift://127.0.0.1:9083
debezium.sink.iceberg.clients=5
debezium.sink.iceberg.warehouse=s3a://datalake/
debezium.sink.iceberg.engine.hive.enabled=true
debezium.sink.iceberg.s3.endpoint=http://localhost:9000
debezium.sink.iceberg.io-impl=org.apache.iceberg.aws.s3.S3FileIO
debezium.sink.iceberg.s3.access-key-id=
debezium.sink.iceberg.s3.secret-access-key=
debezium.sink.iceberg.s3.path-style-access=true

# enable event schemas - mandatory
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json

debezium.source.offset.storage=io.debezium.server.iceberg.offset.IcebergOffsetBackingStore
debezium.source.offset.storage.iceberg.table-name=tpch_raw_debezium_offset_storage_table
debezium.source.schema.history.internal=io.debezium.server.iceberg.history.IcebergSchemaHistory
debezium.source.schema.history.internal.iceberg.table-name=tpch_raw_debezium_database_history_storage_table

# mysql source
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=localhost
debezium.source.database.port=6603
debezium.source.database.user=
debezium.source.database.password=
debezium.source.database.dbname=
debezium.table.include.list=
debezium.source.database.server.name=test
debezium.source.database.server.id=1234
debezium.source.topic.prefix=dbz_


quarkus.http.port=8088
quarkus.log.level=TRACE
quarkus.log.console.json=false
# set log level for libs
# hadoop, parquet
quarkus.log.category."org.apache.hadoop".level=DEBUG
quarkus.log.category."org.apache.parquet".level=DEBUG
# Ignore messages below warning level from Jetty, because it's a bit verbose


debezium.sink.batch.batch-size-wait=MaxBatchSizeWait
debezium.source.max.batch.size=2048
debezium.source.max.queue.size=16000
debezium.sink.batch.batch-size-wait.max-wait-ms=30000
debezium.sink.batch.batch-size-wait.wait-interval-ms=5000
@AbhishekGedela
Copy link
Author

AbhishekGedela commented May 23, 2024

Could it be because Named("icebergevents") and the properties should be debezium.sink.icebergevents.* ?

It seems to be working once I interchanged Named("iceberg") in IcebergEventsChangeConsumer and IcebergChangeConsumer.

@ismailsimsek
Copy link
Member

this seems like Hive catalog related issue.
could you add hive-exec library to he libs and try ? https://mvnrepository.com/artifact/org.apache.hive/hive-exec

@AbhishekGedela
Copy link
Author

Thanks for responding!
I will try that and get back with the findings.
Is the hive-exec library dependency explicitly needed for icebergevents sink type? Once I renamed icebegevents to iceberg it seems to be working fine.

@ismailsimsek
Copy link
Member

we do have 2 consumers in this project icebegevents and iceberg. icebegevents appends all the events to a partitioned table. only difference is icebegevents is using hourly partitioned table by default. other than that i dont see any difference which could cause the error.

it looks like hive related. when it tries to read history table.

could you instead try to use file as a history table

debezium.source.schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory
debezium.source.schema.history.internal.iceberg.table-name=/path/to/storage/schemahistory.dat

example here

@ismailsimsek
Copy link
Member

Is the hive-exec library dependency explicitly needed for icebergevents sink type? Once I renamed icebegevents to iceberg it seems to be working fine.

no its not needed explicitly for icebergevents . it should be same for both consumers. including

Could it be because Named("icebergevents") and the properties should be debezium.sink.icebergevents.* ?

No because property names are hard coded and independent of consumer name. will check this offset storage might be doing it for the parametters

@ismailsimsek
Copy link
Member

ismailsimsek commented May 23, 2024

@AbhishekGedela its related java version, could you please make sure you are running it with java 17 not above.

Edit: with java 21 backed is not receiving any config. but with java 17 all the configuration is recognized(they are set) correctly in the application

Edit: please ignore, still investigating...

@ismailsimsek
Copy link
Member

Could it be because Named("icebergevents") and the properties should be debezium.sink.icebergevents.* ?

No because property names are hard coded and independent of consumer name. will check this offset storage might be doing it for the parametters

looks like it. looks like when debezium passing down the config to SchemaHistory class somehow its filtering configs out using consumer name.

@AbhishekGedela
Copy link
Author

Alright, Thanks a lot for confirming!
I will raise a PR to add this in the Docs. Please approve that.

@ismailsimsek
Copy link
Member

Thank you @AbhishekGedela for reporting this. now it fixed with the latest release, please feel free to open new issue if it still not working

@AbhishekGedela
Copy link
Author

Thanks for fixing this in such short time!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants