Skip to content

Commit

Permalink
Core implementation of backup into S3
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Jul 27, 2021
1 parent 031f4bc commit f1fd7dd
Show file tree
Hide file tree
Showing 11 changed files with 287 additions and 17 deletions.
Empty file removed backup-s3/.gitkeep
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package aiven.io.guardian.kafka.backup.s3

import aiven.io.guardian.kafka.KafkaClientInterface
import aiven.io.guardian.kafka.backup.BackupClientInterface
import aiven.io.guardian.kafka.backup.configs.Backup
import aiven.io.guardian.kafka.s3.configs.{S3 => S3Config}
import akka.stream.alpakka.s3.scaladsl.S3
import akka.stream.alpakka.s3.{MultipartUploadResult, S3Headers}
import akka.stream.scaladsl._
import akka.util.ByteString

import scala.concurrent.Future

class BackupClient(s3Headers: S3Headers)(implicit
override val kafkaClientInterface: KafkaClientInterface,
override val backupConfig: Backup,
s3Config: S3Config
) extends BackupClientInterface {
override type BackupResult = MultipartUploadResult

override def backupToStorageSink(key: String): Sink[ByteString, Future[BackupResult]] =
S3.multipartUploadWithHeaders(
s3Config.dataBucket,
key,
s3Headers = s3Headers
)
}
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ lazy val coreAws = project
libraryDependencies ++= Seq(
"com.lightbend.akka" %% "akka-stream-alpakka-s3" % alpakkaVersion,
"org.scalatest" %% "scalatest" % scalaTestVersion % Test,
"org.scalatestplus" %% "scalacheck-1-15" % scalaTestScalaCheckVersion % Test
"org.scalatestplus" %% "scalacheck-1-15" % scalaTestScalaCheckVersion % Test,
"com.adobe.testing" % "s3mock" % "2.1.36" % Test
)
)
.dependsOn(core)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package aiven.io.guardian.kafka.backup

import aiven.io.guardian.kafka.KafkaClientInterface
import aiven.io.guardian.kafka.backup.configs.Backup
import aiven.io.guardian.kafka.codecs.Circe._
import aiven.io.guardian.kafka.models.ReducedConsumerRecord
import akka.Done
import akka.stream.FlowShape
import akka.stream.scaladsl._
import akka.util.ByteString
import io.circe.syntax._

import java.time._
import java.time.format.DateTimeFormatter
import java.time.temporal._
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration

/** A marker used to indicate in which position the current backup stream is
*/
sealed abstract class BackupStreamPosition

object BackupStreamPosition {

/** The backup stream has just started right now
*/
case object Start extends BackupStreamPosition

/** The backup stream is in the middle of a time period
*/
case object Middle extends BackupStreamPosition

/** The backup stream position has just hit a boundary for when a new period starts
*/
case object Boundary extends BackupStreamPosition
}

trait BackupClientInterface {
implicit val kafkaClientInterface: KafkaClientInterface
implicit val backupConfig: Backup

/** A type representing any kind of result when backing up data to a datasource
*/
type BackupResult

import BackupClientInterface._

/** How to backup a `ByteString` to a `DataSource`
* @param key The object key or filename for what is being backed up
* @return A Sink that also provides a `BackupResult`
*/
def backupToStorageSink(key: String): Sink[ByteString, Future[BackupResult]]

/** A Flow that both backs up the `ByteString` data to a data source and then
* commits the Kafka `CursorContext` using `kafkaClientInterface.commitCursor`.
* @param key They object key or filename for what is being backed up
* @return The `CursorContext` which can be used for logging/debugging
*/
def backupAndCommitFlow(
key: String
): Flow[(ByteString, kafkaClientInterface.CursorContext), kafkaClientInterface.CursorContext, Future[Done]] = {
val sink = Flow.fromGraph(
GraphDSL.create(
backupToStorageSink(key),
kafkaClientInterface.commitCursor
)((_, cursorCommitted) => cursorCommitted)(implicit builder =>
(backupSink, commitCursor) => {
import GraphDSL.Implicits._

val b = builder.add(Concat[(ByteString, kafkaClientInterface.CursorContext)]())

b.out.map(_._1) ~> backupSink
b.out.map(_._2) ~> commitCursor

new FlowShape(b.in(0), b.out)
}
)
)
sink.map { case (_, context) => context }
}

/** The entire flow that involves reading from Kafka, transforming the data into JSON and then backing it up into
* a data source.
* @return The `CursorContext` which can be used for logging/debugging along with the `kafkaClientInterface.Control`
* which can be used to control the Stream
*/
protected def backup: Source[kafkaClientInterface.CursorContext, kafkaClientInterface.Control] = {
// TODO use https://awscli.amazonaws.com/v2/documentation/api/latest/reference/s3api/list-multipart-uploads.html
// and https://stackoverflow.com/questions/53764876/resume-s3-multipart-upload-partetag to find any in progress
// multiupload to resume from previous termination. Looks like we will have to do this manually since its not in
// Alpakka yet
val withPeriods = kafkaClientInterface.getSource.map { reducedConsumerRecord =>
val period = calculateNumberOfPeriodsFromTimestamp(reducedConsumerRecord.toOffsetDateTime,
backupConfig.periodSlice,
reducedConsumerRecord
)
(reducedConsumerRecord, period)
}

val withBackupStreamPositions = withPeriods
.sliding(2)
.map { case Seq((beforeReducedConsumerRecord, beforeDivisions), (_, afterDivisions)) =>
val backupStreamPosition = splitAtBoundaryCondition(beforeDivisions, afterDivisions)

(beforeReducedConsumerRecord, backupStreamPosition)
}
.mapContext { case Seq(head, _) => head }

val split = withBackupStreamPositions.asSource.splitAfter { case ((_, backupStreamPosition), _) =>
backupStreamPosition == BackupStreamPosition.Boundary
}

split
.prefixAndTail(1)
.flatMapConcat { case (head, restOfReducedConsumerRecords) =>
head.headOption match {
case Some(((firstReducedConsumerRecord, _), firstContext)) =>
val key = calculateKey(firstReducedConsumerRecord.toOffsetDateTime)

val combined = Source.combine(
Source.single(
(
(firstReducedConsumerRecord, BackupStreamPosition.Start),
firstContext
)
),
restOfReducedConsumerRecords
)(Concat(_))

val transformed = combined.map { case ((record, position), context) =>
val transform = transformReducedConsumerRecords(record, position)
(transform, context)
}

transformed.via(backupAndCommitFlow(key))
case None =>
// TODO Is it possible to hit this branch? I assume if the Stream is started its impossible for
// head to be empty
???
}
}
.mergeSubstreams
}
}

object BackupClientInterface {
def reducedConsumerRecordAsString(reducedConsumerRecord: ReducedConsumerRecord): String =
io.circe.Printer.noSpaces.print(reducedConsumerRecord.asJson)

def formatOffsetDateTime(offsetDateTime: OffsetDateTime): String =
offsetDateTime.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME)

/** Calculate an object storage key or filename for a ReducedConsumerRecord
* @param offsetDateTime A given time
* @return A `String` that can be used either as some object key or a filename
*/
def calculateKey(offsetDateTime: OffsetDateTime) =
s"${BackupClientInterface.formatOffsetDateTime(offsetDateTime)}.json"

/** Calculates the current position in 2 element sliding of a Stream.
* @param dividedPeriodsBefore The number of divided periods in the first element of the slide. -1 is used as a
* sentinel value to indicate the start of the stream
* @param dividedPeriodsAfter The number of divided periods in the second element of the slide
* @return The position of the Stream
*/
def splitAtBoundaryCondition(dividedPeriodsBefore: Long, dividedPeriodsAfter: Long): BackupStreamPosition =
(dividedPeriodsBefore, dividedPeriodsAfter) match {
case (before, _) if before == -1 =>
BackupStreamPosition.Start
case (before, after) if after > before =>
BackupStreamPosition.Boundary
case _ =>
BackupStreamPosition.Middle
}

/** Transforms a `ReducedConsumer` record into a ByteString so that it can be persisted into the data storage
* @param reducedConsumerRecord The ReducedConsumerRecord to persist
* @param backupStreamPosition The position of the record relative in the stream (so it knows if its at the start,
* middle or end)
* @return a `ByteString` ready to be persisted
*/
def transformReducedConsumerRecords(reducedConsumerRecord: ReducedConsumerRecord,
backupStreamPosition: BackupStreamPosition
): ByteString = {
val string = backupStreamPosition match {
case BackupStreamPosition.Start =>
s"[${reducedConsumerRecordAsString(reducedConsumerRecord)},"
case BackupStreamPosition.Middle =>
s"${reducedConsumerRecordAsString(reducedConsumerRecord)},"
case BackupStreamPosition.Boundary =>
s"${reducedConsumerRecordAsString(reducedConsumerRecord)}]"
}
ByteString(string)
}

protected def calculateNumberOfPeriodsFromTimestamp(initialTime: OffsetDateTime,
period: FiniteDuration,
reducedConsumerRecord: ReducedConsumerRecord
): Long =
// TODO handle overflow?
ChronoUnit.MICROS.between(reducedConsumerRecord.toOffsetDateTime, initialTime) / period.toMicros
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package aiven.io.guardian.kafka.backup.configs

import scala.concurrent.duration.FiniteDuration

/**
* @param periodSlice The time period for each given slice that stores all of the `ReducedConsumerRecord`
/** @param periodSlice The time period for each given slice that stores all of the `ReducedConsumerRecord`
*/
final case class Backup(periodSlice: FiniteDuration)
12 changes: 12 additions & 0 deletions core-s3/src/main/scala/aiven/io/guardian/kafka/s3/configs/S3.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package aiven.io.guardian.kafka.s3.configs

/** @param dataBucket The bucket where a Kafka Consumer directly streams data into as storage
* @param dataBucketPrefix Prefix for the data bucket (if any)
* @param compactionBucket The bucket where compaction results are stored
* @param compactionBucketPrefix Prefix for the compaction bucket (if any)
*/
final case class S3(dataBucket: String,
dataBucketPrefix: Option[String],
compactionBucket: String,
compactionBucketPrefix: Option[String]
)
7 changes: 4 additions & 3 deletions core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ to automatically commit cursors when successfully reading topics.

## Configuration

Specification can be found [here](/src/main/resources/reference.conf)
Specification (including environment variable overrides) can be found [here](/src/main/resources/reference.conf).

The primary `aiven.io.guardian.kafka.KafkaClient` is configured using [Alpakka Kafka][alpakka-kafka] [official
The primary `aiven.io.guardian.kafka.KafkaClient` is configured using [Alpakka Kafka][alpakka-kafka] [Consumer
configuration](https://doc.akka.io/docs/alpakka-kafka/current/consumer.html) which also contains the default values.
Ontop of this the module also allows you to set these settings using environment variables which can be found.
The committing of Kafka cursors also requires
[CommitterSettings configuration](https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#committer-sink).

There is also a generic `aiven.io.guardian.kafka.configs.KafkaCluster` configuration at `"kafka-cluster"` for anything not specific
to the kafka consumer, i.e. which topics to backup/compact/restore.
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ akka.kafka.consumer = {
}
}

akka.kafka.committer = {
max-batch = ${?AKKA_KAFKA_COMMITTER_MAX_BATCH}
max-interval = ${?AKKA_KAFKA_COMMITTER_MAX_INTERVAL}
parallelism = ${?AKKA_KAFKA_COMMITTER_PARALLELISM}
delivery = ${?AKKA_KAFKA_COMMITTER_DELIVERY}
when = ${?AKKA_KAFKA_COMMITTER_WHEN}
}

kafka-cluster = {
topics = []
topics = ${?KAFKA_CLUSTER_TOPICS}
Expand Down
22 changes: 15 additions & 7 deletions core/src/main/scala/aiven/io/guardian/kafka/KafkaClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ package aiven.io.guardian.kafka

import aiven.io.guardian.kafka.configs.KafkaCluster
import aiven.io.guardian.kafka.models.ReducedConsumerRecord
import akka.Done
import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerMessage, ConsumerSettings, Subscriptions}
import akka.stream.scaladsl.{Source, SourceWithContext}
import akka.kafka.ConsumerMessage.{Committable, CommittableOffset}
import akka.kafka.scaladsl.{Committer, Consumer}
import akka.kafka.{CommitterSettings, ConsumerSettings, Subscriptions}
import akka.stream.scaladsl.{Sink, SourceWithContext}
import com.typesafe.scalalogging.StrictLogging
import org.apache.kafka.common.serialization.ByteArrayDeserializer

import java.util.Base64
import scala.concurrent.Future

/** A Kafka Client that uses Alpakka Kafka Consumer under the hood to create a stream of events from a Kafka cluster.
* To configure the Alpakka Kafka Consumer use the standard typesafe configuration i.e. akka.kafka.consumer (note
Expand All @@ -20,8 +23,8 @@ import java.util.Base64
class KafkaClient()(implicit system: ActorSystem, kafkaClusterConfig: KafkaCluster)
extends KafkaClientInterface
with StrictLogging {
override type Context = ConsumerMessage.CommittableOffset
override type Control = Consumer.Control
override type CursorContext = Committable
override type Control = Consumer.Control

if (kafkaClusterConfig.topics.isEmpty)
logger.warn("Kafka Cluster configuration has no topics set")
Expand All @@ -33,8 +36,7 @@ class KafkaClient()(implicit system: ActorSystem, kafkaClusterConfig: KafkaClust

/** @return A `SourceWithContext` that returns a Kafka Stream which automatically handles committing of cursors
*/
override val getSource
: SourceWithContext[ReducedConsumerRecord, ConsumerMessage.CommittableOffset, Consumer.Control] =
override val getSource: SourceWithContext[ReducedConsumerRecord, CommittableOffset, Consumer.Control] =
Consumer
.sourceWithOffsetContext(consumerSettings, subscriptions)
.map(consumerRecord =>
Expand All @@ -47,4 +49,10 @@ class KafkaClient()(implicit system: ActorSystem, kafkaClusterConfig: KafkaClust
consumerRecord.timestampType()
)
)

private[kafka] val committerSettings: CommitterSettings = CommitterSettings(system)

/** @return A `Sink` that allows you to commit a `CursorContext` to Kafka to signify you have processed a message
*/
override val commitCursor: Sink[Committable, Future[Done]] = Committer.sink(committerSettings)
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
package aiven.io.guardian.kafka

import aiven.io.guardian.kafka.models.ReducedConsumerRecord
import akka.stream.scaladsl.SourceWithContext
import akka.Done
import akka.stream.scaladsl.{Sink, SourceWithContext}

import scala.concurrent.Future

trait KafkaClientInterface {

/** The type of the context to pass around. In context of a Kafka consumer, this typically holds offset data to be
* automatically committed
*/
type Context
type CursorContext

/** The type that represents how to control the given stream, i.e. if you want to shut it down or add metrics
*/
type Control

/** @return A `SourceWithContext` that returns a Kafka Stream which automatically handles committing of cursors
*/
def getSource: SourceWithContext[ReducedConsumerRecord, Context, Control]
def getSource: SourceWithContext[ReducedConsumerRecord, CursorContext, Control]

/** @return A `Sink` that allows you to commit a `CursorContext` to Kafka to signify you have processed a message
*/
def commitCursor: Sink[CursorContext, Future[Done]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package aiven.io.guardian.kafka.models

import org.apache.kafka.common.record.TimestampType

import java.time.{Instant, OffsetDateTime, ZoneId}

/** A `ConsumerRecord` that only contains the necessary data for guardian
*
* @param topic The kafka topic (same as `ConsumerRecord` `topic`)
Expand All @@ -16,4 +18,7 @@ final case class ReducedConsumerRecord(topic: String,
value: String,
timestamp: Long,
timestampType: TimestampType
)
) {
def toOffsetDateTime: OffsetDateTime =
Instant.ofEpochMilli(this.timestamp).atZone(ZoneId.systemDefault()).toOffsetDateTime
}

0 comments on commit f1fd7dd

Please sign in to comment.