Skip to content

Commit

Permalink
Implement CLI for backup
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Jan 20, 2022
1 parent 58496c2 commit a58e5b6
Show file tree
Hide file tree
Showing 10 changed files with 197 additions and 26 deletions.
55 changes: 29 additions & 26 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ val diffxVersion = "0.7.0"
val testContainersVersion = "0.39.12"
val testContainersJavaVersion = "1.16.2"
val scalaCheckVersion = "1.15.5-1-SNAPSHOT"
val enumeratumVersion = "1.7.0"

val flagsFor12 = Seq(
"-Xlint:_",
Expand Down Expand Up @@ -62,10 +63,13 @@ val cliSettings = Seq(
scalacOptions ++= Seq(
"-opt-inline-from:**", // See https://www.lightbend.com/blog/scala-inliner-optimizer
"-opt:l:method"
),
) ++ flagsFor13,
publish / skip := true,
publishLocal / skip := true,
publishSigned / skip := true
publishSigned / skip := true,
scalaVersion := "2.13.6",
rpmVendor := "Aiven",
rpmLicense := Some("ASL 2.0")
)

val baseName = "guardian-for-apache-kafka"
Expand Down Expand Up @@ -97,6 +101,20 @@ lazy val core = project
)
)

lazy val coreCli = project
.in(file("core-cli"))
.settings(
cliSettings,
name := s"$baseName-core-cli",
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"com.monovore" %% "decline" % declineVersion,
"com.beachape" %% "enumeratum" % enumeratumVersion
)
).dependsOn(core)

lazy val coreS3 = project
.in(file("core-s3"))
.settings(
Expand Down Expand Up @@ -155,15 +173,10 @@ lazy val cliBackup = project
.in(file("cli-backup"))
.settings(
cliSettings,
name := s"$baseName-cli-backup",
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.monovore" %% "decline" % declineVersion
)
name := s"$baseName-cli-backup"
)
.dependsOn(backupS3, backupGCS)
.enablePlugins(SbtNativePackager)
.dependsOn(coreCli, backupS3, backupGCS)
.enablePlugins(JavaAppPackaging)

lazy val coreCompaction = project
.in(file("core-compaction"))
Expand Down Expand Up @@ -196,15 +209,10 @@ lazy val cliCompaction = project
.in(file("cli-compaction"))
.settings(
cliSettings,
name := s"$baseName-cli-compaction",
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.monovore" %% "decline" % declineVersion
)
name := s"$baseName-cli-compaction"
)
.dependsOn(compactionS3, compactionGCS)
.enablePlugins(SbtNativePackager)
.dependsOn(coreCli, compactionS3, compactionGCS)
.enablePlugins(JavaAppPackaging)

lazy val restoreS3 = project
.in(file("restore-s3"))
Expand All @@ -226,15 +234,10 @@ lazy val cliRestore = project
.in(file("cli-restore"))
.settings(
cliSettings,
name := s"$baseName-cli-restore",
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.monovore" %% "decline" % declineVersion
)
name := s"$baseName-cli-restore"
)
.dependsOn(restoreS3, restoreGCS)
.enablePlugins(SbtNativePackager)
.dependsOn(coreCli, restoreS3, restoreGCS)
.enablePlugins(JavaAppPackaging)

// This is currently causing problems, see https://github.com/djspiewak/sbt-github-actions/issues/74
ThisBuild / githubWorkflowUseSbtThinClient := false
Expand Down
22 changes: 22 additions & 0 deletions cli-backup/src/main/scala/App.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import akka.Done
import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import io.aiven.guardian.kafka.backup.BackupClientInterface
import io.aiven.guardian.kafka.backup.KafkaClient
import io.aiven.guardian.kafka.backup.KafkaClientInterface

import scala.concurrent.ExecutionContext
import scala.concurrent.Future

trait App[T <: KafkaClientInterface] {
implicit val kafkaClient: T
implicit val backupClient: BackupClientInterface[KafkaClient]
implicit val actorSystem: ActorSystem
implicit val executionContext: ExecutionContext

def run(): Consumer.Control = backupClient.backup.run()
def shutdown(control: Consumer.Control): Future[Done] =
// Ideally we should be using drainAndShutdown however this isn't possible due to
// https://github.com/aiven/guardian-for-apache-kafka/issues/80
control.stop().flatMap(_ => control.shutdown())
}
8 changes: 8 additions & 0 deletions cli-backup/src/main/scala/BackupApp.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import io.aiven.guardian.cli.AkkaSettings
import io.aiven.guardian.kafka.backup.KafkaClient
import io.aiven.guardian.kafka.backup.{Config => BackupConfig}
import io.aiven.guardian.kafka.{Config => KafkaConfig}

trait BackupApp extends BackupConfig with KafkaConfig with AkkaSettings {
implicit lazy val kafkaClient: KafkaClient = new KafkaClient()
}
56 changes: 56 additions & 0 deletions cli-backup/src/main/scala/Main.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import cats.implicits._
import com.monovore.decline._
import io.aiven.guardian.cli.arguments.StorageOpt
import io.aiven.guardian.cli.options.Options._
import io.aiven.guardian.kafka.configs.KafkaCluster
import io.aiven.guardian.kafka.s3.configs.S3
import pureconfig.error.ConfigReaderException

import scala.concurrent.Await
import scala.concurrent.duration._
import scala.language.postfixOps

object Main
extends CommandApp(
name = "guardian-backup",
header = "Guardian cli Backup Tool",
main = (storageOpt, topicsOpt, dataBucketOpt).mapN { (storage, topics, dataBucket) =>
val app = storage match {
case StorageOpt.S3 =>
new S3App {
import io.aiven.guardian.kafka.{Config => KafkaConfig}
import io.aiven.guardian.kafka.s3.{Config => S3Config}
override lazy val kafkaClusterConfig: KafkaCluster = {
topics match {
case Some(value) =>
KafkaCluster(value.toList.toSet)
case None if KafkaConfig.kafkaClusterConfig.topics.nonEmpty => KafkaConfig.kafkaClusterConfig
case _ =>
throw new Exception(
"Kafka Topics is empty, please define with either with the KAFKA_CLUSTER_TOPICS environment variable or the kafka-topics command line argument"
)
}
}
override lazy val s3Config: S3 = {
dataBucket match {
case Some(value) => S3(dataBucket = value)
case None =>
try S3Config.s3Config
catch {
case e: ConfigReaderException[_] if e.getMessage().contains("Key not found: 'data-bucket'") =>
throw new Exception(
"S3 data bucket is absent, please define with either with the S3_CONFIG_DATA_BUCKET environment variable or the s3-data-bucket command line argument"
)
}
}
}
}
case StorageOpt.GCS =>
throw new Exception("GCS currently unsupported")
}
val control = app.run()
Runtime.getRuntime.addShutdownHook(new Thread {
Await.result(app.shutdown(control), 5 minutes)
})
}
)
9 changes: 9 additions & 0 deletions cli-backup/src/main/scala/S3App.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import akka.stream.alpakka.s3.S3Settings
import io.aiven.guardian.kafka.backup.KafkaClient
import io.aiven.guardian.kafka.backup.s3.BackupClient
import io.aiven.guardian.kafka.s3.{Config => S3Config}

trait S3App extends S3Config with BackupApp with App[KafkaClient] {
lazy val s3Settings: S3Settings = S3Settings()
implicit lazy val backupClient: BackupClient[KafkaClient] = new BackupClient[KafkaClient](Some(s3Settings))
}
5 changes: 5 additions & 0 deletions core-cli/src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "INFO"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
}
17 changes: 17 additions & 0 deletions core-cli/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<configuration>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>[%highlight(%-5level)] %logger{0} - %msg%n</pattern>
</encoder>
</appender>

<appender name="ASYNCSTDOUT" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="STDOUT" />
</appender>

<root level="INFO">
<appender-ref ref="ASYNCSTDOUT" />
</root>

</configuration>
10 changes: 10 additions & 0 deletions core-cli/src/main/scala/io/aiven/guardian/cli/AkkaSettings.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.aiven.guardian.cli

import akka.actor.ActorSystem

import scala.concurrent.ExecutionContext

trait AkkaSettings {
implicit val actorSystem: ActorSystem = ActorSystem()
implicit val executionContext: ExecutionContext = ExecutionContext.global
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.aiven.guardian.cli.arguments

import cats.data.ValidatedNel
import cats.implicits._
import com.monovore.decline.Argument
import enumeratum._

sealed trait StorageOpt extends EnumEntry with EnumEntry.Lowercase

object StorageOpt extends Enum[StorageOpt] {
case object GCS extends StorageOpt
case object S3 extends StorageOpt

val values: IndexedSeq[StorageOpt] = findValues

implicit val storageArgument: Argument[StorageOpt] = new Argument[StorageOpt] {
override def read(string: String): ValidatedNel[String, StorageOpt] =
StorageOpt.withNameOption(string).toValidNel("Invalid Storage Argument")

override def defaultMetavar: String = "storage"
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.aiven.guardian.cli.options

import cats.data.NonEmptyList
import com.monovore.decline.Opts
import io.aiven.guardian.cli.arguments.StorageOpt

trait Options {
val storageOpt: Opts[StorageOpt] =
Opts.option[StorageOpt]("storage", help = "Which type of storage to persist kafka topics")

val dataBucketOpt: Opts[Option[String]] =
Opts.option[String]("s3-data-bucket", help = "S3 Bucket for storage of main backup data").orNone

val topicsOpt: Opts[Option[NonEmptyList[String]]] =
Opts.options[String]("kafka-topics", help = "Which kafka topics to backup").orNone
}

object Options extends Options

0 comments on commit a58e5b6

Please sign in to comment.