Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ci/Dockerfile.bridge
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ ENV CELESTIA_HOME=/root
RUN apk update && apk add --no-cache bash jq

# Copy in the binary
COPY --from=ghcr.io/celestiaorg/celestia-node:v0.13.1 /bin/celestia /bin/celestia
COPY --from=ghcr.io/celestiaorg/celestia-node:v0.13.1 /bin/cel-key /bin/cel-key
COPY --from=ghcr.io/celestiaorg/celestia-node:v0.15.0 /bin/celestia /bin/celestia
COPY --from=ghcr.io/celestiaorg/celestia-node:v0.15.0 /bin/cel-key /bin/cel-key

COPY ./run-bridge.sh /opt/entrypoint.sh

Expand Down
2 changes: 1 addition & 1 deletion ci/Dockerfile.validator
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ ENV CELESTIA_HOME=/root
RUN apk update && apk add --no-cache bash jq

# Copy in the binary
COPY --from=ghcr.io/celestiaorg/celestia-app:v1.7.0 /bin/celestia-appd /bin/celestia-appd
COPY --from=ghcr.io/celestiaorg/celestia-app:v2.0.0 /bin/celestia-appd /bin/celestia-appd

COPY ./run-validator.sh /opt/entrypoint.sh

Expand Down
2 changes: 1 addition & 1 deletion ci/run-bridge.sh
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ main() {
--rpc.skip-auth=$SKIP_AUTH \
--rpc.addr 0.0.0.0 \
--core.ip validator \
--keyring.accname "$NODE_NAME" \
--keyring.keyname "$NODE_NAME" \
--p2p.network "$P2P_NETWORK"
}

Expand Down
19 changes: 3 additions & 16 deletions ci/run-validator.sh
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ setup_private_validator() {
celestia-appd add-genesis-account "$validator_addr" "$VALIDATOR_COINS"
# Generate a genesis transaction that creates a validator with a self-delegation
celestia-appd gentx "$NODE_NAME" 5000000000utia \
--fees 500utia \
--keyring-backend="test" \
--chain-id "$P2P_NETWORK"
# Collect the genesis transactions and form a genesis.json
Expand All @@ -135,22 +136,8 @@ setup_private_validator() {
# bringing this value too low results in errors
sed -i'.bak' 's|^timeout_commit.*|timeout_commit = "1s"|g' "$CONFIG_DIR/config/config.toml"

# Register the validator EVM address
{
# wait for the genesis
wait_for_block 1

# private key: da6ed55cb2894ac2c9c10209c09de8e8b9d109b910338d5bf3d747a7e1fc9eb9
celestia-appd tx qgb register \
"$(celestia-appd keys show "$NODE_NAME" --bech val -a)" \
0x966e6f22781EF6a6A82BBB4DB3df8E225DfD9488 \
--from "$NODE_NAME" \
--fees 30000utia \
-b block \
-y

echo "Registered validator's EVM address"
} &
# Set app version to 1
sed -i'.bak' 's|"app_version": "2"|"app_version": "1"|g' "$CONFIG_DIR/config/genesis.json"
}

main() {
Expand Down
4 changes: 2 additions & 2 deletions rpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ This crate builds on top of the [`jsonrpsee`](https://docs.rs/jsonrpsee) clients
```rust,no_run
use celestia_rpc::{BlobClient, Client};
use celestia_types::{Blob, nmt::Namespace};
use celestia_types::blob::GasPrice;
use celestia_types::TxConfig;

async fn submit_blob() {
// create a client to the celestia node
Expand All @@ -22,7 +22,7 @@ async fn submit_blob() {
.expect("Failed to create a blob");

// submit it
client.blob_submit(&[blob], GasPrice::default())
client.blob_submit(&[blob], TxConfig::default())
.await
.expect("Failed submitting the blob");
}
Expand Down
32 changes: 28 additions & 4 deletions rpc/src/blob.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
//! celestia-node rpc types and methods related to blobs

use celestia_types::nmt::{Namespace, NamespaceProof};
use celestia_types::{blob::GasPrice, Blob, Commitment};
use celestia_types::{Blob, Commitment, TxConfig};
use jsonrpsee::proc_macros::rpc;
use serde::{Deserialize, Serialize};

/// Response type for [`BlobClient::blob_subscribe`].
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct BlobsAtHeight {
/// Blobs submitted at given height.
pub blobs: Option<Vec<Blob>>,
/// A height for which the blobs were returned.
pub height: u64,
}

#[rpc(client)]
pub trait Blob {
Expand All @@ -15,8 +28,11 @@ pub trait Blob {

/// GetAll returns all blobs under the given namespaces and height.
#[method(name = "blob.GetAll")]
async fn blob_get_all(&self, height: u64, namespaces: &[Namespace])
-> Result<Vec<Blob>, Error>;
async fn blob_get_all(
&self,
height: u64,
namespaces: &[Namespace],
) -> Result<Option<Vec<Blob>>, Error>;

/// GetProof retrieves proofs in the given namespaces at the given height by commitment.
#[method(name = "blob.GetProof")]
Expand All @@ -39,5 +55,13 @@ pub trait Blob {

/// Submit sends Blobs and reports the height in which they were included. Allows sending multiple Blobs atomically synchronously. Uses default wallet registered on the Node.
#[method(name = "blob.Submit")]
async fn blob_submit(&self, blobs: &[Blob], gas_price: GasPrice) -> Result<u64, Error>;
async fn blob_submit(&self, blobs: &[Blob], opts: TxConfig) -> Result<u64, Error>;

/// Subscribe to published blobs from the given namespace as they are included.
///
/// # Notes
///
/// Unsubscribe is not implemented by Celestia nodes.
#[subscription(name = "blob.Subscribe", unsubscribe = "blob.Unsubscribe", item = BlobsAtHeight)]
async fn blob_subscribe(&self, namespace: Namespace) -> SubcriptionResult;
}
2 changes: 1 addition & 1 deletion rpc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![cfg_attr(docsrs, feature(doc_cfg))]
#![doc = include_str!("../README.md")]

mod blob;
pub mod blob;
pub mod client;
mod error;
mod header;
Expand Down
26 changes: 8 additions & 18 deletions rpc/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use celestia_types::state::{
AccAddress, Address, Balance, QueryDelegationResponse, QueryRedelegationsResponse,
QueryUnbondingDelegationResponse, RawTx, TxResponse, Uint, ValAddress,
QueryUnbondingDelegationResponse, TxResponse, Uint, ValAddress,
};
use celestia_types::Blob;
use celestia_types::{Blob, TxConfig};
use jsonrpsee::proc_macros::rpc;

#[rpc(client)]
Expand Down Expand Up @@ -30,8 +30,7 @@ pub trait State {
src: &ValAddress,
dest: &ValAddress,
amount: Uint,
fee: Uint,
gas_limit: u64,
config: TxConfig,
) -> Result<TxResponse, Error>;

/// CancelUnbondingDelegation cancels a user's pending undelegation from a validator.
Expand All @@ -41,8 +40,7 @@ pub trait State {
addr: &ValAddress,
amount: Uint,
height: Uint,
fee: Uint,
gas_limit: u64,
config: TxConfig,
) -> Result<TxResponse, Error>;

/// Delegate sends a user's liquid tokens to a validator for delegation.
Expand All @@ -51,8 +49,7 @@ pub trait State {
&self,
addr: &ValAddress,
amount: Uint,
fee: Uint,
gas_limit: u64,
config: TxConfig,
) -> Result<TxResponse, Error>;

/// IsStopped checks if the Module's context has been stopped.
Expand Down Expand Up @@ -85,23 +82,17 @@ pub trait State {
#[method(name = "state.SubmitPayForBlob")]
async fn state_submit_pay_for_blob(
&self,
fee: Uint,
gas_limit: u64,
blobs: &[Blob],
config: TxConfig,
) -> Result<TxResponse, Error>;

/// SubmitTx submits the given transaction/message to the Celestia network and blocks until the tx is included in a block.
#[method(name = "state.SubmitTx")]
async fn state_submit_tx(&self, tx: &RawTx) -> Result<TxResponse, Error>;

/// Transfer sends the given amount of coins from default wallet of the node to the given account address.
#[method(name = "state.Transfer")]
async fn state_transfer(
&self,
to: &AccAddress,
amount: Uint,
fee: Uint,
gas_limit: u64,
config: TxConfig,
) -> Result<TxResponse, Error>;

/// Undelegate undelegates a user's delegated tokens, unbonding them from the current validator.
Expand All @@ -110,7 +101,6 @@ pub trait State {
&self,
addr: &ValAddress,
amount: Uint,
fee: Uint,
gas_limit: u64,
config: TxConfig,
) -> Result<TxResponse, Error>;
}
66 changes: 66 additions & 0 deletions rpc/tests/blob.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
#![cfg(not(target_arch = "wasm32"))]

use std::cmp::Ordering;
use std::time::Duration;

use celestia_rpc::blob::BlobsAtHeight;
use celestia_rpc::prelude::*;
use celestia_types::{Blob, Commitment};
use jsonrpsee::core::client::Subscription;

pub mod utils;

Expand Down Expand Up @@ -63,6 +66,7 @@ async fn blob_submit_and_get_all() {
let received_blobs = client
.blob_get_all(submitted_height, namespaces)
.await
.unwrap()
.unwrap();

assert_eq!(received_blobs.len(), 2);
Expand Down Expand Up @@ -113,6 +117,46 @@ async fn blob_submit_and_get_large() {
// because without it we can't know how many shares there are in each row
}

#[tokio::test]
async fn blob_subscribe() {
let client = new_test_client(AuthLevel::Write).await.unwrap();
let namespace = random_ns();

let mut incoming_blobs = client.blob_subscribe(namespace).await.unwrap();

// nothing was submitted
let received_blobs = incoming_blobs.next().await.unwrap().unwrap();
assert!(received_blobs.blobs.is_none());

// submit and receive blob
let blob = Blob::new(namespace, random_bytes(10)).unwrap();
let current_height = blob_submit(&client, &[blob.clone()]).await.unwrap();

let received = blobs_at_height(current_height, &mut incoming_blobs).await;
assert_eq!(received.len(), 1);
assert_blob_equal_to_sent(&received[0], &blob);

// submit blob to another ns
let blob_another_ns = Blob::new(random_ns(), random_bytes(10)).unwrap();
let current_height = blob_submit(&client, &[blob_another_ns]).await.unwrap();

let received = blobs_at_height(current_height, &mut incoming_blobs).await;
assert!(received.is_empty());

// submit and receive few blobs
let blob1 = Blob::new(namespace, random_bytes(10)).unwrap();
let blob2 = Blob::new(random_ns(), random_bytes(10)).unwrap(); // different ns
let blob3 = Blob::new(namespace, random_bytes(10)).unwrap();
let current_height = blob_submit(&client, &[blob1.clone(), blob2, blob3.clone()])
.await
.unwrap();

let received = blobs_at_height(current_height, &mut incoming_blobs).await;
assert_eq!(received.len(), 2);
assert_blob_equal_to_sent(&received[0], &blob1);
assert_blob_equal_to_sent(&received[1], &blob3);
}

#[tokio::test]
async fn blob_submit_too_large() {
let client = new_test_client(AuthLevel::Write).await.unwrap();
Expand Down Expand Up @@ -164,6 +208,28 @@ async fn blob_get_get_proof_wrong_commitment() {
.unwrap_err();
}

#[tokio::test]
async fn blob_get_all_with_no_blobs() {
let client = new_test_client(AuthLevel::Read).await.unwrap();

let blobs = client.blob_get_all(3, &[random_ns()]).await.unwrap();

assert!(blobs.is_none());
}

// Skips blobs at height subscription until provided height is reached, then return blobs for the height
async fn blobs_at_height(height: u64, sub: &mut Subscription<BlobsAtHeight>) -> Vec<Blob> {
while let Some(received) = sub.next().await {
let received = received.unwrap();
match received.height.cmp(&height) {
Ordering::Less => continue,
Ordering::Equal => return received.blobs.unwrap_or_default(),
Ordering::Greater => panic!("height {height} missed"),
}
}
panic!("subscription error");
}

/// Blobs received from chain have index field set, so to
/// compare if they are equal to the ones we sent, we need
/// to overwrite the index field with received one.
Expand Down
9 changes: 5 additions & 4 deletions rpc/tests/utils/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use std::sync::OnceLock;
use anyhow::Result;
use celestia_rpc::prelude::*;
use celestia_rpc::Client;
use celestia_types::{blob::GasPrice, Blob};
use jsonrpsee::core::client::ClientT;
use celestia_types::Blob;
use celestia_types::TxConfig;
use jsonrpsee::core::client::SubscriptionClientT;
use jsonrpsee::core::ClientError;
use tokio::sync::{Mutex, MutexGuard};

Expand Down Expand Up @@ -52,8 +53,8 @@ pub async fn new_test_client(auth_level: AuthLevel) -> Result<Client> {

pub async fn blob_submit<C>(client: &C, blobs: &[Blob]) -> Result<u64, ClientError>
where
C: ClientT + Sync,
C: SubscriptionClientT + Sync,
{
let _guard = write_lock().await;
client.blob_submit(blobs, GasPrice::default()).await
client.blob_submit(blobs, TxConfig::default()).await
}
45 changes: 0 additions & 45 deletions types/src/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,6 @@ use crate::consts::appconsts;
use crate::nmt::Namespace;
use crate::{bail_validation, Error, Result, Share};

/// GasPrice represents the amount to be paid per gas unit.
///
/// Fee is set by multiplying GasPrice by GasLimit, which is determined by the blob sizes.
/// If no value is provided, then this will be serialized to `-1.0` which means the node that
/// receives the request will calculate the GasPrice for given blob.
/// Read more about the mechanisms of fees and gas usage in [`submitting data blobs`].
///
/// [`submitting data blobs`]: https://docs.celestia.org/developers/submit-data#fees-and-gas-limits
#[derive(Debug, Default, Copy, Clone, PartialEq, Serialize, Deserialize)]
#[serde(transparent)]
pub struct GasPrice(#[serde(with = "gas_prize_serde")] Option<f64>);

impl From<f64> for GasPrice {
fn from(value: f64) -> Self {
Self(Some(value))
}
}

impl From<Option<f64>> for GasPrice {
fn from(value: Option<f64>) -> Self {
Self(value)
}
}

/// Arbitrary data that can be stored in the network within certain [`Namespace`].
// NOTE: We don't use the `serde(try_from)` pattern for this type
// becase JSON representation needs to have `commitment` field but
Expand Down Expand Up @@ -192,27 +168,6 @@ impl From<Blob> for RawBlob {
}
}

mod gas_prize_serde {
use serde::{Deserialize, Deserializer, Serializer};

/// Serialize [`Option<f64>`] with `None` represented as `-1`
pub fn serialize<S>(value: &Option<f64>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let x = value.unwrap_or(-1.0);
serializer.serialize_f64(x)
}

/// Deserialize [`Option<f64>`] with an error when the value is not present.
pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<f64>, D::Error>
where
D: Deserializer<'de>,
{
f64::deserialize(deserializer).map(Some)
}
}

mod index_serde {
use serde::ser::Error;
use serde::{Deserialize, Deserializer, Serializer};
Expand Down
Loading