Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add s3 backup #8

Merged
merged 5 commits into from
Aug 9, 2021
Merged

Add s3 backup #8

merged 5 commits into from
Aug 9, 2021

Conversation

mdedetrich
Copy link
Contributor

@mdedetrich mdedetrich commented Jul 22, 2021

This is an in progress PR for backing up into Amazon S3. Some important notes

  1. You cannot rename an object in S3 (apart from just copying it to another key which is inappropriate for the large files we will be creating), see https://stackoverflow.com/a/26820022. This means the only way to identify if a file is still being uploaded is by using specific S3 SDK's that allow you to list all multipart uploads in progress (unfortunately this is not part of the Alpakka API). Good news is that the API allows you to resume from a previously terminated multi upload so we don't have to re-download everything, we can just resume.
  2. Figuring out how to deal with committing ReducedConsumerRecord cursors is quite complicated. This is because you have a cursor for each ReducedConsumerRecord however when we are feeding the Flow into the S3.multipartUploadWithHeaders Sink we lose the concept of single elements of ReducedConsumerRecord with a Context (since at the point of S3.multipartUploadWithHeaders its only dealing with a stream of bytes with no beginning or end).
  3. It is a possibility that when dividing the period into the time elapsed from the start of the stream that we can overflow. This can be alleviated by using a different ChronoUnit (currently we use MICROS). A lazy way of fixing this issue is to use BigInt rather than Long (which can basically increase until you run out of memory. There may be a smarter way of handling this problem by detecting that if we are reaching Long.MaxValue we just reset the counter but doing so will make the logic less simple.

@mdedetrich mdedetrich marked this pull request as draft July 22, 2021 15:59
@mdedetrich mdedetrich force-pushed the add-s3-backup branch 14 times, most recently from 2b3cdb5 to f1fd7dd Compare July 27, 2021 10:55
Copy link
Contributor

@jlprat jlprat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added all the feedback I could think off.
Overall seems to be good. But obviously without tests it's complicated.

Comment on lines +21 to +37
sealed abstract class BackupStreamPosition

object BackupStreamPosition {

/** The backup stream has just started right now
*/
case object Start extends BackupStreamPosition

/** The backup stream is in the middle of a time period
*/
case object Middle extends BackupStreamPosition

/** The backup stream position has just hit a boundary for when a new period starts
*/
case object Boundary extends BackupStreamPosition
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If those are meant to be public, I would put them in their own file.
I personally prefer having 1 class per file, easier to navigate and locate things.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may make this private because I don't think it needs to be public, but I note your point (I will put it in a different file if it ends up being public)

Copy link
Contributor

@jlprat jlprat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.
Would it be possible to use new commits instead of force pushing? Reviewing is easier with newer commits

@mdedetrich
Copy link
Contributor Author

mdedetrich commented Jul 28, 2021

@jlprat Okay sure will do, this will be last time I will force push. Do not that github still keeps conversations on comments even if you have force pushed, it only gets hidden if the actual code gets changed.

@jlprat
Copy link
Contributor

jlprat commented Jul 28, 2021

LGTM. Let me know when it's not a draft anymore

@@ -117,7 +117,7 @@ trait BackupClientInterface {
(reducedConsumerRecord, period)
}

case None => throw new IllegalAccessException("")
case None => throw Errors.ExpectedStartOfSource
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Via testing I have verified that this case isn't possible, you have to have at least a single element for a Source in order for it to even start

// TODO Is it possible to hit this branch? I assume if the Stream is started its impossible for
// head to be empty
???
case None => throw Errors.ExpectedStartOfSource
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Via testing I have verified that this case isn't possible, you have to have at least a single element for a Source in order for it to even start

@@ -204,5 +228,5 @@ object BackupClientInterface {
reducedConsumerRecord: ReducedConsumerRecord
): Long =
// TODO handle overflow?
ChronoUnit.MICROS.between(reducedConsumerRecord.toOffsetDateTime, initialTime) / period.toMicros
ChronoUnit.MICROS.between(initialTime, reducedConsumerRecord.toOffsetDateTime) / period.toMicros
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One bug that I found in initial implementation, turns out you can negative time between 2 time periods!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't really get this change

@mdedetrich mdedetrich marked this pull request as ready for review August 6, 2021 09:05
@mdedetrich mdedetrich merged commit 783bf9c into main Aug 9, 2021
@mdedetrich mdedetrich deleted the add-s3-backup branch August 9, 2021 09:25
Copy link
Contributor

@jlprat jlprat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some comments mostly on the testing area.

import scala.annotation.nowarn
import scala.concurrent.Await
import scala.concurrent.duration.{FiniteDuration, _}
import scala.language.postfixOps
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This import is not needed

Comment on lines +44 to +46
"BackupClientInterface" can {
"splitAtBoundaryCondition" should {
"BackupStreamPosition.Boundary happy case" in {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if it improved or not, but nested matchers were bad at performance. A big flat spec was several times faster than a nested one.

Comment on lines +63 to +65
"calculateBackupStreamPositions" should {

"must always have at least one BackupStreamPosition.Boundary" in {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, either use must instead of should or remove it from the string

}
}

"Every ReducedConsumerRecord after a BackupStreamPosition.Boundary must be in the next consecutive time period" in {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sentence makes no sense:
"calculateBackupStreamPositions" should Every ReducedConsumerRecord after a BackupStreamPosition.Boundary must be in the next consecutive time period

}
}

"The time difference between two consecutive BackupStreamPosition.Middle has to be less then the time period" in {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. Probably means the test belongs somewhere else, or you can structure the nesting differently

import scala.language.postfixOps

trait ScalaTestConstants {
val AwaitTimeout = 10 minutes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's huge. Probably seconds might be enough. What about 5 seconds?

@@ -204,5 +228,5 @@ object BackupClientInterface {
reducedConsumerRecord: ReducedConsumerRecord
): Long =
// TODO handle overflow?
ChronoUnit.MICROS.between(reducedConsumerRecord.toOffsetDateTime, initialTime) / period.toMicros
ChronoUnit.MICROS.between(initialTime, reducedConsumerRecord.toOffsetDateTime) / period.toMicros
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't really get this change

@mdedetrich
Copy link
Contributor Author

mdedetrich commented Aug 10, 2021

@jlprat

Don't really get this change

In regards to ChronoUnit.MICROS.between, one gives positive values and the other gives negatives.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants