Skip to content

Commit

Permalink
Implement retry logic for fetch
Browse files Browse the repository at this point in the history
  • Loading branch information
juraj-hrivnak committed Oct 11, 2024
1 parent b29f66b commit 85c00a7
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 62 deletions.
110 changes: 67 additions & 43 deletions src/commonMain/kotlin/teksturepako/pakku/api/actions/fetch/Fetch.kt
Original file line number Diff line number Diff line change
Expand Up @@ -33,68 +33,92 @@ fun retrieveProjectFiles(
@OptIn(ExperimentalCoroutinesApi::class)
suspend fun List<ProjectFile>.fetch(
onError: suspend (error: ActionError) -> Unit,
onProgress: suspend (advance: Long, total: Long) -> Unit,
onProgress: suspend (completed: Long, total: Long) -> Unit,
onSuccess: suspend (path: Path, projectFile: ProjectFile) -> Unit,
lockFile: LockFile, configFile: ConfigFile?
lockFile: LockFile, configFile: ConfigFile?, retry: Int? = null
) = coroutineScope {
val maxBytes: AtomicLong = atomic(0L)
tailrec suspend fun tryFetch(projectFiles: List<ProjectFile>, retryNumber: Int = 0)
{
val totalBytes: AtomicLong = atomic(0L)
val completedBytes: AtomicLong = atomic(0L)

val fetchChannel = produce {
for (projectFile in projectFiles)
{
launch {
val parentProject = projectFile.getParentProject(lockFile) ?: return@launch

val channel = produce {
for (projectFile in this@fetch)
{
launch {
val parentProject = projectFile.getParentProject(lockFile)?: return@launch
val path = projectFile.getPath(parentProject, configFile)

val path = projectFile.getPath(parentProject, configFile)
if (path.exists())
{
onError(AlreadyExists(path.toString()))
return@launch
}

if (path.exists())
{
onError(AlreadyExists(path.toString()))
return@launch
}
totalBytes += projectFile.size.toLong()
val prevBytes: AtomicLong = atomic(0L)

maxBytes += projectFile.size.toLong()
val prevBytes: AtomicLong = atomic(0L)
val bytes = Http().requestByteArray(projectFile.url!!) { bytesSentTotal, _ ->
completedBytes.getAndAdd(bytesSentTotal - prevBytes.value)

val bytes = Http().requestByteArray(projectFile.url!!) { bytesSentTotal, _ ->
onProgress(bytesSentTotal - prevBytes.value, maxBytes.value)
prevBytes.getAndSet(bytesSentTotal)
}
onProgress(completedBytes.value, totalBytes.value)
prevBytes.getAndSet(bytesSentTotal)
}

if (bytes == null)
{
onError(DownloadFailed(path))
return@launch
}
if (bytes == null)
{
onError(DownloadFailed(path, retryNumber))
send(Err(projectFile))
return@launch
}

projectFile.checkIntegrity(bytes, path)?.let { err ->
onError(err)

projectFile.checkIntegrity(bytes, path)?.let { err ->
onError(err)
if (err is HashMismatch) return@launch
}

if (err is HashMismatch) return@launch
send(Ok(Triple(path, projectFile, bytes)))
}
}
}

send(Triple(path, projectFile, bytes))
val jobs = mutableListOf<Job>()
val fails = mutableListOf<Deferred<ProjectFile>>()

fetchChannel.consumeEach { result ->
result.onSuccess { (path, projectFile, bytes) ->
jobs += launch(Dispatchers.IO) {
runCatching {
path.createParentDirectories()
path.writeBytes(bytes)
}.onSuccess {
onSuccess(path, projectFile)
}.onFailure {
onError(CouldNotSave(path, it.stackTraceToString()))
}
}
}.onFailure { projectFile ->
fails += async {
projectFile
}
}
}
}

val jobs = mutableListOf<Job>()
jobs.joinAll()

channel.consumeEach { (path, projectFile, bytes) ->
jobs += launch(Dispatchers.IO) {
runCatching {
path.createParentDirectories()
path.writeBytes(bytes)
}.onSuccess {
onSuccess(path, projectFile)
}.onFailure {
onError(CouldNotSave(path, it.stackTraceToString()))
}
val filesToRetry = fails.awaitAll()

if (retry != null && retryNumber < retry && retryNumber < 10 && filesToRetry.isNotEmpty() )
{
tryFetch(filesToRetry, retryNumber + 1)
}
}

jobs.joinAll()
this.cancel()
launch {
tryFetch(this@fetch)
}
}

@OptIn(ExperimentalCoroutinesApi::class)
Expand Down
46 changes: 27 additions & 19 deletions src/commonMain/kotlin/teksturepako/pakku/cli/cmd/Fetch.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ package teksturepako.pakku.cli.cmd
import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.core.Context
import com.github.ajalt.clikt.core.terminal
import com.github.ajalt.clikt.parameters.options.default
import com.github.ajalt.clikt.parameters.options.help
import com.github.ajalt.clikt.parameters.options.option
import com.github.ajalt.clikt.parameters.options.optionalValue
import com.github.ajalt.clikt.parameters.types.int
import com.github.ajalt.colormath.Color
import com.github.ajalt.colormath.model.RGB
import com.github.ajalt.mordant.animation.coroutines.animateInCoroutine
import com.github.ajalt.mordant.animation.progress.advance
import com.github.ajalt.mordant.rendering.TextStyle
import com.github.ajalt.mordant.terminal.danger
import com.github.ajalt.mordant.widgets.Spinner
Expand All @@ -32,6 +36,12 @@ class Fetch : CliktCommand()
{
override fun help(context: Context) = "Fetch projects to your modpack folder"

private val retryOpt: Int? by option("--retry")
.help("The number of times to retry")
.int()
.optionalValue(2)
.default(0)

override fun run() = runBlocking {
val lockFile = LockFile.readToResult().getOrElse {
terminal.danger(it.message)
Expand Down Expand Up @@ -68,21 +78,21 @@ class Fetch : CliktCommand()

launch { progressBar.execute() }

val fetchJob = launch {
projectFiles.fetch(
onError = { error ->
if (error !is AlreadyExists) terminal.pError(error)
},
onProgress = { advance, total ->
progressBar.advance(advance)
progressBar.update { this.total = total }
},
onSuccess = { path, _ ->
terminal.pSuccess("$path saved")
},
lockFile, configFile
)
}
val fetchJob = projectFiles.fetch(
onError = { error ->
if (error !is AlreadyExists) terminal.pError(error)
},
onProgress = { completed, total ->
progressBar.update {
this.completed = completed
this.total = total
}
},
onSuccess = { path, _ ->
terminal.pSuccess("$path saved")
},
lockFile, configFile, retryOpt
)

// -- OVERRIDES --

Expand Down Expand Up @@ -123,9 +133,7 @@ class Fetch : CliktCommand()
syncJob.join()
oldFilesJob.join()

launch {
progressBar.stop()
}
progressBar.stop()

echo()
}
Expand Down

0 comments on commit 85c00a7

Please sign in to comment.