Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
Expand Down Expand Up @@ -69,6 +70,11 @@ private static final class CheckMutation {
List<String> topics = new ArrayList<>();
}

@Override
public RegionLocator getRegionLocator() throws IOException {
throw new UnsupportedOperationException();
}

public KafkaTableForBridge(TableName tableName,
Configuration conf,
TopicRoutingRules routingRules,
Expand Down
18 changes: 9 additions & 9 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -129,20 +129,20 @@
<compileSource>1.8</compileSource>
<java.min.version>${compileSource}</java.min.version>
<maven.min.version>3.5.0</maven.min.version>
<hbase.version>2.2.2</hbase.version>
<hbase.version>2.4.9</hbase.version>
<exec.maven.version>1.6.0</exec.maven.version>
<audience-annotations.version>0.5.0</audience-annotations.version>
<junit.version>4.12</junit.version>
<hbase-thirdparty.version>2.2.1</hbase-thirdparty.version>
<hbase-thirdparty.version>4.0.1</hbase-thirdparty.version>
<hadoop-two.version>2.8.5</hadoop-two.version>
<hadoop-three.version>3.0.3</hadoop-three.version>
<hadoop.version>${hadoop-two.version}</hadoop.version>
<hadoop-three.version>3.2.0</hadoop-three.version>
<hadoop.version>${hadoop-three.version}</hadoop.version>
<slf4j.version>1.7.25</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<checkstyle.version>8.18</checkstyle.version>
<maven.checkstyle.version>3.1.0</maven.checkstyle.version>
<surefire.version>3.0.0-M4</surefire.version>
<enforcer.version>3.0.0-M3</enforcer.version>
<checkstyle.version>8.28</checkstyle.version>
<maven.checkstyle.version>3.1.2</maven.checkstyle.version>
<surefire.version>3.0.0-M5</surefire.version>
<enforcer.version>3.0.0</enforcer.version>
<extra.enforcer.version>1.2</extra.enforcer.version>
<restrict-imports.enforcer.version>0.14.0</restrict-imports.enforcer.version>
<!--Internally we use a different version of protobuf. See hbase-protocol-shaded-->
Expand All @@ -153,7 +153,7 @@
<commons-lang3.version>3.6</commons-lang3.version>
<!--This property is for hadoops netty. HBase netty
comes in via hbase-thirdparty hbase-shaded-netty-->
<netty.hadoop.version>3.6.2.Final</netty.hadoop.version>
<netty.hadoop.version>3.10.6.Final</netty.hadoop.version>
<os.maven.version>1.6.1</os.maven.version>
<glassfish.el.version>3.0.1-b08</glassfish.el.version>
<compat.module>hbase-hadoop2-compat</compat.module>
Expand Down
26 changes: 16 additions & 10 deletions spark/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,25 @@ limitations under the License.

# Apache HBase&trade; Spark Connector

## Scala and Spark Versions
## Spark, Scala and Configurable Options

To generate an artifact for a different [spark version](https://mvnrepository.com/artifact/org.apache.spark/spark-core) and/or [scala version](https://www.scala-lang.org/download/all.html), pass command-line options as follows (changing version numbers appropriately):
To generate an artifact for a different [Spark version](https://mvnrepository.com/artifact/org.apache.spark/spark-core) and/or [Scala version](https://www.scala-lang.org/download/all.html),
[Hadoop version](https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core), or [HBase version](https://mvnrepository.com/artifact/org.apache.hbase/hbase), pass command-line options as follows (changing version numbers appropriately):

```
$ mvn -Dspark.version=2.2.2 -Dscala.version=2.11.7 -Dscala.binary.version=2.11 clean install
$ mvn -Dspark.version=3.1.2 -Dscala.version=2.12.10 -Dhadoop-three.version=3.2.0 -Dscala.binary.version=2.12 -Dhbase.version=2.4.8 clean install
```

---
To build the connector with Spark 3.0, compile it with scala 2.12.
Additional configurations that you can customize are the Spark version, HBase version, and Hadoop version.
Example:
Note: to build the connector with Spark 2.x, compile it with `-Dscala.binary.version=2.11` and use the profile `-Dhadoop.profile=2.0`

## Configuration and Installation
**Client-side** (Spark) configuration:
- The HBase configuration file `hbase-site.xml` should be made available to Spark, it can be copied to `$SPARK_CONF_DIR` (default is $SPARK_HOME/conf`)

**Server-side** (HBase region servers) configuration:
- The following jars need to be in the CLASSPATH of the HBase region servers:
- scala-library, hbase-spark, and hbase-spark-protocol-shaded.
- The server-side configuration is needed for column filter pushdown
- if you cannot perform the server-side configuration, consider using `.option("hbase.spark.pushdown.columnfilter", false)`
- The Scala library version must match the Scala version (2.11 or 2.12) used for compiling the connector.

```
$ mvn -Dspark.version=3.0.1 -Dscala.version=2.12.10 -Dscala.binary.version=2.12 -Dhbase.version=2.2.4 -Dhadoop.profile=3.0 -Dhadoop-three.version=3.2.0 -DskipTests clean package
```
20 changes: 9 additions & 11 deletions spark/hbase-spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -274,14 +274,16 @@
<skipTests>true</skipTests>
</properties>
</profile>
<!-- profile against Hadoop 2.x: This is the default. -->
<!--
profile for building against Hadoop 2.x. Activate using:
mvn -Dhadoop.profile=2.0
-->
<profile>
<id>hadoop-2.0</id>
<activation>
<property>
<!--Below formatting for dev-support/generate-hadoopX-poms.sh-->
<!--h2-->
<name>!hadoop.profile</name>
<name>hadoop.profile</name>
<value>2.0</value>
</property>
</activation>
<dependencies>
Expand Down Expand Up @@ -345,20 +347,16 @@
</dependency>
</dependencies>
</profile>
<!--
profile for building against Hadoop 3.0.x. Activate using:
mvn -Dhadoop.profile=3.0
-->
<!-- profile against Hadoop 3.x: This is the default. -->
<profile>
<id>hadoop-3.0</id>
<activation>
<property>
<name>hadoop.profile</name>
<value>3.0</value>
<name>!hadoop.profile</name>
</property>
</activation>
<properties>
<hadoop.version>3.0</hadoop.version>
<hadoop.version>${hadoop-three.version}</hadoop.version>
</properties>
<dependencies>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import org.apache.hadoop.hbase.io.compress.Compression
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
import org.apache.hadoop.hbase.io.hfile.{HFile, CacheConfig, HFileContextBuilder, HFileWriterImpl}
import org.apache.hadoop.hbase.regionserver.{HStore, HStoreFile, StoreFileWriter, BloomType}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.regionserver.{HStoreFile, StoreFileWriter, StoreUtils, BloomType}
import org.apache.hadoop.hbase.util.{Bytes, ChecksumType}
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -900,10 +900,19 @@ class HBaseContext(@transient val sc: SparkContext,

val tempConf = new Configuration(conf)
tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f)

// HBASE-25249 introduced an incompatible change in the IA.Private HStore and StoreUtils
// so here, we directly use conf.get for CheckSumType and BytesPerCheckSum to make it
// compatible between hbase 2.3.x and 2.4.x
val contextBuilder = new HFileContextBuilder()
.withCompression(Algorithm.valueOf(familyOptions.compression))
.withChecksumType(HStore.getChecksumType(conf))
.withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
// ChecksumType.nameToType is still an IA.Private Utils, but it's unlikely to be changed.
.withChecksumType(ChecksumType
.nameToType(conf.get(HConstants.CHECKSUM_TYPE_NAME,
ChecksumType.getDefaultChecksumType.getName)))
.withCellComparator(CellComparator.getInstance())
.withBytesPerCheckSum(conf.getInt(HConstants.BYTES_PER_CHECKSUM,
HFile.DEFAULT_BYTES_PER_CHECKSUM))
.withBlockSize(familyOptions.blockSize)

if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
Expand All @@ -919,7 +928,7 @@ class HBaseContext(@transient val sc: SparkContext,
new WriterLength(0,
new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
.withBloomType(BloomType.valueOf(familyOptions.bloomType))
.withComparator(CellComparator.getInstance()).withFileContext(hFileContext)
.withFileContext(hFileContext)
.withFilePath(new Path(familydir, "_" + UUID.randomUUID.toString.replaceAll("-", "")))
.withFavoredNodes(favoredNodes).build())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,15 @@ public static void setUpBeforeClass() throws Exception {

LOG.info("starting minicluster");

TEST_UTIL.startMiniZKCluster();
TEST_UTIL.startMiniHBaseCluster(1, 1);
TEST_UTIL.startMiniCluster();

LOG.info(" - minicluster started");
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
LOG.info("shuting down minicluster");
TEST_UTIL.shutdownMiniHBaseCluster();
TEST_UTIL.shutdownMiniZKCluster();
TEST_UTIL.shutdownMiniCluster();
LOG.info(" - minicluster shut down");
TEST_UTIL.cleanupTestDir();

Expand Down Expand Up @@ -284,8 +282,8 @@ public void testBulkGet() throws IOException {

final JavaRDD<String> stringJavaRDD =
HBASE_CONTEXT.bulkGet(TableName.valueOf(tableName), 2, rdd,
new GetFunction(),
new ResultFunction());
new GetFunction(),
new ResultFunction());

Assert.assertEquals(stringJavaRDD.count(), 5);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
for ( i <- 0 until f1FileList.length) {
val reader = HFile.createReader(fs, f1FileList(i).getPath,
new CacheConfig(config), true, config)
assert(reader.getCompressionAlgorithm.getName.equals("gz"))
assert(reader.getTrailer.getCompressionCodec().getName.equals("gz"))
assert(reader.getDataBlockEncoding.name().equals("PREFIX"))
}

Expand All @@ -401,7 +401,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
for ( i <- 0 until f2FileList.length) {
val reader = HFile.createReader(fs, f2FileList(i).getPath,
new CacheConfig(config), true, config)
assert(reader.getCompressionAlgorithm.getName.equals("none"))
assert(reader.getTrailer.getCompressionCodec().getName.equals("none"))
assert(reader.getDataBlockEncoding.name().equals("NONE"))
}

Expand Down Expand Up @@ -870,7 +870,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
for ( i <- 0 until f1FileList.length) {
val reader = HFile.createReader(fs, f1FileList(i).getPath,
new CacheConfig(config), true, config)
assert(reader.getCompressionAlgorithm.getName.equals("gz"))
assert(reader.getTrailer.getCompressionCodec().getName.equals("gz"))
assert(reader.getDataBlockEncoding.name().equals("PREFIX"))
}

Expand All @@ -880,7 +880,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
for ( i <- 0 until f2FileList.length) {
val reader = HFile.createReader(fs, f2FileList(i).getPath,
new CacheConfig(config), true, config)
assert(reader.getCompressionAlgorithm.getName.equals("none"))
assert(reader.getTrailer.getCompressionCodec().getName.equals("none"))
assert(reader.getDataBlockEncoding.name().equals("NONE"))
}

Expand Down
9 changes: 4 additions & 5 deletions spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,12 @@

<properties>
<protobuf.plugin.version>0.6.1</protobuf.plugin.version>
<hbase-thirdparty.version>2.1.0</hbase-thirdparty.version>
<jackson.version>2.9.10</jackson.version>
<spark.version>2.4.0</spark.version>
<jackson.version>2.12.5</jackson.version>
<spark.version>3.1.2</spark.version>
<!-- The following version is in sync with Spark's choice
Please take caution when this version is modified -->
<scala.version>2.11.12</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.12.10</scala.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>

<dependencyManagement>
Expand Down