Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/examples/six-sigma/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,7 @@ mod tests {
.and_then(|log_event| match log_event {
LogEvent::ProcessedBridgeEvent(LogBridgeEvent::Regular {
bridge_event_id: event_id,
tx_hash: _,
account_id,
}) => (event_id == bridge_event_id).then_some(account_id),
_ => unreachable!(),
Expand Down
13 changes: 10 additions & 3 deletions packages/kolme/src/api_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,15 @@ async fn handle_websocket<App: KolmeApp>(
};
ApiNotification::NewBlock { block, logs }
}
Notification::GenesisInstantiation { chain, contract } => {
ApiNotification::GenesisInstantiation { chain, contract }
}
Notification::GenesisInstantiation {
chain,
tx_hash,
contract,
} => ApiNotification::GenesisInstantiation {
chain,
tx_hash,
contract,
},
Notification::FailedTransaction(failed) => ApiNotification::FailedTransaction(failed),
Notification::LatestBlock(latest_block) => ApiNotification::LatestBlock(latest_block),
};
Expand Down Expand Up @@ -206,6 +212,7 @@ pub enum ApiNotification<AppMessage> {
/// A claim by a submitter that it has instantiated a bridge contract.
GenesisInstantiation {
chain: ExternalChain,
tx_hash: Option<ExternalTxHash>,
contract: String,
},
/// A transaction failed in the processor.
Expand Down
6 changes: 5 additions & 1 deletion packages/kolme/src/core/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,11 @@ impl<App: KolmeApp> ExecutionContext<'_, App> {
}
Message::Listener {
chain,
tx_hash,
event,
event_id,
} => {
self.listener(*chain, event, *event_id)?;
self.listener(*chain, tx_hash.clone(), event, *event_id)?;
}
Message::Approve {
chain,
Expand All @@ -196,6 +197,7 @@ impl<App: KolmeApp> ExecutionContext<'_, App> {
fn listener(
&mut self,
chain: ExternalChain,
tx_hash: Option<ExternalTxHash>,
event: &BridgeEvent,
event_id: BridgeEventId,
) -> Result<()> {
Expand All @@ -219,6 +221,7 @@ impl<App: KolmeApp> ExecutionContext<'_, App> {
event_id,
PendingBridgeEvent {
event: event.clone(),
tx_hash,
attestations: BTreeSet::new(),
},
);
Expand Down Expand Up @@ -335,6 +338,7 @@ impl<App: KolmeApp> ExecutionContext<'_, App> {
.deposit(asset_config.asset_id, amount)?;
}
self.log_event(LogEvent::ProcessedBridgeEvent(LogBridgeEvent::Regular {
tx_hash: pending.tx_hash,
bridge_event_id: event_id,
account_id,
}))?;
Expand Down
39 changes: 38 additions & 1 deletion packages/kolme/src/core/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,26 @@ pub use error::KolmeError;
pub type SolanaClient = solana_client::nonblocking::rpc_client::RpcClient;
pub type SolanaPubsubClient = solana_client::nonblocking::pubsub_client::PubsubClient;

#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq, Clone, Debug)]
pub struct ExternalTxHash(pub String);

impl MerkleSerializeRaw for ExternalTxHash {
fn merkle_serialize_raw(
&self,
serializer: &mut MerkleSerializer,
) -> std::result::Result<(), MerkleSerialError> {
serializer.store(&self.0)
}
}

impl MerkleDeserializeRaw for ExternalTxHash {
fn merkle_deserialize_raw(
deserializer: &mut MerkleDeserializer,
) -> std::result::Result<Self, MerkleSerialError> {
Ok(ExternalTxHash(deserializer.load()?))
}
}

#[derive(
serde::Serialize,
serde::Deserialize,
Expand Down Expand Up @@ -420,30 +440,44 @@ impl MerkleDeserialize for PendingBridgeAction {
#[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq, Debug)]
pub struct PendingBridgeEvent {
pub event: BridgeEvent,
pub tx_hash: Option<ExternalTxHash>,
/// Attestations from the listeners
pub attestations: BTreeSet<PublicKey>,
}

const VERSION_WITH_TX_HASH: usize = 1;

impl MerkleSerialize for PendingBridgeEvent {
fn merkle_serialize(&self, serializer: &mut MerkleSerializer) -> Result<(), MerkleSerialError> {
let Self {
event,
tx_hash,
attestations,
} = self;
serializer.store_json(event)?;
serializer.store(attestations)?;
serializer.store(tx_hash)?;
Ok(())
}

fn merkle_version() -> usize {
VERSION_WITH_TX_HASH
}
}

impl MerkleDeserialize for PendingBridgeEvent {
fn merkle_deserialize(
deserializer: &mut MerkleDeserializer,
_version: usize,
version: usize,
) -> Result<Self, MerkleSerialError> {
Ok(Self {
event: deserializer.load_json()?,
attestations: deserializer.load()?,
tx_hash: if version >= VERSION_WITH_TX_HASH {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooh, I like this approach of using consts.

deserializer.load()?
} else {
None
},
})
}
}
Expand Down Expand Up @@ -1042,6 +1076,7 @@ pub enum Message<AppMessage> {
App(AppMessage),
Listener {
chain: ExternalChain,
tx_hash: Option<ExternalTxHash>,
event_id: BridgeEventId,
event: BridgeEvent,
},
Expand Down Expand Up @@ -1642,6 +1677,7 @@ pub enum Notification<AppMessage> {
/// A claim by a submitter that it has instantiated a bridge contract.
GenesisInstantiation {
chain: ExternalChain,
tx_hash: Option<ExternalTxHash>,
contract: String,
},
/// A transaction failed in the processor.
Expand Down Expand Up @@ -1686,6 +1722,7 @@ pub enum LogEvent {
#[serde(rename_all = "snake_case")]
pub enum LogBridgeEvent {
Regular {
tx_hash: Option<ExternalTxHash>,
bridge_event_id: BridgeEventId,
account_id: AccountId,
},
Expand Down
2 changes: 2 additions & 0 deletions packages/kolme/src/listener/cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ pub(crate) fn to_kolme_message<T>(

Message::Listener {
chain,
tx_hash: None,
event_id,
event: BridgeEvent::Regular {
wallet: Wallet(wallet),
Expand All @@ -130,6 +131,7 @@ pub(crate) fn to_kolme_message<T>(
}
BridgeEventMessage::Signed { wallet, action_id } => Message::Listener {
chain,
tx_hash: None,
event_id,
event: BridgeEvent::Signed {
wallet: Wallet(wallet),
Expand Down
8 changes: 7 additions & 1 deletion packages/kolme/src/listener/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,12 @@ impl<App: KolmeApp> Listener<App> {
return Ok(contracts);
}

if let Notification::GenesisInstantiation { chain, contract } = receiver.recv().await? {
if let Notification::GenesisInstantiation {
chain,
tx_hash,
contract,
} = receiver.recv().await?
{
if chain.name() != name {
continue;
}
Expand Down Expand Up @@ -147,6 +152,7 @@ impl<App: KolmeApp> Listener<App> {
&self.secret,
vec![Message::Listener {
chain,
tx_hash,
event: BridgeEvent::Instantiated { contract },
event_id: BridgeEventId::start(),
}],
Expand Down
37 changes: 24 additions & 13 deletions packages/kolme/src/listener/solana.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ async fn listen_internal<App: KolmeApp>(
continue;
}

let Some(msg) = extract_bridge_message_from_logs(&resp.value.logs)? else {
let Some((tx_hash, msg)) =
extract_bridge_message_from_logs(&resp.value.signature, &resp.value.logs)?
else {
tracing::warn!(
"No bridge message data log was found in TX {} logs.",
resp.value.signature
Expand All @@ -144,7 +146,7 @@ async fn listen_internal<App: KolmeApp>(
.expect("should have at least one TX processed.")
.next();
} else {
let msg = to_kolme_message::<App::Message>(msg, chain);
let msg = to_kolme_message::<App::Message>(tx_hash, msg, chain);

kolme
.sign_propose_await_transaction(secret, vec![msg])
Expand All @@ -168,7 +170,7 @@ async fn catch_up<App: KolmeApp>(
) -> Result<Option<BridgeEventId>> {
tracing::info!("Catching up on missing bridge events until {}.", last_seen);

let mut messages = vec![];
let mut messages_with_hashes = vec![];
let txs = client.get_signatures_for_address(contract).await?;

// First entry is the latest transaction, we want to work up until to the target ID provided.
Expand Down Expand Up @@ -196,7 +198,8 @@ async fn catch_up<App: KolmeApp>(
continue;
};

let Some(msg) = extract_bridge_message_from_logs(&logs)? else {
let Some((tx_hash, msg)) = extract_bridge_message_from_logs(&sig.to_string(), &logs)?
else {
tracing::warn!("No bridge message data log was found in TX {} logs.", sig);

continue;
Expand All @@ -206,24 +209,24 @@ async fn catch_up<App: KolmeApp>(
break;
}

messages.push(msg);
messages_with_hashes.push((tx_hash, msg));
}

assert!(messages.is_sorted_by(|a, b| a.id > b.id));
let Some(latest_id) = messages.first().map(|x| x.id) else {
assert!(messages_with_hashes.is_sorted_by(|(_, a), (_, b)| a.id > b.id));
let Some(latest_id) = messages_with_hashes.first().map(|(_, x)| x.id) else {
return Ok(None);
};

tracing::info!(
"Found {} missed bridge events. Proposing Kolme transaction...",
messages.len()
messages_with_hashes.len()
);

// Now process in reverse insertion order - from oldest to newest.
let kolme_messages: Vec<Message<App::Message>> = messages
let kolme_messages: Vec<Message<App::Message>> = messages_with_hashes
.into_iter()
.rev()
.map(|x| to_kolme_message(x, chain))
.map(|(tx_hash, x)| to_kolme_message(tx_hash, x, chain))
.collect();

kolme
Expand All @@ -233,7 +236,10 @@ async fn catch_up<App: KolmeApp>(
Ok(Some(BridgeEventId(latest_id)))
}

fn extract_bridge_message_from_logs(logs: &[String]) -> Result<Option<BridgeMessage>> {
fn extract_bridge_message_from_logs(
signature: &str,
logs: &[String],
) -> Result<Option<(ExternalTxHash, BridgeMessage)>> {
const PROGRAM_DATA_LOG: &str = "Program data: ";

// Our program data should always be the last "Program data:" entry even if CPI was invoked.
Expand All @@ -253,7 +259,7 @@ fn extract_bridge_message_from_logs(logs: &[String]) -> Result<Option<BridgeMess
});

match result {
Ok(result) => return Ok(Some(result)),
Ok(result) => return Ok(Some((ExternalTxHash(signature.to_owned()), result))),
Err(e) => {
if logs.iter().any(|x| x.contains("Instruction: Initialize")) {
tracing::info!(
Expand All @@ -270,7 +276,11 @@ fn extract_bridge_message_from_logs(logs: &[String]) -> Result<Option<BridgeMess
Ok(None)
}

fn to_kolme_message<T>(msg: BridgeMessage, chain: SolanaChain) -> Message<T> {
fn to_kolme_message<T>(
tx_hash: ExternalTxHash,
msg: BridgeMessage,
chain: SolanaChain,
) -> Message<T> {
let event_id = BridgeEventId(msg.id);
let wallet = Pubkey::new_from_array(msg.wallet).to_string();
let event = match msg.ty {
Expand Down Expand Up @@ -299,6 +309,7 @@ fn to_kolme_message<T>(msg: BridgeMessage, chain: SolanaChain) -> Message<T> {

Message::Listener {
chain: chain.into(),
tx_hash: Some(tx_hash),
event_id,
event,
}
Expand Down
10 changes: 6 additions & 4 deletions packages/kolme/src/submitter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ impl<App: KolmeApp> Submitter<App> {
Notification::NewBlock(_) => (),
Notification::GenesisInstantiation {
chain: _,
tx_hash: _,
contract: _,
} => continue,
Notification::FailedTransaction { .. } => continue,
Expand Down Expand Up @@ -128,7 +129,7 @@ impl<App: KolmeApp> Submitter<App> {
}

async fn handle_genesis(&mut self, genesis_action: GenesisAction) -> Result<()> {
let (contract_addr, chain) = match genesis_action {
let (contract_addr, txhash, chain) = match genesis_action {
GenesisAction::InstantiateCosmos {
chain,
code_id,
Expand All @@ -146,7 +147,7 @@ impl<App: KolmeApp> Submitter<App> {

let addr = cosmos::instantiate(&cosmos, seed_phrase, code_id, args).await?;

(addr, chain.into())
(addr, None, chain.into())
}
GenesisAction::InstantiateSolana {
chain,
Expand All @@ -163,14 +164,15 @@ impl<App: KolmeApp> Submitter<App> {

let client = self.kolme.read().get_solana_client(chain).await;

solana::instantiate(&client, keypair, &program_id, args).await?;
let txhash = solana::instantiate(&client, keypair, &program_id, args).await?;

(program_id, chain.into())
(program_id, Some(txhash), chain.into())
}
};

self.kolme.notify(Notification::GenesisInstantiation {
chain,
tx_hash: txhash,
contract: contract_addr,
});
self.genesis_created.insert(chain);
Expand Down
6 changes: 3 additions & 3 deletions packages/kolme/src/submitter/solana.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ pub async fn instantiate(
keypair: &Keypair,
program_id: &str,
set: ValidatorSet,
) -> Result<()> {
) -> Result<ExternalTxHash> {
let data = InitializeIxData { set };

let program_pubkey = Pubkey::from_str(program_id)?;
let blockhash = client.get_latest_blockhash().await?;
let tx = init_tx(program_pubkey, blockhash, keypair, &data).map_err(|x| anyhow::anyhow!(x))?;

client.send_and_confirm_transaction(&tx).await?;
let signature = client.send_and_confirm_transaction(&tx).await?;

Ok(())
Ok(ExternalTxHash(signature.to_string()))
}

pub async fn execute(
Expand Down
Loading