@@ -53,8 +53,6 @@ import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.Execu
53
53
import com.netflix.spinnaker.orca.pipeline.persistence.UnpausablePipelineException
54
54
import com.netflix.spinnaker.orca.pipeline.persistence.UnresumablePipelineException
55
55
import de.huxhorn.sulky.ulid.SpinULID
56
- import java.lang.System.currentTimeMillis
57
- import java.security.SecureRandom
58
56
import java.time.Duration
59
57
import org.jooq.DSLContext
60
58
import org.jooq.DatePart
@@ -80,7 +78,33 @@ import org.jooq.impl.DSL.value
80
78
import org.slf4j.LoggerFactory
81
79
import rx.Observable
82
80
import java.io.ByteArrayOutputStream
81
+ import java.lang.System.currentTimeMillis
83
82
import java.nio.charset.StandardCharsets
83
+ import java.security.SecureRandom
84
+ import java.util.stream.Collectors.toList
85
+ import kotlin.collections.Collection
86
+ import kotlin.collections.Iterable
87
+ import kotlin.collections.Iterator
88
+ import kotlin.collections.List
89
+ import kotlin.collections.Map
90
+ import kotlin.collections.MutableList
91
+ import kotlin.collections.chunked
92
+ import kotlin.collections.distinct
93
+ import kotlin.collections.firstOrNull
94
+ import kotlin.collections.forEach
95
+ import kotlin.collections.isEmpty
96
+ import kotlin.collections.isNotEmpty
97
+ import kotlin.collections.listOf
98
+ import kotlin.collections.map
99
+ import kotlin.collections.mapOf
100
+ import kotlin.collections.mutableListOf
101
+ import kotlin.collections.mutableMapOf
102
+ import kotlin.collections.plus
103
+ import kotlin.collections.set
104
+ import kotlin.collections.toList
105
+ import kotlin.collections.toMutableList
106
+ import kotlin.collections.toMutableMap
107
+ import kotlin.collections.toTypedArray
84
108
85
109
/* *
86
110
* A generic SQL [ExecutionRepository].
@@ -427,6 +451,129 @@ class SqlExecutionRepository(
427
451
)
428
452
}
429
453
454
+ override fun retrievePipelineConfigIdsForApplication (application : String ): List <String > =
455
+ withPool(poolName) {
456
+ return jooq.selectDistinct(field(" config_id" ))
457
+ .from(PIPELINE .tableName)
458
+ .where(field(" application" ).eq(application))
459
+ .fetch(0 , String ::class .java)
460
+ }
461
+
462
+ /* *
463
+ * this function supports the following ExecutionCriteria currently:
464
+ * 'limit', a.k.a page size and
465
+ * 'statuses'.
466
+ *
467
+ * It executes the following query to determine how many pipeline executions exist that satisfy the above
468
+ * ExecutionCriteria. It then returns a list of all these execution ids.
469
+ *
470
+ * It does this by executing the following query:
471
+ * - If the execution criteria does not contain any statuses:
472
+ * SELECT config_id, id
473
+ FROM pipelines force index (`pipeline_application_idx`)
474
+ WHERE application = "myapp"
475
+ ORDER BY
476
+ config_id;
477
+ * - If the execution criteria contains statuses:
478
+ * SELECT config_id, id
479
+ FROM pipelines force index (`pipeline_application_status_starttime_idx`)
480
+ WHERE (
481
+ application = "myapp" and
482
+ status in ("status1", "status2)
483
+ )
484
+ ORDER BY
485
+ config_id;
486
+
487
+ * It then applies the limit execution criteria on the result set obtained above. We observed load issues in the db
488
+ * when running a query where the limit was calculated in the query itself. Therefore, we are moving that logic to
489
+ * the code below to ease the burden on the db in such circumstances.
490
+ */
491
+ override fun retrieveAndFilterPipelineExecutionIdsForApplication (
492
+ application : String ,
493
+ pipelineConfigIds : List <String >,
494
+ criteria : ExecutionCriteria
495
+ ): List <String > {
496
+
497
+ // baseQueryPredicate for the flow where there are no statuses in the execution criteria
498
+ var baseQueryPredicate = field(" application" ).eq(application)
499
+ .and (field(" config_id" ).`in `(* pipelineConfigIds.toTypedArray()))
500
+
501
+ var table = if (jooq.dialect() == SQLDialect .MYSQL ) PIPELINE .tableName.forceIndex(" pipeline_application_idx" )
502
+ else PIPELINE .tableName
503
+ // baseQueryPredicate for the flow with statuses
504
+ if (criteria.statuses.isNotEmpty() && criteria.statuses.size != ExecutionStatus .values().size) {
505
+ val statusStrings = criteria.statuses.map { it.toString() }
506
+ baseQueryPredicate = baseQueryPredicate
507
+ .and (field(" status" ).`in `(* statusStrings.toTypedArray()))
508
+
509
+ table = if (jooq.dialect() == SQLDialect .MYSQL ) PIPELINE .tableName.forceIndex(" pipeline_application_status_starttime_idx" )
510
+ else PIPELINE .tableName
511
+ }
512
+
513
+ val finalResult: MutableList <String > = mutableListOf ()
514
+
515
+ withPool(poolName) {
516
+ val baseQuery = jooq.select(field(" config_id" ), field(" id" ))
517
+ .from(table)
518
+ .where(baseQueryPredicate)
519
+ .orderBy(field(" config_id" ))
520
+ .fetch().intoGroups(" config_id" , " id" )
521
+
522
+ baseQuery.forEach {
523
+ val count = it.value.size
524
+ if (criteria.pageSize < count) {
525
+ finalResult.addAll(it.value
526
+ .stream()
527
+ .skip((count - criteria.pageSize).toLong())
528
+ .collect(toList()) as List <String >
529
+ )
530
+ } else {
531
+ finalResult.addAll(it.value as List <String >)
532
+ }
533
+ }
534
+ }
535
+ return finalResult
536
+ }
537
+
538
+ /* *
539
+ * It executes the following query to get execution details for n executions at a time in a specific application
540
+ *
541
+ * SELECT id, body, compressed_body, compression_type, `partition`
542
+ FROM pipelines force index (`pipeline_application_idx`)
543
+ left outer join
544
+ pipelines_compressed_executions
545
+ using (`id`)
546
+ WHERE (
547
+ application = "<myapp>" and
548
+ id in ('id1', 'id2', 'id3')
549
+ );
550
+ *
551
+ * it then gets all the stage information for all the executions returned from the above query.
552
+ */
553
+ override fun retrievePipelineExecutionDetailsForApplication (
554
+ application : String ,
555
+ pipelineExecutions : List <String >,
556
+ queryTimeoutSeconds : Int
557
+ ): Collection <PipelineExecution > {
558
+ withPool(poolName) {
559
+ val baseQuery = jooq.select(selectExecutionFields(compressionProperties))
560
+ .from(
561
+ if (jooq.dialect() == SQLDialect .MYSQL ) PIPELINE .tableName.forceIndex(" pipeline_application_idx" )
562
+ else PIPELINE .tableName
563
+ )
564
+ .leftOuterJoin(PIPELINE .tableName.compressedExecTable).using(field(" id" ))
565
+ .where(
566
+ field(" application" ).eq(application)
567
+ .and (field(" id" ).`in `(* pipelineExecutions.toTypedArray()))
568
+ )
569
+ .queryTimeout(queryTimeoutSeconds) // add an explicit timeout so that the query doesn't run forever
570
+ .fetch()
571
+
572
+ log.debug(" getting stage information for all the executions found so far" )
573
+ return ExecutionMapper (mapper, stageReadSize,compressionProperties, pipelineRefEnabled).map(baseQuery.intoResultSet(), jooq)
574
+ }
575
+ }
576
+
430
577
override fun retrievePipelinesForPipelineConfigId (
431
578
pipelineConfigId : String ,
432
579
criteria : ExecutionCriteria
0 commit comments