Skip to content

Commit 086a55d

Browse files
Merge branch 'master' into feat/excludeExecutionForDisabledPipelines
2 parents 03ee606 + d739295 commit 086a55d

File tree

11 files changed

+801
-69
lines changed

11 files changed

+801
-69
lines changed

orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt

+28
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.springframework.context.ApplicationContext
3030
import org.springframework.context.annotation.Primary
3131
import org.springframework.stereotype.Component
3232
import rx.Observable
33+
import javax.annotation.Nonnull
3334

3435
/**
3536
* Intended for performing red/black Orca deployments which do not share the
@@ -192,6 +193,13 @@ class DualExecutionRepository(
192193
).distinct { it.id }
193194
}
194195

196+
override fun retrievePipelineConfigIdsForApplication(application: String): List<String> {
197+
return (
198+
primary.retrievePipelineConfigIdsForApplication(application) +
199+
previous.retrievePipelineConfigIdsForApplication(application)
200+
).distinct()
201+
}
202+
195203
override fun retrievePipelinesForPipelineConfigId(
196204
pipelineConfigId: String,
197205
criteria: ExecutionCriteria
@@ -202,6 +210,26 @@ class DualExecutionRepository(
202210
).distinct { it.id }
203211
}
204212

213+
override fun retrieveAndFilterPipelineExecutionIdsForApplication(
214+
@Nonnull application: String,
215+
@Nonnull pipelineConfigIds: List<String>,
216+
@Nonnull criteria: ExecutionCriteria
217+
): List<String> {
218+
return primary.retrieveAndFilterPipelineExecutionIdsForApplication(application, pipelineConfigIds, criteria) +
219+
previous.retrieveAndFilterPipelineExecutionIdsForApplication(application, pipelineConfigIds, criteria)
220+
}
221+
222+
override fun retrievePipelineExecutionDetailsForApplication(
223+
@Nonnull application: String,
224+
pipelineConfigIds: List<String>,
225+
queryTimeoutSeconds: Int
226+
): Collection<PipelineExecution> {
227+
return (
228+
primary.retrievePipelineExecutionDetailsForApplication(application, pipelineConfigIds, queryTimeoutSeconds) +
229+
previous.retrievePipelineExecutionDetailsForApplication(application, pipelineConfigIds, queryTimeoutSeconds)
230+
).distinctBy { it.id }
231+
}
232+
205233
override fun retrievePipelinesForPipelineConfigIdsBetweenBuildTimeBoundary(
206234
pipelineConfigIds: MutableList<String>,
207235
buildTimeStartBoundary: Long,

orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java

+15
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,21 @@ Observable<PipelineExecution> retrieve(
9292
Observable<PipelineExecution> retrievePipelinesForPipelineConfigId(
9393
@Nonnull String pipelineConfigId, @Nonnull ExecutionCriteria criteria);
9494

95+
@Nonnull
96+
Collection<String> retrievePipelineConfigIdsForApplication(@Nonnull String application);
97+
98+
@Nonnull
99+
Collection<String> retrieveAndFilterPipelineExecutionIdsForApplication(
100+
@Nonnull String application,
101+
@Nonnull List<String> pipelineConfigIds,
102+
@Nonnull ExecutionCriteria criteria);
103+
104+
@Nonnull
105+
Collection<PipelineExecution> retrievePipelineExecutionDetailsForApplication(
106+
@Nonnull String application,
107+
@Nonnull List<String> pipelineConfigIds,
108+
int queryTimeoutSeconds);
109+
95110
/**
96111
* Returns executions in the time boundary. Redis impl does not respect pageSize or offset params,
97112
* and returns all executions. Sql impl respects these params.

orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt

+30-1
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,11 @@ import com.netflix.spinnaker.orca.api.pipeline.models.PipelineExecution
2323
import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution
2424
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.ExecutionComparator
2525
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.ExecutionCriteria
26+
import rx.Observable
2627
import java.lang.System.currentTimeMillis
2728
import java.time.Instant
2829
import java.util.concurrent.ConcurrentHashMap
29-
import rx.Observable
30+
import javax.annotation.Nonnull
3031

3132
class InMemoryExecutionRepository : ExecutionRepository {
3233

@@ -276,6 +277,34 @@ class InMemoryExecutionRepository : ExecutionRepository {
276277
)
277278
}
278279

280+
override fun retrievePipelineConfigIdsForApplication(application: String): List<String> {
281+
return pipelines.values
282+
.filter { it.application == application }
283+
.map { it.pipelineConfigId }
284+
.distinct()
285+
}
286+
287+
override fun retrieveAndFilterPipelineExecutionIdsForApplication(
288+
@Nonnull application: String,
289+
@Nonnull pipelineConfigIds: List<String>,
290+
@Nonnull criteria: ExecutionCriteria
291+
): List<String> {
292+
return pipelines.values
293+
.filter { it.application == application && pipelineConfigIds.contains(it.pipelineConfigId) }
294+
.applyCriteria(criteria)
295+
.map { it.id }
296+
}
297+
298+
override fun retrievePipelineExecutionDetailsForApplication(
299+
application: String,
300+
pipelineConfigIds: List<String>,
301+
queryTimeoutSeconds: Int
302+
): Collection<PipelineExecution> {
303+
return pipelines.values
304+
.filter { it.application == application && pipelineConfigIds.contains(it.pipelineConfigId) }
305+
.distinctBy { it.id }
306+
}
307+
279308
override fun retrieveOrchestrationForCorrelationId(correlationId: String): PipelineExecution {
280309
return retrieveByCorrelationId(ORCHESTRATION, correlationId)
281310
}

orca-front50/src/main/groovy/com/netflix/spinnaker/orca/front50/Front50Service.groovy

+1-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ interface Front50Service {
6969
List<Map<String, Object>> getPipelines(@Path("applicationName") String applicationName, @Query("refresh") boolean refresh)
7070

7171
@GET("/pipelines/{applicationName}")
72-
List<Map<String, Object>> getPipelines(@Path("applicationName") String applicationName, @Query("refresh") boolean refresh, @Query("enabledPipelines") boolean enabledPipelines)
72+
List<Map<String, Object>> getPipelines(@Path("applicationName") String applicationName, @Query("refresh") boolean refresh, @Query("pipelineStateFilter") String pipelineStateFilter)
7373

7474
@GET("/pipelines/{pipelineId}/get")
7575
Map<String, Object> getPipeline(@Path("pipelineId") String pipelineId)

orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java

+31
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,37 @@ public void delete(@Nonnull ExecutionType type, @Nonnull List<String> idsToDelet
479479
return currentObservable;
480480
}
481481

482+
@Override
483+
public @Nonnull List<String> retrievePipelineConfigIdsForApplication(
484+
@Nonnull String application) {
485+
// TODO: not implemented yet - this method, at present, is primarily meant for the
486+
// SqlExecutionRepository
487+
// implementation.
488+
return List.of();
489+
}
490+
491+
@Override
492+
public @Nonnull List<String> retrieveAndFilterPipelineExecutionIdsForApplication(
493+
@Nonnull String application,
494+
@Nonnull List<String> pipelineConfigIds,
495+
@Nonnull ExecutionCriteria criteria) {
496+
// TODO: not implemented yet - this method, at present, is primarily meant for the
497+
// SqlExecutionRepository
498+
// implementation.
499+
return List.of();
500+
}
501+
502+
@Override
503+
public @Nonnull List<PipelineExecution> retrievePipelineExecutionDetailsForApplication(
504+
@Nonnull String application,
505+
@Nonnull List<String> pipelineExecutionIds,
506+
int queryTimeoutSeconds) {
507+
// TODO: not implemented yet - this method, at present, is primarily meant for the
508+
// SqlExecutionRepository
509+
// implementation.
510+
return List.of();
511+
}
512+
482513
/*
483514
* There is no guarantee that the returned results will be sorted.
484515
* @param limit and the param @offset are only implemented in SqlExecutionRepository

orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt

+149-2
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,6 @@ import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.Execu
5353
import com.netflix.spinnaker.orca.pipeline.persistence.UnpausablePipelineException
5454
import com.netflix.spinnaker.orca.pipeline.persistence.UnresumablePipelineException
5555
import de.huxhorn.sulky.ulid.SpinULID
56-
import java.lang.System.currentTimeMillis
57-
import java.security.SecureRandom
5856
import java.time.Duration
5957
import org.jooq.DSLContext
6058
import org.jooq.DatePart
@@ -80,7 +78,33 @@ import org.jooq.impl.DSL.value
8078
import org.slf4j.LoggerFactory
8179
import rx.Observable
8280
import java.io.ByteArrayOutputStream
81+
import java.lang.System.currentTimeMillis
8382
import java.nio.charset.StandardCharsets
83+
import java.security.SecureRandom
84+
import java.util.stream.Collectors.toList
85+
import kotlin.collections.Collection
86+
import kotlin.collections.Iterable
87+
import kotlin.collections.Iterator
88+
import kotlin.collections.List
89+
import kotlin.collections.Map
90+
import kotlin.collections.MutableList
91+
import kotlin.collections.chunked
92+
import kotlin.collections.distinct
93+
import kotlin.collections.firstOrNull
94+
import kotlin.collections.forEach
95+
import kotlin.collections.isEmpty
96+
import kotlin.collections.isNotEmpty
97+
import kotlin.collections.listOf
98+
import kotlin.collections.map
99+
import kotlin.collections.mapOf
100+
import kotlin.collections.mutableListOf
101+
import kotlin.collections.mutableMapOf
102+
import kotlin.collections.plus
103+
import kotlin.collections.set
104+
import kotlin.collections.toList
105+
import kotlin.collections.toMutableList
106+
import kotlin.collections.toMutableMap
107+
import kotlin.collections.toTypedArray
84108

85109
/**
86110
* A generic SQL [ExecutionRepository].
@@ -427,6 +451,129 @@ class SqlExecutionRepository(
427451
)
428452
}
429453

454+
override fun retrievePipelineConfigIdsForApplication(application: String): List<String> =
455+
withPool(poolName) {
456+
return jooq.selectDistinct(field("config_id"))
457+
.from(PIPELINE.tableName)
458+
.where(field("application").eq(application))
459+
.fetch(0, String::class.java)
460+
}
461+
462+
/**
463+
* this function supports the following ExecutionCriteria currently:
464+
* 'limit', a.k.a page size and
465+
* 'statuses'.
466+
*
467+
* It executes the following query to determine how many pipeline executions exist that satisfy the above
468+
* ExecutionCriteria. It then returns a list of all these execution ids.
469+
*
470+
* It does this by executing the following query:
471+
* - If the execution criteria does not contain any statuses:
472+
* SELECT config_id, id
473+
FROM pipelines force index (`pipeline_application_idx`)
474+
WHERE application = "myapp"
475+
ORDER BY
476+
config_id;
477+
* - If the execution criteria contains statuses:
478+
* SELECT config_id, id
479+
FROM pipelines force index (`pipeline_application_status_starttime_idx`)
480+
WHERE (
481+
application = "myapp" and
482+
status in ("status1", "status2)
483+
)
484+
ORDER BY
485+
config_id;
486+
487+
* It then applies the limit execution criteria on the result set obtained above. We observed load issues in the db
488+
* when running a query where the limit was calculated in the query itself. Therefore, we are moving that logic to
489+
* the code below to ease the burden on the db in such circumstances.
490+
*/
491+
override fun retrieveAndFilterPipelineExecutionIdsForApplication(
492+
application: String,
493+
pipelineConfigIds: List<String>,
494+
criteria: ExecutionCriteria
495+
): List<String> {
496+
497+
// baseQueryPredicate for the flow where there are no statuses in the execution criteria
498+
var baseQueryPredicate = field("application").eq(application)
499+
.and(field("config_id").`in`(*pipelineConfigIds.toTypedArray()))
500+
501+
var table = if (jooq.dialect() == SQLDialect.MYSQL) PIPELINE.tableName.forceIndex("pipeline_application_idx")
502+
else PIPELINE.tableName
503+
// baseQueryPredicate for the flow with statuses
504+
if (criteria.statuses.isNotEmpty() && criteria.statuses.size != ExecutionStatus.values().size) {
505+
val statusStrings = criteria.statuses.map { it.toString() }
506+
baseQueryPredicate = baseQueryPredicate
507+
.and(field("status").`in`(*statusStrings.toTypedArray()))
508+
509+
table = if (jooq.dialect() == SQLDialect.MYSQL) PIPELINE.tableName.forceIndex("pipeline_application_status_starttime_idx")
510+
else PIPELINE.tableName
511+
}
512+
513+
val finalResult: MutableList<String> = mutableListOf()
514+
515+
withPool(poolName) {
516+
val baseQuery = jooq.select(field("config_id"), field("id"))
517+
.from(table)
518+
.where(baseQueryPredicate)
519+
.orderBy(field("config_id"))
520+
.fetch().intoGroups("config_id", "id")
521+
522+
baseQuery.forEach {
523+
val count = it.value.size
524+
if (criteria.pageSize < count) {
525+
finalResult.addAll(it.value
526+
.stream()
527+
.skip((count - criteria.pageSize).toLong())
528+
.collect(toList()) as List<String>
529+
)
530+
} else {
531+
finalResult.addAll(it.value as List<String>)
532+
}
533+
}
534+
}
535+
return finalResult
536+
}
537+
538+
/**
539+
* It executes the following query to get execution details for n executions at a time in a specific application
540+
*
541+
* SELECT id, body, compressed_body, compression_type, `partition`
542+
FROM pipelines force index (`pipeline_application_idx`)
543+
left outer join
544+
pipelines_compressed_executions
545+
using (`id`)
546+
WHERE (
547+
application = "<myapp>" and
548+
id in ('id1', 'id2', 'id3')
549+
);
550+
*
551+
* it then gets all the stage information for all the executions returned from the above query.
552+
*/
553+
override fun retrievePipelineExecutionDetailsForApplication(
554+
application: String,
555+
pipelineExecutions: List<String>,
556+
queryTimeoutSeconds: Int
557+
): Collection<PipelineExecution> {
558+
withPool(poolName) {
559+
val baseQuery = jooq.select(selectExecutionFields(compressionProperties))
560+
.from(
561+
if (jooq.dialect() == SQLDialect.MYSQL) PIPELINE.tableName.forceIndex("pipeline_application_idx")
562+
else PIPELINE.tableName
563+
)
564+
.leftOuterJoin(PIPELINE.tableName.compressedExecTable).using(field("id"))
565+
.where(
566+
field("application").eq(application)
567+
.and(field("id").`in`(*pipelineExecutions.toTypedArray()))
568+
)
569+
.queryTimeout(queryTimeoutSeconds) // add an explicit timeout so that the query doesn't run forever
570+
.fetch()
571+
572+
log.debug("getting stage information for all the executions found so far")
573+
return ExecutionMapper(mapper, stageReadSize,compressionProperties, pipelineRefEnabled).map(baseQuery.intoResultSet(), jooq)
574+
}
575+
}
576+
430577
override fun retrievePipelinesForPipelineConfigId(
431578
pipelineConfigId: String,
432579
criteria: ExecutionCriteria

orca-web/orca-web.gradle

+10
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,19 @@ dependencies {
9494
testImplementation("io.strikt:strikt-core")
9595
testImplementation("io.mockk:mockk")
9696
testImplementation("org.apache.groovy:groovy-json")
97+
testImplementation("com.nhaarman:mockito-kotlin")
98+
testImplementation("io.spinnaker.kork:kork-sql-test")
99+
testImplementation("org.testcontainers:mysql")
97100
testImplementation ("com.squareup.retrofit2:retrofit-mock")
98101
}
99102

103+
sourceSets {
104+
main {
105+
java { srcDirs = [] } // no source dirs for the java compiler
106+
groovy { srcDirs = ["src/main/java", "src/main/groovy"] } // compile everything in src/ with groovy
107+
}
108+
}
109+
100110
test {
101111
//The Implementation-Version is set in the MANIFEST.MF for the JAR produced via testing so that
102112
//assertions can be made against the version (see orca-plugins-test, for example).

0 commit comments

Comments
 (0)