Skip to content

Commit

Permalink
Use index in existing Measurements check when creating Measurements (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tristanvuong2021 authored and ple13 committed Aug 16, 2024
1 parent c7bfb11 commit 2114489
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import org.wfanet.measurement.kingdom.deploy.common.DuchyIds
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.MeasurementNotFoundException
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.readers.MeasurementReader.Companion.getEtag

class MeasurementReader(private val view: Measurement.View) :
class MeasurementReader(private val view: Measurement.View, measurementsIndex: Index = Index.NONE) :
SpannerReader<MeasurementReader.Result>() {

data class Result(
Expand All @@ -56,9 +56,25 @@ class MeasurementReader(private val view: Measurement.View) :
val measurement: Measurement,
)

enum class Index(internal val sql: String) {
NONE(""),
CREATE_REQUEST_ID("@{FORCE_INDEX=MeasurementsByCreateRequestId}"),
}

override val baseSql: String
get() = BASE_SQL

private val BASE_SQL =
"""
@{spanner_emulator.disable_query_null_filtered_index_check=true}
WITH FilteredMeasurements AS (
SELECT *
FROM
Measurements${measurementsIndex.sql}
JOIN MeasurementConsumers USING (MeasurementConsumerId)
"""
.trimIndent()

private var filled = false

/** Optional ORDER BY clause that is appended at the end of the overall query. */
Expand Down Expand Up @@ -258,16 +274,6 @@ class MeasurementReader(private val view: Measurement.View) :
return "W/\"${hash}\""
}

private val BASE_SQL =
"""
WITH FilteredMeasurements AS (
SELECT *
FROM
Measurements
JOIN MeasurementConsumers USING (MeasurementConsumerId)
"""
.trimIndent()

private val DEFAULT_VIEW_SQL =
"""
SELECT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,20 +427,27 @@ class CreateMeasurements(private val requests: List<CreateMeasurementRequest>) :
val whereClause =
"""
WHERE MeasurementConsumerId = @${params.MEASUREMENT_CONSUMER_ID}
AND CreateRequestId IS NOT NULL
AND CreateRequestId IN UNNEST(@${params.CREATE_REQUEST_ID})
"""
.trimIndent()

val requestIds = createMeasurementRequests.map { it.requestId }
return buildMap {
MeasurementReader(Measurement.View.DEFAULT)
.fillStatementBuilder {
appendClause(whereClause)
bind(params.MEASUREMENT_CONSUMER_ID to measurementConsumerId)
bind(params.CREATE_REQUEST_ID)
.toStringArray(createMeasurementRequests.map { it.requestId })
}
.execute(transactionContext)
.collect { put(it.createRequestId, it.measurement) }
if (requestIds.isNotEmpty()) {
MeasurementReader(Measurement.View.DEFAULT, MeasurementReader.Index.CREATE_REQUEST_ID)
.fillStatementBuilder {
appendClause(whereClause)
bind(params.MEASUREMENT_CONSUMER_ID to measurementConsumerId)
bind(params.CREATE_REQUEST_ID).toStringArray(requestIds)
}
.execute(transactionContext)
.collect {
if (it.createRequestId != null) {
put(it.createRequestId, it.measurement)
}
}
}
}
}

Expand Down

0 comments on commit 2114489

Please sign in to comment.