Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.client

import java.io.{OutputStream, PrintStream}
import java.lang.{Iterable => JIterable}
import java.lang.reflect.{Proxy => JdkProxy}
import java.lang.reflect.InvocationTargetException
import java.nio.charset.StandardCharsets.UTF_8
import java.util.{HashMap => JHashMap, Locale, Map => JMap}
Expand All @@ -33,7 +34,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.{IMetaStoreClient, TableType => HiveTableType}
import org.apache.hadoop.hive.metastore.{HiveMetaStoreClient, IMetaStoreClient, RetryingMetaStoreClient, TableType => HiveTableType}
import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, Table => MetaStoreApiTable, _}
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition => HivePartition, Table => HiveTable}
Expand All @@ -44,6 +45,7 @@ import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.security.UserGroupInformation
import org.apache.thrift.transport.TEndpointTransport

import org.apache.spark.{SparkConf, SparkException, SparkThrowable}
import org.apache.spark.deploy.SparkHadoopUtil.SOURCE_SPARK
Expand All @@ -65,7 +67,7 @@ import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
import org.apache.spark.sql.hive.HiveExternalCatalog.DATASOURCE_SCHEMA
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.{CircularBuffer, Utils}
import org.apache.spark.util.{CircularBuffer, SparkClassUtils, Utils}

/**
* A class that wraps the HiveClient and converts its responses to externally visible classes.
Expand Down Expand Up @@ -1407,13 +1409,62 @@ private[hive] object HiveClientImpl extends Logging {
case _ =>
new HiveConf(conf, classOf[HiveConf])
}
try {
val hive = try {
Hive.getWithoutRegisterFns(hiveConf)
} catch {
// SPARK-37069: not all Hive versions have the above method (e.g., Hive 2.3.9 has it but
// 2.3.8 don't), therefore here we fallback when encountering the exception.
// 2.3.8 doesn't), therefore here we fallback when encountering the exception.
case _: NoSuchMethodError =>
Hive.get(hiveConf)
}
configureMaxThriftMessageSize(hiveConf, hive.getMSC)
hive
}

// SPARK-49489: a surgery for Hive 2.3.10 due to lack of HIVE-26633
private def configureMaxThriftMessageSize(
hiveConf: HiveConf, msClient: IMetaStoreClient): Unit = try {
msClient match {
// Hive uses Java Dynamic Proxy to enhance the MetaStoreClient to support synchronization
// and retrying, we should unwrap and access the real MetaStoreClient instance firstly
case proxy if JdkProxy.isProxyClass(proxy.getClass) =>
JdkProxy.getInvocationHandler(proxy) match {
case syncHandler if syncHandler.getClass.getName.endsWith("SynchronizedHandler") =>
val realMscField = SparkClassUtils.classForName(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can directly use syncHandler.getClass().getDeclaredField("client") here, and there should be no practical difference in effect.

Copy link
Contributor

@LuciferYang LuciferYang Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, perhaps we can try to refactor a bit here, maybe like

JdkProxy.getInvocationHandler(proxy) match {
    case syncHandler if syncHandler.getClass.getName.endsWith("SynchronizedHandler") =>
      val realMsc = getFieldValue(syncHandler, "client").asInstanceOf[IMetaStoreClient]
      configureMaxThriftMessageSize(hiveConf, realMsc)

    case retryHandler: RetryingMetaStoreClient =>
      val realMsc = getFieldValue(retryHandler, "base").asInstanceOf[IMetaStoreClient]
      configureMaxThriftMessageSize(hiveConf, realMsc)

    case _ => // do nothing
  }

private def getFieldValue(obj: AnyRef, fieldName: String): AnyRef = {
  val field = obj.getClass.getDeclaredField(fieldName)
  field.setAccessible(true)
  field.get(obj)
}

hmm... Perhaps the exception type caught in the catch block needs to be changed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LuciferYang thanks for the review, I simplify the reflection call by following your suggestions.

For concerns about the reflection code itself, I tune the code to make the added code only takes effect when the user configures hive.thrift.client.max.message.size explicitly, in case users compile the Spark with their own modified Hive version.

"org.apache.hadoop.hive.metastore.HiveMetaStoreClient$SynchronizedHandler")
.getDeclaredField("client")
realMscField.setAccessible(true)
val realMsc = realMscField.get(syncHandler).asInstanceOf[IMetaStoreClient]
configureMaxThriftMessageSize(hiveConf, realMsc)
case retryHandler: RetryingMetaStoreClient =>
val realMscField = classOf[RetryingMetaStoreClient].getDeclaredField("base")
realMscField.setAccessible(true)
val realMsc = realMscField.get(retryHandler).asInstanceOf[IMetaStoreClient]
configureMaxThriftMessageSize(hiveConf, realMsc)
case _ =>
}

case msc: HiveMetaStoreClient if !msc.isLocalMetaStore =>
msc.getTTransport match {
case t: TEndpointTransport =>
// The configuration is added in HIVE-26633 (4.0.0)
val maxThriftMessageSize = HiveConf.toSizeBytes(
hiveConf.get("hive.thrift.client.max.message.size", "1gb")).toInt
if (t.getConfiguration.getMaxMessageSize != maxThriftMessageSize) {
t.getConfiguration.setMaxMessageSize(maxThriftMessageSize)
val resetConsumedMessageSizeMethod = classOf[TEndpointTransport]
.getDeclaredMethod("resetConsumedMessageSize", classOf[Long])
resetConsumedMessageSizeMethod.setAccessible(true)
resetConsumedMessageSizeMethod.invoke(t, Long.box(-1L))
}
case _ =>
}
case _ => // do nothing
}
} catch {
// TEndpointTransport is added in THRIFT-5237 (0.14.0), for Hive versions that use older
// Thrift library (e.g. Hive 2.3.9 uses Thrift 0.9.3), which aren't affected by THRIFT-5237
// and don't need to apply HIVE-26633
case _: NoClassDefFoundError => // do nothing
}
}