diff --git a/Jenkinsfile b/Jenkinsfile index 79887f30..97ea51ed 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -4,5 +4,5 @@ sproutMultiModuleBuild { squads = ['listening'] slackChannel = '#eng-listening-platform' runECRLogin = 'true' - mavenProperties = '-Dspark.version=3.1.1 -Dscala.version=2.12.10 -Dscala.binary.version=2.12 -Dhbase.version=2.2.6 -Dhadoop.profile=3.0 -Dhadoop-three.version=3.2.1 -DskipTests' + mavenProperties = '' } diff --git a/hbase-connectors-assembly/pom.xml b/hbase-connectors-assembly/pom.xml index cefb2d33..af7356fb 100755 --- a/hbase-connectors-assembly/pom.xml +++ b/hbase-connectors-assembly/pom.xml @@ -24,7 +24,7 @@ hbase-connectors org.apache.hbase.connectors - 1.0.1.sprout-emr + ${revision} ../ hbase-connectors-assembly @@ -40,12 +40,12 @@ org.apache.hbase.connectors.kafka hbase-kafka-proxy - 1.0.1.sprout-emr + ${revision} org.apache.hbase.connectors.kafka hbase-kafka-model - 1.0.1.sprout-emr + ${revision} diff --git a/kafka/hbase-kafka-model/pom.xml b/kafka/hbase-kafka-model/pom.xml index 3a385eb8..a9dc5fd6 100644 --- a/kafka/hbase-kafka-model/pom.xml +++ b/kafka/hbase-kafka-model/pom.xml @@ -24,7 +24,7 @@ org.apache.hbase.connectors kafka - 1.0.1.sprout-emr + ${revision} ../ org.apache.hbase.connectors.kafka diff --git a/kafka/hbase-kafka-proxy/pom.xml b/kafka/hbase-kafka-proxy/pom.xml index 8decba8a..356838d7 100755 --- a/kafka/hbase-kafka-proxy/pom.xml +++ b/kafka/hbase-kafka-proxy/pom.xml @@ -24,7 +24,7 @@ org.apache.hbase.connectors kafka - 1.0.1.sprout-emr + ${revision} ../ org.apache.hbase.connectors.kafka diff --git a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java index 25e4796a..31baa44e 100755 --- a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java +++ b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; @@ -69,6 +70,11 @@ private static final class CheckMutation { List topics = new ArrayList<>(); } + @Override + public RegionLocator getRegionLocator() throws IOException { + throw new UnsupportedOperationException(); + } + public KafkaTableForBridge(TableName tableName, Configuration conf, TopicRoutingRules routingRules, diff --git a/kafka/pom.xml b/kafka/pom.xml index d5401051..a829e06c 100644 --- a/kafka/pom.xml +++ b/kafka/pom.xml @@ -24,7 +24,7 @@ org.apache.hbase.connectors hbase-connectors - 1.0.1.sprout-emr + ${revision} ../ kafka @@ -48,7 +48,7 @@ org.apache.hbase.connectors.kafka hbase-kafka-model - 1.0.1.sprout-emr + ${revision} diff --git a/pom.xml b/pom.xml index 3e568a60..c69d3d64 100644 --- a/pom.xml +++ b/pom.xml @@ -43,7 +43,7 @@ org.apache.hbase.connectors hbase-connectors - 1.0.1.sprout-emr + ${revision} Apache HBase Connectors pom @@ -136,33 +136,33 @@ - 1.0.1.sprout-emr + 1.0.1.sprout-emr-hbase24 true yyyy-MM-dd'T'HH:mm ${maven.build.timestamp} 1.8 ${compileSource} 3.5.0 - 2.2.6 + 2.4.8 1.6.0 0.5.0 4.12 - 2.2.1 + 3.5.1 2.8.5 3.2.1 - ${hadoop-two.version} + ${hadoop-three.version} 1.7.25 1.2.17 - 8.18 - 3.1.0 - 3.0.0-M4 - 3.0.0-M3 + 8.45.1 + 3.1.2 + 3.0.0-M5 + 3.0.0 1.2 0.14.0 2.5.0 0.5.0 - 2.5 + 2.11.0 1.7.7 3.6 + + apache-release + + + + + org.sonatype.plugins + nexus-staging-maven-plugin + 1.6.8 + true + + https://repository.apache.org/ + apache.releases.https + + + + + diff --git a/spark/README.md b/spark/README.md index a3d823c5..0df5d769 100755 --- a/spark/README.md +++ b/spark/README.md @@ -18,19 +18,25 @@ limitations under the License. # Apache HBase™ Spark Connector -## Scala and Spark Versions +## Spark, Scala and Configurable Options -To generate an artifact for a different [spark version](https://mvnrepository.com/artifact/org.apache.spark/spark-core) and/or [scala version](https://www.scala-lang.org/download/all.html), pass command-line options as follows (changing version numbers appropriately): +To generate an artifact for a different [Spark version](https://mvnrepository.com/artifact/org.apache.spark/spark-core) and/or [Scala version](https://www.scala-lang.org/download/all.html), +[Hadoop version](https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core), or [HBase version](https://mvnrepository.com/artifact/org.apache.hbase/hbase), pass command-line options as follows (changing version numbers appropriately): ``` -$ mvn -Dspark.version=2.2.2 -Dscala.version=2.11.7 -Dscala.binary.version=2.11 clean install +$ mvn -Dspark.version=3.1.2 -Dscala.version=2.12.10 -Dhadoop-three.version=3.2.0 -Dscala.binary.version=2.12 -Dhbase.version=2.4.8 clean install ``` ---- -To build the connector with Spark 3.0, compile it with scala 2.12. -Additional configurations that you can customize are the Spark version, HBase version, and Hadoop version. -Example: +Note: to build the connector with Spark 2.x, compile it with `-Dscala.binary.version=2.11` and use the profile `-Dhadoop.profile=2.0` + +## Configuration and Installation +**Client-side** (Spark) configuration: +- The HBase configuration file `hbase-site.xml` should be made available to Spark, it can be copied to `$SPARK_CONF_DIR` (default is $SPARK_HOME/conf`) + +**Server-side** (HBase region servers) configuration: +- The following jars needs to be in the CLASSPATH of the HBase region servers: + - scala-library, hbase-spark, and hbase-spark-protocol-shaded. +- The server-side configuration is needed for column filter pushdown + - if you cannot perform the server-side configuration, consider using `.option("hbase.spark.pushdown.columnfilter", false)` +- The Scala library version must match the Scala version (2.11 or 2.12) used for compiling the connector. -``` -$ mvn -Dspark.version=3.0.1 -Dscala.version=2.12.10 -Dscala.binary.version=2.12 -Dhbase.version=2.2.4 -Dhadoop.profile=3.0 -Dhadoop-three.version=3.2.0 -DskipTests clean package -``` diff --git a/spark/hbase-spark-it/pom.xml b/spark/hbase-spark-it/pom.xml index 20252852..976edba3 100644 --- a/spark/hbase-spark-it/pom.xml +++ b/spark/hbase-spark-it/pom.xml @@ -23,7 +23,7 @@ org.apache.hbase.connectors spark - 1.0.1.sprout-emr + ${revision} ../ org.apache.hbase.connectors.spark @@ -186,7 +186,7 @@ org.apache.hbase.connectors.spark hbase-spark - 1.0.1.sprout-emr + ${revision} org.apache.hbase diff --git a/spark/hbase-spark-protocol-shaded/pom.xml b/spark/hbase-spark-protocol-shaded/pom.xml index 3f1a2216..a429f30f 100644 --- a/spark/hbase-spark-protocol-shaded/pom.xml +++ b/spark/hbase-spark-protocol-shaded/pom.xml @@ -24,7 +24,7 @@ org.apache.hbase.connectors spark - 1.0.1.sprout-emr + ${revision} ../ diff --git a/spark/hbase-spark-protocol/pom.xml b/spark/hbase-spark-protocol/pom.xml index bc08e3a7..374b8142 100644 --- a/spark/hbase-spark-protocol/pom.xml +++ b/spark/hbase-spark-protocol/pom.xml @@ -24,7 +24,7 @@ org.apache.hbase.connectors spark - 1.0.1.sprout-emr + ${revision} ../ diff --git a/spark/hbase-spark/pom.xml b/spark/hbase-spark/pom.xml index 0aee9c41..b87bfaea 100644 --- a/spark/hbase-spark/pom.xml +++ b/spark/hbase-spark/pom.xml @@ -24,7 +24,7 @@ org.apache.hbase.connectors spark - 1.0.1.sprout-emr + ${revision} ../ @@ -286,14 +286,16 @@ true - + hadoop-2.0 - - - !hadoop.profile + hadoop.profile + 2.0 @@ -357,20 +359,16 @@ - + hadoop-3.0 - hadoop.profile - 3.0 + !hadoop.profile - 3.0 + 3.2.0 diff --git a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala index db5cda04..12465d9d 100644 --- a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala +++ b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.io.compress.Compression import org.apache.hadoop.hbase.io.compress.Compression.Algorithm import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding import org.apache.hadoop.hbase.io.hfile.{HFile, CacheConfig, HFileContextBuilder, HFileWriterImpl} -import org.apache.hadoop.hbase.regionserver.{HStore, HStoreFile, StoreFileWriter, BloomType} +import org.apache.hadoop.hbase.regionserver.{HStoreFile, StoreFileWriter, StoreUtils, BloomType} import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapred.JobConf import org.apache.spark.broadcast.Broadcast @@ -902,8 +902,8 @@ class HBaseContext(@transient val sc: SparkContext, tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f) val contextBuilder = new HFileContextBuilder() .withCompression(Algorithm.valueOf(familyOptions.compression)) - .withChecksumType(HStore.getChecksumType(conf)) - .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) + .withChecksumType(StoreUtils.getChecksumType(conf)) + .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)) .withBlockSize(familyOptions.blockSize) if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) { @@ -919,7 +919,7 @@ class HBaseContext(@transient val sc: SparkContext, new WriterLength(0, new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs)) .withBloomType(BloomType.valueOf(familyOptions.bloomType)) - .withComparator(CellComparator.getInstance()).withFileContext(hFileContext) + .withFileContext(hFileContext) .withFilePath(new Path(familydir, "_" + UUID.randomUUID.toString.replaceAll("-", ""))) .withFavoredNodes(favoredNodes).build()) diff --git a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/NaiveEncoder.scala b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/NaiveEncoder.scala index a2a68281..67328696 100644 --- a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/NaiveEncoder.scala +++ b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/NaiveEncoder.scala @@ -240,8 +240,8 @@ class NaiveEncoder extends BytesEncoder with Logging{ val value = Bytes.toInt(filterBytes, offset2 + 1) compare(in.compareTo(value), ops) case LongEnc | TimestampEnc => - val in = Bytes.toInt(input, offset1) - val value = Bytes.toInt(filterBytes, offset2 + 1) + val in = Bytes.toLong(input, offset1) + val value = Bytes.toLong(filterBytes, offset2 + 1) compare(in.compareTo(value), ops) case FloatEnc => val in = Bytes.toFloat(input, offset1) diff --git a/spark/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java b/spark/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java index 61ada1d9..8944b110 100644 --- a/spark/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java +++ b/spark/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java @@ -284,8 +284,8 @@ public void testBulkGet() throws IOException { final JavaRDD stringJavaRDD = HBASE_CONTEXT.bulkGet(TableName.valueOf(tableName), 2, rdd, - new GetFunction(), - new ResultFunction()); + new GetFunction(), + new ResultFunction()); Assert.assertEquals(stringJavaRDD.count(), 5); } diff --git a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala index dc328f36..59435bb9 100644 --- a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala +++ b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala @@ -391,7 +391,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { for ( i <- 0 until f1FileList.length) { val reader = HFile.createReader(fs, f1FileList(i).getPath, new CacheConfig(config), true, config) - assert(reader.getCompressionAlgorithm.getName.equals("gz")) + assert(reader.getTrailer.getCompressionCodec().getName.equals("gz")) assert(reader.getDataBlockEncoding.name().equals("PREFIX")) } @@ -401,7 +401,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { for ( i <- 0 until f2FileList.length) { val reader = HFile.createReader(fs, f2FileList(i).getPath, new CacheConfig(config), true, config) - assert(reader.getCompressionAlgorithm.getName.equals("none")) + assert(reader.getTrailer.getCompressionCodec().getName.equals("none")) assert(reader.getDataBlockEncoding.name().equals("NONE")) } @@ -870,7 +870,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { for ( i <- 0 until f1FileList.length) { val reader = HFile.createReader(fs, f1FileList(i).getPath, new CacheConfig(config), true, config) - assert(reader.getCompressionAlgorithm.getName.equals("gz")) + assert(reader.getTrailer.getCompressionCodec().getName.equals("gz")) assert(reader.getDataBlockEncoding.name().equals("PREFIX")) } @@ -880,7 +880,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { for ( i <- 0 until f2FileList.length) { val reader = HFile.createReader(fs, f2FileList(i).getPath, new CacheConfig(config), true, config) - assert(reader.getCompressionAlgorithm.getName.equals("none")) + assert(reader.getTrailer.getCompressionCodec().getName.equals("none")) assert(reader.getDataBlockEncoding.name().equals("NONE")) } diff --git a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala index 0424527b..e493c54a 100644 --- a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala +++ b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala @@ -279,6 +279,74 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { assert(!builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray)) } + test("Long Type") { + val greaterLogic = new GreaterThanLogicExpression("Col1", 0) + greaterLogic.setEncoder(encoder) + val greaterAndEqualLogic = new GreaterThanOrEqualLogicExpression("Col1", 0) + greaterAndEqualLogic.setEncoder(encoder) + val lessLogic = new LessThanLogicExpression("Col1", 0) + lessLogic.setEncoder(encoder) + val lessAndEqualLogic = new LessThanOrEqualLogicExpression("Col1", 0) + lessAndEqualLogic.setEncoder(encoder) + val equalLogic = new EqualLogicExpression("Col1", 0, false) + val notEqualLogic = new EqualLogicExpression("Col1", 0, true) + + val columnToCurrentRowValueMap = new util.HashMap[String, ByteArrayComparable]() + columnToCurrentRowValueMap.put("Col1", new ByteArrayComparable(Bytes.toBytes(10L))) + val valueFromQueryValueArray = new Array[Array[Byte]](1) + + //great than + valueFromQueryValueArray(0) = encoder.encode(LongType, 10L) + assert(!greaterLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray)) + + valueFromQueryValueArray(0) = encoder.encode(LongType, 20L) + assert(!greaterLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray)) + + //great than and equal + valueFromQueryValueArray(0) = encoder.encode(LongType, 5L) + assert(greaterAndEqualLogic.execute(columnToCurrentRowValueMap, + valueFromQueryValueArray)) + + valueFromQueryValueArray(0) = encoder.encode(LongType, 10L) + assert(greaterAndEqualLogic.execute(columnToCurrentRowValueMap, + valueFromQueryValueArray)) + + valueFromQueryValueArray(0) = encoder.encode(LongType, 20L) + assert(!greaterAndEqualLogic.execute(columnToCurrentRowValueMap, + valueFromQueryValueArray)) + + //less than + valueFromQueryValueArray(0) = encoder.encode(LongType, 10L) + assert(!lessLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray)) + + valueFromQueryValueArray(0) = encoder.encode(LongType, 5L) + assert(!lessLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray)) + + //less than and equal + valueFromQueryValueArray(0) = encoder.encode(LongType, 20L) + assert(lessAndEqualLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray)) + + valueFromQueryValueArray(0) = encoder.encode(LongType, 20L) + assert(lessAndEqualLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray)) + + valueFromQueryValueArray(0) = encoder.encode(LongType, 10L) + assert(lessAndEqualLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray)) + + //equal too + valueFromQueryValueArray(0) = Bytes.toBytes(10L) + assert(equalLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray)) + + valueFromQueryValueArray(0) = Bytes.toBytes(5L) + assert(!equalLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray)) + + //not equal too + valueFromQueryValueArray(0) = Bytes.toBytes(10L) + assert(!notEqualLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray)) + + valueFromQueryValueArray(0) = Bytes.toBytes(5L) + assert(notEqualLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray)) + } + test("String Type") { val leftLogic = new LessThanLogicExpression("Col1", 0) leftLogic.setEncoder(encoder) diff --git a/spark/pom.xml b/spark/pom.xml index e6192b9e..76d468eb 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -25,13 +25,13 @@ org.apache.hbase.connectors hbase-connectors - 1.0.1.sprout-emr + ${revision} ../ spark pom - 1.0.1.sprout-emr + ${revision} Apache HBase - Spark Spark Connectors for Apache HBase @@ -45,7 +45,7 @@ 0.6.1 2.2.1 - 2.9.10 + 2.12.5 3.1.1 @@ -58,17 +58,17 @@ org.apache.hbase.connectors.spark hbase-spark - 1.0.1.sprout-emr + ${revision} org.apache.hbase.connectors.spark hbase-spark-protocol - 1.0.1.sprout-emr + ${revision} org.apache.hbase.connectors.spark hbase-spark-protocol-shaded - 1.0.1.sprout-emr + ${revision} org.apache.hbase.thirdparty