diff --git a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java
index 25e4796a..eea034ce 100755
--- a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java
+++ b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java
@@ -35,6 +35,7 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
@@ -201,4 +202,9 @@ public TableDescriptor getDescriptor() throws IOException {
return null;
}
+ @Override
+ public RegionLocator getRegionLocator() {
+ return null;
+ }
+
}
diff --git a/pom.xml b/pom.xml
index 4d704087..91be3694 100644
--- a/pom.xml
+++ b/pom.xml
@@ -129,17 +129,17 @@
1.8
${compileSource}
3.5.0
- 2.2.2
+ 2.4.9
1.6.0
0.5.0
4.12
- 2.2.1
- 2.8.5
- 3.0.3
+ 4.0.1
+ 2.10.0
+ 3.1.2
${hadoop-two.version}
1.7.25
1.2.17
- 8.18
+ 8.28
3.1.0
3.0.0-M4
3.0.0-M3
@@ -153,7 +153,7 @@
3.6
- 3.6.2.Final
+ 3.10.6.Final
1.6.1
3.0.1-b08
hbase-hadoop2-compat
diff --git a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
index db5cda04..bf45b0ad 100644
--- a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
+++ b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
@@ -21,33 +21,34 @@ import java.net.InetSocketAddress
import java.util
import java.util.UUID
import javax.management.openmbean.KeyAlreadyExistsException
-
-import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceAudience
import org.apache.hadoop.hbase.fs.HFileSystem
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.io.compress.Compression
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
-import org.apache.hadoop.hbase.io.hfile.{HFile, CacheConfig, HFileContextBuilder, HFileWriterImpl}
-import org.apache.hadoop.hbase.regionserver.{HStore, HStoreFile, StoreFileWriter, BloomType}
-import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.io.hfile.{CacheConfig, HFile, HFileContextBuilder, HFileWriterImpl}
+import org.apache.hadoop.hbase.regionserver.{BloomType, HStoreFile, StoreFileWriter}
+import org.apache.hadoop.hbase.util.{Bytes, ChecksumType}
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
import org.apache.hadoop.hbase.client._
+
import scala.reflect.ClassTag
import org.apache.spark.{SerializableWritable, SparkContext}
-import org.apache.hadoop.hbase.mapreduce.{TableMapReduceUtil,
-TableInputFormat, IdentityTableMapper}
+import org.apache.hadoop.hbase.mapreduce.{IdentityTableMapper, TableInputFormat, TableMapReduceUtil}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.streaming.dstream.DStream
+
import java.io._
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
-import org.apache.hadoop.fs.{Path, FileAlreadyExistsException, FileSystem}
+import org.apache.hadoop.fs.{FileAlreadyExistsException, FileSystem, Path}
+
import scala.collection.mutable
/**
@@ -900,10 +901,18 @@ class HBaseContext(@transient val sc: SparkContext,
val tempConf = new Configuration(conf)
tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f)
+ // HBASE-25249 introduced an incompatible change in the IA.Private HStore and StoreUtils
+ // so here, we directly use conf.get for CheckSumType and BytesPerCheckSum to make it
+ // compatible between hbase 2.3.x and 2.4.x
val contextBuilder = new HFileContextBuilder()
.withCompression(Algorithm.valueOf(familyOptions.compression))
- .withChecksumType(HStore.getChecksumType(conf))
- .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
+ // ChecksumType.nameToType is still an IA.Private Utils, but it's unlikely to be changed.
+ .withChecksumType(ChecksumType
+ .nameToType(conf.get(HConstants.CHECKSUM_TYPE_NAME,
+ ChecksumType.getDefaultChecksumType.getName)))
+ .withCellComparator(CellComparator.getInstance())
+ .withBytesPerCheckSum(conf.getInt(HConstants.BYTES_PER_CHECKSUM,
+ HFile.DEFAULT_BYTES_PER_CHECKSUM))
.withBlockSize(familyOptions.blockSize)
if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
@@ -919,7 +928,7 @@ class HBaseContext(@transient val sc: SparkContext,
new WriterLength(0,
new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
.withBloomType(BloomType.valueOf(familyOptions.bloomType))
- .withComparator(CellComparator.getInstance()).withFileContext(hFileContext)
+ .withFileContext(hFileContext)
.withFilePath(new Path(familydir, "_" + UUID.randomUUID.toString.replaceAll("-", "")))
.withFavoredNodes(favoredNodes).build())
diff --git a/spark/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java b/spark/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
index 61ada1d9..1e35b240 100644
--- a/spark/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
+++ b/spark/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
@@ -95,8 +95,8 @@ public static void setUpBeforeClass() throws Exception {
LOG.info("starting minicluster");
- TEST_UTIL.startMiniZKCluster();
- TEST_UTIL.startMiniHBaseCluster(1, 1);
+ //TEST_UTIL.startMiniZKCluster();
+ TEST_UTIL.startMiniCluster();
LOG.info(" - minicluster started");
}
@@ -104,8 +104,8 @@ public static void setUpBeforeClass() throws Exception {
@AfterClass
public static void tearDownAfterClass() throws Exception {
LOG.info("shuting down minicluster");
- TEST_UTIL.shutdownMiniHBaseCluster();
- TEST_UTIL.shutdownMiniZKCluster();
+ //TEST_UTIL.shutdownMiniHBaseCluster();
+ TEST_UTIL.shutdownMiniCluster();
LOG.info(" - minicluster shut down");
TEST_UTIL.cleanupTestDir();
diff --git a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
index dc328f36..8a61ed4c 100644
--- a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
+++ b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
@@ -65,6 +65,15 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
sc.stop()
}
+ override def beforeEach() {
+ try {
+ TEST_UTIL.deleteTable(TableName.valueOf(tableName))
+ } catch {
+ case e: Exception =>
+ logInfo(" - no table " + tableName + " found")
+ }
+ }
+
test("Wide Row Bulk Load: Test multi family and multi column tests " +
"with all default HFile Configs.") {
val config = TEST_UTIL.getConfiguration
@@ -391,7 +400,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
for ( i <- 0 until f1FileList.length) {
val reader = HFile.createReader(fs, f1FileList(i).getPath,
new CacheConfig(config), true, config)
- assert(reader.getCompressionAlgorithm.getName.equals("gz"))
+ assert(reader.getFileContext.getCompression.getName.equals("gz"))
assert(reader.getDataBlockEncoding.name().equals("PREFIX"))
}
@@ -401,7 +410,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
for ( i <- 0 until f2FileList.length) {
val reader = HFile.createReader(fs, f2FileList(i).getPath,
new CacheConfig(config), true, config)
- assert(reader.getCompressionAlgorithm.getName.equals("none"))
+ assert(reader.getFileContext.getCompression.getName.equals("none"))
assert(reader.getDataBlockEncoding.name().equals("NONE"))
}
@@ -870,7 +879,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
for ( i <- 0 until f1FileList.length) {
val reader = HFile.createReader(fs, f1FileList(i).getPath,
new CacheConfig(config), true, config)
- assert(reader.getCompressionAlgorithm.getName.equals("gz"))
+ assert(reader.getFileContext.getCompression.getName.equals("gz"))
assert(reader.getDataBlockEncoding.name().equals("PREFIX"))
}
@@ -880,7 +889,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
for ( i <- 0 until f2FileList.length) {
val reader = HFile.createReader(fs, f2FileList(i).getPath,
new CacheConfig(config), true, config)
- assert(reader.getCompressionAlgorithm.getName.equals("none"))
+ assert(reader.getFileContext.getCompression.getName.equals("none"))
assert(reader.getDataBlockEncoding.name().equals("NONE"))
}
diff --git a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctionsSuite.scala b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctionsSuite.scala
index 9ea2c7fa..703910ea 100644
--- a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctionsSuite.scala
+++ b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctionsSuite.scala
@@ -55,7 +55,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
TEST_UTIL.deleteTable(TableName.valueOf(tableName))
logInfo("shuting down minicluster")
TEST_UTIL.shutdownMiniCluster()
-
+ TEST_UTIL.cleanupTestDir
sc.stop()
}
diff --git a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala
index 49600847..d1de359f 100644
--- a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala
+++ b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala
@@ -80,7 +80,7 @@ class PartitionFilterSuite extends FunSuite with
override def afterAll() {
logInfo("shutting down minicluster")
TEST_UTIL.shutdownMiniCluster()
-
+ TEST_UTIL.cleanupTestDir
sc.stop()
}
diff --git a/spark/pom.xml b/spark/pom.xml
index c65670c6..3d410b20 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -44,9 +44,8 @@
0.6.1
- 2.1.0
2.9.10
- 2.4.0
+ 2.4.8
2.11.12