Skip to content

Commit

Permalink
Merge pull request #62 from RevathiKotla/release-3.3.5
Browse files Browse the repository at this point in the history
Issue #TG:620- Fix the actor issue
  • Loading branch information
sowmya-dixit authored Oct 13, 2020
2 parents 60a25e5 + a512d07 commit 4570d0e
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,11 @@ class FrameworkContext {
storageContainers.foreach(f => f._2.closeContext());
}
}
def shutDownDruidActor() ={
DruidDataFetcher.system.terminate()
}

def closeContext() = {
shutdownDruidClient();
shutdownDruidRollUpClien();
shutdownStorageService();
shutDownDruidActor()
}

def loadData(spark: SparkSession, settings: scala.collection.Map[String, String], url: String, schema: StructType): DataFrame = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ object AkkaHttpUtil extends AkkaHttpClient {
}

object DruidDataFetcher {
implicit val system = ActorSystem("ExecuteQuery")
@throws(classOf[DataFetcherException])
def getDruidData(query: DruidQueryModel, queryAsStream: Boolean = false)(implicit sc: SparkContext, fc: FrameworkContext): RDD[String] = {
val request = getDruidQuery(query)
Expand Down Expand Up @@ -136,6 +135,11 @@ object DruidDataFetcher {


def executeQueryAsStream(model: DruidQueryModel, query: DruidNativeQuery)(implicit sc: SparkContext, fc: FrameworkContext): RDD[String] = {

implicit val system = if (query.dataSource.contains("rollup") || query.dataSource.contains("distinct"))
fc.getDruidRollUpClient().actorSystem
else
fc.getDruidClient().actorSystem
implicit val materializer = ActorMaterializer()

val response =
Expand All @@ -160,7 +164,7 @@ object DruidDataFetcher {

val druidQuery = getSQLDruidQuery(model)
fc.inputEventsCount = sc.longAccumulator("DruidDataCount")

implicit val system = fc.getDruidRollUpClient().actorSystem
implicit val materializer = ActorMaterializer()
implicit val ec: ExecutionContextExecutor = system.dispatcher
val url = String.format("%s://%s:%s%s%s", "http", AppConf.getConfig("druid.rollup.host"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ class TestDruidDataFetcher extends SparkSpec with Matchers with MockFactory {
implicit val mockFc = mock[FrameworkContext];
implicit val druidConfig = mock[DruidConfig];
val mockDruidClient = mock[DruidClient]
(mockDruidClient.actorSystem _).expects().returning(ActorSystem("TestQuery")).anyNumberOfTimes()
(mockDruidClient.doQueryAsStream(_:DruidQuery)(_:DruidConfig)).expects(druidQuery, *).returns(Source(List(druidResponse))).anyNumberOfTimes()
(mockFc.getDruidClient: () => DruidClient).expects().returns(mockDruidClient).anyNumberOfTimes();

Expand All @@ -568,6 +569,7 @@ class TestDruidDataFetcher extends SparkSpec with Matchers with MockFactory {
implicit val mockFc = mock[FrameworkContext];
implicit val druidConfig = mock[DruidConfig];
val mockDruidClient = mock[DruidClient]
(mockDruidClient.actorSystem _).expects().returning(ActorSystem("TestQuery")).anyNumberOfTimes()
(mockDruidClient.doQueryAsStream(_:DruidQuery)(_:DruidConfig)).expects(druidQuery, *).returns(Source(List(druidResponse))).anyNumberOfTimes()
(mockFc.getDruidClient: () => DruidClient).expects().returns(mockDruidClient).anyNumberOfTimes();

Expand Down Expand Up @@ -595,6 +597,7 @@ class TestDruidDataFetcher extends SparkSpec with Matchers with MockFactory {
implicit val mockFc = mock[FrameworkContext];
implicit val druidConfig = mock[DruidConfig];
val mockDruidClient = mock[DruidClient]
(mockDruidClient.actorSystem _).expects().returning(ActorSystem("TestQuery")).anyNumberOfTimes()
(mockDruidClient.doQueryAsStream(_:DruidQuery)(_:DruidConfig)).expects(druidQuery, *).returns(Source(List(druidResponse,druidResponse1,druidResponse2))).anyNumberOfTimes()
(mockFc.getDruidRollUpClient: () => DruidClient).expects().returns(mockDruidClient).anyNumberOfTimes();

Expand All @@ -618,6 +621,7 @@ class TestDruidDataFetcher extends SparkSpec with Matchers with MockFactory {
implicit val mockFc = mock[FrameworkContext]
implicit val druidConfig = mock[DruidConfig]
val mockDruidClient = mock[DruidClient]
(mockDruidClient.actorSystem _).expects().returning(ActorSystem("TestQuery")).anyNumberOfTimes()
(mockDruidClient.doQuery[DruidResponse](_:DruidNativeQuery)(_:DruidConfig)).expects(druidQuery, *).returns(Future(druidResponse)).anyNumberOfTimes()
(mockFc.getDruidClient: () => DruidClient).expects().returns(mockDruidClient).anyNumberOfTimes();

Expand All @@ -639,12 +643,10 @@ class TestDruidDataFetcher extends SparkSpec with Matchers with MockFactory {
}
"""
val doc: Json = parse(json).getOrElse(Json.Null);
//val results = List(DruidResult.apply(Some(ZonedDateTime.of(2019, 11, 28, 17, 0, 0, 0, ZoneOffset.UTC)), doc));
//val druidResponse = DruidResult.apply(Some(ZonedDateTime.of(2019, 11, 28, 17, 0, 0, 0, ZoneOffset.UTC)), doc)

implicit val mockFc = mock[FrameworkContext];
implicit val druidConfig = mock[DruidConfig];
val mockDruidClient = mock[DruidClient]
(mockDruidClient.actorSystem _).expects().returning(ActorSystem("TestQuery")).anyNumberOfTimes()
(mockDruidClient.doQueryAsStream(_:DruidQuery)(_:DruidConfig)).expects(druidQuery, *).returns(Source(List())).anyNumberOfTimes()
(mockFc.getDruidClient: () => DruidClient).expects().returns(mockDruidClient).anyNumberOfTimes();

Expand All @@ -660,7 +662,7 @@ class TestDruidDataFetcher extends SparkSpec with Matchers with MockFactory {
DruidSQLDimension("dimensions_pdata_id",None))),None)


implicit val fc = new FrameworkContext
implicit val mockFc = mock[FrameworkContext];
implicit val druidConfig = mock[DruidConfig];


Expand All @@ -674,14 +676,14 @@ class TestDruidDataFetcher extends SparkSpec with Matchers with MockFactory {
"""{"dimensions_pdata_id":"", "state":10}
{"dimensions_pdata_id":null, "state":5}
|{"dimensions_pdata_id":"dev.portal", "state":5}""".stripMargin

val mockDruidClient = mock[DruidClient]
(mockDruidClient.actorSystem _).expects().returning(ActorSystem("TestQuery")).anyNumberOfTimes()
(mockFc.getDruidRollUpClient: () => DruidClient).expects().returns(mockDruidClient).anyNumberOfTimes();
(mockAKkaUtil.sendRequest(_: HttpRequest)(_: ActorSystem))
.expects(request,DruidDataFetcher.system)
.expects(request,mockDruidClient.actorSystem)
.returns(Future.successful(HttpResponse(entity = HttpEntity(ByteString(stripString))))).anyNumberOfTimes();
val response = DruidDataFetcher.executeSQLQuery(sqlQuery, mockAKkaUtil)
response.count() should be (3)
fc.inputEventsCount.value should be (3)

}

"DruidDataFetcher" should "verify DruidOutput operations" in {
Expand Down

0 comments on commit 4570d0e

Please sign in to comment.