Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
Expand Down Expand Up @@ -201,4 +202,9 @@ public TableDescriptor getDescriptor() throws IOException {
return null;
}

@Override
public RegionLocator getRegionLocator() {
return null;
}

}
12 changes: 6 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -129,17 +129,17 @@
<compileSource>1.8</compileSource>
<java.min.version>${compileSource}</java.min.version>
<maven.min.version>3.5.0</maven.min.version>
<hbase.version>2.2.2</hbase.version>
<hbase.version>2.4.9</hbase.version>
<exec.maven.version>1.6.0</exec.maven.version>
<audience-annotations.version>0.5.0</audience-annotations.version>
<junit.version>4.12</junit.version>
<hbase-thirdparty.version>2.2.1</hbase-thirdparty.version>
<hadoop-two.version>2.8.5</hadoop-two.version>
<hadoop-three.version>3.0.3</hadoop-three.version>
<hbase-thirdparty.version>4.0.1</hbase-thirdparty.version>
<hadoop-two.version>2.10.0</hadoop-two.version>
<hadoop-three.version>3.1.2</hadoop-three.version>
<hadoop.version>${hadoop-two.version}</hadoop.version>
<slf4j.version>1.7.25</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<checkstyle.version>8.18</checkstyle.version>
<checkstyle.version>8.28</checkstyle.version>
<maven.checkstyle.version>3.1.0</maven.checkstyle.version>
<surefire.version>3.0.0-M4</surefire.version>
<enforcer.version>3.0.0-M3</enforcer.version>
Expand All @@ -153,7 +153,7 @@
<commons-lang3.version>3.6</commons-lang3.version>
<!--This property is for hadoops netty. HBase netty
comes in via hbase-thirdparty hbase-shaded-netty-->
<netty.hadoop.version>3.6.2.Final</netty.hadoop.version>
<netty.hadoop.version>3.10.6.Final</netty.hadoop.version>
<os.maven.version>1.6.1</os.maven.version>
<glassfish.el.version>3.0.1-b08</glassfish.el.version>
<compat.module>hbase-hadoop2-compat</compat.module>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,34 @@ import java.net.InetSocketAddress
import java.util
import java.util.UUID
import javax.management.openmbean.KeyAlreadyExistsException

import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience
import org.apache.hadoop.hbase.fs.HFileSystem
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.io.compress.Compression
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
import org.apache.hadoop.hbase.io.hfile.{HFile, CacheConfig, HFileContextBuilder, HFileWriterImpl}
import org.apache.hadoop.hbase.regionserver.{HStore, HStoreFile, StoreFileWriter, BloomType}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.io.hfile.{CacheConfig, HFile, HFileContextBuilder, HFileWriterImpl}
import org.apache.hadoop.hbase.regionserver.{BloomType, HStoreFile, StoreFileWriter}
import org.apache.hadoop.hbase.util.{Bytes, ChecksumType}
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
import org.apache.hadoop.hbase.client._

import scala.reflect.ClassTag
import org.apache.spark.{SerializableWritable, SparkContext}
import org.apache.hadoop.hbase.mapreduce.{TableMapReduceUtil,
TableInputFormat, IdentityTableMapper}
import org.apache.hadoop.hbase.mapreduce.{IdentityTableMapper, TableInputFormat, TableMapReduceUtil}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.streaming.dstream.DStream

import java.io._
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
import org.apache.hadoop.fs.{Path, FileAlreadyExistsException, FileSystem}
import org.apache.hadoop.fs.{FileAlreadyExistsException, FileSystem, Path}

import scala.collection.mutable

/**
Expand Down Expand Up @@ -900,10 +901,18 @@ class HBaseContext(@transient val sc: SparkContext,

val tempConf = new Configuration(conf)
tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f)
// HBASE-25249 introduced an incompatible change in the IA.Private HStore and StoreUtils
// so here, we directly use conf.get for CheckSumType and BytesPerCheckSum to make it
// compatible between hbase 2.3.x and 2.4.x
val contextBuilder = new HFileContextBuilder()
.withCompression(Algorithm.valueOf(familyOptions.compression))
.withChecksumType(HStore.getChecksumType(conf))
.withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
// ChecksumType.nameToType is still an IA.Private Utils, but it's unlikely to be changed.
.withChecksumType(ChecksumType
.nameToType(conf.get(HConstants.CHECKSUM_TYPE_NAME,
ChecksumType.getDefaultChecksumType.getName)))
.withCellComparator(CellComparator.getInstance())
.withBytesPerCheckSum(conf.getInt(HConstants.BYTES_PER_CHECKSUM,
HFile.DEFAULT_BYTES_PER_CHECKSUM))
.withBlockSize(familyOptions.blockSize)

if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
Expand All @@ -919,7 +928,7 @@ class HBaseContext(@transient val sc: SparkContext,
new WriterLength(0,
new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
.withBloomType(BloomType.valueOf(familyOptions.bloomType))
.withComparator(CellComparator.getInstance()).withFileContext(hFileContext)
.withFileContext(hFileContext)
.withFilePath(new Path(familydir, "_" + UUID.randomUUID.toString.replaceAll("-", "")))
.withFavoredNodes(favoredNodes).build())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,17 @@ public static void setUpBeforeClass() throws Exception {

LOG.info("starting minicluster");

TEST_UTIL.startMiniZKCluster();
TEST_UTIL.startMiniHBaseCluster(1, 1);
//TEST_UTIL.startMiniZKCluster();
TEST_UTIL.startMiniCluster();

LOG.info(" - minicluster started");
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
LOG.info("shuting down minicluster");
TEST_UTIL.shutdownMiniHBaseCluster();
TEST_UTIL.shutdownMiniZKCluster();
//TEST_UTIL.shutdownMiniHBaseCluster();
TEST_UTIL.shutdownMiniCluster();
LOG.info(" - minicluster shut down");
TEST_UTIL.cleanupTestDir();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
sc.stop()
}

override def beforeEach() {
try {
TEST_UTIL.deleteTable(TableName.valueOf(tableName))
} catch {
case e: Exception =>
logInfo(" - no table " + tableName + " found")
}
}

test("Wide Row Bulk Load: Test multi family and multi column tests " +
"with all default HFile Configs.") {
val config = TEST_UTIL.getConfiguration
Expand Down Expand Up @@ -391,7 +400,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
for ( i <- 0 until f1FileList.length) {
val reader = HFile.createReader(fs, f1FileList(i).getPath,
new CacheConfig(config), true, config)
assert(reader.getCompressionAlgorithm.getName.equals("gz"))
assert(reader.getFileContext.getCompression.getName.equals("gz"))
assert(reader.getDataBlockEncoding.name().equals("PREFIX"))
}

Expand All @@ -401,7 +410,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
for ( i <- 0 until f2FileList.length) {
val reader = HFile.createReader(fs, f2FileList(i).getPath,
new CacheConfig(config), true, config)
assert(reader.getCompressionAlgorithm.getName.equals("none"))
assert(reader.getFileContext.getCompression.getName.equals("none"))
assert(reader.getDataBlockEncoding.name().equals("NONE"))
}

Expand Down Expand Up @@ -870,7 +879,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
for ( i <- 0 until f1FileList.length) {
val reader = HFile.createReader(fs, f1FileList(i).getPath,
new CacheConfig(config), true, config)
assert(reader.getCompressionAlgorithm.getName.equals("gz"))
assert(reader.getFileContext.getCompression.getName.equals("gz"))
assert(reader.getDataBlockEncoding.name().equals("PREFIX"))
}

Expand All @@ -880,7 +889,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
for ( i <- 0 until f2FileList.length) {
val reader = HFile.createReader(fs, f2FileList(i).getPath,
new CacheConfig(config), true, config)
assert(reader.getCompressionAlgorithm.getName.equals("none"))
assert(reader.getFileContext.getCompression.getName.equals("none"))
assert(reader.getDataBlockEncoding.name().equals("NONE"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
TEST_UTIL.deleteTable(TableName.valueOf(tableName))
logInfo("shuting down minicluster")
TEST_UTIL.shutdownMiniCluster()

TEST_UTIL.cleanupTestDir
sc.stop()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class PartitionFilterSuite extends FunSuite with
override def afterAll() {
logInfo("shutting down minicluster")
TEST_UTIL.shutdownMiniCluster()

TEST_UTIL.cleanupTestDir
sc.stop()
}

Expand Down
3 changes: 1 addition & 2 deletions spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@

<properties>
<protobuf.plugin.version>0.6.1</protobuf.plugin.version>
<hbase-thirdparty.version>2.1.0</hbase-thirdparty.version>
<jackson.version>2.9.10</jackson.version>
<spark.version>2.4.0</spark.version>
<spark.version>2.4.8</spark.version>
<!-- The following version is in sync with Spark's choice
Please take caution when this version is modified -->
<scala.version>2.11.12</scala.version>
Expand Down