Skip to content

Commit

Permalink
Implement CLI for backup
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Jan 25, 2022
1 parent 58496c2 commit ea0818c
Show file tree
Hide file tree
Showing 13 changed files with 341 additions and 26 deletions.
59 changes: 33 additions & 26 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ val diffxVersion = "0.7.0"
val testContainersVersion = "0.39.12"
val testContainersJavaVersion = "1.16.2"
val scalaCheckVersion = "1.15.5-1-SNAPSHOT"
val enumeratumVersion = "1.7.0"

val flagsFor12 = Seq(
"-Xlint:_",
Expand Down Expand Up @@ -62,10 +63,13 @@ val cliSettings = Seq(
scalacOptions ++= Seq(
"-opt-inline-from:**", // See https://www.lightbend.com/blog/scala-inliner-optimizer
"-opt:l:method"
),
) ++ flagsFor13,
publish / skip := true,
publishLocal / skip := true,
publishSigned / skip := true
publishSigned / skip := true,
scalaVersion := "2.13.6",
rpmVendor := "Aiven",
rpmLicense := Some("ASL 2.0")
)

val baseName = "guardian-for-apache-kafka"
Expand Down Expand Up @@ -97,6 +101,21 @@ lazy val core = project
)
)

lazy val coreCli = project
.in(file("core-cli"))
.settings(
cliSettings,
name := s"$baseName-core-cli",
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"com.monovore" %% "decline" % declineVersion,
"com.beachape" %% "enumeratum" % enumeratumVersion
)
)
.dependsOn(core)

lazy val coreS3 = project
.in(file("core-s3"))
.settings(
Expand Down Expand Up @@ -155,15 +174,13 @@ lazy val cliBackup = project
.in(file("cli-backup"))
.settings(
cliSettings,
name := s"$baseName-cli-backup",
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.monovore" %% "decline" % declineVersion
)
name := s"$baseName-cli-backup"
)
.dependsOn(coreCli % "compile->compile;test->test",
backupS3 % "compile->compile;test->test",
backupGCS % "compile->compile;test->test"
)
.dependsOn(backupS3, backupGCS)
.enablePlugins(SbtNativePackager)
.enablePlugins(JavaAppPackaging)

lazy val coreCompaction = project
.in(file("core-compaction"))
Expand Down Expand Up @@ -196,15 +213,10 @@ lazy val cliCompaction = project
.in(file("cli-compaction"))
.settings(
cliSettings,
name := s"$baseName-cli-compaction",
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.monovore" %% "decline" % declineVersion
)
name := s"$baseName-cli-compaction"
)
.dependsOn(compactionS3, compactionGCS)
.enablePlugins(SbtNativePackager)
.dependsOn(coreCli, compactionS3, compactionGCS)
.enablePlugins(JavaAppPackaging)

lazy val restoreS3 = project
.in(file("restore-s3"))
Expand All @@ -226,15 +238,10 @@ lazy val cliRestore = project
.in(file("cli-restore"))
.settings(
cliSettings,
name := s"$baseName-cli-restore",
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.monovore" %% "decline" % declineVersion
)
name := s"$baseName-cli-restore"
)
.dependsOn(restoreS3, restoreGCS)
.enablePlugins(SbtNativePackager)
.dependsOn(coreCli, restoreS3, restoreGCS)
.enablePlugins(JavaAppPackaging)

// This is currently causing problems, see https://github.com/djspiewak/sbt-github-actions/issues/74
ThisBuild / githubWorkflowUseSbtThinClient := false
Expand Down
Empty file removed cli-backup/.gitkeep
Empty file.
24 changes: 24 additions & 0 deletions cli-backup/src/main/scala/io/aiven/guardian/kafka/backup/App.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.aiven.guardian.kafka.backup

import akka.Done
import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import io.aiven.guardian.kafka.backup.BackupClientInterface
import io.aiven.guardian.kafka.backup.KafkaClient
import io.aiven.guardian.kafka.backup.KafkaClientInterface

import scala.concurrent.ExecutionContext
import scala.concurrent.Future

trait App[T <: KafkaClientInterface] {
implicit val kafkaClient: T
implicit val backupClient: BackupClientInterface[KafkaClient]
implicit val actorSystem: ActorSystem
implicit val executionContext: ExecutionContext

def run(): Consumer.Control = backupClient.backup.run()
def shutdown(control: Consumer.Control): Future[Done] =
// Ideally we should be using drainAndShutdown however this isn't possible due to
// https://github.com/aiven/guardian-for-apache-kafka/issues/80
control.stop().flatMap(_ => control.shutdown())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.aiven.guardian.kafka.backup

import io.aiven.guardian.cli.AkkaSettings
import io.aiven.guardian.kafka.backup.KafkaClient
import io.aiven.guardian.kafka.backup.{Config => BackupConfig}
import io.aiven.guardian.kafka.{Config => KafkaConfig}

trait BackupApp extends BackupConfig with KafkaConfig with AkkaSettings {
implicit lazy val kafkaClient: KafkaClient = new KafkaClient()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package io.aiven.guardian.kafka.backup

import cats.implicits._
import com.monovore.decline._
import io.aiven.guardian.cli.arguments.StorageOpt
import io.aiven.guardian.cli.options.Options
import io.aiven.guardian.kafka.backup.KafkaClient
import io.aiven.guardian.kafka.backup.configs.Backup
import io.aiven.guardian.kafka.backup.configs.ChronoUnitSlice
import io.aiven.guardian.kafka.backup.configs.PeriodFromFirst
import io.aiven.guardian.kafka.backup.configs.TimeConfiguration
import io.aiven.guardian.kafka.configs.KafkaCluster
import io.aiven.guardian.kafka.s3.configs.S3

import scala.concurrent.Await
import scala.concurrent.duration._
import scala.language.postfixOps

import java.time.temporal.ChronoUnit
import java.util.concurrent.atomic.AtomicReference

class Entry(val initializedApp: AtomicReference[Option[App[_]]] = new AtomicReference[Option[App[_]]](None))
extends CommandApp(
name = "guardian-backup",
header = "Guardian cli Backup Tool",
main = {
val groupIdOpt: Opts[Option[String]] =
Opts.option[String]("kafka-group-id", help = "Kafka group id for the consumer").orNone

val periodFromFirstOpt =
Opts
.option[FiniteDuration]("period-from-first", help = "Duration for period-from-first configured backup")
.map(PeriodFromFirst.apply)

val chronoUnitSliceOpt =
Opts
.option[ChronoUnit]("chrono-unit-slice", help = "ChronoUnit for chrono-unit-slice configured backup")
.map(ChronoUnitSlice.apply)

val timeConfigurationOpt: Opts[Option[TimeConfiguration]] =
(periodFromFirstOpt orElse chronoUnitSliceOpt).orNone

val backupOpt =
(groupIdOpt, timeConfigurationOpt).tupled.mapValidated { case (maybeGroupId, maybeTimeConfiguration) =>
import io.aiven.guardian.kafka.backup.Config.backupConfig
(maybeGroupId, maybeTimeConfiguration) match {
case (Some(groupId), Some(timeConfiguration)) =>
Backup(groupId, timeConfiguration).validNel
case _ =>
Options
.optionalPureConfigValue(() => backupConfig)
.toValidNel("Backup config is a mandatory value that needs to be configured")
}
}

val s3Opt = Options.dataBucketOpt.mapValidated { maybeDataBucket =>
import io.aiven.guardian.kafka.s3.Config
maybeDataBucket match {
case Some(value) => S3(dataBucket = value).validNel
case _ =>
Options
.optionalPureConfigValue(() => Config.s3Config)
.toValidNel("S3 data bucket is a mandatory value that needs to be configured")
}
}

(Options.storageOpt, Options.kafkaClusterOpt, Options.kafkaConsumerSettingsOpt, s3Opt, backupOpt).mapN {
(storage, kafkaCluster, kafkaConsumerSettings, s3, backup) =>
val app = storage match {
case StorageOpt.S3 =>
new S3App {
override lazy val kafkaClusterConfig: KafkaCluster = kafkaCluster
override lazy val s3Config: S3 = s3
override lazy val backupConfig: Backup = backup
override lazy val kafkaClient: KafkaClient =
new KafkaClient(kafkaConsumerSettings)(actorSystem, kafkaClusterConfig, backupConfig)
}
}
initializedApp.set(Some(app))
val control = app.run()
Runtime.getRuntime.addShutdownHook(new Thread {
Await.result(app.shutdown(control), 5 minutes)
})
}
}
)

object Main extends Entry()
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.aiven.guardian.kafka.backup

import akka.stream.alpakka.s3.S3Settings
import io.aiven.guardian.kafka.backup.KafkaClient
import io.aiven.guardian.kafka.backup.s3.BackupClient
import io.aiven.guardian.kafka.s3.{Config => S3Config}

trait S3App extends S3Config with BackupApp with App[KafkaClient] {
lazy val s3Settings: S3Settings = S3Settings()
implicit lazy val backupClient: BackupClient[KafkaClient] = new BackupClient[KafkaClient](Some(s3Settings))
}
48 changes: 48 additions & 0 deletions cli-backup/src/test/scala/io/aiven/guardian/kafka/CliSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.aiven.guardian.kafka

import io.aiven.guardian.kafka.backup.configs.ChronoUnitSlice
import io.aiven.guardian.kafka.backup.configs.{Backup => BackupConfig}
import io.aiven.guardian.kafka.configs.{KafkaCluster => KafkaClusterConfig}
import org.scalatest.matchers.must.Matchers
import org.scalatest.propspec.AnyPropSpec

import scala.annotation.nowarn

import java.time.temporal.ChronoUnit

@nowarn("msg=method main in class CommandApp is deprecated")
class CliSpec extends AnyPropSpec with Matchers {

property("Command line args are properly passed into application") {
val groupId = "my-consumer-group"
val topic = "topic"
val bootstrapServer = "localhost:9092"
val dataBucket = "backup-bucket"

val args = List(
"--storage",
"s3",
"--kafka-topics",
topic,
"--kafka-bootstrap-servers",
bootstrapServer,
"--s3-data-bucket",
dataBucket,
"--kafka-group-id",
groupId,
"--chrono-unit-slice",
"hours"
)

backup.Main.main(args.toArray)
backup.Main.initializedApp.get() match {
case Some(s3App: backup.S3App) =>
s3App.backupConfig mustEqual BackupConfig(groupId, ChronoUnitSlice(ChronoUnit.HOURS))
s3App.kafkaClusterConfig mustEqual KafkaClusterConfig(Set(topic))
s3App.kafkaClient.consumerSettings.getProperty("bootstrap.servers") mustEqual bootstrapServer
s3App.s3Config.dataBucket mustEqual dataBucket
case _ =>
}
}

}
5 changes: 5 additions & 0 deletions core-cli/src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "INFO"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
}
17 changes: 17 additions & 0 deletions core-cli/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<configuration>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>[%highlight(%-5level)] %logger{0} - %msg%n</pattern>
</encoder>
</appender>

<appender name="ASYNCSTDOUT" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="STDOUT" />
</appender>

<root level="INFO">
<appender-ref ref="ASYNCSTDOUT" />
</root>

</configuration>
10 changes: 10 additions & 0 deletions core-cli/src/main/scala/io/aiven/guardian/cli/AkkaSettings.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.aiven.guardian.cli

import akka.actor.ActorSystem

import scala.concurrent.ExecutionContext

trait AkkaSettings {
implicit val actorSystem: ActorSystem = ActorSystem()
implicit val executionContext: ExecutionContext = ExecutionContext.global
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.aiven.guardian.cli.arguments

import cats.data.ValidatedNel
import cats.implicits._
import com.monovore.decline.Argument
import enumeratum._

sealed trait StorageOpt extends EnumEntry with EnumEntry.Lowercase

object StorageOpt extends Enum[StorageOpt] {
case object S3 extends StorageOpt

val values: IndexedSeq[StorageOpt] = findValues

implicit val storageArgument: Argument[StorageOpt] = new Argument[StorageOpt] {
override def read(string: String): ValidatedNel[String, StorageOpt] =
StorageOpt.withNameOption(string).toValidNel("Invalid Storage Argument")

override def defaultMetavar: String = "storage"
}

}
Loading

0 comments on commit ea0818c

Please sign in to comment.