From 358bd650ca3770e31f2c9690cc238037357f8b2b Mon Sep 17 00:00:00 2001 From: Stephen Wu Date: Mon, 10 Jan 2022 09:21:52 -0800 Subject: [PATCH 1/2] HBASE-24356 Move hbase-connectors to hbase-thirdparty-4.0.1 - upgrade spark version to 2.4.8 - upgrade hbase version to 2.4.9 - upgrade hadoop2 version to 2.10.0 - upgrade hadoop3 version to 3.1.2 - upgrade checkstyle to 8.28 - update related client APIs after switching to 2.4.9 --- .../hadoop/hbase/kafka/KafkaTableForBridge.java | 6 ++++++ pom.xml | 12 ++++++------ .../hadoop/hbase/spark/HBaseContext.scala | 9 +++++---- .../hbase/spark/TestJavaHBaseContext.java | 8 ++++---- .../hadoop/hbase/spark/BulkLoadSuite.scala | 17 +++++++++++++---- .../hbase/spark/HBaseRDDFunctionsSuite.scala | 2 +- .../hbase/spark/PartitionFilterSuite.scala | 2 +- spark/pom.xml | 3 +-- 8 files changed, 37 insertions(+), 22 deletions(-) diff --git a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java index 25e4796a..eea034ce 100755 --- a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java +++ b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; @@ -201,4 +202,9 @@ public TableDescriptor getDescriptor() throws IOException { return null; } + @Override + public RegionLocator getRegionLocator() { + return null; + } + } diff --git a/pom.xml b/pom.xml index 4d704087..91be3694 100644 --- a/pom.xml +++ b/pom.xml @@ -129,17 +129,17 @@ 1.8 ${compileSource} 3.5.0 - 2.2.2 + 2.4.9 1.6.0 0.5.0 4.12 - 2.2.1 - 2.8.5 - 3.0.3 + 4.0.1 + 2.10.0 + 3.1.2 ${hadoop-two.version} 1.7.25 1.2.17 - 8.18 + 8.28 3.1.0 3.0.0-M4 3.0.0-M3 @@ -153,7 +153,7 @@ 3.6 - 3.6.2.Final + 3.10.6.Final 1.6.1 3.0.1-b08 hbase-hadoop2-compat diff --git a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala index db5cda04..73bf3e3a 100644 --- a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala +++ b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.io.compress.Compression import org.apache.hadoop.hbase.io.compress.Compression.Algorithm import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding import org.apache.hadoop.hbase.io.hfile.{HFile, CacheConfig, HFileContextBuilder, HFileWriterImpl} -import org.apache.hadoop.hbase.regionserver.{HStore, HStoreFile, StoreFileWriter, BloomType} +import org.apache.hadoop.hbase.regionserver.{HStore, HStoreFile, StoreFileWriter, StoreUtils, BloomType} import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapred.JobConf import org.apache.spark.broadcast.Broadcast @@ -902,8 +902,9 @@ class HBaseContext(@transient val sc: SparkContext, tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f) val contextBuilder = new HFileContextBuilder() .withCompression(Algorithm.valueOf(familyOptions.compression)) - .withChecksumType(HStore.getChecksumType(conf)) - .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) + .withChecksumType(StoreUtils.getChecksumType(conf)) + .withCellComparator(CellComparator.getInstance()) + .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)) .withBlockSize(familyOptions.blockSize) if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) { @@ -919,7 +920,7 @@ class HBaseContext(@transient val sc: SparkContext, new WriterLength(0, new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs)) .withBloomType(BloomType.valueOf(familyOptions.bloomType)) - .withComparator(CellComparator.getInstance()).withFileContext(hFileContext) + .withFileContext(hFileContext) .withFilePath(new Path(familydir, "_" + UUID.randomUUID.toString.replaceAll("-", ""))) .withFavoredNodes(favoredNodes).build()) diff --git a/spark/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java b/spark/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java index 61ada1d9..1e35b240 100644 --- a/spark/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java +++ b/spark/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java @@ -95,8 +95,8 @@ public static void setUpBeforeClass() throws Exception { LOG.info("starting minicluster"); - TEST_UTIL.startMiniZKCluster(); - TEST_UTIL.startMiniHBaseCluster(1, 1); + //TEST_UTIL.startMiniZKCluster(); + TEST_UTIL.startMiniCluster(); LOG.info(" - minicluster started"); } @@ -104,8 +104,8 @@ public static void setUpBeforeClass() throws Exception { @AfterClass public static void tearDownAfterClass() throws Exception { LOG.info("shuting down minicluster"); - TEST_UTIL.shutdownMiniHBaseCluster(); - TEST_UTIL.shutdownMiniZKCluster(); + //TEST_UTIL.shutdownMiniHBaseCluster(); + TEST_UTIL.shutdownMiniCluster(); LOG.info(" - minicluster shut down"); TEST_UTIL.cleanupTestDir(); diff --git a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala index dc328f36..8a61ed4c 100644 --- a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala +++ b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala @@ -65,6 +65,15 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { sc.stop() } + override def beforeEach() { + try { + TEST_UTIL.deleteTable(TableName.valueOf(tableName)) + } catch { + case e: Exception => + logInfo(" - no table " + tableName + " found") + } + } + test("Wide Row Bulk Load: Test multi family and multi column tests " + "with all default HFile Configs.") { val config = TEST_UTIL.getConfiguration @@ -391,7 +400,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { for ( i <- 0 until f1FileList.length) { val reader = HFile.createReader(fs, f1FileList(i).getPath, new CacheConfig(config), true, config) - assert(reader.getCompressionAlgorithm.getName.equals("gz")) + assert(reader.getFileContext.getCompression.getName.equals("gz")) assert(reader.getDataBlockEncoding.name().equals("PREFIX")) } @@ -401,7 +410,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { for ( i <- 0 until f2FileList.length) { val reader = HFile.createReader(fs, f2FileList(i).getPath, new CacheConfig(config), true, config) - assert(reader.getCompressionAlgorithm.getName.equals("none")) + assert(reader.getFileContext.getCompression.getName.equals("none")) assert(reader.getDataBlockEncoding.name().equals("NONE")) } @@ -870,7 +879,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { for ( i <- 0 until f1FileList.length) { val reader = HFile.createReader(fs, f1FileList(i).getPath, new CacheConfig(config), true, config) - assert(reader.getCompressionAlgorithm.getName.equals("gz")) + assert(reader.getFileContext.getCompression.getName.equals("gz")) assert(reader.getDataBlockEncoding.name().equals("PREFIX")) } @@ -880,7 +889,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { for ( i <- 0 until f2FileList.length) { val reader = HFile.createReader(fs, f2FileList(i).getPath, new CacheConfig(config), true, config) - assert(reader.getCompressionAlgorithm.getName.equals("none")) + assert(reader.getFileContext.getCompression.getName.equals("none")) assert(reader.getDataBlockEncoding.name().equals("NONE")) } diff --git a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctionsSuite.scala b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctionsSuite.scala index 9ea2c7fa..703910ea 100644 --- a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctionsSuite.scala +++ b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctionsSuite.scala @@ -55,7 +55,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { TEST_UTIL.deleteTable(TableName.valueOf(tableName)) logInfo("shuting down minicluster") TEST_UTIL.shutdownMiniCluster() - + TEST_UTIL.cleanupTestDir sc.stop() } diff --git a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala index 49600847..d1de359f 100644 --- a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala +++ b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala @@ -80,7 +80,7 @@ class PartitionFilterSuite extends FunSuite with override def afterAll() { logInfo("shutting down minicluster") TEST_UTIL.shutdownMiniCluster() - + TEST_UTIL.cleanupTestDir sc.stop() } diff --git a/spark/pom.xml b/spark/pom.xml index c65670c6..3d410b20 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -44,9 +44,8 @@ 0.6.1 - 2.1.0 2.9.10 - 2.4.0 + 2.4.8 2.11.12 From 3e9c8c9c4453c3e54ff6561cac94bcc4897b1006 Mon Sep 17 00:00:00 2001 From: Stephen Wu Date: Tue, 11 Jan 2022 10:24:00 -0800 Subject: [PATCH 2/2] make HBaseContext compatible between hbase-2.4 and hbase-2.3 --- .../hadoop/hbase/spark/HBaseContext.scala | 28 ++++++++++++------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala index 73bf3e3a..bf45b0ad 100644 --- a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala +++ b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala @@ -21,33 +21,34 @@ import java.net.InetSocketAddress import java.util import java.util.UUID import javax.management.openmbean.KeyAlreadyExistsException - -import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceAudience import org.apache.hadoop.hbase.fs.HFileSystem import org.apache.hadoop.hbase._ import org.apache.hadoop.hbase.io.compress.Compression import org.apache.hadoop.hbase.io.compress.Compression.Algorithm import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding -import org.apache.hadoop.hbase.io.hfile.{HFile, CacheConfig, HFileContextBuilder, HFileWriterImpl} -import org.apache.hadoop.hbase.regionserver.{HStore, HStoreFile, StoreFileWriter, StoreUtils, BloomType} -import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.io.hfile.{CacheConfig, HFile, HFileContextBuilder, HFileWriterImpl} +import org.apache.hadoop.hbase.regionserver.{BloomType, HStoreFile, StoreFileWriter} +import org.apache.hadoop.hbase.util.{Bytes, ChecksumType} import org.apache.hadoop.mapred.JobConf import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._ import org.apache.hadoop.hbase.client._ + import scala.reflect.ClassTag import org.apache.spark.{SerializableWritable, SparkContext} -import org.apache.hadoop.hbase.mapreduce.{TableMapReduceUtil, -TableInputFormat, IdentityTableMapper} +import org.apache.hadoop.hbase.mapreduce.{IdentityTableMapper, TableInputFormat, TableMapReduceUtil} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.mapreduce.Job import org.apache.spark.streaming.dstream.DStream + import java.io._ import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod -import org.apache.hadoop.fs.{Path, FileAlreadyExistsException, FileSystem} +import org.apache.hadoop.fs.{FileAlreadyExistsException, FileSystem, Path} + import scala.collection.mutable /** @@ -900,11 +901,18 @@ class HBaseContext(@transient val sc: SparkContext, val tempConf = new Configuration(conf) tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f) + // HBASE-25249 introduced an incompatible change in the IA.Private HStore and StoreUtils + // so here, we directly use conf.get for CheckSumType and BytesPerCheckSum to make it + // compatible between hbase 2.3.x and 2.4.x val contextBuilder = new HFileContextBuilder() .withCompression(Algorithm.valueOf(familyOptions.compression)) - .withChecksumType(StoreUtils.getChecksumType(conf)) + // ChecksumType.nameToType is still an IA.Private Utils, but it's unlikely to be changed. + .withChecksumType(ChecksumType + .nameToType(conf.get(HConstants.CHECKSUM_TYPE_NAME, + ChecksumType.getDefaultChecksumType.getName))) .withCellComparator(CellComparator.getInstance()) - .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)) + .withBytesPerCheckSum(conf.getInt(HConstants.BYTES_PER_CHECKSUM, + HFile.DEFAULT_BYTES_PER_CHECKSUM)) .withBlockSize(familyOptions.blockSize) if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {