diff --git a/src/main/k8s/testing/secretfiles/BUILD.bazel b/src/main/k8s/testing/secretfiles/BUILD.bazel index 0ab444525dd..a910d0048d4 100644 --- a/src/main/k8s/testing/secretfiles/BUILD.bazel +++ b/src/main/k8s/testing/secretfiles/BUILD.bazel @@ -47,6 +47,7 @@ filegroup( srcs = [ "aggregator_root.pem", "kingdom_root.pem", + "pdp1_root.pem", ] + glob(["*edp*_root.pem"]), ) @@ -216,6 +217,10 @@ SECRET_FILES = [ "exchange_workflow.textproto", "reporting_tls.key", "reporting_tls.pem", + "pdp1_cs_cert.der", + "pdp1_cs_private.der", + "pdp1_enc_private.tink", + "pdp1_enc_public.tink", ] filegroup( diff --git a/src/main/k8s/testing/secretfiles/pdp1_cs_cert.der b/src/main/k8s/testing/secretfiles/pdp1_cs_cert.der new file mode 100644 index 00000000000..89ec6e456d9 Binary files /dev/null and b/src/main/k8s/testing/secretfiles/pdp1_cs_cert.der differ diff --git a/src/main/k8s/testing/secretfiles/pdp1_cs_private.der b/src/main/k8s/testing/secretfiles/pdp1_cs_private.der new file mode 100644 index 00000000000..5d4aa66ffa6 Binary files /dev/null and b/src/main/k8s/testing/secretfiles/pdp1_cs_private.der differ diff --git a/src/main/k8s/testing/secretfiles/pdp1_enc_private.tink b/src/main/k8s/testing/secretfiles/pdp1_enc_private.tink new file mode 100644 index 00000000000..b2dac1265f7 Binary files /dev/null and b/src/main/k8s/testing/secretfiles/pdp1_enc_private.tink differ diff --git a/src/main/k8s/testing/secretfiles/pdp1_enc_public.tink b/src/main/k8s/testing/secretfiles/pdp1_enc_public.tink new file mode 100644 index 00000000000..1952a415241 Binary files /dev/null and b/src/main/k8s/testing/secretfiles/pdp1_enc_public.tink differ diff --git a/src/main/k8s/testing/secretfiles/pdp1_root.key b/src/main/k8s/testing/secretfiles/pdp1_root.key new file mode 100755 index 00000000000..f86ab6dd94a --- /dev/null +++ b/src/main/k8s/testing/secretfiles/pdp1_root.key @@ -0,0 +1,5 @@ +-----BEGIN PRIVATE KEY----- +MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgkxY2b6z6khhcfMse +mahhvwEcV7iNwmcAhmIdleR7goihRANCAAQCiTgBO2Qe6kSVcdP51lDa13Q7hxoP +pDvgZa07LT26/apLhGADvKajOT6nfpeXjnUa+myjuhlP25mY24Lh/Dgq +-----END PRIVATE KEY----- diff --git a/src/main/k8s/testing/secretfiles/pdp1_root.pem b/src/main/k8s/testing/secretfiles/pdp1_root.pem new file mode 100755 index 00000000000..47179d2669a --- /dev/null +++ b/src/main/k8s/testing/secretfiles/pdp1_root.pem @@ -0,0 +1,12 @@ +-----BEGIN CERTIFICATE----- +MIIB2zCCAYGgAwIBAgIUOUhTyf/lbnXnRh41LP3m8D4GpNEwCgYIKoZIzj0EAwIw +KTEVMBMGA1UECgwMSGFsbyBDTU0gRGV2MRAwDgYDVQQDDAdQZHAxIENBMB4XDTI0 +MDYwNTE5MzYzMVoXDTM0MDYwMzE5MzYzMVowKTEVMBMGA1UECgwMSGFsbyBDTU0g +RGV2MRAwDgYDVQQDDAdQZHAxIENBMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE +Aok4ATtkHupElXHT+dZQ2td0O4caD6Q74GWtOy09uv2qS4RgA7ymozk+p36Xl451 +Gvpso7oZT9uZmNuC4fw4KqOBhjCBgzAdBgNVHQ4EFgQUlQzFwajKpHfpj+5I8eFe +OMzfrbMwHwYDVR0jBBgwFoAUlQzFwajKpHfpj+5I8eFeOMzfrbMwDwYDVR0TAQH/ +BAUwAwEB/zALBgNVHQ8EBAMCAYYwIwYDVR0RBBwwGoIYY2EucGRwMS5kZXYuaGFs +by1jbW0ub3JnMAoGCCqGSM49BAMCA0gAMEUCIHGO5/B9qsb+u/0s7cCoEiD7go2Z +iUJsy2LH69LJORrmAiEAmp9zPpNcE63MT0eNA3hU5fZXE34LqHdnRq+dNq9YWCo= +-----END CERTIFICATE----- diff --git a/src/main/kotlin/org/wfanet/measurement/api/v2alpha/PopulationSpecValidator.kt b/src/main/kotlin/org/wfanet/measurement/api/v2alpha/PopulationSpecValidator.kt index 5e8e0921587..a29f326fc00 100644 --- a/src/main/kotlin/org/wfanet/measurement/api/v2alpha/PopulationSpecValidator.kt +++ b/src/main/kotlin/org/wfanet/measurement/api/v2alpha/PopulationSpecValidator.kt @@ -156,3 +156,11 @@ object PopulationSpecValidator { return details } } + +/** + * Returns the size of a [VidRange] by calculating the difference between the start and end of the + * range. + */ +fun VidRange.size(): Long { + return this.endVidInclusive - this.startVid + 1 +} diff --git a/src/main/kotlin/org/wfanet/measurement/dataprovider/BUILD.bazel b/src/main/kotlin/org/wfanet/measurement/dataprovider/BUILD.bazel new file mode 100644 index 00000000000..a61718f0aa3 --- /dev/null +++ b/src/main/kotlin/org/wfanet/measurement/dataprovider/BUILD.bazel @@ -0,0 +1,32 @@ +load("@wfa_rules_kotlin_jvm//kotlin:defs.bzl", "kt_jvm_library") + +package( + default_visibility = [ + "//src/main/kotlin/org/wfanet/measurement/loadtest/dataprovider:__subpackages__", + "//src/main/kotlin/org/wfanet/measurement/populationdataprovider:__subpackages__", + ], +) + +kt_jvm_library( + name = "requisition_fulfiller", + srcs = ["RequisitionFulfiller.kt"], + deps = [ + "//src/main/kotlin/org/wfanet/measurement/api/v2alpha:packed_messages", + "//src/main/kotlin/org/wfanet/measurement/api/v2alpha:resource_key", + "//src/main/proto/wfa/measurement/api/v2alpha:certificates_service_kt_jvm_grpc_proto", + "//src/main/proto/wfa/measurement/api/v2alpha:crypto_kt_jvm_proto", + "//src/main/proto/wfa/measurement/api/v2alpha:data_providers_service_kt_jvm_grpc_proto", + "//src/main/proto/wfa/measurement/api/v2alpha:direct_computation_kt_jvm_proto", + "//src/main/proto/wfa/measurement/api/v2alpha:measurement_consumers_service_kt_jvm_grpc_proto", + "//src/main/proto/wfa/measurement/api/v2alpha:measurements_service_kt_jvm_grpc_proto", + "//src/main/proto/wfa/measurement/api/v2alpha:requisitions_service_kt_jvm_grpc_proto", + "@wfa_common_jvm//imports/kotlin/kotlinx/coroutines:core", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/crypto:key_storage", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/crypto/tink", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/identity", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/throttler", + "@wfa_consent_signaling_client//src/main/kotlin/org/wfanet/measurement/consent/client/common:verification_exception", + "@wfa_consent_signaling_client//src/main/kotlin/org/wfanet/measurement/consent/client/dataprovider", + ], +) diff --git a/src/main/kotlin/org/wfanet/measurement/dataprovider/RequisitionFulfiller.kt b/src/main/kotlin/org/wfanet/measurement/dataprovider/RequisitionFulfiller.kt new file mode 100644 index 00000000000..c2449d7311b --- /dev/null +++ b/src/main/kotlin/org/wfanet/measurement/dataprovider/RequisitionFulfiller.kt @@ -0,0 +1,256 @@ +// Copyright 2024 The Cross-Media Measurement Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.wfanet.measurement.dataprovider + +import com.google.protobuf.ByteString +import com.google.protobuf.kotlin.unpack +import io.grpc.StatusException +import java.security.GeneralSecurityException +import java.security.SignatureException +import java.security.cert.CertPathValidatorException +import java.security.cert.X509Certificate +import java.util.logging.Level +import java.util.logging.Logger +import org.wfanet.measurement.api.v2alpha.Certificate +import org.wfanet.measurement.api.v2alpha.CertificatesGrpcKt.CertificatesCoroutineStub +import org.wfanet.measurement.api.v2alpha.DataProviderCertificateKey +import org.wfanet.measurement.api.v2alpha.EncryptedMessage +import org.wfanet.measurement.api.v2alpha.EncryptionPublicKey +import org.wfanet.measurement.api.v2alpha.ListRequisitionsRequestKt.filter +import org.wfanet.measurement.api.v2alpha.Measurement +import org.wfanet.measurement.api.v2alpha.MeasurementSpec +import org.wfanet.measurement.api.v2alpha.Requisition +import org.wfanet.measurement.api.v2alpha.RequisitionKt.refusal +import org.wfanet.measurement.api.v2alpha.RequisitionSpec +import org.wfanet.measurement.api.v2alpha.RequisitionsGrpcKt.RequisitionsCoroutineStub +import org.wfanet.measurement.api.v2alpha.SignedMessage +import org.wfanet.measurement.api.v2alpha.fulfillDirectRequisitionRequest +import org.wfanet.measurement.api.v2alpha.getCertificateRequest +import org.wfanet.measurement.api.v2alpha.listRequisitionsRequest +import org.wfanet.measurement.api.v2alpha.refuseRequisitionRequest +import org.wfanet.measurement.api.v2alpha.unpack +import org.wfanet.measurement.common.crypto.PrivateKeyHandle +import org.wfanet.measurement.common.crypto.SigningKeyHandle +import org.wfanet.measurement.common.crypto.authorityKeyIdentifier +import org.wfanet.measurement.common.crypto.readCertificate +import org.wfanet.measurement.common.throttler.Throttler +import org.wfanet.measurement.consent.client.common.NonceMismatchException +import org.wfanet.measurement.consent.client.common.PublicKeyMismatchException +import org.wfanet.measurement.consent.client.common.toEncryptionPublicKey +import org.wfanet.measurement.consent.client.dataprovider.decryptRequisitionSpec +import org.wfanet.measurement.consent.client.dataprovider.encryptResult +import org.wfanet.measurement.consent.client.dataprovider.signResult +import org.wfanet.measurement.consent.client.dataprovider.verifyMeasurementSpec +import org.wfanet.measurement.consent.client.dataprovider.verifyRequisitionSpec + +data class DataProviderData( + /** The DataProvider's public API resource name. */ + val name: String, + /** The DataProvider's display name. */ + val displayName: String, + /** The DataProvider's decryption key. */ + val privateEncryptionKey: PrivateKeyHandle, + /** The DataProvider's consent signaling signing key. */ + val signingKeyHandle: SigningKeyHandle, + /** The CertificateKey to use for result signing. */ + val certificateKey: DataProviderCertificateKey, +) + +abstract class RequisitionFulfiller( + val dataProviderData: DataProviderData, + private val certificatesStub: CertificatesCoroutineStub, + private val requisitionsStub: RequisitionsCoroutineStub, + val throttler: Throttler, + private val trustedCertificates: Map, + protected val measurementConsumerName: String, +) { + protected data class Specifications( + val measurementSpec: MeasurementSpec, + val requisitionSpec: RequisitionSpec, + ) + + protected class RequisitionRefusalException( + val justification: Requisition.Refusal.Justification, + message: String, + ) : Exception(message) + + protected class InvalidConsentSignalException(message: String? = null, cause: Throwable? = null) : + GeneralSecurityException(message, cause) + + protected class InvalidSpecException(message: String, cause: Throwable? = null) : + Exception(message, cause) + + /** A sequence of operations done in the simulator. */ + abstract suspend fun run() + + /** Executes the requisition fulfillment workflow. */ + abstract suspend fun executeRequisitionFulfillingWorkflow() + + protected fun verifySpecifications( + requisition: Requisition, + measurementConsumerCertificate: Certificate, + ): Specifications { + val x509Certificate = readCertificate(measurementConsumerCertificate.x509Der) + // Look up the trusted issuer certificate for this MC certificate. Note that this doesn't + // confirm that this is the trusted issuer for the right MC. In a production environment, + // consider having a mapping of MC to root/CA cert. + val trustedIssuer = + trustedCertificates[checkNotNull(x509Certificate.authorityKeyIdentifier)] + ?: throw InvalidConsentSignalException( + "Issuer of ${measurementConsumerCertificate.name} is not trusted" + ) + + try { + verifyMeasurementSpec(requisition.measurementSpec, x509Certificate, trustedIssuer) + } catch (e: CertPathValidatorException) { + throw InvalidConsentSignalException( + "Certificate path for ${measurementConsumerCertificate.name} is invalid", + e, + ) + } catch (e: SignatureException) { + throw InvalidConsentSignalException("MeasurementSpec signature is invalid", e) + } + + val measurementSpec: MeasurementSpec = requisition.measurementSpec.message.unpack() + + val publicKey = requisition.dataProviderPublicKey.unpack(EncryptionPublicKey::class.java)!! + check(publicKey == dataProviderData.privateEncryptionKey.publicKey.toEncryptionPublicKey()) { + "Unable to decrypt for this public key" + } + val signedRequisitionSpec: SignedMessage = + try { + decryptRequisitionSpec( + requisition.encryptedRequisitionSpec, + dataProviderData.privateEncryptionKey, + ) + } catch (e: GeneralSecurityException) { + throw InvalidConsentSignalException("RequisitionSpec decryption failed", e) + } + val requisitionSpec: RequisitionSpec = signedRequisitionSpec.unpack() + + try { + verifyRequisitionSpec( + signedRequisitionSpec, + requisitionSpec, + measurementSpec, + x509Certificate, + trustedIssuer, + ) + } catch (e: CertPathValidatorException) { + throw InvalidConsentSignalException( + "Certificate path for ${measurementConsumerCertificate.name} is invalid", + e, + ) + } catch (e: SignatureException) { + throw InvalidConsentSignalException("RequisitionSpec signature is invalid", e) + } catch (e: NonceMismatchException) { + throw InvalidConsentSignalException(e.message, e) + } catch (e: PublicKeyMismatchException) { + throw InvalidConsentSignalException(e.message, e) + } + + // TODO(@uakyol): Validate that collection interval is not outside of privacy landscape. + + return Specifications(measurementSpec, requisitionSpec) + } + + protected suspend fun getCertificate(resourceName: String): Certificate { + return try { + certificatesStub.getCertificate(getCertificateRequest { name = resourceName }) + } catch (e: StatusException) { + throw Exception("Error fetching certificate $resourceName", e) + } + } + + protected suspend fun refuseRequisition( + requisitionName: String, + justification: Requisition.Refusal.Justification, + message: String, + ): Requisition { + try { + return requisitionsStub.refuseRequisition( + refuseRequisitionRequest { + name = requisitionName + refusal = refusal { + this.justification = justification + this.message = message + } + } + ) + } catch (e: StatusException) { + throw Exception("Error refusing requisition $requisitionName", e) + } + } + + protected suspend fun getRequisitions(): List { + val request = listRequisitionsRequest { + parent = dataProviderData.name + filter = filter { + states += Requisition.State.UNFULFILLED + measurementStates += Measurement.State.AWAITING_REQUISITION_FULFILLMENT + } + } + + try { + return requisitionsStub.listRequisitions(request).requisitionsList + } catch (e: StatusException) { + throw Exception("Error listing requisitions", e) + } + } + + protected suspend fun fulfillDirectMeasurement( + requisition: Requisition, + measurementSpec: MeasurementSpec, + nonce: Long, + measurementResult: Measurement.Result, + ) { + logger.log(Level.INFO, "Direct MeasurementSpec:\n$measurementSpec") + logger.log(Level.INFO, "Direct MeasurementResult:\n$measurementResult") + + DataProviderCertificateKey.fromName(requisition.dataProviderCertificate) + ?: throw RequisitionRefusalException( + Requisition.Refusal.Justification.UNFULFILLABLE, + "Invalid data provider certificate", + ) + val measurementEncryptionPublicKey: EncryptionPublicKey = + if (measurementSpec.hasMeasurementPublicKey()) { + measurementSpec.measurementPublicKey.unpack() + } else { + @Suppress("DEPRECATION") // Handle legacy resources. + EncryptionPublicKey.parseFrom(measurementSpec.serializedMeasurementPublicKey) + } + val signedResult: SignedMessage = + signResult(measurementResult, dataProviderData.signingKeyHandle) + val encryptedResult: EncryptedMessage = + encryptResult(signedResult, measurementEncryptionPublicKey) + + try { + requisitionsStub.fulfillDirectRequisition( + fulfillDirectRequisitionRequest { + name = requisition.name + this.encryptedResult = encryptedResult + this.nonce = nonce + this.certificate = dataProviderData.certificateKey.toName() + } + ) + } catch (e: StatusException) { + throw Exception("Error fulfilling direct requisition ${requisition.name}", e) + } + } + + companion object { + val logger: Logger = Logger.getLogger(this::class.java.name) + } +} diff --git a/src/main/kotlin/org/wfanet/measurement/integration/common/InProcessEdpSimulator.kt b/src/main/kotlin/org/wfanet/measurement/integration/common/InProcessEdpSimulator.kt index 7064d54168a..6e7e3ef6de7 100644 --- a/src/main/kotlin/org/wfanet/measurement/integration/common/InProcessEdpSimulator.kt +++ b/src/main/kotlin/org/wfanet/measurement/integration/common/InProcessEdpSimulator.kt @@ -45,11 +45,11 @@ import org.wfanet.measurement.api.v2alpha.event_group_metadata.testing.Synthetic import org.wfanet.measurement.api.v2alpha.event_templates.testing.TestEvent import org.wfanet.measurement.common.identity.withPrincipalName import org.wfanet.measurement.common.throttler.MinimumIntervalThrottler +import org.wfanet.measurement.dataprovider.DataProviderData import org.wfanet.measurement.eventdataprovider.privacybudgetmanagement.InMemoryBackingStore import org.wfanet.measurement.eventdataprovider.privacybudgetmanagement.PrivacyBucketFilter import org.wfanet.measurement.eventdataprovider.privacybudgetmanagement.PrivacyBudgetManager import org.wfanet.measurement.eventdataprovider.privacybudgetmanagement.testing.TestPrivacyBucketMapper -import org.wfanet.measurement.loadtest.dataprovider.EdpData import org.wfanet.measurement.loadtest.dataprovider.EdpSimulator import org.wfanet.measurement.loadtest.dataprovider.SyntheticGeneratorEventQuery import org.wfanet.measurement.loadtest.dataprovider.VidToIndexMapGenerator @@ -145,10 +145,12 @@ class InProcessEdpSimulator( suspend fun ensureEventGroup() = delegate.ensureEventGroup(EVENT_TEMPLATES, syntheticDataSpec) - /** Builds a [EdpData] object for the Edp with a certain [displayName] and [resourceName]. */ + /** + * Builds a [DataProviderData] object for the Edp with a certain [displayName] and [resourceName]. + */ @Blocking private fun createEdpData(displayName: String, resourceName: String) = - EdpData( + DataProviderData( name = resourceName, displayName = displayName, certificateKey = certificateKey, diff --git a/src/main/kotlin/org/wfanet/measurement/loadtest/dataprovider/BUILD.bazel b/src/main/kotlin/org/wfanet/measurement/loadtest/dataprovider/BUILD.bazel index f2094ef0bbd..4b2e8ae68f1 100644 --- a/src/main/kotlin/org/wfanet/measurement/loadtest/dataprovider/BUILD.bazel +++ b/src/main/kotlin/org/wfanet/measurement/loadtest/dataprovider/BUILD.bazel @@ -146,6 +146,7 @@ kt_jvm_library( ":vid_to_index_map_generator", "//src/main/kotlin/org/wfanet/measurement/api/v2alpha:packed_messages", "//src/main/kotlin/org/wfanet/measurement/api/v2alpha:resource_key", + "//src/main/kotlin/org/wfanet/measurement/dataprovider:requisition_fulfiller", "//src/main/kotlin/org/wfanet/measurement/eventdataprovider/noiser", "//src/main/kotlin/org/wfanet/measurement/eventdataprovider/privacybudgetmanagement:privacy_budget_manager", "//src/main/kotlin/org/wfanet/measurement/eventdataprovider/privacybudgetmanagement/api/v2alpha:privacy_query_mapper", @@ -165,25 +166,13 @@ kt_jvm_library( "//src/main/proto/wfa/measurement/api/v2alpha:event_group_kt_jvm_proto", "//src/main/proto/wfa/measurement/api/v2alpha:event_group_metadata_descriptors_service_kt_jvm_grpc_proto", "//src/main/proto/wfa/measurement/api/v2alpha:event_groups_service_kt_jvm_grpc_proto", - "//src/main/proto/wfa/measurement/api/v2alpha:measurement_consumers_service_kt_jvm_grpc_proto", - "//src/main/proto/wfa/measurement/api/v2alpha:measurements_service_kt_jvm_grpc_proto", "//src/main/proto/wfa/measurement/api/v2alpha:requisition_fulfillment_service_kt_jvm_grpc_proto", - "//src/main/proto/wfa/measurement/api/v2alpha:requisitions_service_kt_jvm_grpc_proto", "//src/main/proto/wfa/measurement/api/v2alpha/event_group_metadata/testing:simulator_synthetic_data_spec_kt_jvm_proto", "//src/main/proto/wfa/measurement/api/v2alpha/event_group_metadata/testing:test_metadata_message_kt_jvm_proto", "//src/main/proto/wfa/measurement/api/v2alpha/event_templates/testing:test_event_kt_jvm_proto", "@any_sketch_java//src/main/java/org/wfanet/anysketch:sketch_proto_converter", "@any_sketch_java//src/main/java/org/wfanet/frequencycount:secret_share_generator_adapter", "@any_sketch_java//src/main/java/org/wfanet/sampling:vid_sampler", - "@wfa_common_jvm//imports/kotlin/kotlinx/coroutines:core", - "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common", - "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/crypto:key_storage", - "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/crypto/testing", - "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/crypto/tink", - "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/identity", - "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/throttler", - "@wfa_consent_signaling_client//src/main/kotlin/org/wfanet/measurement/consent/client/common:verification_exception", - "@wfa_consent_signaling_client//src/main/kotlin/org/wfanet/measurement/consent/client/dataprovider", "@wfa_consent_signaling_client//src/main/kotlin/org/wfanet/measurement/consent/client/measurementconsumer", ], ) diff --git a/src/main/kotlin/org/wfanet/measurement/loadtest/dataprovider/EdpSimulator.kt b/src/main/kotlin/org/wfanet/measurement/loadtest/dataprovider/EdpSimulator.kt index e5e41219f76..e0f74c69c73 100644 --- a/src/main/kotlin/org/wfanet/measurement/loadtest/dataprovider/EdpSimulator.kt +++ b/src/main/kotlin/org/wfanet/measurement/loadtest/dataprovider/EdpSimulator.kt @@ -23,7 +23,6 @@ import com.google.protobuf.timestamp import com.google.type.interval import io.grpc.Status import io.grpc.StatusException -import java.security.GeneralSecurityException import java.security.SignatureException import java.security.cert.CertPathValidatorException import java.security.cert.X509Certificate @@ -52,7 +51,6 @@ import org.wfanet.frequencycount.secretShareGeneratorRequest import org.wfanet.measurement.api.v2alpha.Certificate import org.wfanet.measurement.api.v2alpha.CertificatesGrpcKt.CertificatesCoroutineStub import org.wfanet.measurement.api.v2alpha.CustomDirectMethodologyKt.variance -import org.wfanet.measurement.api.v2alpha.DataProviderCertificateKey import org.wfanet.measurement.api.v2alpha.DataProviderKey import org.wfanet.measurement.api.v2alpha.DataProviderKt import org.wfanet.measurement.api.v2alpha.DataProvidersGrpcKt.DataProvidersCoroutineStub @@ -76,7 +74,6 @@ import org.wfanet.measurement.api.v2alpha.FulfillRequisitionRequestKt.HeaderKt.h import org.wfanet.measurement.api.v2alpha.FulfillRequisitionRequestKt.bodyChunk import org.wfanet.measurement.api.v2alpha.FulfillRequisitionRequestKt.header import org.wfanet.measurement.api.v2alpha.ListEventGroupsRequestKt -import org.wfanet.measurement.api.v2alpha.ListRequisitionsRequestKt.filter import org.wfanet.measurement.api.v2alpha.Measurement import org.wfanet.measurement.api.v2alpha.MeasurementConsumer import org.wfanet.measurement.api.v2alpha.MeasurementConsumerKey @@ -93,7 +90,6 @@ import org.wfanet.measurement.api.v2alpha.ProtocolConfig.NoiseMechanism import org.wfanet.measurement.api.v2alpha.Requisition import org.wfanet.measurement.api.v2alpha.Requisition.DuchyEntry import org.wfanet.measurement.api.v2alpha.RequisitionFulfillmentGrpcKt.RequisitionFulfillmentCoroutineStub -import org.wfanet.measurement.api.v2alpha.RequisitionKt.refusal import org.wfanet.measurement.api.v2alpha.RequisitionSpec import org.wfanet.measurement.api.v2alpha.RequisitionsGrpcKt.RequisitionsCoroutineStub import org.wfanet.measurement.api.v2alpha.SignedMessage @@ -103,15 +99,11 @@ import org.wfanet.measurement.api.v2alpha.createEventGroupRequest import org.wfanet.measurement.api.v2alpha.customDirectMethodology import org.wfanet.measurement.api.v2alpha.eventGroup import org.wfanet.measurement.api.v2alpha.eventGroupMetadataDescriptor -import org.wfanet.measurement.api.v2alpha.fulfillDirectRequisitionRequest import org.wfanet.measurement.api.v2alpha.fulfillRequisitionRequest -import org.wfanet.measurement.api.v2alpha.getCertificateRequest import org.wfanet.measurement.api.v2alpha.getEventGroupRequest import org.wfanet.measurement.api.v2alpha.getMeasurementConsumerRequest import org.wfanet.measurement.api.v2alpha.listEventGroupsRequest -import org.wfanet.measurement.api.v2alpha.listRequisitionsRequest import org.wfanet.measurement.api.v2alpha.randomSeed -import org.wfanet.measurement.api.v2alpha.refuseRequisitionRequest import org.wfanet.measurement.api.v2alpha.replaceDataAvailabilityIntervalRequest import org.wfanet.measurement.api.v2alpha.replaceDataProviderCapabilitiesRequest import org.wfanet.measurement.api.v2alpha.unpack @@ -119,28 +111,20 @@ import org.wfanet.measurement.api.v2alpha.updateEventGroupMetadataDescriptorRequ import org.wfanet.measurement.api.v2alpha.updateEventGroupRequest import org.wfanet.measurement.common.ProtoReflection import org.wfanet.measurement.common.asBufferedFlow -import org.wfanet.measurement.common.crypto.PrivateKeyHandle -import org.wfanet.measurement.common.crypto.SigningKeyHandle import org.wfanet.measurement.common.crypto.authorityKeyIdentifier import org.wfanet.measurement.common.crypto.readCertificate import org.wfanet.measurement.common.identity.apiIdToExternalId import org.wfanet.measurement.common.pack import org.wfanet.measurement.common.throttler.Throttler import org.wfanet.measurement.common.toProtoTime -import org.wfanet.measurement.consent.client.common.NonceMismatchException -import org.wfanet.measurement.consent.client.common.PublicKeyMismatchException -import org.wfanet.measurement.consent.client.common.toEncryptionPublicKey import org.wfanet.measurement.consent.client.dataprovider.computeRequisitionFingerprint -import org.wfanet.measurement.consent.client.dataprovider.decryptRequisitionSpec import org.wfanet.measurement.consent.client.dataprovider.encryptMetadata import org.wfanet.measurement.consent.client.dataprovider.encryptRandomSeed -import org.wfanet.measurement.consent.client.dataprovider.encryptResult import org.wfanet.measurement.consent.client.dataprovider.signRandomSeed -import org.wfanet.measurement.consent.client.dataprovider.signResult import org.wfanet.measurement.consent.client.dataprovider.verifyElGamalPublicKey -import org.wfanet.measurement.consent.client.dataprovider.verifyMeasurementSpec -import org.wfanet.measurement.consent.client.dataprovider.verifyRequisitionSpec import org.wfanet.measurement.consent.client.measurementconsumer.verifyEncryptionPublicKey +import org.wfanet.measurement.dataprovider.DataProviderData +import org.wfanet.measurement.dataprovider.RequisitionFulfiller import org.wfanet.measurement.eventdataprovider.eventfiltration.validation.EventFilterValidationException import org.wfanet.measurement.eventdataprovider.noiser.AbstractNoiser import org.wfanet.measurement.eventdataprovider.noiser.DirectNoiseMechanism @@ -157,33 +141,20 @@ import org.wfanet.measurement.loadtest.common.sampleVids import org.wfanet.measurement.loadtest.config.TestIdentifiers.SIMULATOR_EVENT_GROUP_REFERENCE_ID_PREFIX import org.wfanet.measurement.loadtest.dataprovider.MeasurementResults.computeImpression -data class EdpData( - /** The EDP's public API resource name. */ - val name: String, - /** The EDP's display name. */ - val displayName: String, - /** The EDP's decryption key. */ - val privateEncryptionKey: PrivateKeyHandle, - /** The EDP's consent signaling signing key. */ - val signingKeyHandle: SigningKeyHandle, - /** The CertificateKey to use for result signing. */ - val certificateKey: DataProviderCertificateKey, -) - /** A simulator handling EDP businesses. */ class EdpSimulator( - private val edpData: EdpData, - private val measurementConsumerName: String, + private val edpData: DataProviderData, + measurementConsumerName: String, private val measurementConsumersStub: MeasurementConsumersCoroutineStub, - private val certificatesStub: CertificatesCoroutineStub, + certificatesStub: CertificatesCoroutineStub, private val dataProvidersStub: DataProvidersCoroutineStub, private val eventGroupsStub: EventGroupsCoroutineStub, private val eventGroupMetadataDescriptorsStub: EventGroupMetadataDescriptorsCoroutineStub, - private val requisitionsStub: RequisitionsCoroutineStub, + requisitionsStub: RequisitionsCoroutineStub, private val requisitionFulfillmentStubsByDuchyName: Map, private val eventQuery: EventQuery, - private val throttler: Throttler, + throttler: Throttler, private val privacyBudgetManager: PrivacyBudgetManager, private val trustedCertificates: Map, /** @@ -203,7 +174,15 @@ class EdpSimulator( private val sketchEncrypter: SketchEncrypter = SketchEncrypter.Default, private val random: Random = Random, private val logSketchDetails: Boolean = false, -) { +) : + RequisitionFulfiller( + edpData, + certificatesStub, + requisitionsStub, + throttler, + trustedCertificates, + measurementConsumerName, + ) { val eventGroupReferenceIdPrefix = getEventGroupReferenceIdPrefix(edpData.displayName) val supportedProtocols = buildSet { @@ -215,10 +194,10 @@ class EdpSimulator( } /** A sequence of operations done in the simulator. */ - suspend fun run() { + override suspend fun run() { dataProvidersStub.replaceDataAvailabilityInterval( replaceDataAvailabilityIntervalRequest { - name = edpData.name + name = dataProviderData.name dataAvailabilityInterval = interval { startTime = timestamp { seconds = 1577865600 // January 1, 2020 12:00:00 AM, America/Los_Angeles @@ -422,87 +401,6 @@ class EdpSimulator( } } - private data class Specifications( - val measurementSpec: MeasurementSpec, - val requisitionSpec: RequisitionSpec, - ) - - private class RequisitionRefusalException( - val justification: Requisition.Refusal.Justification, - message: String, - ) : Exception(message) - - private class InvalidConsentSignalException(message: String? = null, cause: Throwable? = null) : - GeneralSecurityException(message, cause) - - private class InvalidSpecException(message: String, cause: Throwable? = null) : - Exception(message, cause) - - private fun verifySpecifications( - requisition: Requisition, - measurementConsumerCertificate: Certificate, - ): Specifications { - val x509Certificate = readCertificate(measurementConsumerCertificate.x509Der) - // Look up the trusted issuer certificate for this MC certificate. Note that this doesn't - // confirm that this is the trusted issuer for the right MC. In a production environment, - // consider having a mapping of MC to root/CA cert. - val trustedIssuer = - trustedCertificates[checkNotNull(x509Certificate.authorityKeyIdentifier)] - ?: throw InvalidConsentSignalException( - "Issuer of ${measurementConsumerCertificate.name} is not trusted" - ) - - try { - verifyMeasurementSpec(requisition.measurementSpec, x509Certificate, trustedIssuer) - } catch (e: CertPathValidatorException) { - throw InvalidConsentSignalException( - "Certificate path for ${measurementConsumerCertificate.name} is invalid", - e, - ) - } catch (e: SignatureException) { - throw InvalidConsentSignalException("MeasurementSpec signature is invalid", e) - } - - val measurementSpec: MeasurementSpec = requisition.measurementSpec.message.unpack() - - val publicKey = requisition.dataProviderPublicKey.unpack(EncryptionPublicKey::class.java)!! - check(publicKey == edpData.privateEncryptionKey.publicKey.toEncryptionPublicKey()) { - "Unable to decrypt for this public key" - } - val signedRequisitionSpec: SignedMessage = - try { - decryptRequisitionSpec(requisition.encryptedRequisitionSpec, edpData.privateEncryptionKey) - } catch (e: GeneralSecurityException) { - throw InvalidConsentSignalException("RequisitionSpec decryption failed", e) - } - val requisitionSpec: RequisitionSpec = signedRequisitionSpec.unpack() - - try { - verifyRequisitionSpec( - signedRequisitionSpec, - requisitionSpec, - measurementSpec, - x509Certificate, - trustedIssuer, - ) - } catch (e: CertPathValidatorException) { - throw InvalidConsentSignalException( - "Certificate path for ${measurementConsumerCertificate.name} is invalid", - e, - ) - } catch (e: SignatureException) { - throw InvalidConsentSignalException("RequisitionSpec signature is invalid", e) - } catch (e: NonceMismatchException) { - throw InvalidConsentSignalException(e.message, e) - } catch (e: PublicKeyMismatchException) { - throw InvalidConsentSignalException(e.message, e) - } - - // TODO(@uakyol): Validate that collection interval is not outside of privacy landscape. - - return Specifications(measurementSpec, requisitionSpec) - } - private fun verifyProtocolConfig( requsitionName: String, protocol: ProtocolConfig.Protocol.ProtocolCase, @@ -649,16 +547,8 @@ class EdpSimulator( } } - private suspend fun getCertificate(resourceName: String): Certificate { - return try { - certificatesStub.getCertificate(getCertificateRequest { name = resourceName }) - } catch (e: StatusException) { - throw Exception("Error fetching certificate $resourceName", e) - } - } - /** Executes the requisition fulfillment workflow. */ - suspend fun executeRequisitionFulfillingWorkflow() { + override suspend fun executeRequisitionFulfillingWorkflow() { logger.info("Executing requisitionFulfillingWorkflow...") val requisitions = getRequisitions().filter { @@ -931,26 +821,6 @@ class EdpSimulator( } } - private suspend fun refuseRequisition( - requisitionName: String, - justification: Requisition.Refusal.Justification, - message: String, - ): Requisition { - try { - return requisitionsStub.refuseRequisition( - refuseRequisitionRequest { - name = requisitionName - refusal = refusal { - this.justification = justification - this.message = message - } - } - ) - } catch (e: StatusException) { - throw Exception("Error refusing requisition $requisitionName", e) - } - } - private suspend fun chargeMpcPrivacyBudget( requisitionName: String, measurementSpec: MeasurementSpec, @@ -1430,22 +1300,6 @@ class EdpSimulator( return SketchEncrypter.combineElGamalPublicKeys(curveId, elGamalPublicKeys) } - private suspend fun getRequisitions(): List { - val request = listRequisitionsRequest { - parent = edpData.name - filter = filter { - states += Requisition.State.UNFULFILLED - measurementStates += Measurement.State.AWAITING_REQUISITION_FULFILLMENT - } - } - - try { - return requisitionsStub.listRequisitions(request).requisitionsList - } catch (e: StatusException) { - throw Exception("Error listing requisitions", e) - } - } - /** * Calculate direct reach and frequency for measurement with single EDP by summing up VIDs * directly and fulfillDirectMeasurement @@ -1837,45 +1691,6 @@ class EdpSimulator( fulfillDirectMeasurement(requisition, measurementSpec, requisitionSpec.nonce, measurementResult) } - private suspend fun fulfillDirectMeasurement( - requisition: Requisition, - measurementSpec: MeasurementSpec, - nonce: Long, - measurementResult: Measurement.Result, - ) { - logger.log(Level.INFO, "Direct MeasurementSpec:\n$measurementSpec") - logger.log(Level.INFO, "Direct MeasurementResult:\n$measurementResult") - - DataProviderCertificateKey.fromName(requisition.dataProviderCertificate) - ?: throw RequisitionRefusalException( - Requisition.Refusal.Justification.UNFULFILLABLE, - "Invalid data provider certificate", - ) - val measurementEncryptionPublicKey: EncryptionPublicKey = - if (measurementSpec.hasMeasurementPublicKey()) { - measurementSpec.measurementPublicKey.unpack() - } else { - @Suppress("DEPRECATION") // Handle legacy resources. - EncryptionPublicKey.parseFrom(measurementSpec.serializedMeasurementPublicKey) - } - val signedResult: SignedMessage = signResult(measurementResult, edpData.signingKeyHandle) - val encryptedResult: EncryptedMessage = - encryptResult(signedResult, measurementEncryptionPublicKey) - - try { - requisitionsStub.fulfillDirectRequisition( - fulfillDirectRequisitionRequest { - name = requisition.name - this.encryptedResult = encryptedResult - this.nonce = nonce - this.certificate = edpData.certificateKey.toName() - } - ) - } catch (e: StatusException) { - throw Exception("Error fulfilling direct requisition ${requisition.name}", e) - } - } - companion object { init { System.loadLibrary("secret_share_generator_adapter") diff --git a/src/main/kotlin/org/wfanet/measurement/loadtest/dataprovider/EdpSimulatorRunner.kt b/src/main/kotlin/org/wfanet/measurement/loadtest/dataprovider/EdpSimulatorRunner.kt index 3adf532144c..4e9eaf84339 100644 --- a/src/main/kotlin/org/wfanet/measurement/loadtest/dataprovider/EdpSimulatorRunner.kt +++ b/src/main/kotlin/org/wfanet/measurement/loadtest/dataprovider/EdpSimulatorRunner.kt @@ -35,6 +35,7 @@ import org.wfanet.measurement.common.crypto.testing.loadSigningKey import org.wfanet.measurement.common.crypto.tink.loadPrivateKey import org.wfanet.measurement.common.grpc.buildMutualTlsChannel import org.wfanet.measurement.common.throttler.MinimumIntervalThrottler +import org.wfanet.measurement.dataprovider.DataProviderData import org.wfanet.measurement.loadtest.config.PrivacyBudgets.createNoOpPrivacyBudgetManager import picocli.CommandLine @@ -83,7 +84,7 @@ abstract class EdpSimulatorRunner : Runnable { val certificateKey = DataProviderCertificateKey.fromName(flags.dataProviderCertificateResourceName)!! val edpData = - EdpData( + DataProviderData( flags.dataProviderResourceName, flags.dataProviderDisplayName, loadPrivateKey(flags.edpEncryptionPrivateKeyset), diff --git a/src/main/kotlin/org/wfanet/measurement/populationdataprovider/BUILD.bazel b/src/main/kotlin/org/wfanet/measurement/populationdataprovider/BUILD.bazel new file mode 100644 index 00000000000..ed2a4108553 --- /dev/null +++ b/src/main/kotlin/org/wfanet/measurement/populationdataprovider/BUILD.bazel @@ -0,0 +1,22 @@ +load("@wfa_rules_kotlin_jvm//kotlin:defs.bzl", "kt_jvm_library") + +package( + default_visibility = [ + "//src/main/kotlin/org/wfanet/measurement/loadtest/dataprovider:__subpackages__", + "//src/test/kotlin/org/wfanet/measurement/loadtest/dataprovider:__subpackages__", + ], +) + +kt_jvm_library( + name = "population_requisition_fulfiller", + srcs = ["PopulationRequisitionFulfiller.kt"], + deps = [ + "//src/main/kotlin/org/wfanet/measurement/api/v2alpha:population_spec_validator", + "//src/main/kotlin/org/wfanet/measurement/dataprovider:requisition_fulfiller", + "//src/main/kotlin/org/wfanet/measurement/eventdataprovider/eventfiltration:event_filters", + "//src/main/proto/wfa/measurement/api/v2alpha:certificates_service_kt_jvm_grpc_proto", + "//src/main/proto/wfa/measurement/api/v2alpha:model_releases_service_kt_jvm_grpc_proto", + "//src/main/proto/wfa/measurement/api/v2alpha:model_rollouts_service_kt_jvm_grpc_proto", + "//src/main/proto/wfa/measurement/api/v2alpha:population_spec_kt_jvm_proto", + ], +) diff --git a/src/main/kotlin/org/wfanet/measurement/populationdataprovider/PopulationRequisitionFulfiller.kt b/src/main/kotlin/org/wfanet/measurement/populationdataprovider/PopulationRequisitionFulfiller.kt new file mode 100644 index 00000000000..199b6f17072 --- /dev/null +++ b/src/main/kotlin/org/wfanet/measurement/populationdataprovider/PopulationRequisitionFulfiller.kt @@ -0,0 +1,349 @@ +// Copyright 2024 The Cross-Media Measurement Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.wfanet.measurement.populationdataprovider + +import com.google.protobuf.Any +import com.google.protobuf.ByteString +import com.google.protobuf.Descriptors.Descriptor +import com.google.protobuf.Descriptors.FieldDescriptor +import com.google.protobuf.DynamicMessage +import com.google.protobuf.TypeRegistry +import io.grpc.Status +import io.grpc.StatusException +import java.security.cert.X509Certificate +import java.util.logging.Level +import org.projectnessie.cel.Program +import org.wfanet.measurement.api.v2alpha.Certificate +import org.wfanet.measurement.api.v2alpha.CertificatesGrpcKt.CertificatesCoroutineStub +import org.wfanet.measurement.api.v2alpha.DeterministicCount +import org.wfanet.measurement.api.v2alpha.EventAnnotationsProto +import org.wfanet.measurement.api.v2alpha.Measurement +import org.wfanet.measurement.api.v2alpha.MeasurementConsumerKey +import org.wfanet.measurement.api.v2alpha.MeasurementKey +import org.wfanet.measurement.api.v2alpha.MeasurementKt +import org.wfanet.measurement.api.v2alpha.MeasurementSpec +import org.wfanet.measurement.api.v2alpha.ModelRelease +import org.wfanet.measurement.api.v2alpha.ModelReleasesGrpcKt.ModelReleasesCoroutineStub +import org.wfanet.measurement.api.v2alpha.ModelRolloutsGrpcKt.ModelRolloutsCoroutineStub +import org.wfanet.measurement.api.v2alpha.PopulationKey +import org.wfanet.measurement.api.v2alpha.PopulationSpec +import org.wfanet.measurement.api.v2alpha.PopulationSpecValidator +import org.wfanet.measurement.api.v2alpha.Requisition +import org.wfanet.measurement.api.v2alpha.RequisitionSpec +import org.wfanet.measurement.api.v2alpha.RequisitionsGrpcKt.RequisitionsCoroutineStub +import org.wfanet.measurement.api.v2alpha.getModelReleaseRequest +import org.wfanet.measurement.api.v2alpha.listModelRolloutsRequest +import org.wfanet.measurement.api.v2alpha.size +import org.wfanet.measurement.common.throttler.Throttler +import org.wfanet.measurement.common.toLocalDate +import org.wfanet.measurement.dataprovider.DataProviderData +import org.wfanet.measurement.dataprovider.RequisitionFulfiller +import org.wfanet.measurement.eventdataprovider.eventfiltration.EventFilters + +/** + * Data class associated with a population. + * + * @param populationSpec The [PopulationSpec] that contains 1) information on the attributes of a + * population, and 2) vid ranges that are used to calculate the size of the population. + * @param eventMessageDescriptor The [Descriptor] of the event message that wraps the event template + * which is used by the CEL program to filter out irrelevant populations and calculate the size. + * The event template should contain the same types provided in the attributes list of the + * [PopulationSpec]. + */ +data class PopulationInfo( + val populationSpec: PopulationSpec, + // TODO(jojijac0b): Dynamically generate an Event message type for each EventTemplate by building + // a DescriptorProto. + val eventMessageDescriptor: Descriptor, +) +/** A requisition fulfiller for PDP businesses. */ +class PopulationRequisitionFulfiller( + pdpData: DataProviderData, + certificatesStub: CertificatesCoroutineStub, + requisitionsStub: RequisitionsCoroutineStub, + throttler: Throttler, + trustedCertificates: Map, + measurementConsumerName: String, + private val modelRolloutsStub: ModelRolloutsCoroutineStub, + private val modelReleasesStub: ModelReleasesCoroutineStub, + private val populationInfoMap: Map, + private val typeRegistry: TypeRegistry, +) : + RequisitionFulfiller( + pdpData, + certificatesStub, + requisitionsStub, + throttler, + trustedCertificates, + measurementConsumerName, + ) { + + /** A sequence of operations done in the simulator. */ + override suspend fun run() { + throttler.loopOnReady { executeRequisitionFulfillingWorkflow() } + } + /** Executes the requisition fulfillment workflow. */ + override suspend fun executeRequisitionFulfillingWorkflow() { + logger.info("Executing requisitionFulfillingWorkflow...") + val requisitions = + getRequisitions().filter { + checkNotNull(MeasurementKey.fromName(it.measurement)).measurementConsumerId == + checkNotNull(MeasurementConsumerKey.fromName(measurementConsumerName)) + .measurementConsumerId + } + + if (requisitions.isEmpty()) { + logger.fine("No unfulfilled requisition. Polling again later...") + return + } + + for (requisition in requisitions) { + try { + logger.info("Processing requisition ${requisition.name}...") + + // TODO(@SanjayVas): Verify that DataProvider public key in Requisition matches private key + // in pdpData. A real PDP would look up the matching private key. + + val measurementConsumerCertificate: Certificate = + getCertificate(requisition.measurementConsumerCertificate) + + val (measurementSpec, requisitionSpec) = + try { + verifySpecifications(requisition, measurementConsumerCertificate) + } catch (e: InvalidConsentSignalException) { + logger.log(Level.WARNING, e) { + "Consent signaling verification failed for ${requisition.name}" + } + throw RequisitionRefusalException( + Requisition.Refusal.Justification.CONSENT_SIGNAL_INVALID, + e.message.orEmpty(), + ) + } + + logger.log(Level.INFO, "MeasurementSpec:\n$measurementSpec") + logger.log(Level.INFO, "RequisitionSpec:\n$requisitionSpec") + + val modelRelease: ModelRelease = getModelRelease(measurementSpec) + + val populationId: PopulationKey = + requireNotNull(PopulationKey.fromName(modelRelease.population)) { + throw InvalidSpecException( + "Measurement spec model line does not contain a valid Population for the model release of its latest model rollout." + ) + } + + val populationInfo: PopulationInfo = populationInfoMap.getValue(populationId) + populationInfo.eventMessageDescriptor.fields + PopulationSpecValidator.validateVidRangesList(populationInfo.populationSpec).getOrThrow() + + val requisitionFilterExpression = requisitionSpec.population.filter.expression + + fulfillPopulationMeasurement( + requisition, + requisitionSpec, + measurementSpec, + requisitionFilterExpression, + populationInfo, + typeRegistry, + ) + } catch (refusalException: RequisitionRefusalException) { + refuseRequisition( + requisition.name, + refusalException.justification, + refusalException.message ?: "Refuse to fulfill requisition.", + ) + } + } + } + + /** + * Returns the [ModelRelease] associated with the latest `ModelRollout` that is connected to the + * `ModelLine` provided in the MeasurementSpec` + */ + private suspend fun getModelRelease(measurementSpec: MeasurementSpec): ModelRelease { + // TODO(@jojijac0b): Handle case where measurement spans across one or more model outages. + // Should use HoldbackModelLine in this case to reflect what is done with measurement reports. + + val measurementSpecModelLineName = measurementSpec.modelLine + + // Returns list of ModelRollouts. + val listModelRolloutsResponse = + try { + modelRolloutsStub.listModelRollouts( + listModelRolloutsRequest { parent = measurementSpecModelLineName } + ) + } catch (e: StatusException) { + throw when (e.status.code) { + Status.Code.NOT_FOUND -> + InvalidSpecException("ModelLine $measurementSpecModelLineName not found", e) + else -> Exception("Error retrieving ModelLine $measurementSpecModelLineName", e) + } + } + + // Sort list of ModelRollouts by descending updateTime. + val sortedModelRolloutsList = + listModelRolloutsResponse.modelRolloutsList.sortedWith { a, b -> + val aDate = + if (a.hasGradualRolloutPeriod()) a.gradualRolloutPeriod.endDate else a.instantRolloutDate + val bDate = + if (b.hasGradualRolloutPeriod()) b.gradualRolloutPeriod.endDate else b.instantRolloutDate + if (aDate.toLocalDate().isBefore(bDate.toLocalDate())) -1 else 1 + } + + // Retrieves latest ModelRollout from list. + val latestModelRollout = sortedModelRolloutsList.first() + val modelReleaseName = latestModelRollout.modelRelease + + // Returns ModelRelease associated with latest ModelRollout. + return try { + modelReleasesStub.getModelRelease(getModelReleaseRequest { name = modelReleaseName }) + } catch (e: StatusException) { + throw when (e.status.code) { + Status.Code.NOT_FOUND -> InvalidSpecException("ModelRelease $modelReleaseName not found", e) + else -> Exception("Error retrieving ModelLine $modelReleaseName", e) + } + } + } + + /** Fulfills a population measurement. */ + private suspend fun fulfillPopulationMeasurement( + requisition: Requisition, + requisitionSpec: RequisitionSpec, + measurementSpec: MeasurementSpec, + filterExpression: String, + populationInfo: PopulationInfo, + typeRegistry: TypeRegistry, + ) { + + val operativeFields = getPopulationOperativeFields(populationInfo.eventMessageDescriptor) + + // CEL program that will check the event against the filter expression + val program: Program = + EventFilters.compileProgram( + populationInfo.eventMessageDescriptor, + filterExpression, + operativeFields, + ) + + // Filters populationBucketsList through a CEL program and sums the result. + val populationSum = + populationInfo.populationSpec.subpopulationsList.sumOf { + val attributesList = it.attributesList + val vidRanges = it.vidRangesList + val shouldSumPopulation = + isValidAttributesList(attributesList, populationInfo, program, typeRegistry) + if (shouldSumPopulation) { + vidRanges.sumOf { jt -> jt.size() } + } else { + 0L + } + } + + // Create measurement result with sum of valid populations. + val measurementResult: Measurement.Result = + MeasurementKt.result { + population = + MeasurementKt.ResultKt.population { + value = populationSum + deterministicCount = DeterministicCount.getDefaultInstance() + } + } + + // Fulfill the measurement. + fulfillDirectMeasurement(requisition, measurementSpec, requisitionSpec.nonce, measurementResult) + } + + /** + * Returns a [Set] of operative fields derived from a [Descriptor]. Only fields that have the + * population attribute set to true will be returned. + */ + private fun getPopulationOperativeFields(eventMessageDescriptor: Descriptor): Set { + // TODO(jojijac0b): Pass in specific template descriptor instead of entire event message + // descriptor. + return eventMessageDescriptor.fields + .flatMap { templateField -> + templateField.messageType.fields.map { templateFieldDescriptor -> + if ( + templateFieldDescriptor.options + .getExtension(EventAnnotationsProto.templateField) + .populationAttribute + ) { + "${templateField.name}.${templateFieldDescriptor.name}" + } else null + } + } + .filterNotNull() + .toSet() + } + + /** + * Returns a [Boolean] representing whether the attributes in the list are 1) the correct type and + * 2) pass a check against the filter expression after being run through a CEL program. + */ + private fun isValidAttributesList( + attributeList: List, + populationInfo: PopulationInfo, + program: Program, + typeRegistry: TypeRegistry, + ): Boolean { + val eventMessageDescriptor: Descriptor = populationInfo.eventMessageDescriptor + + // Event message that will be passed to CEL program + val eventMessage: DynamicMessage.Builder = DynamicMessage.newBuilder(eventMessageDescriptor) + + // Populate event message that will be used in the program if attribute is valid + attributeList.forEach { attribute -> + val attributeDescriptor: Descriptor = typeRegistry.getDescriptorForTypeUrl(attribute.typeUrl) + val requiredAttributes: List = + attributeDescriptor.fields.filter { + it.options.getExtension(EventAnnotationsProto.templateField).populationAttribute + } + + // Create the attribute message of the type specified in attribute descriptor using the type + // registry. + val descriptor: Descriptor = typeRegistry.getDescriptorForTypeUrl(attribute.typeUrl) + val attributeMessage: DynamicMessage = DynamicMessage.parseFrom(descriptor, attribute.value) + + // If the attribute type is not a field in the event message, it is not valid. + val isAttributeFieldInEvent = + eventMessageDescriptor.fields.any { + it.messageType.name === attributeMessage.descriptorForType.name + } + require(isAttributeFieldInEvent) { + throw InvalidSpecException( + "Subpopulation attribute is not a field in the event descriptor." + ) + } + + // If the population_attribute option in the attribute message is set to true, we do not allow + // the value to be unspecified. + val isValidAttribute = attributeMessage.allFields.keys.containsAll(requiredAttributes) + require(isValidAttribute) { + throw InvalidSpecException("Subpopulation population attribute cannot be unspecified.") + } + + // Find corresponding field descriptor for this attribute. + val fieldDescriptor: FieldDescriptor = + eventMessageDescriptor.fields.first { eventField -> + eventField.messageType.name === attributeDescriptor.name + } + + // Set field in event message with typed attribute message. + eventMessage.setField(fieldDescriptor, attributeMessage) + } + + return EventFilters.matches(eventMessage.build(), program) + } +} diff --git a/src/test/kotlin/org/wfanet/measurement/loadtest/dataprovider/BUILD.bazel b/src/test/kotlin/org/wfanet/measurement/loadtest/dataprovider/BUILD.bazel index 5da222976a6..c4d85dd90b5 100644 --- a/src/test/kotlin/org/wfanet/measurement/loadtest/dataprovider/BUILD.bazel +++ b/src/test/kotlin/org/wfanet/measurement/loadtest/dataprovider/BUILD.bazel @@ -80,6 +80,53 @@ kt_jvm_test( ], ) +kt_jvm_library( + name = "population_requisition_fulfiller_test", + srcs = ["PopulationRequisitionFulfillerTest.kt"], + data = [ + "//src/main/k8s/testing/secretfiles:all_configs", + "//src/main/k8s/testing/secretfiles:all_der_files", + "//src/main/k8s/testing/secretfiles:all_tink_keysets", + "//src/main/k8s/testing/secretfiles:edp_trusted_certs.pem", + ], + deps = [ + "//src/main/kotlin/org/wfanet/measurement/api/v2alpha/testing", + "//src/main/kotlin/org/wfanet/measurement/eventdataprovider/eventfiltration:event_filters", + "//src/main/kotlin/org/wfanet/measurement/eventdataprovider/privacybudgetmanagement:privacy_budget_manager", + "//src/main/kotlin/org/wfanet/measurement/eventdataprovider/privacybudgetmanagement/testing", + "//src/main/kotlin/org/wfanet/measurement/integration/common:configs", + "//src/main/kotlin/org/wfanet/measurement/integration/common:synthetic_generation_specs", + "//src/main/kotlin/org/wfanet/measurement/loadtest/common:sample_vids", + "//src/main/kotlin/org/wfanet/measurement/loadtest/config:event_group_metadata", + "//src/main/kotlin/org/wfanet/measurement/loadtest/config:test_identifiers", + "//src/main/kotlin/org/wfanet/measurement/loadtest/dataprovider:in_memory_event_query", + "//src/main/kotlin/org/wfanet/measurement/loadtest/dataprovider:measurement_results", + "//src/main/kotlin/org/wfanet/measurement/loadtest/dataprovider:synthetic_generator_event_query", + "//src/main/kotlin/org/wfanet/measurement/populationdataprovider:population_requisition_fulfiller", + "//src/main/proto/wfa/measurement/api/v2alpha:model_releases_service_kt_jvm_grpc_proto", + "//src/main/proto/wfa/measurement/api/v2alpha:model_rollouts_service_kt_jvm_grpc_proto", + "//src/main/proto/wfa/measurement/api/v2alpha:protocol_config_kt_jvm_proto", + "//src/main/proto/wfa/measurement/api/v2alpha/event_templates/testing:bad_templates_kt_jvm_proto", + "//src/main/proto/wfa/measurement/api/v2alpha/event_templates/testing:test_event_kt_jvm_proto", + "@wfa_common_jvm//imports/java/com/google/common/truth", + "@wfa_common_jvm//imports/java/com/google/common/truth/extensions/proto", + "@wfa_common_jvm//imports/java/org/junit", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/crypto/testing", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/crypto/tink", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/grpc/testing", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/testing", + "@wfa_consent_signaling_client//src/main/kotlin/org/wfanet/measurement/consent/client/duchy", + "@wfa_consent_signaling_client//src/main/kotlin/org/wfanet/measurement/consent/client/measurementconsumer", + ], +) + +# TODO(bazelbuild/rules_kotlin#1088): Use kt_jvm_test when fixed. +java_test( + name = "PopulationRequisitionFulfillerTest", + test_class = "org.wfanet.measurement.loadtest.dataprovider.PopulationRequisitionFulfillerTest", + runtime_deps = [":population_requisition_fulfiller_test"], +) + kt_jvm_test( name = "VidToIndexMapGeneratorTest", srcs = ["VidToIndexMapGeneratorTest.kt"], diff --git a/src/test/kotlin/org/wfanet/measurement/loadtest/dataprovider/EdpSimulatorTest.kt b/src/test/kotlin/org/wfanet/measurement/loadtest/dataprovider/EdpSimulatorTest.kt index 5c6c7d1bead..51daad4e2fc 100644 --- a/src/test/kotlin/org/wfanet/measurement/loadtest/dataprovider/EdpSimulatorTest.kt +++ b/src/test/kotlin/org/wfanet/measurement/loadtest/dataprovider/EdpSimulatorTest.kt @@ -161,6 +161,7 @@ import org.wfanet.measurement.consent.client.measurementconsumer.encryptRequisit import org.wfanet.measurement.consent.client.measurementconsumer.signEncryptionPublicKey import org.wfanet.measurement.consent.client.measurementconsumer.signMeasurementSpec import org.wfanet.measurement.consent.client.measurementconsumer.signRequisitionSpec +import org.wfanet.measurement.dataprovider.DataProviderData import org.wfanet.measurement.eventdataprovider.privacybudgetmanagement.AcdpCharge import org.wfanet.measurement.eventdataprovider.privacybudgetmanagement.AgeGroup as PrivacyLandscapeAge import org.wfanet.measurement.eventdataprovider.privacybudgetmanagement.Gender as PrivacyLandscapeGender @@ -2964,7 +2965,7 @@ class EdpSimulatorTest { subjectKeyIdentifier = EDP_RESULT_SIGNING_KEY.certificate.subjectKeyIdentifier!! } private val EDP_DATA = - EdpData( + DataProviderData( EDP_NAME, EDP_DISPLAY_NAME, loadEncryptionPrivateKey("${EDP_DISPLAY_NAME}_enc_private.tink"), diff --git a/src/test/kotlin/org/wfanet/measurement/loadtest/dataprovider/PopulationRequisitionFulfillerTest.kt b/src/test/kotlin/org/wfanet/measurement/loadtest/dataprovider/PopulationRequisitionFulfillerTest.kt new file mode 100644 index 00000000000..d09f1e38ec1 --- /dev/null +++ b/src/test/kotlin/org/wfanet/measurement/loadtest/dataprovider/PopulationRequisitionFulfillerTest.kt @@ -0,0 +1,915 @@ +// Copyright 2024 The Cross-Media Measurement Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.wfanet.measurement.loadtest.dataprovider + +import com.google.common.truth.Truth.assertThat +import com.google.protobuf.ByteString +import com.google.protobuf.Timestamp +import com.google.protobuf.TypeRegistry +import com.google.protobuf.kotlin.toByteString +import java.lang.UnsupportedOperationException +import java.nio.file.Path +import java.nio.file.Paths +import java.security.cert.X509Certificate +import java.time.Instant +import kotlin.random.Random +import kotlin.test.assertFailsWith +import kotlinx.coroutines.runBlocking +import org.junit.Rule +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 +import org.mockito.kotlin.any +import org.mockito.kotlin.eq +import org.mockito.kotlin.stub +import org.wfanet.measurement.api.v2alpha.Certificate +import org.wfanet.measurement.api.v2alpha.CertificatesGrpcKt.CertificatesCoroutineImplBase +import org.wfanet.measurement.api.v2alpha.CertificatesGrpcKt.CertificatesCoroutineStub +import org.wfanet.measurement.api.v2alpha.DataProviderCertificateKey +import org.wfanet.measurement.api.v2alpha.DataProvidersGrpcKt.DataProvidersCoroutineImplBase +import org.wfanet.measurement.api.v2alpha.EncryptionPublicKey +import org.wfanet.measurement.api.v2alpha.FulfillDirectRequisitionRequest +import org.wfanet.measurement.api.v2alpha.Measurement +import org.wfanet.measurement.api.v2alpha.MeasurementSpecKt.reachAndFrequency +import org.wfanet.measurement.api.v2alpha.MeasurementSpecKt.vidSamplingInterval +import org.wfanet.measurement.api.v2alpha.ModelReleasesGrpcKt.ModelReleasesCoroutineImplBase +import org.wfanet.measurement.api.v2alpha.ModelReleasesGrpcKt.ModelReleasesCoroutineStub +import org.wfanet.measurement.api.v2alpha.ModelRolloutsGrpcKt.ModelRolloutsCoroutineImplBase +import org.wfanet.measurement.api.v2alpha.ModelRolloutsGrpcKt.ModelRolloutsCoroutineStub +import org.wfanet.measurement.api.v2alpha.PopulationKey +import org.wfanet.measurement.api.v2alpha.PopulationSpecKt.subPopulation +import org.wfanet.measurement.api.v2alpha.PopulationSpecKt.vidRange +import org.wfanet.measurement.api.v2alpha.ProtocolConfig +import org.wfanet.measurement.api.v2alpha.ProtocolConfigKt +import org.wfanet.measurement.api.v2alpha.ReplaceDataAvailabilityIntervalRequest +import org.wfanet.measurement.api.v2alpha.Requisition +import org.wfanet.measurement.api.v2alpha.RequisitionSpecKt +import org.wfanet.measurement.api.v2alpha.RequisitionSpecKt.eventFilter +import org.wfanet.measurement.api.v2alpha.RequisitionsGrpcKt.RequisitionsCoroutineImplBase +import org.wfanet.measurement.api.v2alpha.RequisitionsGrpcKt.RequisitionsCoroutineStub +import org.wfanet.measurement.api.v2alpha.certificate +import org.wfanet.measurement.api.v2alpha.copy +import org.wfanet.measurement.api.v2alpha.dataProvider +import org.wfanet.measurement.api.v2alpha.differentialPrivacyParams +import org.wfanet.measurement.api.v2alpha.event_templates.testing.Dummy +import org.wfanet.measurement.api.v2alpha.event_templates.testing.Person +import org.wfanet.measurement.api.v2alpha.event_templates.testing.TestEvent +import org.wfanet.measurement.api.v2alpha.event_templates.testing.person +import org.wfanet.measurement.api.v2alpha.fulfillDirectRequisitionResponse +import org.wfanet.measurement.api.v2alpha.getCertificateRequest +import org.wfanet.measurement.api.v2alpha.listModelRolloutsResponse +import org.wfanet.measurement.api.v2alpha.listRequisitionsResponse +import org.wfanet.measurement.api.v2alpha.measurementSpec +import org.wfanet.measurement.api.v2alpha.modelRelease +import org.wfanet.measurement.api.v2alpha.modelRollout +import org.wfanet.measurement.api.v2alpha.populationSpec +import org.wfanet.measurement.api.v2alpha.protocolConfig +import org.wfanet.measurement.api.v2alpha.requisition +import org.wfanet.measurement.api.v2alpha.requisitionSpec +import org.wfanet.measurement.api.v2alpha.size +import org.wfanet.measurement.api.v2alpha.unpack +import org.wfanet.measurement.common.crypto.Hashing +import org.wfanet.measurement.common.crypto.SigningKeyHandle +import org.wfanet.measurement.common.crypto.authorityKeyIdentifier +import org.wfanet.measurement.common.crypto.readCertificateCollection +import org.wfanet.measurement.common.crypto.subjectKeyIdentifier +import org.wfanet.measurement.common.crypto.testing.loadSigningKey +import org.wfanet.measurement.common.crypto.tink.TinkPrivateKeyHandle +import org.wfanet.measurement.common.crypto.tink.loadPrivateKey +import org.wfanet.measurement.common.crypto.tink.loadPublicKey +import org.wfanet.measurement.common.getRuntimePath +import org.wfanet.measurement.common.grpc.testing.GrpcTestServerRule +import org.wfanet.measurement.common.grpc.testing.mockService +import org.wfanet.measurement.common.identity.externalIdToApiId +import org.wfanet.measurement.common.pack +import org.wfanet.measurement.common.readByteString +import org.wfanet.measurement.common.testing.verifyAndCapture +import org.wfanet.measurement.common.throttler.Throttler +import org.wfanet.measurement.common.toProtoTime +import org.wfanet.measurement.consent.client.common.toEncryptionPublicKey +import org.wfanet.measurement.consent.client.measurementconsumer.decryptResult +import org.wfanet.measurement.consent.client.measurementconsumer.encryptRequisitionSpec +import org.wfanet.measurement.consent.client.measurementconsumer.signMeasurementSpec +import org.wfanet.measurement.consent.client.measurementconsumer.signRequisitionSpec +import org.wfanet.measurement.dataprovider.DataProviderData +import org.wfanet.measurement.eventdataprovider.eventfiltration.validation.EventFilterValidationException +import org.wfanet.measurement.populationdataprovider.PopulationInfo +import org.wfanet.measurement.populationdataprovider.PopulationRequisitionFulfiller + +@RunWith(JUnit4::class) +class PopulationRequisitionFulfillerTest { + private val certificatesServiceMock: CertificatesCoroutineImplBase = mockService { + onBlocking { + getCertificate(eq(getCertificateRequest { name = MEASUREMENT_CONSUMER_CERTIFICATE_NAME })) + } + .thenReturn(MEASUREMENT_CONSUMER_CERTIFICATE) + onBlocking { + getCertificate(eq(getCertificateRequest { name = DATA_PROVIDER_CERTIFICATE.name })) + } + .thenReturn(DATA_PROVIDER_CERTIFICATE) + onBlocking { + getCertificate(eq(getCertificateRequest { name = DATA_PROVIDER_CERTIFICATE.name })) + } + .thenReturn(DATA_PROVIDER_CERTIFICATE) + } + private val dataProvidersServiceMock: DataProvidersCoroutineImplBase = mockService { + onBlocking { replaceDataAvailabilityInterval(any()) } + .thenAnswer { + val request = it.arguments[0] as ReplaceDataAvailabilityIntervalRequest + dataProvider { dataAvailabilityInterval = request.dataAvailabilityInterval } + } + } + + private val requisitionsServiceMock: RequisitionsCoroutineImplBase = mockService { + onBlocking { listRequisitions(any()) } + .thenReturn(listRequisitionsResponse { requisitions += REQUISITION }) + onBlocking { fulfillDirectRequisition(any()) }.thenReturn(fulfillDirectRequisitionResponse {}) + } + + private val modelRolloutsServiceStub: ModelRolloutsCoroutineImplBase = mockService { + onBlocking { listModelRollouts(any()) } + .thenReturn(listModelRolloutsResponse { modelRollouts += listOf(MODEL_ROLLOUT) }) + } + + private val modelReleasesServiceStub: ModelReleasesCoroutineImplBase = mockService { + onBlocking { getModelRelease(any()) }.thenReturn(MODEL_RELEASE_1) + } + + @get:Rule + val grpcTestServerRule = GrpcTestServerRule { + addService(certificatesServiceMock) + addService(dataProvidersServiceMock) + addService(requisitionsServiceMock) + addService(modelRolloutsServiceStub) + addService(modelReleasesServiceStub) + } + + private val certificatesStub: CertificatesCoroutineStub by lazy { + CertificatesCoroutineStub(grpcTestServerRule.channel) + } + + private val requisitionsStub: RequisitionsCoroutineStub by lazy { + RequisitionsCoroutineStub(grpcTestServerRule.channel) + } + + private val modelRolloutsStub: ModelRolloutsCoroutineStub by lazy { + ModelRolloutsCoroutineStub(grpcTestServerRule.channel) + } + + private val modelReleasesStub: ModelReleasesCoroutineStub by lazy { + ModelReleasesCoroutineStub(grpcTestServerRule.channel) + } + + @Test + fun `fulfills requisition for 18-34 age range`() { + requisitionsServiceMock.stub { + onBlocking { listRequisitions(any()) } + .thenReturn(listRequisitionsResponse { requisitions += REQUISITION }) + } + + val requisitionFulfiller = + PopulationRequisitionFulfiller( + PDP_DATA, + certificatesStub, + requisitionsStub, + dummyThrottler, + TRUSTED_CERTIFICATES, + MC_NAME, + modelRolloutsStub, + modelReleasesStub, + POPULATION_INFO_MAP, + TYPE_REGISTRY, + ) + + runBlocking { requisitionFulfiller.executeRequisitionFulfillingWorkflow() } + + val request: FulfillDirectRequisitionRequest = + verifyAndCapture( + requisitionsServiceMock, + RequisitionsCoroutineImplBase::fulfillDirectRequisition, + ) + val result: Measurement.Result = decryptResult(request.encryptedResult, MC_PRIVATE_KEY).unpack() + + // Result should be the value of SUB_POPULATION_1 + assertThat(result.population.value).isEqualTo(VID_RANGE_1.size()) + } + + @Test + fun `fulfills requisition for males`() { + val requisitionSpec = + REQUISITION_SPEC.copy { + population = + RequisitionSpecKt.population { + filter = eventFilter { expression = "person.gender == ${Person.Gender.MALE_VALUE}" } + } + } + + val encryptedRequisitionSpec = + encryptRequisitionSpec( + signRequisitionSpec(requisitionSpec, MC_SIGNING_KEY), + DATA_PROVIDER_PUBLIC_KEY, + ) + + val requisition = REQUISITION.copy { this.encryptedRequisitionSpec = encryptedRequisitionSpec } + + requisitionsServiceMock.stub { + onBlocking { listRequisitions(any()) } + .thenReturn(listRequisitionsResponse { requisitions += requisition }) + } + + val requisitionFulfiller = + PopulationRequisitionFulfiller( + PDP_DATA, + certificatesStub, + requisitionsStub, + dummyThrottler, + TRUSTED_CERTIFICATES, + MC_NAME, + modelRolloutsStub, + modelReleasesStub, + POPULATION_INFO_MAP, + TYPE_REGISTRY, + ) + + runBlocking { requisitionFulfiller.executeRequisitionFulfillingWorkflow() } + + val request: FulfillDirectRequisitionRequest = + verifyAndCapture( + requisitionsServiceMock, + RequisitionsCoroutineImplBase::fulfillDirectRequisition, + ) + val result: Measurement.Result = decryptResult(request.encryptedResult, MC_PRIVATE_KEY).unpack() + + // Result should be the sum of SUB_POPULATION_1 and SUB_POPULATION_2 + assertThat(result.population.value).isEqualTo(VID_RANGE_1.size() + VID_RANGE_2.size()) + } + + @Test + fun `fulfills requisition for males 35-54`() { + val requisitionSpec = + REQUISITION_SPEC.copy { + population = + RequisitionSpecKt.population { + filter = eventFilter { + expression = + "person.gender == ${Person.Gender.MALE_VALUE} && person.age_group == ${Person.AgeGroup.YEARS_35_TO_54_VALUE}" + } + } + } + + val encryptedRequisitionSpec = + encryptRequisitionSpec( + signRequisitionSpec(requisitionSpec, MC_SIGNING_KEY), + DATA_PROVIDER_PUBLIC_KEY, + ) + + val requisition = REQUISITION.copy { this.encryptedRequisitionSpec = encryptedRequisitionSpec } + + requisitionsServiceMock.stub { + onBlocking { listRequisitions(any()) } + .thenReturn(listRequisitionsResponse { requisitions += requisition }) + } + + val requisitionFulfiller = + PopulationRequisitionFulfiller( + PDP_DATA, + certificatesStub, + requisitionsStub, + dummyThrottler, + TRUSTED_CERTIFICATES, + MC_NAME, + modelRolloutsStub, + modelReleasesStub, + POPULATION_INFO_MAP, + TYPE_REGISTRY, + ) + + runBlocking { requisitionFulfiller.executeRequisitionFulfillingWorkflow() } + + val request: FulfillDirectRequisitionRequest = + verifyAndCapture( + requisitionsServiceMock, + RequisitionsCoroutineImplBase::fulfillDirectRequisition, + ) + val result: Measurement.Result = decryptResult(request.encryptedResult, MC_PRIVATE_KEY).unpack() + + // Result should be the value of SUB_POPULATION_2 + assertThat(result.population.value).isEqualTo(VID_RANGE_2.size()) + } + + @Test + fun `fulfills requisition for females using different ModelRelease`() { + modelReleasesServiceStub.stub { + onBlocking { getModelRelease(any()) }.thenReturn(MODEL_RELEASE_2) + } + + val requisitionSpec = + REQUISITION_SPEC.copy { + population = + RequisitionSpecKt.population { + filter = eventFilter { expression = "person.gender == ${Person.Gender.FEMALE_VALUE}" } + } + } + + val encryptedRequisitionSpec = + encryptRequisitionSpec( + signRequisitionSpec(requisitionSpec, MC_SIGNING_KEY), + DATA_PROVIDER_PUBLIC_KEY, + ) + + val requisition = REQUISITION.copy { this.encryptedRequisitionSpec = encryptedRequisitionSpec } + + requisitionsServiceMock.stub { + onBlocking { listRequisitions(any()) } + .thenReturn(listRequisitionsResponse { requisitions += requisition }) + } + + val requisitionFulfiller = + PopulationRequisitionFulfiller( + PDP_DATA, + certificatesStub, + requisitionsStub, + dummyThrottler, + TRUSTED_CERTIFICATES, + MC_NAME, + modelRolloutsStub, + modelReleasesStub, + POPULATION_INFO_MAP, + TYPE_REGISTRY, + ) + + runBlocking { requisitionFulfiller.executeRequisitionFulfillingWorkflow() } + + val request: FulfillDirectRequisitionRequest = + verifyAndCapture( + requisitionsServiceMock, + RequisitionsCoroutineImplBase::fulfillDirectRequisition, + ) + val result: Measurement.Result = decryptResult(request.encryptedResult, MC_PRIVATE_KEY).unpack() + + // Result should be SUB_POPULATION_3 + assertThat(result.population.value).isEqualTo(VID_RANGE_3.size()) + } + + @Test + fun `fulfills requisition for females using field not part of population spec`() { + modelReleasesServiceStub.stub { + onBlocking { getModelRelease(any()) }.thenReturn(MODEL_RELEASE_2) + } + + val requisitionSpec = + REQUISITION_SPEC.copy { + population = + RequisitionSpecKt.population { + filter = eventFilter { + expression = + "person.gender == ${Person.Gender.FEMALE_VALUE} && banner_ad.viewable == true" + } + } + } + + val encryptedRequisitionSpec = + encryptRequisitionSpec( + signRequisitionSpec(requisitionSpec, MC_SIGNING_KEY), + DATA_PROVIDER_PUBLIC_KEY, + ) + + val requisition = REQUISITION.copy { this.encryptedRequisitionSpec = encryptedRequisitionSpec } + + requisitionsServiceMock.stub { + onBlocking { listRequisitions(any()) } + .thenReturn(listRequisitionsResponse { requisitions += requisition }) + } + + val requisitionFulfiller = + PopulationRequisitionFulfiller( + PDP_DATA, + certificatesStub, + requisitionsStub, + dummyThrottler, + TRUSTED_CERTIFICATES, + MC_NAME, + modelRolloutsStub, + modelReleasesStub, + POPULATION_INFO_MAP, + TYPE_REGISTRY, + ) + + runBlocking { requisitionFulfiller.executeRequisitionFulfillingWorkflow() } + + val request: FulfillDirectRequisitionRequest = + verifyAndCapture( + requisitionsServiceMock, + RequisitionsCoroutineImplBase::fulfillDirectRequisition, + ) + val result: Measurement.Result = decryptResult(request.encryptedResult, MC_PRIVATE_KEY).unpack() + + // Population value calculated with requisition spec that contains banner_ad field in the filter + // expression should be equal to that which does not contain that field. banner_ad field should + // just be ignored. The result of this test should be the same as test `fulfills requisition for + // females using different ModelRelease` + assertThat(result.population.value).isEqualTo(VID_RANGE_3.size()) + } + + @Test + fun `throws error when attribute is not part of event descriptor`() { + val requisitionSpec = + REQUISITION_SPEC.copy { + population = + RequisitionSpecKt.population { + filter = eventFilter { expression = "person.gender == ${Person.Gender.FEMALE_VALUE}" } + } + } + + val encryptedRequisitionSpec = + encryptRequisitionSpec( + signRequisitionSpec(requisitionSpec, MC_SIGNING_KEY), + DATA_PROVIDER_PUBLIC_KEY, + ) + + val requisition = REQUISITION.copy { this.encryptedRequisitionSpec = encryptedRequisitionSpec } + + requisitionsServiceMock.stub { + onBlocking { listRequisitions(any()) } + .thenReturn(listRequisitionsResponse { requisitions += requisition }) + } + + val invalidPopulationInfoMap = mapOf(POPULATION_ID_1 to INVALID_POPULATION_INFO_1) + + val requisitionFulfiller = + PopulationRequisitionFulfiller( + PDP_DATA, + certificatesStub, + requisitionsStub, + dummyThrottler, + TRUSTED_CERTIFICATES, + MC_NAME, + modelRolloutsStub, + modelReleasesStub, + invalidPopulationInfoMap, + TYPE_REGISTRY, + ) + + val exception = + assertFailsWith { + runBlocking { requisitionFulfiller.executeRequisitionFulfillingWorkflow() } + } + + // Result should throw InvalidSpecException + assertThat(exception) + .hasMessageThat() + .contains("Subpopulation attribute is not a field in the event descriptor.") + } + + @Test + fun `throws error when population attribute is unspecified`() { + val requisitionSpec = + REQUISITION_SPEC.copy { + population = + RequisitionSpecKt.population { + filter = eventFilter { expression = "person.gender == ${Person.Gender.FEMALE_VALUE}" } + } + } + + val encryptedRequisitionSpec = + encryptRequisitionSpec( + signRequisitionSpec(requisitionSpec, MC_SIGNING_KEY), + DATA_PROVIDER_PUBLIC_KEY, + ) + + val requisition = REQUISITION.copy { this.encryptedRequisitionSpec = encryptedRequisitionSpec } + + requisitionsServiceMock.stub { + onBlocking { listRequisitions(any()) } + .thenReturn(listRequisitionsResponse { requisitions += requisition }) + } + + val INVALID_POPULATION_INFO_MAP = mapOf(POPULATION_ID_1 to INVALID_POPULATION_INFO_2) + + val requisitionFulfiller = + PopulationRequisitionFulfiller( + PDP_DATA, + certificatesStub, + requisitionsStub, + dummyThrottler, + TRUSTED_CERTIFICATES, + MC_NAME, + modelRolloutsStub, + modelReleasesStub, + INVALID_POPULATION_INFO_MAP, + TYPE_REGISTRY, + ) + + val exception = + assertFailsWith { + runBlocking { requisitionFulfiller.executeRequisitionFulfillingWorkflow() } + } + + // Result should throw InvalidSpecException + assertThat(exception) + .hasMessageThat() + .contains("Subpopulation population attribute cannot be unspecified.") + } + + @Test + fun `throws error when ModelRelease population id is not found in PopulationInfoMap`() { + val requisitionSpec = + REQUISITION_SPEC.copy { + population = + RequisitionSpecKt.population { + filter = eventFilter { expression = "person.gender == ${Person.Gender.FEMALE_VALUE}" } + } + } + + val encryptedRequisitionSpec = + encryptRequisitionSpec( + signRequisitionSpec(requisitionSpec, MC_SIGNING_KEY), + DATA_PROVIDER_PUBLIC_KEY, + ) + + val requisition = REQUISITION.copy { this.encryptedRequisitionSpec = encryptedRequisitionSpec } + + requisitionsServiceMock.stub { + onBlocking { listRequisitions(any()) } + .thenReturn(listRequisitionsResponse { requisitions += requisition }) + } + + val INVALID_POPULATION_ID = PopulationKey.defaultValue + val INVALID_POPULATION_INFO_MAP = mapOf(INVALID_POPULATION_ID to POPULATION_INFO_2) + + val requisitionFulfiller = + PopulationRequisitionFulfiller( + PDP_DATA, + certificatesStub, + requisitionsStub, + dummyThrottler, + TRUSTED_CERTIFICATES, + MC_NAME, + modelRolloutsStub, + modelReleasesStub, + INVALID_POPULATION_INFO_MAP, + TYPE_REGISTRY, + ) + + // Result should throw NoSuchElementException + val exception = + assertFailsWith { + runBlocking { requisitionFulfiller.executeRequisitionFulfillingWorkflow() } + } + + assertThat(exception).hasMessageThat().contains("Key $POPULATION_ID_1 is missing in the map.") + } + + @Test + fun `throws error when filter expression is not in operative field in population info`() { + val requisitionSpec = + REQUISITION_SPEC.copy { + population = + RequisitionSpecKt.population { + filter = eventFilter { + expression = + "person.gender == ${Person.Gender.FEMALE_VALUE} && other_field.value == other_value" + } + } + } + + val encryptedRequisitionSpec = + encryptRequisitionSpec( + signRequisitionSpec(requisitionSpec, MC_SIGNING_KEY), + DATA_PROVIDER_PUBLIC_KEY, + ) + + val requisition = REQUISITION.copy { this.encryptedRequisitionSpec = encryptedRequisitionSpec } + + requisitionsServiceMock.stub { + onBlocking { listRequisitions(any()) } + .thenReturn(listRequisitionsResponse { requisitions += requisition }) + } + + val requisitionFulfiller = + PopulationRequisitionFulfiller( + PDP_DATA, + certificatesStub, + requisitionsStub, + dummyThrottler, + TRUSTED_CERTIFICATES, + MC_NAME, + modelRolloutsStub, + modelReleasesStub, + POPULATION_INFO_MAP, + TYPE_REGISTRY, + ) + + // Result should throw EventFilterValidationException + val exception = + assertFailsWith { + runBlocking { requisitionFulfiller.executeRequisitionFulfillingWorkflow() } + } + + assertThat(exception) + .hasMessageThat() + .contains( + "undeclared reference to 'other_field' (in container 'wfa.measurement.api.v2alpha.event_templates.testing.TestEvent')" + ) + } + + companion object { + private const val MC_ID = "mc" + private const val MC_NAME = "measurementConsumers/$MC_ID" + private const val PDP_DISPLAY_NAME = "pdp1" + private val SECRET_FILES_PATH: Path = + checkNotNull( + getRuntimePath( + Paths.get("wfa_measurement_system", "src", "main", "k8s", "testing", "secretfiles") + ) + ) + private const val PDP_ID = "somePopulationDataProvider" + private const val PDP_NAME = "dataProviders/$PDP_ID" + + private val MEASUREMENT_CONSUMER_CERTIFICATE_DER = + SECRET_FILES_PATH.resolve("mc_cs_cert.der").toFile().readByteString() + private const val MEASUREMENT_CONSUMER_NAME = "measurementConsumers/AAAAAAAAAHs" + private const val MEASUREMENT_NAME = "$MC_NAME/measurements/BBBBBBBBBHs" + private const val MEASUREMENT_CONSUMER_CERTIFICATE_NAME = + "$MEASUREMENT_CONSUMER_NAME/certificates/AAAAAAAAAcg" + private val MEASUREMENT_CONSUMER_CERTIFICATE = certificate { + name = MEASUREMENT_CONSUMER_CERTIFICATE_NAME + x509Der = MEASUREMENT_CONSUMER_CERTIFICATE_DER + } + + private val CREATE_TIME_1: Timestamp = Instant.ofEpochSecond(123).toProtoTime() + private val CREATE_TIME_2: Timestamp = Instant.ofEpochSecond(456).toProtoTime() + + private const val DATA_PROVIDER_NAME = "dataProviders/AAAAAAAAAHs" + private const val POPULATION_NAME_1 = "$DATA_PROVIDER_NAME/populations/AAAAAAAAAHs" + private const val POPULATION_NAME_2 = "$DATA_PROVIDER_NAME/populations/AAAAAAAAAJs" + + private const val MODEL_PROVIDER_NAME = "modelProviders/AAAAAAAAAHs" + private const val MODEL_SUITE_NAME = "$MODEL_PROVIDER_NAME/modelSuites/AAAAAAAAAHs" + private const val MODEL_RELEASE_NAME_1 = "$MODEL_SUITE_NAME/modelReleases/AAAAAAAAAHs" + private const val MODEL_RELEASE_NAME_2 = "$MODEL_SUITE_NAME/modelReleases/AAAAAAAAAJs" + + private const val MODEL_LINE_NAME = "${MODEL_SUITE_NAME}/modelLines/AAAAAAAAAHs" + private const val MODEL_ROLLOUT_NAME = "${MODEL_LINE_NAME}/modelRollouts/AAAAAAAAAHs" + + private val MODEL_RELEASE_1 = modelRelease { + name = MODEL_RELEASE_NAME_1 + createTime = CREATE_TIME_1 + population = POPULATION_NAME_1 + } + + private val MODEL_RELEASE_2 = modelRelease { + name = MODEL_RELEASE_NAME_2 + createTime = CREATE_TIME_2 + population = POPULATION_NAME_2 + } + + private val MODEL_ROLLOUT = modelRollout { + name = MODEL_ROLLOUT_NAME + modelRelease = MODEL_RELEASE_NAME_1 + } + + private val PERSON_1 = person { + ageGroup = Person.AgeGroup.YEARS_18_TO_34 + gender = Person.Gender.MALE + socialGradeGroup = Person.SocialGradeGroup.A_B_C1 + } + + private val PERSON_2 = person { + ageGroup = Person.AgeGroup.YEARS_35_TO_54 + gender = Person.Gender.MALE + socialGradeGroup = Person.SocialGradeGroup.A_B_C1 + } + + private val PERSON_3 = person { + ageGroup = Person.AgeGroup.YEARS_18_TO_34 + gender = Person.Gender.FEMALE + socialGradeGroup = Person.SocialGradeGroup.A_B_C1 + } + + private val INVALID_PERSON = person { + gender = Person.Gender.FEMALE + ageGroup = Person.AgeGroup.AGE_GROUP_UNSPECIFIED + socialGradeGroup = Person.SocialGradeGroup.A_B_C1 + } + + private val ATTRIBUTE_1 = PERSON_1.pack() + + private val ATTRIBUTE_2 = PERSON_2.pack() + + private val ATTRIBUTE_3 = PERSON_3.pack() + + private val INVALID_ATTRIBUTE_1 = Dummy.getDefaultInstance().pack() + + private val INVALID_ATTRIBUTE_2 = INVALID_PERSON.pack() + + private val VID_RANGE_1 = vidRange { + startVid = 1 + endVidInclusive = 100 + } + + private val VID_RANGE_2 = vidRange { + startVid = 101 + endVidInclusive = 300 + } + + private val VID_RANGE_3 = vidRange { + startVid = 301 + endVidInclusive = 600 + } + + // Male 18-34 + private val SUB_POPULATION_1 = subPopulation { + attributes += listOf(ATTRIBUTE_1) + vidRanges += listOf(VID_RANGE_1) + } + + // Male 35-54 + private val SUB_POPULATION_2 = subPopulation { + attributes += listOf(ATTRIBUTE_2) + vidRanges += listOf(VID_RANGE_2) + } + + // Female 18-34 + private val SUB_POPULATION_3 = subPopulation { + attributes += listOf(ATTRIBUTE_3) + vidRanges += listOf(VID_RANGE_3) + } + + private val POPULATION_SPEC_1 = populationSpec { + subpopulations += listOf(SUB_POPULATION_1, SUB_POPULATION_2) + } + + private val POPULATION_SPEC_2 = populationSpec { subpopulations += listOf(SUB_POPULATION_3) } + + private val INVALID_POPULATION_SPEC_1 = populationSpec { + subpopulations += + listOf( + subPopulation { + attributes += listOf(INVALID_ATTRIBUTE_1) + vidRanges += listOf(VID_RANGE_1) + } + ) + } + + private val INVALID_POPULATION_SPEC_2 = populationSpec { + subpopulations += + listOf( + subPopulation { + attributes += listOf(INVALID_ATTRIBUTE_2) + vidRanges += listOf(VID_RANGE_1) + } + ) + } + + private val POPULATION_ID_1: PopulationKey = + requireNotNull(PopulationKey.fromName(POPULATION_NAME_1)) + private val POPULATION_ID_2: PopulationKey = + requireNotNull(PopulationKey.fromName(POPULATION_NAME_2)) + + private val POPULATION_INFO_1 = PopulationInfo(POPULATION_SPEC_1, TestEvent.getDescriptor()) + + private val POPULATION_INFO_2 = PopulationInfo(POPULATION_SPEC_2, TestEvent.getDescriptor()) + + private val INVALID_POPULATION_INFO_1 = + PopulationInfo(INVALID_POPULATION_SPEC_1, TestEvent.getDescriptor()) + + private val INVALID_POPULATION_INFO_2 = + PopulationInfo(INVALID_POPULATION_SPEC_2, TestEvent.getDescriptor()) + + private val POPULATION_INFO_MAP = + mapOf( + POPULATION_ID_1 to POPULATION_INFO_1, + POPULATION_ID_2 to POPULATION_INFO_2, + ) + + private val TYPE_REGISTRY = + TypeRegistry.newBuilder().add(Person.getDescriptor()).add(Dummy.getDescriptor()).build() + private val MC_SIGNING_KEY: SigningKeyHandle = + loadSigningKey("${MC_ID}_cs_cert.der", "${MC_ID}_cs_private.der") + private val PDP_SIGNING_KEY: SigningKeyHandle = + loadSigningKey("${PDP_DISPLAY_NAME}_cs_cert.der", "${PDP_DISPLAY_NAME}_cs_private.der") + private val DATA_PROVIDER_CERTIFICATE_KEY: DataProviderCertificateKey = + DataProviderCertificateKey(PDP_ID, externalIdToApiId(8L)) + + private val DATA_PROVIDER_CERTIFICATE: Certificate = certificate { + name = DATA_PROVIDER_CERTIFICATE_KEY.toName() + x509Der = PDP_SIGNING_KEY.certificate.encoded.toByteString() + subjectKeyIdentifier = PDP_SIGNING_KEY.certificate.subjectKeyIdentifier!! + } + + private val PDP_DATA = + DataProviderData( + PDP_NAME, + PDP_DISPLAY_NAME, + loadEncryptionPrivateKey("${PDP_DISPLAY_NAME}_enc_private.tink"), + PDP_SIGNING_KEY, + DATA_PROVIDER_CERTIFICATE_KEY, + ) + + private val MC_PUBLIC_KEY: EncryptionPublicKey = + loadPublicKey(SECRET_FILES_PATH.resolve("mc_enc_public.tink").toFile()) + .toEncryptionPublicKey() + private val MC_PRIVATE_KEY: TinkPrivateKeyHandle = + loadPrivateKey(SECRET_FILES_PATH.resolve("mc_enc_private.tink").toFile()) + private val DATA_PROVIDER_PUBLIC_KEY: EncryptionPublicKey = + loadPublicKey(SECRET_FILES_PATH.resolve("${PDP_DISPLAY_NAME}_enc_public.tink").toFile()) + .toEncryptionPublicKey() + + private val REQUISITION_SPEC = requisitionSpec { + population = + RequisitionSpecKt.population { + filter = eventFilter { + expression = "person.age_group == ${Person.AgeGroup.YEARS_18_TO_34_VALUE}" + } + } + measurementPublicKey = MC_PUBLIC_KEY.pack() + nonce = Random.Default.nextLong() + } + private val ENCRYPTED_REQUISITION_SPEC = + encryptRequisitionSpec( + signRequisitionSpec(REQUISITION_SPEC, MC_SIGNING_KEY), + DATA_PROVIDER_PUBLIC_KEY, + ) + + private val OUTPUT_DP_PARAMS = differentialPrivacyParams { + epsilon = 1.0 + delta = 1E-12 + } + private val MEASUREMENT_SPEC = measurementSpec { + measurementPublicKey = MC_PUBLIC_KEY.pack() + reachAndFrequency = reachAndFrequency { + reachPrivacyParams = OUTPUT_DP_PARAMS + frequencyPrivacyParams = OUTPUT_DP_PARAMS + maximumFrequency = 10 + } + vidSamplingInterval = vidSamplingInterval { + start = 0.0f + width = 1.0f + } + nonceHashes += Hashing.hashSha256(REQUISITION_SPEC.nonce) + modelLine = MODEL_LINE_NAME + } + + private val REQUISITION = requisition { + name = "${PDP_NAME}/requisitions/foo" + measurement = MEASUREMENT_NAME + state = Requisition.State.UNFULFILLED + measurementConsumerCertificate = MEASUREMENT_CONSUMER_CERTIFICATE_NAME + measurementSpec = signMeasurementSpec(MEASUREMENT_SPEC, MC_SIGNING_KEY) + encryptedRequisitionSpec = ENCRYPTED_REQUISITION_SPEC + protocolConfig = protocolConfig { + protocols += + ProtocolConfigKt.protocol { + direct = + ProtocolConfigKt.direct { + deterministicCountDistinct = + ProtocolConfig.Direct.DeterministicCountDistinct.getDefaultInstance() + deterministicDistribution = + ProtocolConfig.Direct.DeterministicDistribution.getDefaultInstance() + } + } + } + dataProviderCertificate = DATA_PROVIDER_CERTIFICATE.name + dataProviderPublicKey = DATA_PROVIDER_PUBLIC_KEY.pack() + } + + private val TRUSTED_CERTIFICATES: Map = + readCertificateCollection(SECRET_FILES_PATH.resolve("edp_trusted_certs.pem").toFile()) + .associateBy { requireNotNull(it.authorityKeyIdentifier) } + + /** Dummy [Throttler] for satisfying signatures without being used. */ + private val dummyThrottler = + object : Throttler { + override suspend fun onReady(block: suspend () -> T): T { + throw UnsupportedOperationException("Should not be called") + } + } + + private fun loadSigningKey( + certDerFileName: String, + privateKeyDerFileName: String, + ): SigningKeyHandle { + return loadSigningKey( + SECRET_FILES_PATH.resolve(certDerFileName).toFile(), + SECRET_FILES_PATH.resolve(privateKeyDerFileName).toFile(), + ) + } + + private fun loadEncryptionPrivateKey(fileName: String): TinkPrivateKeyHandle { + return loadPrivateKey(SECRET_FILES_PATH.resolve(fileName).toFile()) + } + } +}