Skip to content

Commit

Permalink
feat: Add read connection pool support
Browse files Browse the repository at this point in the history
  • Loading branch information
ovidiupopa07 committed Dec 14, 2023
1 parent 5009613 commit a6f801d
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 35 deletions.
35 changes: 35 additions & 0 deletions .github/workflows/branch.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
name: Temporary Branch Build

on:
push:
branches:
- read-connection-pool

env:
GRADLE_OPTS: -Dorg.gradle.daemon=false -Xmx6g -Xms6g
CONTAINER_REGISTRY: us-docker.pkg.dev/spinnaker-community/docker

jobs:
branch-build:
# Only run this on repositories in the 'spinnaker' org, not on forks.
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- uses: actions/setup-java@v4
with:
java-version: |
17
11
distribution: 'zulu'
cache: 'gradle'
- name: Prepare build variables
id: build_variables
run: |
echo REPO="${GITHUB_REPOSITORY##*/}" >> $GITHUB_OUTPUT
echo VERSION="$(git describe --tags --abbrev=0 --match='v[0-9]*' | cut -c2-)-dev-${GITHUB_REF_NAME}-$(git rev-parse --short HEAD)-$(date --utc +'%Y%m%d%H%M')" >> $GITHUB_OUTPUT
- name: Build
env:
ORG_GRADLE_PROJECT_version: ${{ steps.build_variables.outputs.VERSION }}
run: ./gradlew build --stacktrace ${{ steps.build_variables.outputs.REPO }}-web:installDist
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2023 Armory, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.config;

public enum ConnectionPools {
READ("read"),
WRITE("write");

private final String type;

ConnectionPools(String type) {
this.type = type;
}

public String getType() {
return type;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class SqlConfiguration {
properties.retries.transactions,
orcaSqlProperties.batchReadSize,
orcaSqlProperties.stageReadSize,
"",
interlink = interlink.orElse(null),
executionRepositoryListeners = executionRepositoryListeners
).let {
Expand Down Expand Up @@ -155,6 +156,7 @@ class SqlConfiguration {
@Bean
@ConditionalOnProperty("sql.external-lock.enabled")
fun lockProvider(datasource: DataSource): LockProvider {
ConnectionPools.READ.type
return JdbcTemplateLockProvider(datasource)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package com.netflix.spinnaker.orca.sql.pipeline.persistence

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import com.netflix.spinnaker.config.ConnectionPools
import com.netflix.spinnaker.kork.sql.routing.withPool
import com.netflix.spinnaker.orca.api.pipeline.models.PipelineExecution
import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution
import java.sql.ResultSet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.netflix.spinnaker.orca.sql.pipeline.persistence

import com.fasterxml.jackson.databind.ObjectMapper
import com.netflix.spinnaker.config.ConnectionPools
import com.netflix.spinnaker.kork.core.RetrySupport
import com.netflix.spinnaker.kork.exceptions.ConfigurationException
import com.netflix.spinnaker.kork.exceptions.SystemException
Expand Down Expand Up @@ -50,6 +51,7 @@ import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.Execu
import com.netflix.spinnaker.orca.pipeline.persistence.UnpausablePipelineException
import com.netflix.spinnaker.orca.pipeline.persistence.UnresumablePipelineException
import de.huxhorn.sulky.ulid.SpinULID
import org.checkerframework.checker.units.qual.C
import java.lang.System.currentTimeMillis
import java.security.SecureRandom
import org.jooq.DSLContext
Expand Down Expand Up @@ -245,7 +247,7 @@ class SqlExecutionRepository(
}

override fun isCanceled(type: ExecutionType, id: String): Boolean {
withPool(poolName) {
withPool(ConnectionPools.READ.type) {
return jooq.fetchExists(
jooq.selectFrom(type.tableName)
.where(id.toWhereCondition())
Expand Down Expand Up @@ -341,25 +343,26 @@ class SqlExecutionRepository(
private fun cleanupOldDeletedExecutions() {
// Note: this runs as part of a delete operation but is not critical (best effort cleanup)
// Hence it doesn't need to be in a transaction and we "eat" the exceptions here

try {
val idsToDelete = jooq
.select(field("id"))
.from(table("deleted_executions"))
.where(field("deleted_at").lt(timestampSub(now(), 1, DatePart.DAY)))
.fetch(field("id"), Int::class.java)

// Perform chunked delete in the rare event that there are many executions to clean up
idsToDelete
.chunked(25)
.forEach { chunk ->
jooq
.deleteFrom(table("deleted_executions"))
.where(field("id").`in`(*chunk.toTypedArray()))
.execute()
}
} catch (e: Exception) {
log.error("Failed to cleanup some deleted_executions", e)
withPool(poolName) {
try {
val idsToDelete = jooq
.select(field("id"))
.from(table("deleted_executions"))
.where(field("deleted_at").lt(timestampSub(now(), 1, DatePart.DAY)))
.fetch(field("id"), Int::class.java)

// Perform chunked delete in the rare event that there are many executions to clean up
idsToDelete
.chunked(25)
.forEach { chunk ->
jooq
.deleteFrom(table("deleted_executions"))
.where(field("id").`in`(*chunk.toTypedArray()))
.execute()
}
} catch (e: Exception) {
log.error("Failed to cleanup some deleted_executions", e)
}
}
}

Expand All @@ -380,7 +383,7 @@ class SqlExecutionRepository(
}

private fun retrieve(type: ExecutionType, criteria: ExecutionCriteria, partition: String?): Observable<PipelineExecution> {
withPool(poolName) {
withPool(ConnectionPools.READ.type) {
val select = jooq.selectExecutions(
type,
fields = selectFields() + field("status"),
Expand Down Expand Up @@ -409,7 +412,7 @@ class SqlExecutionRepository(
}

override fun retrievePipelinesForApplication(application: String): Observable<PipelineExecution> =
withPool(poolName) {
withPool(ConnectionPools.READ.type) {
Observable.from(
fetchExecutions { pageSize, cursor ->
selectExecutions(PIPELINE, pageSize, cursor) {
Expand All @@ -426,7 +429,7 @@ class SqlExecutionRepository(
// When not filtering by status, provide an index hint to ensure use of `pipeline_config_id_idx` which
// fully satisfies the where clause and order by. Without, some lookups by config_id matching thousands
// of executions triggered costly full table scans.
withPool(poolName) {
withPool(ConnectionPools.READ.type) {
val select = if (criteria.statuses.isEmpty() || criteria.statuses.size == ExecutionStatus.values().size) {
jooq.selectExecutions(
PIPELINE,
Expand Down Expand Up @@ -470,7 +473,7 @@ class SqlExecutionRepository(
criteria: ExecutionCriteria,
sorter: ExecutionComparator?
): MutableList<PipelineExecution> {
withPool(poolName) {
withPool(ConnectionPools.READ.type) {
return jooq.selectExecutions(
ORCHESTRATION,
conditions = {
Expand Down Expand Up @@ -508,7 +511,7 @@ class SqlExecutionRepository(
}

override fun retrieveOrchestrationForCorrelationId(correlationId: String): PipelineExecution {
withPool(poolName) {
withPool(ConnectionPools.READ.type) {
val execution = jooq.selectExecution(ORCHESTRATION)
.where(
field("id").eq(
Expand Down Expand Up @@ -536,7 +539,7 @@ class SqlExecutionRepository(
}

override fun retrievePipelineForCorrelationId(correlationId: String): PipelineExecution {
withPool(poolName) {
withPool(ConnectionPools.READ.type) {
val execution = jooq.selectExecution(PIPELINE)
.where(
field("id").eq(
Expand Down Expand Up @@ -575,7 +578,7 @@ class SqlExecutionRepository(
}

override fun retrieveAllApplicationNames(type: ExecutionType?): List<String> {
withPool(poolName) {
withPool(ConnectionPools.READ.type) {
return if (type == null) {
jooq.select(field("application"))
.from(PIPELINE.tableName)
Expand All @@ -598,7 +601,7 @@ class SqlExecutionRepository(
}

override fun retrieveAllApplicationNames(type: ExecutionType?, minExecutions: Int): List<String> {
withPool(poolName) {
withPool(ConnectionPools.READ.type) {
return if (type == null) {
jooq.select(field("application"))
.from(PIPELINE.tableName)
Expand All @@ -624,7 +627,7 @@ class SqlExecutionRepository(
}

override fun countActiveExecutions(): ActiveExecutionsReport {
withPool(poolName) {
withPool(ConnectionPools.READ.type) {
val partitionPredicate = if (partitionName != null) field(name("partition")).eq(partitionName) else value(1).eq(value(1))

val orchestrationsQuery = jooq.selectCount()
Expand Down Expand Up @@ -653,7 +656,7 @@ class SqlExecutionRepository(
buildTimeEndBoundary: Long,
executionCriteria: ExecutionCriteria
): List<PipelineExecution> {
withPool(poolName) {
withPool(ConnectionPools.READ.type) {
val select = jooq.selectExecutions(
PIPELINE,
conditions = {
Expand Down Expand Up @@ -716,7 +719,7 @@ class SqlExecutionRepository(
}

override fun hasExecution(type: ExecutionType, id: String): Boolean {
withPool(poolName) {
withPool(ConnectionPools.READ.type) {
return jooq.selectCount()
.from(type.tableName)
.where(id.toWhereCondition())
Expand All @@ -725,7 +728,7 @@ class SqlExecutionRepository(
}

override fun retrieveAllExecutionIds(type: ExecutionType): MutableList<String> {
withPool(poolName) {
withPool(ConnectionPools.READ.type) {
return jooq.select(field("id")).from(type.tableName).fetch("id", String::class.java)
}
}
Expand All @@ -745,7 +748,7 @@ class SqlExecutionRepository(
): Pair<String, String?> {
if (isULID(id)) return Pair(id, null)

withPool(poolName) {
withPool(ConnectionPools.READ.type) {
val ts = (timestamp ?: System.currentTimeMillis())
val row = ctx.select(field("id"))
.from(table)
Expand Down Expand Up @@ -971,7 +974,7 @@ class SqlExecutionRepository(
id: String,
forUpdate: Boolean = false
): PipelineExecution? {
withPool(poolName) {
withPool(ConnectionPools.READ.type) {
val select = ctx.selectExecution(type).where(id.toWhereCondition())
if (forUpdate) {
select.forUpdate()
Expand All @@ -986,7 +989,7 @@ class SqlExecutionRepository(
cursor: String?,
where: ((SelectJoinStep<Record>) -> SelectConditionStep<Record>)? = null
): Collection<PipelineExecution> {
withPool(poolName) {
withPool(ConnectionPools.READ.type) {
val select = jooq.selectExecutions(
type,
conditions = {
Expand Down

0 comments on commit a6f801d

Please sign in to comment.