Skip to content

Commit

Permalink
Make JSON utilities available within data-flare ecosystem
Browse files Browse the repository at this point in the history
  • Loading branch information
timgent committed Apr 30, 2021
1 parent 8e0f671 commit 1a089a4
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 170 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ lazy val root = (project in file("."))
elastic4sTestKit,
elastic4sCirceJson,
enumeratum,
enumeratumCirce,
cats,
spire,
scalacheck,
Expand Down
1 change: 1 addition & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ object Dependencies {
lazy val elastic4sTestKit = "com.sksamuel.elastic4s" %% "elastic4s-testkit" % elastic4sVersion % "test"
lazy val elastic4sCirceJson = "com.sksamuel.elastic4s" %% "elastic4s-json-circe" % elastic4sVersion
lazy val enumeratum = "com.beachape" %% "enumeratum" % "1.5.15"
lazy val enumeratumCirce = "com.beachape" %% "enumeratum-circe" % "1.5.15"
lazy val cats = "org.typelevel" %% "cats-core" % "2.0.0"
lazy val scalacheck = "org.scalatestplus" %% "scalatestplus-scalacheck" % "3.1.0.0-RC2" % Test
lazy val scalacheckToolboxDatetime = "com.47deg" %% "scalacheck-toolbox-datetime" % "0.3.5" % Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ object ChecksSuiteResult {
*/
sealed trait CheckSuiteStatus extends EnumEntry

object CheckSuiteStatus extends Enum[CheckSuiteStatus] {
object CheckSuiteStatus extends Enum[CheckSuiteStatus] with CirceEnum[CheckSuiteStatus] {
val values = findValues
case object Success extends CheckSuiteStatus
case object Warning extends CheckSuiteStatus
Expand Down
151 changes: 147 additions & 4 deletions src/main/scala/com/github/timgent/dataflare/json/CustomEncodings.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,157 @@
package com.github.timgent.dataflare.json

import com.github.timgent.dataflare.checks.DatasourceDescription
import com.github.timgent.dataflare.checks.CheckDescription.{
DualMetricCheckDescription,
SimpleCheckDescription,
SingleMetricCheckDescription
}
import com.github.timgent.dataflare.checks.DatasourceDescription.{DualDsDescription, OtherDsDescription, SingleDsDescription}
import com.github.timgent.dataflare.checks._
import com.github.timgent.dataflare.checkssuite.{CheckSuiteStatus, ChecksSuiteResult}
import com.github.timgent.dataflare.metrics.SimpleMetricDescriptor
import com.github.timgent.dataflare.repository.CommonEncoders.{metricDescriptorDecoder, metricDescriptorEncoder}
import io.circe.Decoder.Result
import io.circe.generic.semiauto.deriveEncoder
import io.circe.syntax.EncoderOps
import io.circe.{Decoder, Encoder, HCursor, Json}
import cats.syntax.either._

import java.time.Instant

private[dataflare] object CustomEncodings {
implicit val singleDsDescriptionEncoder: Encoder[SingleDsDescription] = new Encoder[SingleDsDescription] {
override def apply(a: SingleDsDescription): Json = ???
implicit val qcTypeEncoder: Encoder[QcType] = new Encoder[QcType] {
override def apply(a: QcType): Json = Json.fromString(a.toString)
}
implicit val checkStatusEncoder: Encoder[CheckStatus] = new Encoder[CheckStatus] {
override def apply(a: CheckStatus): Json = Json.fromString(a.toString)
}
implicit val datasourceDescriptionEncoder: Encoder[DatasourceDescription] = new Encoder[DatasourceDescription] {
override def apply(a: DatasourceDescription): Json = {
val fields = a match {
case SingleDsDescription(datasource) =>
Seq(
"type" -> Json.fromString("SingleDs"),
"datasource" -> Json.fromString(datasource)
)
case DatasourceDescription.DualDsDescription(datasourceA, datasourceB) =>
Seq(
"type" -> Json.fromString("DualDs"),
"datasourceA" -> Json.fromString(datasourceA),
"datasourceB" -> Json.fromString(datasourceB)
)
case DatasourceDescription.OtherDsDescription(datasource) =>
Seq(
"type" -> Json.fromString("OtherDs"),
"datasource" -> Json.fromString(datasource)
)
}
Json.obj(fields: _*)
}
}
implicit val datasourceDescriptionDecoder: Decoder[DatasourceDescription] = new Decoder[DatasourceDescription] {
override def apply(c: HCursor): Result[DatasourceDescription] =
for {
datasourceType <- c.downField("type").as[String]
datasourceDescription <- datasourceType match {
case "SingleDs" =>
for {
datasource <- c.downField("datasource").as[String]
} yield SingleDsDescription(datasource)
case "DualDs" =>
for {
datasourceA <- c.downField("datasourceA").as[String]
datasourceB <- c.downField("datasourceB").as[String]
} yield DualDsDescription(datasourceA, datasourceB)
case "OtherDs" =>
for {
datasource <- c.downField("datasource").as[String]
} yield OtherDsDescription(datasource)
}
} yield datasourceDescription
}
implicit val checkDescriptionEncoder: Encoder[CheckDescription] = new Encoder[CheckDescription] {
override def apply(a: CheckDescription): Json = {
val fields = a match {
case CheckDescription.SimpleCheckDescription(desc) =>
Seq(
"type" -> Json.fromString("SimpleCheckDescription"),
"desc" -> Json.fromString(desc)
)
case CheckDescription.DualMetricCheckDescription(desc, dsMetric, dsToCompareMetric, metricComparator) =>
Seq(
"type" -> Json.fromString("DualMetricCheckDescription"),
"desc" -> Json.fromString(desc),
"dsMetric" -> dsMetric.asJson,
"dsToCompareMetric" -> dsToCompareMetric.asJson,
"metricComparator" -> Json.fromString(metricComparator)
)
case CheckDescription.SingleMetricCheckDescription(desc, dsMetric) =>
Seq(
"type" -> Json.fromString("SingleMetricCheckDescription"),
"desc" -> Json.fromString(desc),
"dsMetric" -> dsMetric.asJson
)
}
Json.obj(fields: _*)
}
}

implicit val checkDescriptionDecoder: Decoder[CheckDescription] = new Decoder[CheckDescription] {
override def apply(c: HCursor): Result[CheckDescription] =
for {
descriptionType <- c.downField("type").as[String]
checkDescription <- descriptionType match {
case "SimpleCheckDescription" =>
for {
desc <- c.downField("desc").as[String]
} yield SimpleCheckDescription(desc)
case "DualMetricCheckDescription" =>
for {
desc <- c.downField("desc").as[String]
dsMetric <- c.downField("dsMetric").as[SimpleMetricDescriptor]
dsToCompareMetric <- c.downField("dsToCompareMetric").as[SimpleMetricDescriptor]
metricComparator <- c.downField("metricComparator").as[String]
} yield DualMetricCheckDescription(desc, dsMetric, dsToCompareMetric, metricComparator)
case "SingleMetricCheckDescription" =>
for {
desc <- c.downField("desc").as[String]
dsMetric <- c.downField("dsMetric").as[SimpleMetricDescriptor]
} yield SingleMetricCheckDescription(desc, dsMetric)
}
} yield checkDescription
}
implicit val checkResultEncoder: Encoder[CheckResult] = new Encoder[CheckResult] {
override def apply(a: CheckResult): Json = {
Json.obj(
"qcType" -> a.qcType.asJson,
"status" -> a.status.asJson,
"resultDescription" -> a.resultDescription.asJson,
"checkDescription" -> a.checkDescription.asJson,
"datasourceDescription" -> a.datasourceDescription.asJson
)
}
}

implicit val checkResultDecoder: Decoder[CheckResult] = new Decoder[CheckResult] {
override def apply(c: HCursor): Result[CheckResult] = {
for {
qcType <- c.downField("qcType").as[String].map(QcType.namesToValuesMap)
status <- c.downField("status").as[String].map(CheckStatus.namesToValuesMap)
resultDescription <- c.downField("resultDescription").as[String]
checkDescription <- c.downField("checkDescription").as[CheckDescription]
datasourceDescription = c.downField("datasourceDescription").as[DatasourceDescription].fold(_ => None, Some(_))
} yield CheckResult(qcType, status, resultDescription, checkDescription, datasourceDescription, Seq.empty)
}
}
implicit val checksSuiteResultEncoder: Encoder[ChecksSuiteResult] = deriveEncoder[ChecksSuiteResult]
implicit val checksSuiteResultDecoder: Decoder[ChecksSuiteResult] = new Decoder[ChecksSuiteResult] {
override def apply(c: HCursor): Result[ChecksSuiteResult] = {
for {
overallStatus <- c.downField("overallStatus").as[String].map(CheckSuiteStatus.namesToValuesMap)
checkSuiteDescription <- c.downField("checkSuiteDescription").as[String]
checkResults <- c.downField("checkResults").as[Seq[CheckResult]]
timestamp <- c.downField("timestamp").as[Instant]
checkTags <- c.downField("checkTags").as[Map[String, String]]
} yield ChecksSuiteResult(overallStatus, checkSuiteDescription, checkResults, timestamp, checkTags)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import io.circe.generic.semiauto.deriveDecoder
import io.circe.{Decoder, Encoder, Json}
import io.circe.syntax._

private[repository] object CommonEncoders {
private[dataflare] object CommonEncoders {
implicit val metricDescriptorEncoder: Encoder[SimpleMetricDescriptor] = new Encoder[SimpleMetricDescriptor] {
override def apply(a: SimpleMetricDescriptor): Json = {
val fields: List[(String, Json)] = List(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,11 @@
package com.github.timgent.dataflare.repository

import java.time.Instant

import com.github.timgent.dataflare.checks.{CheckDescription, CheckResult, CheckStatus, DatasourceDescription, QcType}
import com.github.timgent.dataflare.checkssuite.{CheckSuiteStatus, ChecksSuiteResult}
import com.github.timgent.dataflare.checkssuite.ChecksSuiteResult
import com.github.timgent.dataflare.json.CustomEncodings.{checksSuiteResultDecoder, checksSuiteResultEncoder}
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.circe._
import com.sksamuel.elastic4s.http.JavaClient
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties, Index}
import io.circe.Decoder.Result
import io.circe.{Decoder, Encoder, HCursor, Json}
import cats.syntax.either._
import com.github.timgent.dataflare.FlareError
import com.github.timgent.dataflare.checks.CheckDescription.{
DualMetricCheckDescription,
SimpleCheckDescription,
SingleMetricCheckDescription
}
import com.github.timgent.dataflare.checks.DatasourceDescription.{DualDsDescription, OtherDsDescription, SingleDsDescription}
import com.github.timgent.dataflare.metrics.SimpleMetricDescriptor
import io.circe.syntax._

import scala.concurrent.{ExecutionContext, Future}

Expand All @@ -32,9 +18,6 @@ import scala.concurrent.{ExecutionContext, Future}
class ElasticSearchQcResultsRepository(client: ElasticClient, index: Index)(implicit
ec: ExecutionContext
) extends QcResultsRepository {
import ElasticSearchQcResultsRepository.checksSuiteResultEncoder
import ElasticSearchQcResultsRepository.checksSuiteResultDecoder

override def save(qcResults: List[ChecksSuiteResult]): Future[Unit] = {
client
.execute {
Expand All @@ -54,148 +37,6 @@ class ElasticSearchQcResultsRepository(client: ElasticClient, index: Index)(impl
}

object ElasticSearchQcResultsRepository {
import io.circe.generic.semiauto._
import CommonEncoders.{metricDescriptorDecoder, metricDescriptorEncoder}
private implicit val checkSuiteStatusEncoder: Encoder[CheckSuiteStatus] = new Encoder[CheckSuiteStatus] {
override def apply(a: CheckSuiteStatus): Json = Json.fromString(a.toString)
}
private implicit val qcTypeEncoder: Encoder[QcType] = new Encoder[QcType] {
override def apply(a: QcType): Json = Json.fromString(a.toString)
}
private implicit val checkStatusEncoder: Encoder[CheckStatus] = new Encoder[CheckStatus] {
override def apply(a: CheckStatus): Json = Json.fromString(a.toString)
}
private[repository] implicit val datasourceDescriptionEncoder: Encoder[DatasourceDescription] = new Encoder[DatasourceDescription] {
override def apply(a: DatasourceDescription): Json = {
val fields = a match {
case SingleDsDescription(datasource) =>
Seq(
"type" -> Json.fromString("SingleDs"),
"datasource" -> Json.fromString(datasource)
)
case DatasourceDescription.DualDsDescription(datasourceA, datasourceB) =>
Seq(
"type" -> Json.fromString("DualDs"),
"datasourceA" -> Json.fromString(datasourceA),
"datasourceB" -> Json.fromString(datasourceB)
)
case DatasourceDescription.OtherDsDescription(datasource) =>
Seq(
"type" -> Json.fromString("OtherDs"),
"datasource" -> Json.fromString(datasource)
)
}
Json.obj(fields: _*)
}
}
private implicit val datasourceDescriptionDecoder: Decoder[DatasourceDescription] = new Decoder[DatasourceDescription] {
override def apply(c: HCursor): Result[DatasourceDescription] =
for {
datasourceType <- c.downField("type").as[String]
datasourceDescription <- datasourceType match {
case "SingleDs" =>
for {
datasource <- c.downField("datasource").as[String]
} yield SingleDsDescription(datasource)
case "DualDs" =>
for {
datasourceA <- c.downField("datasourceA").as[String]
datasourceB <- c.downField("datasourceB").as[String]
} yield DualDsDescription(datasourceA, datasourceB)
case "OtherDs" =>
for {
datasource <- c.downField("datasource").as[String]
} yield OtherDsDescription(datasource)
}
} yield datasourceDescription
}
private implicit val checkDescriptionEncoder: Encoder[CheckDescription] = new Encoder[CheckDescription] {
override def apply(a: CheckDescription): Json = {
val fields = a match {
case CheckDescription.SimpleCheckDescription(desc) =>
Seq(
"type" -> Json.fromString("SimpleCheckDescription"),
"desc" -> Json.fromString(desc)
)
case CheckDescription.DualMetricCheckDescription(desc, dsMetric, dsToCompareMetric, metricComparator) =>
Seq(
"type" -> Json.fromString("DualMetricCheckDescription"),
"desc" -> Json.fromString(desc),
"dsMetric" -> dsMetric.asJson,
"dsToCompareMetric" -> dsToCompareMetric.asJson,
"metricComparator" -> Json.fromString(metricComparator)
)
case CheckDescription.SingleMetricCheckDescription(desc, dsMetric) =>
Seq(
"type" -> Json.fromString("SingleMetricCheckDescription"),
"desc" -> Json.fromString(desc),
"dsMetric" -> dsMetric.asJson
)
}
Json.obj(fields: _*)
}
}

private implicit val checkDescriptionDecoder: Decoder[CheckDescription] = new Decoder[CheckDescription] {
override def apply(c: HCursor): Result[CheckDescription] =
for {
descriptionType <- c.downField("type").as[String]
checkDescription <- descriptionType match {
case "SimpleCheckDescription" =>
for {
desc <- c.downField("desc").as[String]
} yield SimpleCheckDescription(desc)
case "DualMetricCheckDescription" =>
for {
desc <- c.downField("desc").as[String]
dsMetric <- c.downField("dsMetric").as[SimpleMetricDescriptor]
dsToCompareMetric <- c.downField("dsToCompareMetric").as[SimpleMetricDescriptor]
metricComparator <- c.downField("metricComparator").as[String]
} yield DualMetricCheckDescription(desc, dsMetric, dsToCompareMetric, metricComparator)
case "SingleMetricCheckDescription" =>
for {
desc <- c.downField("desc").as[String]
dsMetric <- c.downField("dsMetric").as[SimpleMetricDescriptor]
} yield SingleMetricCheckDescription(desc, dsMetric)
}
} yield checkDescription
}
private implicit val checkResultEncoder: Encoder[CheckResult] = new Encoder[CheckResult] {
override def apply(a: CheckResult): Json = {
Json.obj(
"qcType" -> a.qcType.asJson,
"status" -> a.status.asJson,
"resultDescription" -> a.resultDescription.asJson,
"checkDescription" -> a.checkDescription.asJson,
"datasourceDescription" -> a.datasourceDescription.asJson
)
}
}

private implicit val checkResultDecoder: Decoder[CheckResult] = new Decoder[CheckResult] {
override def apply(c: HCursor): Result[CheckResult] = {
for {
qcType <- c.downField("qcType").as[String].map(QcType.namesToValuesMap)
status <- c.downField("status").as[String].map(CheckStatus.namesToValuesMap)
resultDescription <- c.downField("resultDescription").as[String]
checkDescription <- c.downField("checkDescription").as[CheckDescription]
datasourceDescription = c.downField("datasourceDescription").as[DatasourceDescription].fold(_ => None, Some(_))
} yield CheckResult(qcType, status, resultDescription, checkDescription, datasourceDescription, Seq.empty)
}
}
private[repository] implicit val checksSuiteResultEncoder: Encoder[ChecksSuiteResult] = deriveEncoder[ChecksSuiteResult]
private[repository] implicit val checksSuiteResultDecoder: Decoder[ChecksSuiteResult] = new Decoder[ChecksSuiteResult] {
override def apply(c: HCursor): Result[ChecksSuiteResult] = {
for {
overallStatus <- c.downField("overallStatus").as[String].map(CheckSuiteStatus.namesToValuesMap)
checkSuiteDescription <- c.downField("checkSuiteDescription").as[String]
checkResults <- c.downField("checkResults").as[Seq[CheckResult]]
timestamp <- c.downField("timestamp").as[Instant]
checkTags <- c.downField("checkTags").as[Map[String, String]]
} yield ChecksSuiteResult(overallStatus, checkSuiteDescription, checkResults, timestamp, checkTags)
}
}

def apply(hosts: Seq[String], index: Index)(implicit
ec: ExecutionContext
): ElasticSearchQcResultsRepository = {
Expand Down
Loading

0 comments on commit 1a089a4

Please sign in to comment.