Skip to content

Commit

Permalink
Apply control loop pattern to task tracking
Browse files Browse the repository at this point in the history
Similar to other control loops (e.g. resources, verifications,
constraints), apply the same pattern to the "task" control loop.

Add a TaskActuator class to separate out the individual task tracking.

The new pattern brings consistency with the other control loops and
includes metrics + error handling
  • Loading branch information
osoriano committed Aug 25, 2023
1 parent 3224569 commit f6dac6b
Show file tree
Hide file tree
Showing 14 changed files with 300 additions and 518 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import strikt.assertions.first
import strikt.assertions.isEmpty
import strikt.assertions.isEqualTo
import java.time.Clock
import java.time.Duration

abstract class TaskTrackingRepositoryTests<T : TaskTrackingRepository> {

Expand All @@ -31,28 +32,28 @@ abstract class TaskTrackingRepositoryTests<T : TaskTrackingRepository> {

@Test
fun `returns nothing if there are no in-progress tasks`() {
expectThat(subject.getIncompleteTasks()).isEmpty()
expectThat(subject.getIncompleteTasks(Duration.ZERO, Int.MAX_VALUE)).isEmpty()
}

@Test
fun `in-progress tasks are returned`() {
subject.store(taskRecord1)
expectThat(subject.getIncompleteTasks().size).isEqualTo(1)
expectThat(subject.getIncompleteTasks()).first().get(TaskRecord::id).isEqualTo(taskRecord1.id)
expectThat(subject.getIncompleteTasks(Duration.ZERO, Int.MAX_VALUE).size).isEqualTo(1)
expectThat(subject.getIncompleteTasks(Duration.ZERO, Int.MAX_VALUE)).first().get(TaskRecord::id).isEqualTo(taskRecord1.id)
}

@Test
fun `multiple tasks may be returned`() {
subject.store(taskRecord2)
subject.store(taskRecord1)
expectThat(subject.getIncompleteTasks().size).isEqualTo(2)
expectThat(subject.getIncompleteTasks(Duration.ZERO, Int.MAX_VALUE).size).isEqualTo(2)
}

@Test
fun `completed tasks are not returned`() {
subject.store(taskRecord1)
expectThat(subject.getIncompleteTasks().size).isEqualTo(1)
subject.updateStatus(taskRecord1.id, SUCCEEDED)
expectThat(subject.getIncompleteTasks()).isEmpty()
expectThat(subject.getIncompleteTasks(Duration.ZERO, Int.MAX_VALUE).size).isEqualTo(1)
subject.delete(taskRecord1.id)
expectThat(subject.getIncompleteTasks(Duration.ZERO, Int.MAX_VALUE)).isEmpty()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.netflix.spinnaker.config

import org.springframework.boot.context.properties.ConfigurationProperties

@ConfigurationProperties(prefix = "keel.task-check")
class TaskCheckConfig: BaseSchedulerConfig() {
// only uses properties from the BaseSchedulerConfig,
// but this is here to give a separate prefix for overriding the values
// via fast property or in the config file.
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@ import com.netflix.spinnaker.config.EnvironmentDeletionConfig
import com.netflix.spinnaker.config.EnvironmentVerificationConfig
import com.netflix.spinnaker.config.PostDeployActionsConfig
import com.netflix.spinnaker.config.ResourceCheckConfig
import com.netflix.spinnaker.config.TaskCheckConfig
import com.netflix.spinnaker.keel.activation.ApplicationDown
import com.netflix.spinnaker.keel.activation.ApplicationUp
import com.netflix.spinnaker.keel.exceptions.EnvironmentCurrentlyBeingActedOn
import com.netflix.spinnaker.keel.logging.TracingSupport.Companion.blankMDC
import com.netflix.spinnaker.keel.persistence.AgentLockRepository
import com.netflix.spinnaker.keel.persistence.EnvironmentDeletionRepository
import com.netflix.spinnaker.keel.persistence.KeelRepository
import com.netflix.spinnaker.keel.persistence.TaskTrackingRepository
import com.netflix.spinnaker.keel.postdeploy.PostDeployActionRunner
import com.netflix.spinnaker.keel.telemetry.AgentInvocationComplete
import com.netflix.spinnaker.keel.scheduled.TaskActuator
import com.netflix.spinnaker.keel.telemetry.recordDurationPercentile
import com.netflix.spinnaker.keel.telemetry.safeIncrement
import com.netflix.spinnaker.keel.verification.VerificationRunner
Expand Down Expand Up @@ -43,6 +44,7 @@ import kotlin.math.max
@EnableConfigurationProperties(
ArtifactVersionCleanupConfig::class,
ResourceCheckConfig::class,
TaskCheckConfig::class,
EnvironmentDeletionConfig::class,
EnvironmentVerificationConfig::class,
PostDeployActionsConfig::class,
Expand All @@ -58,12 +60,14 @@ class CheckScheduler(
private val postDeployActionRunner: PostDeployActionRunner,
private val artifactVersionCleanupConfig: ArtifactVersionCleanupConfig,
private val resourceCheckConfig: ResourceCheckConfig,
private val taskCheckConfig: TaskCheckConfig,
private val verificationConfig: EnvironmentVerificationConfig,
private val postDeployConfig: PostDeployActionsConfig,
private val environmentDeletionConfig: EnvironmentDeletionConfig,
private val environmentCleaner: EnvironmentCleaner,
private val publisher: ApplicationEventPublisher,
private val agentLockRepository: AgentLockRepository,
private val taskActuator: TaskActuator,
private val taskTrackingRepository: TaskTrackingRepository,
private val clock: Clock,
private val springEnv: Environment,
private val spectator: Registry
Expand Down Expand Up @@ -388,23 +392,53 @@ class CheckScheduler(
}
}

// todo eb: remove this loop in favor of transitioning the [OrcaTaskMonitoringAgent] to a
// [LifecycleMonitor]
@Scheduled(fixedDelayString = "\${keel.scheduled.agent.frequency:PT1M}")
fun invokeAgent() {
@Scheduled(fixedDelayString = "\${keel.task-check.frequency:PT1S}")
fun checkTasks() {
if (enabled.get()) {
val startTime = clock.instant()
agentLockRepository.agents.forEach {
val agentName: String = it.javaClass.simpleName
val lockAcquired = agentLockRepository.tryAcquireLock(agentName, it.lockTimeoutSeconds)
if (lockAcquired) {
runBlocking(blankMDC) {
it.invokeAgent()
val job = launch(blankMDC) {
supervisorScope {
runCatching {
taskTrackingRepository.getIncompleteTasks(taskCheckConfig.minAgeDuration, taskCheckConfig.batchSize)
}
publisher.publishEvent(AgentInvocationComplete(Duration.between(startTime, clock.instant()), agentName))
.onFailure {
log.error("Exception fetching tasks due for check", it)
}
.onSuccess {
it.forEach {
launch {
try {
withTimeout(taskCheckConfig.timeoutDuration.toMillis()) {
taskActuator.checkTask(it)
}
} catch (e: TimeoutCancellationException) {
log.error("Timed out checking task ${it.id}", e)
spectator.counter(
"keel.scheduled.timeout",
listOf(
BasicTag("type", "task")
)
).safeIncrement()
} catch (e: Exception) {
log.error("Failed checking task ${it.id}", e)
spectator.counter(
"keel.scheduled.failure",
listOf(
BasicTag("type", "task")
)
).safeIncrement()
}
}
}
spectator.counter(
"keel.scheduled.batch.size",
listOf(BasicTag("type", "task"))
).increment(it.size.toLong())
}
}
}
recordDuration(startTime, "agent")
runBlocking { job.join() }
recordDuration(startTime, "task")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package com.netflix.spinnaker.keel.persistence

import com.netflix.spinnaker.keel.api.TaskStatus
import com.netflix.spinnaker.keel.api.actuation.SubjectType
import java.time.Duration

interface TaskTrackingRepository {
fun store(task: TaskRecord)
fun getIncompleteTasks(): Set<TaskRecord>
fun updateStatus(taskId: String, status: TaskStatus)
fun getIncompleteTasks(minTimeSinceLastCheck: Duration, limit: Int): Set<TaskRecord>
fun delete(taskId: String)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.netflix.spinnaker.keel.scheduled

import com.netflix.spinnaker.keel.persistence.TaskRecord

interface TaskActuator {

suspend fun checkTask(task: TaskRecord)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,22 @@ import com.netflix.spinnaker.config.EnvironmentDeletionConfig
import com.netflix.spinnaker.config.EnvironmentVerificationConfig
import com.netflix.spinnaker.config.PostDeployActionsConfig
import com.netflix.spinnaker.config.ResourceCheckConfig
import com.netflix.spinnaker.config.TaskCheckConfig
import com.netflix.spinnaker.keel.api.Environment
import com.netflix.spinnaker.keel.api.ResourceKind.Companion.parseKind
import com.netflix.spinnaker.keel.api.actuation.SubjectType
import com.netflix.spinnaker.keel.api.artifacts.VirtualMachineOptions
import com.netflix.spinnaker.keel.api.plugins.UnsupportedKind
import com.netflix.spinnaker.keel.artifacts.DebianArtifact
import com.netflix.spinnaker.keel.artifacts.DockerArtifact
import com.netflix.spinnaker.keel.persistence.AgentLockRepository
import com.netflix.spinnaker.keel.persistence.EnvironmentDeletionRepository
import com.netflix.spinnaker.keel.persistence.KeelRepository
import com.netflix.spinnaker.keel.persistence.TaskRecord
import com.netflix.spinnaker.keel.persistence.TaskTrackingRepository
import com.netflix.spinnaker.keel.postdeploy.PostDeployActionRunner
import com.netflix.spinnaker.keel.scheduled.ScheduledAgent
import com.netflix.spinnaker.keel.scheduled.TaskActuator
import com.netflix.spinnaker.keel.test.randomString
import com.netflix.spinnaker.keel.test.resource
import com.netflix.spinnaker.keel.verification.VerificationRunner
import com.netflix.spinnaker.time.MutableClock
Expand All @@ -38,6 +43,7 @@ internal object CheckSchedulerTests : JUnit5Minutests {
private val repository: KeelRepository = mockk()
private val postDeployActionRunner: PostDeployActionRunner = mockk()
private val resourceActuator = mockk<ResourceActuator>(relaxUnitFun = true)
private val taskActuator = mockk<TaskActuator>(relaxUnitFun = true)
private val environmentPromotionChecker = mockk<EnvironmentPromotionChecker>()
private val artifactHandler = mockk<ArtifactHandler>(relaxUnitFun = true)
private val publisher = mockk<ApplicationEventPublisher>(relaxUnitFun = true)
Expand All @@ -47,6 +53,10 @@ internal object CheckSchedulerTests : JUnit5Minutests {
it.minAgeDuration = checkMinAge
it.batchSize = 2
}
private val taskCheckConfig = TaskCheckConfig().also {
it.minAgeDuration = checkMinAge
it.batchSize = 2
}
private val verificationConfig = EnvironmentVerificationConfig().also {
it.minAgeDuration = checkMinAge
it.batchSize = 2
Expand All @@ -64,20 +74,7 @@ internal object CheckSchedulerTests : JUnit5Minutests {
}


class DummyScheduledAgent(override val lockTimeoutSeconds: Long) : ScheduledAgent {
override suspend fun invokeAgent() {
}
}

private val dummyAgent = mockk<DummyScheduledAgent>(relaxUnitFun = true) {
every {
lockTimeoutSeconds
} returns 5
}

private var agentLockRepository = mockk<AgentLockRepository>(relaxUnitFun = true) {
every { agents } returns listOf(dummyAgent)
}
private var taskTrackingRepository = mockk<TaskTrackingRepository>()

private val verificationRunner = mockk<VerificationRunner>()

Expand Down Expand Up @@ -117,6 +114,11 @@ internal object CheckSchedulerTests : JUnit5Minutests {
Environment("my-preview-environment2")
)

private val taskRecords = setOf(
TaskRecord("123", "Upsert server group", SubjectType.RESOURCE, randomString(), randomString(), randomString()),
TaskRecord("456", "Bake", SubjectType.RESOURCE, randomString(), null, null)
)

fun tests() = rootContext<CheckScheduler> {
fixture {
CheckScheduler(
Expand All @@ -128,12 +130,14 @@ internal object CheckSchedulerTests : JUnit5Minutests {
artifactHandlers = listOf(artifactHandler),
artifactVersionCleanupConfig = ArtifactVersionCleanupConfig(),
resourceCheckConfig = resourceCheckConfig,
taskCheckConfig = taskCheckConfig,
verificationConfig = verificationConfig,
postDeployConfig = postDeployConfig,
environmentDeletionConfig = EnvironmentDeletionConfig(),
environmentCleaner = environmentCleaner,
publisher = publisher,
agentLockRepository = agentLockRepository,
taskActuator = taskActuator,
taskTrackingRepository = taskTrackingRepository,
verificationRunner = verificationRunner,
clock = MutableClock(),
springEnv = springEnv,
Expand Down Expand Up @@ -243,20 +247,26 @@ internal object CheckSchedulerTests : JUnit5Minutests {
}
}

context("test invoke agents") {
context("test task checking") {
before {
onApplicationUp()
}

test("invoke a single agent") {
test("invoke a single task tracking iteration") {
every {
agentLockRepository.tryAcquireLock(any(), any())
} returns true
taskTrackingRepository.getIncompleteTasks(any(), any())
} returns taskRecords

invokeAgent()
checkTasks()

taskRecords.forEach { taskRecord ->
verify(timeout = 500) {
taskActuator.checkTask(taskRecord)
}
}

verify {
dummyAgent.invokeAgent()
taskTrackingRepository.getIncompleteTasks(any(), any())
}
}
after {
Expand Down
Loading

0 comments on commit f6dac6b

Please sign in to comment.