diff --git a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java index 25e4796a..31baa44e 100755 --- a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java +++ b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; @@ -69,6 +70,11 @@ private static final class CheckMutation { List topics = new ArrayList<>(); } + @Override + public RegionLocator getRegionLocator() throws IOException { + throw new UnsupportedOperationException(); + } + public KafkaTableForBridge(TableName tableName, Configuration conf, TopicRoutingRules routingRules, diff --git a/pom.xml b/pom.xml index 4d704087..034bff2a 100644 --- a/pom.xml +++ b/pom.xml @@ -129,20 +129,20 @@ 1.8 ${compileSource} 3.5.0 - 2.2.2 + 2.4.9 1.6.0 0.5.0 4.12 - 2.2.1 + 4.0.1 2.8.5 - 3.0.3 - ${hadoop-two.version} + 3.2.0 + ${hadoop-three.version} 1.7.25 1.2.17 - 8.18 - 3.1.0 - 3.0.0-M4 - 3.0.0-M3 + 8.28 + 3.1.2 + 3.0.0-M5 + 3.0.0 1.2 0.14.0 @@ -153,7 +153,7 @@ 3.6 - 3.6.2.Final + 3.10.6.Final 1.6.1 3.0.1-b08 hbase-hadoop2-compat diff --git a/spark/README.md b/spark/README.md index a3d823c5..2dd90eb7 100755 --- a/spark/README.md +++ b/spark/README.md @@ -18,19 +18,25 @@ limitations under the License. # Apache HBase™ Spark Connector -## Scala and Spark Versions +## Spark, Scala and Configurable Options -To generate an artifact for a different [spark version](https://mvnrepository.com/artifact/org.apache.spark/spark-core) and/or [scala version](https://www.scala-lang.org/download/all.html), pass command-line options as follows (changing version numbers appropriately): +To generate an artifact for a different [Spark version](https://mvnrepository.com/artifact/org.apache.spark/spark-core) and/or [Scala version](https://www.scala-lang.org/download/all.html), +[Hadoop version](https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core), or [HBase version](https://mvnrepository.com/artifact/org.apache.hbase/hbase), pass command-line options as follows (changing version numbers appropriately): ``` -$ mvn -Dspark.version=2.2.2 -Dscala.version=2.11.7 -Dscala.binary.version=2.11 clean install +$ mvn -Dspark.version=3.1.2 -Dscala.version=2.12.10 -Dhadoop-three.version=3.2.0 -Dscala.binary.version=2.12 -Dhbase.version=2.4.8 clean install ``` ---- -To build the connector with Spark 3.0, compile it with scala 2.12. -Additional configurations that you can customize are the Spark version, HBase version, and Hadoop version. -Example: +Note: to build the connector with Spark 2.x, compile it with `-Dscala.binary.version=2.11` and use the profile `-Dhadoop.profile=2.0` + +## Configuration and Installation +**Client-side** (Spark) configuration: +- The HBase configuration file `hbase-site.xml` should be made available to Spark, it can be copied to `$SPARK_CONF_DIR` (default is $SPARK_HOME/conf`) + +**Server-side** (HBase region servers) configuration: +- The following jars need to be in the CLASSPATH of the HBase region servers: + - scala-library, hbase-spark, and hbase-spark-protocol-shaded. +- The server-side configuration is needed for column filter pushdown + - if you cannot perform the server-side configuration, consider using `.option("hbase.spark.pushdown.columnfilter", false)` +- The Scala library version must match the Scala version (2.11 or 2.12) used for compiling the connector. -``` -$ mvn -Dspark.version=3.0.1 -Dscala.version=2.12.10 -Dscala.binary.version=2.12 -Dhbase.version=2.2.4 -Dhadoop.profile=3.0 -Dhadoop-three.version=3.2.0 -DskipTests clean package -``` diff --git a/spark/hbase-spark/pom.xml b/spark/hbase-spark/pom.xml index 379b5db5..26c5c70f 100644 --- a/spark/hbase-spark/pom.xml +++ b/spark/hbase-spark/pom.xml @@ -274,14 +274,16 @@ true - + hadoop-2.0 - - - !hadoop.profile + hadoop.profile + 2.0 @@ -345,20 +347,16 @@ - + hadoop-3.0 - hadoop.profile - 3.0 + !hadoop.profile - 3.0 + ${hadoop-three.version} diff --git a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala index db5cda04..ee860c42 100644 --- a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala +++ b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala @@ -29,8 +29,8 @@ import org.apache.hadoop.hbase.io.compress.Compression import org.apache.hadoop.hbase.io.compress.Compression.Algorithm import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding import org.apache.hadoop.hbase.io.hfile.{HFile, CacheConfig, HFileContextBuilder, HFileWriterImpl} -import org.apache.hadoop.hbase.regionserver.{HStore, HStoreFile, StoreFileWriter, BloomType} -import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.regionserver.{HStoreFile, StoreFileWriter, StoreUtils, BloomType} +import org.apache.hadoop.hbase.util.{Bytes, ChecksumType} import org.apache.hadoop.mapred.JobConf import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD @@ -900,10 +900,19 @@ class HBaseContext(@transient val sc: SparkContext, val tempConf = new Configuration(conf) tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f) + + // HBASE-25249 introduced an incompatible change in the IA.Private HStore and StoreUtils + // so here, we directly use conf.get for CheckSumType and BytesPerCheckSum to make it + // compatible between hbase 2.3.x and 2.4.x val contextBuilder = new HFileContextBuilder() .withCompression(Algorithm.valueOf(familyOptions.compression)) - .withChecksumType(HStore.getChecksumType(conf)) - .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) + // ChecksumType.nameToType is still an IA.Private Utils, but it's unlikely to be changed. + .withChecksumType(ChecksumType + .nameToType(conf.get(HConstants.CHECKSUM_TYPE_NAME, + ChecksumType.getDefaultChecksumType.getName))) + .withCellComparator(CellComparator.getInstance()) + .withBytesPerCheckSum(conf.getInt(HConstants.BYTES_PER_CHECKSUM, + HFile.DEFAULT_BYTES_PER_CHECKSUM)) .withBlockSize(familyOptions.blockSize) if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) { @@ -919,7 +928,7 @@ class HBaseContext(@transient val sc: SparkContext, new WriterLength(0, new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs)) .withBloomType(BloomType.valueOf(familyOptions.bloomType)) - .withComparator(CellComparator.getInstance()).withFileContext(hFileContext) + .withFileContext(hFileContext) .withFilePath(new Path(familydir, "_" + UUID.randomUUID.toString.replaceAll("-", ""))) .withFavoredNodes(favoredNodes).build()) diff --git a/spark/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java b/spark/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java index 61ada1d9..793ed8ef 100644 --- a/spark/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java +++ b/spark/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java @@ -95,8 +95,7 @@ public static void setUpBeforeClass() throws Exception { LOG.info("starting minicluster"); - TEST_UTIL.startMiniZKCluster(); - TEST_UTIL.startMiniHBaseCluster(1, 1); + TEST_UTIL.startMiniCluster(); LOG.info(" - minicluster started"); } @@ -104,8 +103,7 @@ public static void setUpBeforeClass() throws Exception { @AfterClass public static void tearDownAfterClass() throws Exception { LOG.info("shuting down minicluster"); - TEST_UTIL.shutdownMiniHBaseCluster(); - TEST_UTIL.shutdownMiniZKCluster(); + TEST_UTIL.shutdownMiniCluster(); LOG.info(" - minicluster shut down"); TEST_UTIL.cleanupTestDir(); @@ -284,8 +282,8 @@ public void testBulkGet() throws IOException { final JavaRDD stringJavaRDD = HBASE_CONTEXT.bulkGet(TableName.valueOf(tableName), 2, rdd, - new GetFunction(), - new ResultFunction()); + new GetFunction(), + new ResultFunction()); Assert.assertEquals(stringJavaRDD.count(), 5); } diff --git a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala index dc328f36..59435bb9 100644 --- a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala +++ b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala @@ -391,7 +391,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { for ( i <- 0 until f1FileList.length) { val reader = HFile.createReader(fs, f1FileList(i).getPath, new CacheConfig(config), true, config) - assert(reader.getCompressionAlgorithm.getName.equals("gz")) + assert(reader.getTrailer.getCompressionCodec().getName.equals("gz")) assert(reader.getDataBlockEncoding.name().equals("PREFIX")) } @@ -401,7 +401,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { for ( i <- 0 until f2FileList.length) { val reader = HFile.createReader(fs, f2FileList(i).getPath, new CacheConfig(config), true, config) - assert(reader.getCompressionAlgorithm.getName.equals("none")) + assert(reader.getTrailer.getCompressionCodec().getName.equals("none")) assert(reader.getDataBlockEncoding.name().equals("NONE")) } @@ -870,7 +870,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { for ( i <- 0 until f1FileList.length) { val reader = HFile.createReader(fs, f1FileList(i).getPath, new CacheConfig(config), true, config) - assert(reader.getCompressionAlgorithm.getName.equals("gz")) + assert(reader.getTrailer.getCompressionCodec().getName.equals("gz")) assert(reader.getDataBlockEncoding.name().equals("PREFIX")) } @@ -880,7 +880,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { for ( i <- 0 until f2FileList.length) { val reader = HFile.createReader(fs, f2FileList(i).getPath, new CacheConfig(config), true, config) - assert(reader.getCompressionAlgorithm.getName.equals("none")) + assert(reader.getTrailer.getCompressionCodec().getName.equals("none")) assert(reader.getDataBlockEncoding.name().equals("NONE")) } diff --git a/spark/pom.xml b/spark/pom.xml index c65670c6..050d35ad 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -44,13 +44,12 @@ 0.6.1 - 2.1.0 - 2.9.10 - 2.4.0 + 2.12.5 + 3.1.2 - 2.11.12 - 2.11 + 2.12.10 + 2.12