From 5b23934909d692e35034b9f39893050e80a96db3 Mon Sep 17 00:00:00 2001 From: SowmyaDixit Date: Wed, 9 Jun 2021 15:44:02 +0530 Subject: [PATCH] Issue #TG-955 feat: Handle throttling of data exhaust requests - framework changes --- .../framework/util/HadoopFileUtil.scala | 20 +++++++++++++++++++ .../framework/util/TestDatasetUtil.scala | 10 ++++++++++ 2 files changed, 30 insertions(+) diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/HadoopFileUtil.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/HadoopFileUtil.scala index 29c64f54..85fde818 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/HadoopFileUtil.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/HadoopFileUtil.scala @@ -66,4 +66,24 @@ class HadoopFileUtil { if (deleteSource) srcFS.delete(srcDir, true) else true } else false } + + /** + * Get a hadoop source folder/file size in bytes + */ + def size(file: String, conf: Configuration) : Long = { + + val path = new Path(file); + path.getFileSystem(conf).getContentSummary(path).getLength + } + + /** + * Get size in bytes for multiple files. + */ + def size(conf: Configuration, files: String*) : Seq[(String, Long)] = { + + for(file <- files) yield { + val path = new Path(file); + (file, path.getFileSystem(conf).getContentSummary(path).getLength) + } + } } \ No newline at end of file diff --git a/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestDatasetUtil.scala b/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestDatasetUtil.scala index b537596d..39ee327d 100644 --- a/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestDatasetUtil.scala +++ b/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestDatasetUtil.scala @@ -29,6 +29,16 @@ class TestDatasetUtil extends BaseSpec with Matchers with MockFactory { rdd3.head should be ("env1,22.1,3") rdd3.last should be ("env1,32.1,4") + // get file size + val size = fileUtil.size("src/test/resources/test-report2.csv", sparkSession.sparkContext.hadoopConfiguration) + size should be (36) + + // get multiple files size + val filesWithSize = fileUtil.size(sparkSession.sparkContext.hadoopConfiguration, "src/test/resources/test-report/env1.csv", "src/test/resources/test-report/env2.csv", "src/test/resources/test-report2.csv") + filesWithSize.size should be (3) + filesWithSize.head._1 should be ("src/test/resources/test-report/env1.csv") + filesWithSize.head._2 should be (31) + val rdd1 = sparkSession.sparkContext.parallelize(Seq(DruidSummary("2020-01-11","env1", 22.1, 3), DruidSummary("2020-01-11","env2", 20.1, 3)), 1) val df1 = sparkSession.createDataFrame(rdd1);