Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement restore #96

Merged
merged 1 commit into from
Feb 11, 2022
Merged

Implement restore #96

merged 1 commit into from
Feb 11, 2022

Conversation

mdedetrich
Copy link
Contributor

@mdedetrich mdedetrich commented Feb 4, 2022

About this change - What it does

This PR implements the restore functionality for S3 persistence and adds unit/mock tests as well as a full round trip end to end test that backs up data from an actual Kafka cluster into S3 and then restores that S3 backup into an actual kafka cluster under a different set of topics.

Why this way

This PR has a lot of significant changes so here is a breakdown

  • The restore has its own config Restore which has the following optional settings
    • fromWhen This is a setting I think would be useful, it only restores topics after the fromWhen timestamp (according to Kafka's own internal timestamp).
    • overrideTopics: This is a map that defines how to translate a topic in a backup to the destination kafka cluster, i.e. going from backupTopic to newRestoredTopic would be `Map("backupTopic" -> "newRestoredTopic").
      • The RealS3RestoreSpec uses this config when restoring an S3 backup to avoid the hassle of having to create a new Kafka cluster (as a bonus it also tests this functionality)
  • A lot of code in RealS3BackupSpec was moved into common traits since it was common code that was also needed for RealS3RestoreSpec
  • In build.sbt the project was restructured a bit to introduce a coreRestore project. Also the s3Restore project include s3Backup main classpath but ONLY in test which is what allows us to do a full end to end test.
  • JDK was moved to version 11 due to what I originally though twas https://stackoverflow.com/questions/61490701/offsetdatetime-parse-different-behavior-in-java-8-and-java-11. However later on I realized it was a very rare corner case that I found out by running property tests for hours which occurred a lot more frequently in CDP (not sure why). In any case due to ISO_OFFSET_DATE_TIME bug in JDK 8 its a good idea to specify the min version to JDK 11
  • akka.http.client.stream-cancellation-delay was set to 1000 milliseconds because it was causing random issues in github actions due to the fast network speed in data centers. See https://discuss.lightbend.com/t/about-nomoreelementsneeded-exception/8599/10 and NoMoreElementsNeeded Exception on Upgrade to Akka HTTP 10.1.12 with Akka 2.5.30 akka/akka-http#3201
  • One thing to note is that in order to get the test to work we had to use very specific settings for the Producer in the Restore module, namely
    def baseProducerConfig
     : Some[ProducerSettings[Array[Byte], Array[Byte]] => ProducerSettings[Array[Byte], Array[Byte]]] =
    Some(
     _.withBootstrapServers(
       container.bootstrapServers
     ).withProperties(
       Map(
         ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG             -> true.toString,
         ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION -> 1.toString,
         ProducerConfig.BATCH_SIZE_CONFIG                     -> 0.toString
       )
     ).withParallelism(1)
    )
    While this config gives the producer exact one semantics for message delivery it also has some severe limitations such as disabling batching/concurrent writes which greatly reduces the throughput of the restore utility. From what I can tell the only way to fix this is by using Kafka transactions (see https://www.confluent.io/blog/transactions-apache-kafka/ and https://doc.akka.io/docs/alpakka-kafka/current/transactions.html). I tried to implement this PR by using transactional API which led me down to a rabbit hole, but the tl;dr is that Alpakka's transactional API doesn't work in this scenario because the Transactional.source only works if the source is a Kafka cluster rather than another source (even if you construct the ProducerRecord the exact same way). I will create an issue for this to investigate further
  • Due to https://discuss.lightbend.com/t/working-with-eof-for-a-source-flow/9481 I had to implement a workaround for restoring multiple keys that uses Utils.runSequentially to make sure the Future's run sequentially (Future's are strict by default in Scala)

@mdedetrich mdedetrich marked this pull request as draft February 4, 2022 10:20
@mdedetrich mdedetrich force-pushed the implement-restore branch 15 times, most recently from 9973bb7 to 4eb678b Compare February 9, 2022 05:34
@mdedetrich mdedetrich force-pushed the implement-restore branch 4 times, most recently from 41ee717 to 106df32 Compare February 9, 2022 09:47
@mdedetrich
Copy link
Contributor Author

Ignore the fact that Build and Test (ubuntu-latest, 2.13.6, [email protected]) is orange, this test is no longer run since we updated the OpenJDK version.

After this PR is merged an update to the branch status check settings will fix this up.

@mdedetrich mdedetrich force-pushed the implement-restore branch 8 times, most recently from 9b37743 to 0f4ba25 Compare February 9, 2022 23:43
@mdedetrich mdedetrich force-pushed the implement-restore branch 3 times, most recently from e7d1140 to 569d2c9 Compare February 10, 2022 01:02
@@ -73,7 +73,10 @@ class Entry(val initializedApp: AtomicReference[Option[App[_]]] = new AtomicRefe
block.withBootstrapServers(value.toList.mkString(","))

Some(block).validNel
case None if Options.checkConfigKeyIsDefined("kafka-client.bootstrap.servers") => None.validNel
case None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a quick fix, there are multiple configurations, a global one which is shared with kafka-client.bootstrap.servers and a specific one just for the consumer at akka.kafka.consumer.kafka-clients.bootstrap.servers

@@ -105,3 +99,17 @@ class KafkaClient(
override def batchCursorContext(cursors: immutable.Iterable[CommittableOffset]): CommittableOffsetBatch =
CommittableOffsetBatch(cursors.toSeq)
}

object KafkaClient {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is refactored out since we use it in a lot of places

@mdedetrich mdedetrich marked this pull request as ready for review February 10, 2022 01:32
@@ -92,6 +93,7 @@ lazy val core = project
"org.scalatest" %% "scalatest" % scalaTestVersion % Test,
"org.scalatestplus" %% "scalacheck-1-15" % scalaTestScalaCheckVersion % Test,
"org.mdedetrich" %% "scalacheck" % scalaCheckVersion % Test,
"com.rallyhealth" %% "scalacheck-ops_1-15" % scalaCheckOpsVersion % Test,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an extension library for scalacheck that adds generators for different types. Specifically we needed a generator for OffsetDateTime

@mdedetrich mdedetrich merged commit 99dde04 into main Feb 11, 2022
@mdedetrich mdedetrich deleted the implement-restore branch February 11, 2022 00:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants