diff --git a/.fluence/aqua/services.aqua b/.fluence/aqua/services.aqua index 87b80ab..572e3b5 100644 --- a/.fluence/aqua/services.aqua +++ b/.fluence/aqua/services.aqua @@ -7,6 +7,7 @@ data ExecutionConfig: private_key: string ceramic_endpoint: string checkpointer_endpoint: string + attestation_issuer: string attestation_model_id: string materialization_model_id: string diff --git a/Cargo.toml b/Cargo.toml index 630c6dd..bd01590 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,6 @@ members = [ anyhow = "1.0.80" async-trait = "0.1.77" base64 = "0.21.7" - clap = { version = "=4.4.18", default-features = false, features = ["derive", "std"] } curve25519-dalek = "=4.1.1" hmac = "0.12.1" jwt = "0.16.0" @@ -25,6 +24,11 @@ members = [ tracing-appender = "0.2.3" url = "2.5.0" + [workspace.dependencies.clap] + version = "=4.4.18" + default-features = false + features = [ "derive", "std" ] + [workspace.dependencies.chrono] version = "0.4.34" features = [ "serde" ] diff --git a/calculator/src/calculator.rs b/calculator/src/calculator.rs index 0ae0c67..f943cb9 100644 --- a/calculator/src/calculator.rs +++ b/calculator/src/calculator.rs @@ -10,6 +10,7 @@ use std::str::FromStr; #[derive(Clone, Debug)] pub struct CalculatorParameters { + pub attestation_issuer: DidDocument, pub attestation_model_id: StreamId, pub materialization_model_id: StreamId, } @@ -23,7 +24,12 @@ impl CalculatorParameters { std::env::var("MATERIALIZATION_MODEL_ID").unwrap_or_else(|_| { "kjzl6hvfrbw6c88slfzg2mw6jvin2hgv2v24tbl9u0xc97f4pr4755xjr2l6sck".to_string() }); + let attestation_issuer = + DidDocument::new(&std::env::var("ATTESTATION_ISSUER").unwrap_or_else(|_| { + "did:key:z6MkhER5181mt9PBCrnVvL9AcdWyzSzj4PLgGVKSFjJ8obMN".to_string() + })); Ok(Self { + attestation_issuer, attestation_model_id: StreamId::from_str(&attestation_model_id)?, materialization_model_id: StreamId::from_str(&materialization_model_id)?, }) @@ -56,9 +62,16 @@ impl Calculator { let attestation_stream_id = StreamId::from_str(&event.commit_id)?; match serde_json::from_str::(&event.content) { Ok(attestation) => { + if attestation.issuer != self.params.attestation_issuer.id { + tracing::warn!( + "Attestation issuer {} does not match expected {}", + attestation.issuer, + self.params.attestation_issuer.id + ); + return Ok(()); + } if let Err(e) = validate_attestation(&attestation).await { tracing::warn!("Error validating attestation: {}", e); - return Ok(()); } unique_events( &mut self.cache, diff --git a/fluence.yaml b/fluence.yaml index 6ad33f6..6c14dbd 100644 --- a/fluence.yaml +++ b/fluence.yaml @@ -9,8 +9,8 @@ version: 8 deployments: myDeployment: targetWorkers: 1 - pricePerWorkerEpoch: 0.00001 - initialBalance: 0.001 + pricePerWorkerEpoch: "0.00001" + initialBalance: "0.001" services: [ event_joiner ] spells: [ event_joiner_periodic ] diff --git a/fluence/src/services/event_joiner/modules/event_joiner/src/main.rs b/fluence/src/services/event_joiner/modules/event_joiner/src/main.rs index cca2969..89db2c3 100644 --- a/fluence/src/services/event_joiner/modules/event_joiner/src/main.rs +++ b/fluence/src/services/event_joiner/modules/event_joiner/src/main.rs @@ -26,6 +26,7 @@ pub struct ExecutionConfig { pub private_key: String, pub ceramic_endpoint: String, pub checkpointer_endpoint: String, + pub attestation_issuer: String, pub attestation_model_id: String, pub materialization_model_id: String, } @@ -64,12 +65,14 @@ async fn try_process_events(cfg: ExecutionConfig) -> Result = Box::new(Ceramic::new(did.clone(), &cfg.private_key, ceramic_endpoint).await?); let mut calculator = calculator::Calculator::new( calculator::CalculatorParameters { + attestation_issuer, attestation_model_id, materialization_model_id, }, diff --git a/fluence/src/spells/event_joiner_periodic/spell.aqua b/fluence/src/spells/event_joiner_periodic/spell.aqua index 0a95193..9df4c68 100644 --- a/fluence/src/spells/event_joiner_periodic/spell.aqua +++ b/fluence/src/spells/event_joiner_periodic/spell.aqua @@ -12,6 +12,7 @@ func spell(): private_key = "pk", ceramic_endpoint = "http://localhost:7007", checkpointer_endpoint = "http://localhost:8080", + attestation_issuer = "did:key:z6MkhER5181mt9PBCrnVvL9AcdWyzSzj4PLgGVKSFjJ8obMN", attestation_model_id = "kjz", materialization_model_id = "kjz" ))