Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,33 +22,36 @@ import java.util
import java.util.UUID
import javax.management.openmbean.KeyAlreadyExistsException

import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience
import org.apache.hadoop.hbase.fs.HFileSystem
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.io.compress.Compression
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
import org.apache.hadoop.hbase.io.hfile.{HFile, CacheConfig, HFileContextBuilder, HFileWriterImpl}
import org.apache.hadoop.hbase.regionserver.{HStore, HStoreFile, StoreFileWriter, BloomType}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.util.{Bytes, RegionSplitter}
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
import org.apache.hadoop.hbase.client._

import scala.reflect.ClassTag
import org.apache.spark.{SerializableWritable, SparkContext}
import org.apache.hadoop.hbase.mapreduce.{TableMapReduceUtil,
TableInputFormat, IdentityTableMapper}
TableInputFormat, IdentityTableMapper, TableSnapshotInputFormat}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.streaming.dstream.DStream
import java.io._

import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
import org.apache.hadoop.fs.{Path, FileAlreadyExistsException, FileSystem}
import org.apache.hadoop.fs.{FileAlreadyExistsException, FileSystem, Path}

import scala.collection.mutable

/**
Expand Down Expand Up @@ -463,6 +466,81 @@ class HBaseContext(@transient val sc: SparkContext,
(r: (ImmutableBytesWritable, Result)) => r)
}

/**
*
* @param snapshotName the name of the snapshot to scan
* @param scans the HBase scan object to use to read data from HBase
* @param restoreDir a temporary directory to restore the snapshot into. Current user should
* have write permissions to this directory,
* and this should not be a subdirectory of rootdir.
* After the job is finished, restoreDir can be deleted.
* @return New RDD with results from scan
*/

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very nice feature. Do we have any tests though?

def hbaseRDDForSnapshot(snapshotName: String, scans: Scan, restoreDir: String):
RDD[(ImmutableBytesWritable, Result)]
= hbaseRDDForSnapshot(snapshotName, scans, restoreDir, null, 1)

/**
* This function will use the native HBase TableSnapshotInputFormat with the
* given scan object to generate a new RDD
*
* @param snapshotName the name of the snapshot to scan
* @param scans the HBase scan object to use to read data from HBase
* @param restoreDir a temporary directory to restore the snapshot into. Current user should
* have write permissions to this directory,
* and this should not be a subdirectory of rootdir.
* After the job is finished, restoreDir can be deleted.
* @param splitAlgo SplitAlgorithm to be used when generating InputSplits
* @param numSplitsPerRegion how many input splits to generate per one region
* @return New RDD with results from scan
*/
def hbaseRDDForSnapshot(snapshotName: String, scans: Scan, restoreDir: String,
splitAlgo: RegionSplitter.SplitAlgorithm, numSplitsPerRegion: Int):
RDD[(ImmutableBytesWritable, Result)] = {
hbaseRDDForSnapshot[(ImmutableBytesWritable, Result)](
snapshotName,
scans,
restoreDir,
splitAlgo,
numSplitsPerRegion,
(r: (ImmutableBytesWritable, Result)) => r)
}

/**
* This function will use the native HBase TableSnapshotInputFormat with the
* given scan object to generate a new RDD
*
* @param snapshotName the name of the snapshot to scan
* @param scans the HBase scan object to use to read data from HBase
* @param restoreDir a temporary directory to restore the snapshot into. Current user should
* have write permissions to this directory,
* and this should not be a subdirectory of rootdir.
* After the job is finished, restoreDir can be deleted.
* @param splitAlgo SplitAlgorithm to be used when generating InputSplits
* @param numSplitsPerRegion how many input splits to generate per one region
* @param f function to convert a Result object from HBase into
* what the user wants in the final generated RDD
* @return new RDD with results from scan
*/
def hbaseRDDForSnapshot[U: ClassTag](snapshotName: String, scans: Scan, restoreDir: String,
splitAlgo: RegionSplitter.SplitAlgorithm, numSplitsPerRegion: Int,
f: ((ImmutableBytesWritable, Result)) => U): RDD[U] = {
val job: Job = Job.getInstance(getConf(broadcastedConf))

TableMapReduceUtil.initCredentials(job)
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scans,
classOf[IdentityTableMapper], null, null, job, true, new Path(restoreDir), splitAlgo, numSplitsPerRegion)

val jconf = new JobConf(job.getConfiguration)
SparkHadoopUtil.get.addCredentials(jconf)
new NewHBaseRDD(sc,
classOf[TableSnapshotInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result],
job.getConfiguration,
this).map(f)
}

/**
* underlining wrapper all foreach functions in HBaseContext
*/
Expand Down