Skip to content

Commit

Permalink
Update getLastUpdatedMeasurement
Browse files Browse the repository at this point in the history
  • Loading branch information
roaminggypsy committed Sep 27, 2024
1 parent f47813b commit 7df2459
Showing 1 changed file with 21 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.wfanet.measurement.api.v2alpha.DataProvidersGrpcKt
import org.wfanet.measurement.api.v2alpha.EventGroup
import org.wfanet.measurement.api.v2alpha.EventGroupsGrpcKt
import org.wfanet.measurement.api.v2alpha.ListEventGroupsRequestKt
import org.wfanet.measurement.api.v2alpha.ListMeasurementsResponse
import org.wfanet.measurement.api.v2alpha.Measurement
import org.wfanet.measurement.api.v2alpha.MeasurementConsumer
import org.wfanet.measurement.api.v2alpha.MeasurementConsumersGrpcKt
Expand Down Expand Up @@ -212,45 +213,45 @@ class MeasurementSystemProber(
}

private suspend fun shouldCreateNewMeasurement(): Boolean {
val lastMeasurement = getLastCreatedMeasurement()
val lastUpdatedMeasurement = getLastUpdatedMeasurement()

if (lastMeasurement == null) {
if (lastUpdatedMeasurement == null) {
return true
}

if (lastMeasurement.state !in COMPLETED_MEASUREMENT_STATES) {
if (lastUpdatedMeasurement.state !in COMPLETED_MEASUREMENT_STATES) {
return false
}

val updateInstant = lastMeasurement.updateTime.toInstant()
val updateInstant = lastUpdatedMeasurement.updateTime.toInstant()
val nextMeasurementEarliestInstant = updateInstant.plus(durationBetweenMeasurement)
return clock.instant() >= nextMeasurementEarliestInstant
}

private suspend fun getLastCreatedMeasurement(): Measurement? {
var lastCreatedMeasurement: Measurement? = null
try {
var nextPageToken = ""
while (lastCreatedMeasurement == null && nextPageToken.isNotBlank()) {
val response =
private suspend fun getLastUpdatedMeasurement(): Measurement? {
var nextPageToken: String? = ""
while (nextPageToken != null) {
val response: ListMeasurementsResponse =
try {
measurementsStub.listMeasurements(
listMeasurementsRequest {
parent = measurementConsumerName
this.pageSize = 1
pageToken = nextPageToken
pageToken = nextPageToken as String
}
)
lastCreatedMeasurement = response.measurementsList[0]
nextPageToken = response.nextPageToken
} catch (e: StatusException) {
throw Exception(
"Unable to list measurements for measurement consumer $measurementConsumerName",
e,
)
}
if (response.measurementsList.isNotEmpty()) {
return response.measurementsList.single()
}
} catch (e: StatusException) {
throw Exception(
"Unable to list measurements for measurement consumer $measurementConsumerName",
e,
)
nextPageToken = response.nextPageToken
}

return lastCreatedMeasurement
return null
}

private suspend fun getDataProviderEntry(
Expand Down

0 comments on commit 7df2459

Please sign in to comment.