Skip to content

Commit

Permalink
Update telemetry + traces + cleanup (#788)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dzejkop authored Oct 15, 2024
1 parent cf5e035 commit b81f759
Show file tree
Hide file tree
Showing 22 changed files with 3,055 additions and 2,394 deletions.
2,321 changes: 1,389 additions & 932 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ default = []
anyhow = { version = "1.0.68" }
async-stream = "0.3.3"
async-trait = "0.1.64"
axum = "0.6.4"
axum-server = "0.4.4"
axum = "0.7.7"
axum-server = "0.7.1"
bytes = "1.4.0"
chrono = { version = "0.4.19", features = ["serde"] }
clap = { version = "4.0", features = ["derive", "env"] }
telemetry-batteries = { git = "https://github.com/worldcoin/telemetry-batteries.git", rev = "12cc036234b4e9b86f22ff7e35d499e2ff1e6304" }
telemetry-batteries = { git = "https://github.com/worldcoin/telemetry-batteries.git", rev = "901ea26e478c81e10d5d4355ac628ab7b15afca7" }
config = "0.13.4"
ethers = { version = "2.0.10", features = ["ws", "ipc", "openssl", "abigen"] }
ethers-solc = "2.0.10"
Expand All @@ -41,13 +41,13 @@ hex = "0.4.3"
hex-literal = "0.4.1"
humantime = "2.1.0"
humantime-serde = "1.1.1"
hyper = { version = "^0.14.17", features = ["server", "tcp", "http1", "http2"] }
hyper = { version = "1.4.1", features = ["server", "http1", "http2"] }
indoc = "2.0.4"
once_cell = "1.8"
oz-api = { path = "crates/oz-api" }
# We need upstream PR#465 to fix #272.
prometheus = "0.13.3"
reqwest = { version = "0.11.18", features = ["json"] }
reqwest = { version = "0.12.8", features = ["json"] }
ruint = { version = "1.12.3", features = ["primitive-types", "sqlx"] }
semaphore = { git = "https://github.com/worldcoin/semaphore-rs", rev = "251e908d89d598c976901306bc29f06ab59e799d", features = [
"depth_30",
Expand Down
4 changes: 3 additions & 1 deletion crates/tx-sitter-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ edition = "2021"
publish = false

[dependencies]
telemetry-batteries = { git = "https://github.com/worldcoin/telemetry-batteries.git", rev = "901ea26e478c81e10d5d4355ac628ab7b15afca7" }

anyhow = "1.0"
ethers = { version = "2.0.10", features = [ ] }
reqwest = "0.11.14"
reqwest = { version = "0.12.8", features = ["json"] }
serde = { version = "1.0.154", features = ["derive"] }
serde_json = "1.0.94"
strum = { version = "0.25", features = ["derive"] }
Expand Down
25 changes: 20 additions & 5 deletions crates/tx-sitter-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,41 @@
use data::{GetTxResponse, SendTxRequest, SendTxResponse, TxStatus};
use reqwest::Response;
use reqwest::header::HeaderMap;
use reqwest::{RequestBuilder, Response};
use telemetry_batteries::tracing::trace_to_headers;
use tracing::instrument;

pub mod data;

pub struct TxSitterClient {
client: reqwest::Client,
url: String,
url: String,
}

impl TxSitterClient {
pub fn new(url: impl ToString) -> Self {
Self {
client: reqwest::Client::new(),
url: url.to_string(),
url: url.to_string(),
}
}

fn inject_tracing_headers(req_builder: RequestBuilder) -> RequestBuilder {
let mut headers = HeaderMap::new();

trace_to_headers(&mut headers);

req_builder.headers(headers)
}

async fn json_post<T, R>(&self, url: &str, body: T) -> anyhow::Result<R>
where
T: serde::Serialize,
R: serde::de::DeserializeOwned,
{
let response = self.client.post(url).json(&body).send().await?;
let response = Self::inject_tracing_headers(self.client.post(url))
.json(&body)
.send()
.await?;

let response = Self::validate_response(response).await?;

Expand All @@ -33,7 +46,9 @@ impl TxSitterClient {
where
R: serde::de::DeserializeOwned,
{
let response = self.client.get(url).send().await?;
let response = Self::inject_tracing_headers(self.client.get(url))
.send()
.await?;

let response = Self::validate_response(response).await?;

Expand Down
8 changes: 4 additions & 4 deletions src/bin/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ async fn main() -> anyhow::Result<()> {
let client = reqwest::Client::new();

let response = client
.post(&format!("{}/insertIdentity", args.sequencer_url))
.post(format!("{}/insertIdentity", args.sequencer_url))
.basic_auth(basic_auth_username, Some(basic_auth_password))
.json(&InsertCommitmentRequest {
identity_commitment,
Expand All @@ -167,7 +167,7 @@ async fn main() -> anyhow::Result<()> {
let client = reqwest::Client::new();

let response = client
.post(&format!("{}/inclusionProof", args.sequencer_url))
.post(format!("{}/inclusionProof", args.sequencer_url))
.json(&InclusionProofRequest {
identity_commitment,
})
Expand Down Expand Up @@ -199,7 +199,7 @@ async fn main() -> anyhow::Result<()> {
let client = reqwest::Client::new();

let response = client
.post(&format!("{}/verifySemaphoreProof", args.sequencer_url))
.post(format!("{}/verifySemaphoreProof", args.sequencer_url))
.json(&proof_request)
.send()
.await?;
Expand All @@ -218,7 +218,7 @@ async fn main() -> anyhow::Result<()> {
let client = reqwest::Client::new();

let response = client
.post(&format!("{}/inclusionProof", args.sequencer_url))
.post(format!("{}/inclusionProof", args.sequencer_url))
.json(&InclusionProofRequest {
identity_commitment: identity.commitment(),
})
Expand Down
94 changes: 5 additions & 89 deletions src/database/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ use sqlx::{Executor, Postgres, Row};
use tracing::instrument;
use types::{DeletionEntry, LatestDeletionEntry, RecoveryEntry};

use crate::database::types::{
BatchEntry, BatchEntryData, BatchType, LatestInsertionEntry, TransactionEntry,
};
use crate::database::types::{BatchEntry, BatchEntryData, BatchType, LatestInsertionEntry};
use crate::database::{types, Error};
use crate::identity_tree::{
Hash, ProcessedStatus, RootItem, TreeItem, TreeUpdate, UnprocessedStatus,
Expand Down Expand Up @@ -186,17 +184,6 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> {
.collect())
}

async fn get_latest_root(self) -> Result<Option<Hash>, Error> {
Ok(sqlx::query(
r#"
SELECT root FROM identities ORDER BY id DESC LIMIT 1
"#,
)
.fetch_optional(self)
.await?
.map(|r| r.get::<Hash, _>(0)))
}

async fn get_latest_root_by_status(
self,
status: ProcessedStatus,
Expand Down Expand Up @@ -439,6 +426,7 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> {
Ok(())
}

#[cfg(test)]
async fn get_all_recoveries(self) -> Result<Vec<RecoveryEntry>, Error> {
Ok(
sqlx::query_as::<_, RecoveryEntry>("SELECT * FROM recoveries")
Expand Down Expand Up @@ -620,7 +608,7 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> {
.bind(BatchType::Insertion)
.bind(sqlx::types::Json::from(BatchEntryData {
identities: vec![],
indexes: vec![],
indexes: vec![],
}));

self.execute(query).await?;
Expand Down Expand Up @@ -652,13 +640,14 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> {
.bind(batch_type)
.bind(sqlx::types::Json::from(BatchEntryData {
identities: identities.to_vec(),
indexes: indexes.to_vec(),
indexes: indexes.to_vec(),
}));

self.execute(query).await?;
Ok(())
}

#[cfg(test)]
async fn get_next_batch(self, prev_root: &Hash) -> Result<Option<BatchEntry>, Error> {
let res = sqlx::query_as::<_, BatchEntry>(
r#"
Expand Down Expand Up @@ -701,29 +690,6 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> {
Ok(res)
}

async fn get_latest_batch_with_transaction(self) -> Result<Option<BatchEntry>, Error> {
let res = sqlx::query_as::<_, BatchEntry>(
r#"
SELECT
batches.id,
batches.next_root,
batches.prev_root,
batches.created_at,
batches.batch_type,
batches.data
FROM batches
LEFT JOIN transactions ON batches.next_root = transactions.batch_next_root
WHERE transactions.batch_next_root IS NOT NULL AND batches.prev_root IS NOT NULL
ORDER BY batches.id DESC
LIMIT 1
"#,
)
.fetch_optional(self)
.await?;

Ok(res)
}

async fn get_next_batch_without_transaction(self) -> Result<Option<BatchEntry>, Error> {
let res = sqlx::query_as::<_, BatchEntry>(
r#"
Expand Down Expand Up @@ -767,26 +733,6 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> {
Ok(res)
}

async fn get_all_batches_after(self, id: i64) -> Result<Vec<BatchEntry>, Error> {
let res = sqlx::query_as::<_, BatchEntry>(
r#"
SELECT
id,
next_root,
prev_root,
created_at,
batch_type,
data
FROM batches WHERE id >= $1 ORDER BY id ASC
"#,
)
.bind(id)
.fetch_all(self)
.await?;

Ok(res)
}

#[instrument(skip(self), level = "debug")]
async fn delete_batches_after_root(self, root: &Hash) -> Result<(), Error> {
let query = sqlx::query(
Expand All @@ -813,15 +759,6 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> {
Ok(())
}

async fn root_in_batch_chain(self, root: &Hash) -> Result<bool, Error> {
let query = sqlx::query(
r#"SELECT exists(SELECT 1 FROM batches where prev_root = $1 OR next_root = $1)"#,
)
.bind(root);
let row_unprocessed = self.fetch_one(query).await?;
Ok(row_unprocessed.get::<bool, _>(0))
}

async fn insert_new_transaction(
self,
transaction_id: &String,
Expand All @@ -842,25 +779,4 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> {
self.execute(query).await?;
Ok(())
}

async fn get_transaction_for_batch(
self,
next_root: &Hash,
) -> Result<Option<TransactionEntry>, Error> {
let res = sqlx::query_as::<_, TransactionEntry>(
r#"
SELECT
transaction_id,
batch_next_root,
created_at
FROM transactions WHERE batch_next_root = $1
LIMIT 1
"#,
)
.bind(next_root)
.fetch_optional(self)
.await?;

Ok(res)
}
}
32 changes: 15 additions & 17 deletions src/database/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,22 @@ use crate::identity_tree::{Hash, UnprocessedStatus};
use crate::prover::identity::Identity;

pub struct UnprocessedCommitment {
pub commitment: Hash,
pub status: UnprocessedStatus,
pub created_at: DateTime<Utc>,
pub processed_at: Option<DateTime<Utc>>,
pub error_message: Option<String>,
pub commitment: Hash,
pub status: UnprocessedStatus,
pub created_at: DateTime<Utc>,
pub processed_at: Option<DateTime<Utc>>,
pub error_message: Option<String>,
pub eligibility_timestamp: DateTime<Utc>,
}

#[derive(FromRow)]
pub struct RecoveryEntry {
// existing commitment is used in tests only, but recoveries in general
// are used in production code via the FromRow trait
// so removing this field would break the production code
#[allow(unused)]
pub existing_commitment: Hash,
pub new_commitment: Hash,
pub new_commitment: Hash,
}

pub struct LatestInsertionEntry {
Expand Down Expand Up @@ -68,26 +72,20 @@ impl std::fmt::Display for BatchType {

#[derive(Debug, Clone, FromRow)]
pub struct BatchEntry {
pub id: i64,
pub next_root: Hash,
pub id: i64,
pub next_root: Hash,
// In general prev_root is present all the time except the first row (head of the batches
// chain)
pub prev_root: Option<Hash>,
pub prev_root: Option<Hash>,
pub created_at: DateTime<Utc>,
pub batch_type: BatchType,
pub data: sqlx::types::Json<BatchEntryData>,
pub data: sqlx::types::Json<BatchEntryData>,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct BatchEntryData {
pub identities: Vec<Identity>,
pub indexes: Vec<usize>,
}

#[derive(Debug, Clone, FromRow)]
pub struct TransactionEntry {
pub batch_next_root: Hash,
pub transaction_id: String,
pub indexes: Vec<usize>,
}

#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down
Loading

0 comments on commit b81f759

Please sign in to comment.