Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #TG-360 - Fix the file merge issue in spark insight cluster #51

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package org.ekstep.analytics.framework.util

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hadoop.io.IOUtils

import scala.util.Try

class HadoopFileUtil {

Expand All @@ -27,16 +29,33 @@ class HadoopFileUtil {
}

}

/**
* Merge a hadoop source folder/file into another file
*/
* Merge a hadoop source folder/file into another file
*/
def copyMerge(srcPath: String, destPath: String, conf: Configuration, deleteSrc: Boolean) {

val srcFilePath = new Path(srcPath);
val destFilePath = new Path(destPath);

FileUtil.copyMerge(srcFilePath.getFileSystem(conf), srcFilePath, destFilePath.getFileSystem(conf), destFilePath, deleteSrc, conf, null)
copyMerge(srcFilePath.getFileSystem(conf), srcFilePath, destFilePath.getFileSystem(conf), destFilePath, deleteSrc, conf)
}

def copyMerge(srcFS: FileSystem, srcDir: Path, dstFS: FileSystem, dstFile: Path,
deleteSource: Boolean, conf: Configuration): Boolean = {

if (srcFS.exists(srcDir) && srcFS.getFileStatus(srcDir).isDirectory) {
val outputFile = dstFS.create(dstFile)
Try {
srcFS.listStatus(srcDir).sortBy(_.getPath.getName)
.collect {
case status if status.isFile() =>
val inputFile = srcFS.open(status.getPath())
Try(IOUtils.copyBytes(inputFile, outputFile, conf, false))
inputFile.close()
}
}
outputFile.close()
if (deleteSource) srcFS.delete(srcDir, true) else true
} else false
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ import org.joda.time.DateTime
import java.util.Date
import java.text.SimpleDateFormat

import org.apache.hadoop.fs.Path

import scala.collection.mutable.ListBuffer
import org.joda.time.format.DateTimeFormat
import org.ekstep.analytics.framework.Period._
import org.apache.spark.sql.Encoders
import org.ekstep.analytics.framework.util.DatasetUtil.extensions
import org.apache.hadoop.fs.azure.AzureException
import org.apache.hadoop.fs.s3.S3Exception
import org.apache.spark.sql.functions.col

class TestDatasetUtil extends BaseSpec {

Expand Down Expand Up @@ -58,4 +61,32 @@ class TestDatasetUtil extends BaseSpec {

sparkSession.stop();
}

"DatasetUtil" should "test the dataset copy functionality" in {

val fileUtil = new HadoopFileUtil();
val sparkSession = CommonUtil.getSparkSession(1, "TestDatasetUtil", None, None, None);
val rdd = sparkSession.sparkContext.parallelize(Seq(EnvSummary("env1", 22.1, 3), EnvSummary("env2", 20.1, 3), EnvSummary("env1", 32.1, 4)), 1);

val tempDir = "src/test/resources/test-report/_tmp"

val partitioningColumns = Option(Seq("env"));
val dims = partitioningColumns.getOrElse(Seq());
val options = Option(Map("header" -> "true"))
val df = sparkSession.createDataFrame(rdd);
val conf = sparkSession.sparkContext.hadoopConfiguration
val filePrefix = ""
val format = "csv"
val srcFS=new Path("src/test/resources/test-report/_tmp/env=env1")
val srcDir = srcFS.getFileSystem(conf)
fileUtil.delete(sparkSession.sparkContext.hadoopConfiguration, "" + tempDir)
val opts = options.getOrElse(Map());
df.coalesce(1).write.format(format).options(opts).partitionBy(dims: _*).save(filePrefix + tempDir);
fileUtil.copyMerge("" + "src/test/resources/test-report/_tmp/env=env1", "src/test/resources/test-report/env2.csv", sparkSession.sparkContext.hadoopConfiguration, false);
srcDir.delete(new Path("src/test/resources/test-report/_tmp/env=env1"), true)
fileUtil.delete(sparkSession.sparkContext.hadoopConfiguration, "src/test/resources/test-report", "src/test/resources/test-report2", "src/test/resources/test-report2.csv");
fileUtil.copyMerge("" + "src/test/resources/test-report/_tmp/env=env1", "src/test/resources/test-report/env2.csv", sparkSession.sparkContext.hadoopConfiguration, false);
sparkSession.stop();

}
}