Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Api integration #135

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ lazy val root = (project in file("."))
scalacheck,
scalacheckToolboxDatetime,
scalacheckToolboxMagic,
scalacheckToolboxCombinators
scalacheckToolboxCombinators,
sttp,
asyncSttp
),
fork in Test := true,
parallelExecution in Test := false,
Expand Down
2 changes: 2 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ object Dependencies {
lazy val scalacheckToolboxMagic = "com.47deg" %% "scalacheck-toolbox-magic" % "0.3.5" % Test
lazy val scalacheckToolboxCombinators = "com.47deg" %% "scalacheck-toolbox-combinators" % "0.3.5" % Test
lazy val spire = "org.typelevel" %% "spire" % "0.14.1"
lazy val sttp = "com.softwaremill.sttp.client3" %% "core" % "3.3.4"
lazy val asyncSttp = "com.softwaremill.sttp.client3" %% "async-http-client-backend-future" % "3.3.4"
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.github.timgent.dataflare.checkssuite

import java.time.Instant

import cats.implicits._
import com.github.timgent.dataflare.FlareError.MetricCalculationError
import com.github.timgent.dataflare.checks.ArbDualDsCheck.DatasetPair
Expand All @@ -10,6 +9,7 @@ import com.github.timgent.dataflare.checks.QCCheck.{DualDsQCCheck, SingleDsCheck
import com.github.timgent.dataflare.checks._
import com.github.timgent.dataflare.checks.metrics.{DualMetricCheck, SingleMetricCheck}
import com.github.timgent.dataflare.metrics.{MetricDescriptor, MetricValue, MetricsCalculator}
import com.github.timgent.dataflare.repository.QcResultsRepoErr.QcResultsRepoException
import com.github.timgent.dataflare.repository.{MetricsPersister, NullMetricsPersister, NullQcResultsRepository, QcResultsRepository}
import org.apache.spark.sql.Dataset

Expand Down Expand Up @@ -37,6 +37,10 @@ case class DescribedDsPair(ds: DescribedDs, dsToCompare: DescribedDs) {
private[dataflare] def rawDatasetPair = DatasetPair(ds.ds, dsToCompare.ds)
}

trait ChecksSuiteErr {
def throwErr: Nothing
}

/**
* Main entry point which contains the suite of checks you want to perform
* @param checkSuiteDescription - description of the check suite
Expand Down Expand Up @@ -91,17 +95,46 @@ case class ChecksSuite(
* @param ec - execution context
* @return
*/
def runBlocking(timestamp: Instant, timeout: Duration = 1 minute)(implicit ec: ExecutionContext) =
@deprecated("Will be replaced by runBlockingV2 which surfaces errors", "July 2021")
def runBlocking(timestamp: Instant, timeout: Duration = 1 minute)(implicit ec: ExecutionContext): ChecksSuiteResult =
Await.result(run(timestamp), timeout)

/**
* Run all checks in the ChecksSuite and waits for computations to finish before returning (blocking the thread)
*
* @param timestamp - time the checks are being run
* @param ec - execution context
* @return either an error or the ChecksSuiteResult
*/
def runBlockingV2(timestamp: Instant, timeout: Duration = 1 minute)(implicit
ec: ExecutionContext
): Either[ChecksSuiteErr, ChecksSuiteResult] =
Await.result(runV2(timestamp), timeout)

/**
* Run all checks in the ChecksSuite asynchronously, returning a Future
*
* @param timestamp - time the checks are being run
* @param ec - execution context
* @return
*/
@deprecated("Will be replaced by runV2 which surfaces errors in the return type", "July 2021")
def run(timestamp: Instant)(implicit ec: ExecutionContext): Future[ChecksSuiteResult] = {
runV2(timestamp).map {
case Left(err) => err.throwErr
case Right(checkSuiteResult) => checkSuiteResult
}
}

/**
* Run all checks in the ChecksSuite asynchronously, returning a Future with either an
* error of the ChecksSuiteResult
*
* @param timestamp - time the checks are being run
* @param ec - execution context
* @return
*/
def runV2(timestamp: Instant)(implicit ec: ExecutionContext): Future[Either[ChecksSuiteErr, ChecksSuiteResult]] = {
val metricBasedCheckResultsFut: Future[Seq[CheckResult]] = runMetricBasedChecks(timestamp)
val singleDatasetCheckResults: Seq[CheckResult] = for {
(dds, checks) <- arbSingleDsChecks.toSeq
Expand All @@ -127,9 +160,9 @@ case class ChecksSuite(
timestamp = timestamp,
tags
)
_ <- qcResultsRepository.save(checkSuiteResult)
maybeSavedCheckSuiteResult <- qcResultsRepository.saveV2(checkSuiteResult)
} yield {
checkSuiteResult
maybeSavedCheckSuiteResult
}
}

Expand Down
29 changes: 18 additions & 11 deletions src/main/scala/com/github/timgent/dataflare/examples/Example.scala
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
package com.github.timgent.dataflare.examples

import java.time.{LocalDateTime, ZoneOffset}

import cats.implicits._
import com.github.timgent.dataflare.checks.metrics.{DualMetricCheck, SingleMetricCheck}
import com.github.timgent.dataflare.checks.{ArbSingleDsCheck, CheckStatus, RawCheckResult}
import com.github.timgent.dataflare.checkssuite._
import com.github.timgent.dataflare.examples.Day1Checks.qcResults
import com.github.timgent.dataflare.examples.ExampleHelpers.{Customer, Order, _}
import com.github.timgent.dataflare.metrics.MetricDescriptor.{CountDistinctValuesMetric, SizeMetric}
import com.github.timgent.dataflare.metrics.{ComplianceFn, MetricComparator, MetricFilter}
import com.github.timgent.dataflare.repository.{ElasticSearchMetricsPersister, ElasticSearchQcResultsRepository}
import com.github.timgent.dataflare.metrics.{ComplianceFn, MetricComparator}
import com.github.timgent.dataflare.repository.{DfApiQcResultsRepository, ElasticSearchMetricsPersister, ElasticSearchQcResultsRepository}
import com.github.timgent.dataflare.thresholds.AbsoluteThreshold
import com.github.timgent.dataflare.utils.DateTimeUtils.InstantExtension
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import sttp.client3.UriContext

import java.time.{LocalDateTime, ZoneOffset}
import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.sys.exit

object ExampleHelpers {
val sparkConf = new SparkConf().setAppName("SparkDataQualityExample").setMaster("local")
Expand Down Expand Up @@ -131,6 +131,7 @@ object Helpers {
ElasticSearchQcResultsRepository(List("http://127.0.0.1:9200"), "orders_qc_results")
val esMetricsPersister =
ElasticSearchMetricsPersister(List("http://127.0.0.1:9200"), "order_metrics")
val apiQcResultsRepository = new DfApiQcResultsRepository(uri"http://127.0.0.1:8080")

def getCheckSuite(
orderDs: DescribedDs,
Expand Down Expand Up @@ -185,7 +186,7 @@ object Helpers {
singleDsChecks = singleDsChecks |+| Map(customerDs -> List(expectedCustomerColumnsCheck)),
dualDsChecks = dualDsMetricChecks,
metricsPersister = esMetricsPersister,
qcResultsRepository = qcResultsRepository
qcResultsRepository = apiQcResultsRepository
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe make the example so it's easy to switch between the 2?

)

checksSuite
Expand All @@ -199,12 +200,14 @@ object Day1Checks extends App {
val checksSuite = Helpers.getCheckSuite(orderDs, customerDs, customersWithOrdersDs)

val allQcResultsFuture = checksSuite.run(monday)
val qcResults = Await.result(allQcResultsFuture, 10 seconds)
val allQcResults = Await.result(allQcResultsFuture, 10 seconds)

if (qcResults.overallStatus == CheckSuiteStatus.Success)
if (allQcResults.overallStatus == CheckSuiteStatus.Success)
println("All checks completed successfully!!")
else
println("Checks failed :(")

exit(0)
}

object Day2Checks extends App {
Expand All @@ -216,10 +219,12 @@ object Day2Checks extends App {
val allQcResultsFuture = checksSuite.run(tuesday)
val allQcResults = Await.result(allQcResultsFuture, 10 seconds)

if (qcResults.overallStatus == CheckSuiteStatus.Success)
if (allQcResults.overallStatus == CheckSuiteStatus.Success)
println("All checks completed successfully!!")
else
println("Checks failed :(")

exit(0)
}

object Day3Checks extends App {
Expand All @@ -231,8 +236,10 @@ object Day3Checks extends App {
val allQcResultsFuture = checksSuite.run(wednesday)
val allQcResults = Await.result(allQcResultsFuture, 10 seconds)

if (qcResults.overallStatus == CheckSuiteStatus.Success)
if (allQcResults.overallStatus == CheckSuiteStatus.Success)
println("All checks completed successfully!!")
else
println("Checks failed :(")

exit(0)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.github.timgent.dataflare.repository

import cats.implicits._
import com.github.timgent.dataflare.checkssuite.ChecksSuiteResult
import com.github.timgent.dataflare.json.CustomEncodings.{checksSuiteResultDecoder, checksSuiteResultEncoder}
import com.github.timgent.dataflare.repository.QcResultsRepoErr.{LoadQcResultErr, SaveQcResultErr}
import io.circe.parser._
import io.circe.syntax._
import sttp.client3._
import sttp.client3.asynchttpclient.future.AsyncHttpClientFutureBackend
import sttp.model.Uri

import scala.concurrent.{ExecutionContext, Future}
class DfApiQcResultsRepository(host: Uri)(implicit val ec: ExecutionContext) extends QcResultsRepository {

private lazy val backend = AsyncHttpClientFutureBackend()

/**
* Save Quality Check results to some repository
*
* @param qcResults A list of results
* @return A Future of Unit
*/
override def saveV2(qcResults: List[ChecksSuiteResult]): Future[List[Either[QcResultsRepoErr, ChecksSuiteResult]]] = {
qcResults.traverse(qcResult =>
basicRequest
.contentType("application/json")
.body(qcResult.asJson.noSpaces)
.post(host.addPath("qcresults"))
.send(backend)
.map { res =>
val mapped: Either[SaveQcResultErr, ChecksSuiteResult] = res.body match {
case Left(err) => Left(SaveQcResultErr(err))
case Right(_) => Right(qcResult)
}
mapped
}
)
}

/**
* Load all check results in the repository
*
* @return
*/
override def loadAll: Future[Either[LoadQcResultErr, List[ChecksSuiteResult]]] =
basicRequest
.contentType("application/json")
.get(host.addPath("qcresults"))
.send(backend)
.map { response =>
for {
bodyStr <- response.body.leftMap(err => LoadQcResultErr("Received an unsuccessful response from the API: " + err, None))
bodyJson <-
parse(bodyStr).leftMap(err => LoadQcResultErr("Response json was not valid JSON: " + err.message, Some(err.underlying)))
deserializedBody <-
bodyJson
.as[List[ChecksSuiteResult]]
.leftMap(err => LoadQcResultErr("Response JSON could not be deserialized: " + err.message, None))
} yield deserializedBody
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package com.github.timgent.dataflare.repository

import com.github.timgent.dataflare.checkssuite.ChecksSuiteResult
import com.github.timgent.dataflare.json.CustomEncodings.{checksSuiteResultDecoder, checksSuiteResultEncoder}
import com.github.timgent.dataflare.repository.QcResultsRepoErr.{LoadQcResultErr, SaveQcResultErr}
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.circe._
import com.sksamuel.elastic4s.http.JavaClient
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties, Index}
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties, Index, RequestFailure, RequestSuccess}

import scala.concurrent.{ExecutionContext, Future}

Expand All @@ -15,24 +16,25 @@ import scala.concurrent.{ExecutionContext, Future}
* @param index - the name of the index to save QC results to
* @param ec - the execution context
*/
class ElasticSearchQcResultsRepository(client: ElasticClient, index: Index)(implicit
ec: ExecutionContext
) extends QcResultsRepository {
override def save(qcResults: List[ChecksSuiteResult]): Future[Unit] = {
class ElasticSearchQcResultsRepository(client: ElasticClient, index: Index)(implicit val ec: ExecutionContext) extends QcResultsRepository {
override def saveV2(qcResults: List[ChecksSuiteResult]): Future[List[Either[QcResultsRepoErr, ChecksSuiteResult]]] = {
client
.execute {
bulk(
qcResults.map(indexInto(index).doc(_))
)
}
.map(_ => {})
.map {
case RequestSuccess(status, body, headers, result) => qcResults.map(Right(_))
case RequestFailure(status, body, headers, error) => List(Left(SaveQcResultErr(error.reason)))
}
}

override def loadAll: Future[List[ChecksSuiteResult]] = {
override def loadAll: Future[Either[LoadQcResultErr, List[ChecksSuiteResult]]] = {
val resp = client.execute {
search(index) query matchAllQuery
}
resp.map(_.result.hits.hits.map(_.to[ChecksSuiteResult]).toList)
resp.map(response => Right(response.result.hits.hits.map(_.to[ChecksSuiteResult]).toList))
}
}

Expand Down
Loading