Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/examples/six-sigma/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,7 @@ mod tests {
.and_then(|log_event| match log_event {
LogEvent::ProcessedBridgeEvent(LogBridgeEvent::Regular {
bridge_event_id: event_id,
tx_hash: _,
account_id,
}) => (event_id == bridge_event_id).then_some(account_id),
_ => unreachable!(),
Expand Down
13 changes: 10 additions & 3 deletions packages/kolme/src/api_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,15 @@ async fn handle_websocket<App: KolmeApp>(
};
ApiNotification::NewBlock { block, logs }
}
Notification::GenesisInstantiation { chain, contract } => {
ApiNotification::GenesisInstantiation { chain, contract }
}
Notification::GenesisInstantiation {
chain,
tx_hash,
contract,
} => ApiNotification::GenesisInstantiation {
chain,
tx_hash,
contract,
},
Notification::FailedTransaction(failed) => ApiNotification::FailedTransaction(failed),
Notification::LatestBlock(latest_block) => ApiNotification::LatestBlock(latest_block),
};
Expand Down Expand Up @@ -206,6 +212,7 @@ pub enum ApiNotification<AppMessage> {
/// A claim by a submitter that it has instantiated a bridge contract.
GenesisInstantiation {
chain: ExternalChain,
tx_hash: Option<ExternalTxHash>,
contract: String,
},
/// A transaction failed in the processor.
Expand Down
6 changes: 5 additions & 1 deletion packages/kolme/src/core/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,11 @@ impl<App: KolmeApp> ExecutionContext<'_, App> {
}
Message::Listener {
chain,
tx_hash,
event,
event_id,
} => {
self.listener(*chain, event, *event_id)?;
self.listener(*chain, tx_hash.clone(), event, *event_id)?;
}
Message::Approve {
chain,
Expand All @@ -196,6 +197,7 @@ impl<App: KolmeApp> ExecutionContext<'_, App> {
fn listener(
&mut self,
chain: ExternalChain,
tx_hash: Option<ExternalTxHash>,
event: &BridgeEvent,
event_id: BridgeEventId,
) -> Result<()> {
Expand All @@ -219,6 +221,7 @@ impl<App: KolmeApp> ExecutionContext<'_, App> {
event_id,
PendingBridgeEvent {
event: event.clone(),
tx_hash,
attestations: BTreeSet::new(),
},
);
Expand Down Expand Up @@ -335,6 +338,7 @@ impl<App: KolmeApp> ExecutionContext<'_, App> {
.deposit(asset_config.asset_id, amount)?;
}
self.log_event(LogEvent::ProcessedBridgeEvent(LogBridgeEvent::Regular {
tx_hash: pending.tx_hash,
bridge_event_id: event_id,
account_id,
}))?;
Expand Down
28 changes: 28 additions & 0 deletions packages/kolme/src/core/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,27 @@ pub use error::KolmeError;
pub type SolanaClient = solana_client::nonblocking::rpc_client::RpcClient;
pub type SolanaPubsubClient = solana_client::nonblocking::pubsub_client::PubsubClient;

#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq, Clone, Debug)]
pub struct ExternalTxHash(pub String);

impl MerkleSerialize for ExternalTxHash {
fn merkle_serialize(
&self,
serializer: &mut MerkleSerializer,
) -> std::result::Result<(), MerkleSerialError> {
serializer.store(&self.0)
}
}

impl MerkleDeserialize for ExternalTxHash {
fn merkle_deserialize(
deserializer: &mut MerkleDeserializer,
_version: usize,
) -> Result<Self, MerkleSerialError> {
Ok(ExternalTxHash(deserializer.load()?))
}
}

#[derive(
serde::Serialize,
serde::Deserialize,
Expand Down Expand Up @@ -420,6 +441,7 @@ impl MerkleDeserialize for PendingBridgeAction {
#[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq, Debug)]
pub struct PendingBridgeEvent {
pub event: BridgeEvent,
pub tx_hash: Option<ExternalTxHash>,
/// Attestations from the listeners
pub attestations: BTreeSet<PublicKey>,
}
Expand All @@ -428,9 +450,11 @@ impl MerkleSerialize for PendingBridgeEvent {
fn merkle_serialize(&self, serializer: &mut MerkleSerializer) -> Result<(), MerkleSerialError> {
let Self {
event,
tx_hash,
attestations,
} = self;
serializer.store_json(event)?;
serializer.store(tx_hash)?;
serializer.store(attestations)?;
Ok(())
}
Expand All @@ -443,6 +467,7 @@ impl MerkleDeserialize for PendingBridgeEvent {
) -> Result<Self, MerkleSerialError> {
Ok(Self {
event: deserializer.load_json()?,
tx_hash: deserializer.load()?,
attestations: deserializer.load()?,
})
}
Expand Down Expand Up @@ -1042,6 +1067,7 @@ pub enum Message<AppMessage> {
App(AppMessage),
Listener {
chain: ExternalChain,
tx_hash: Option<ExternalTxHash>,
event_id: BridgeEventId,
event: BridgeEvent,
},
Expand Down Expand Up @@ -1642,6 +1668,7 @@ pub enum Notification<AppMessage> {
/// A claim by a submitter that it has instantiated a bridge contract.
GenesisInstantiation {
chain: ExternalChain,
tx_hash: Option<ExternalTxHash>,
contract: String,
},
/// A transaction failed in the processor.
Expand Down Expand Up @@ -1686,6 +1713,7 @@ pub enum LogEvent {
#[serde(rename_all = "snake_case")]
pub enum LogBridgeEvent {
Regular {
tx_hash: Option<ExternalTxHash>,
bridge_event_id: BridgeEventId,
account_id: AccountId,
},
Expand Down
2 changes: 2 additions & 0 deletions packages/kolme/src/listener/cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ pub(crate) fn to_kolme_message<T>(

Message::Listener {
chain,
tx_hash: None,
event_id,
event: BridgeEvent::Regular {
wallet: Wallet(wallet),
Expand All @@ -130,6 +131,7 @@ pub(crate) fn to_kolme_message<T>(
}
BridgeEventMessage::Signed { wallet, action_id } => Message::Listener {
chain,
tx_hash: None,
event_id,
event: BridgeEvent::Signed {
wallet: Wallet(wallet),
Expand Down
8 changes: 7 additions & 1 deletion packages/kolme/src/listener/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,12 @@ impl<App: KolmeApp> Listener<App> {
return Ok(contracts);
}

if let Notification::GenesisInstantiation { chain, contract } = receiver.recv().await? {
if let Notification::GenesisInstantiation {
chain,
tx_hash,
contract,
} = receiver.recv().await?
{
if chain.name() != name {
continue;
}
Expand Down Expand Up @@ -147,6 +152,7 @@ impl<App: KolmeApp> Listener<App> {
&self.secret,
vec![Message::Listener {
chain,
tx_hash,
event: BridgeEvent::Instantiated { contract },
event_id: BridgeEventId::start(),
}],
Expand Down
37 changes: 24 additions & 13 deletions packages/kolme/src/listener/solana.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ async fn listen_internal<App: KolmeApp>(
continue;
}

let Some(msg) = extract_bridge_message_from_logs(&resp.value.logs)? else {
let Some((tx_hash, msg)) =
extract_bridge_message_from_logs(&resp.value.signature, &resp.value.logs)?
else {
tracing::warn!(
"No bridge message data log was found in TX {} logs.",
resp.value.signature
Expand All @@ -144,7 +146,7 @@ async fn listen_internal<App: KolmeApp>(
.expect("should have at least one TX processed.")
.next();
} else {
let msg = to_kolme_message::<App::Message>(msg, chain);
let msg = to_kolme_message::<App::Message>(tx_hash, msg, chain);

kolme
.sign_propose_await_transaction(secret, vec![msg])
Expand All @@ -168,7 +170,7 @@ async fn catch_up<App: KolmeApp>(
) -> Result<Option<BridgeEventId>> {
tracing::info!("Catching up on missing bridge events until {}.", last_seen);

let mut messages = vec![];
let mut messages_with_hashes = vec![];
let txs = client.get_signatures_for_address(contract).await?;

// First entry is the latest transaction, we want to work up until to the target ID provided.
Expand Down Expand Up @@ -196,7 +198,8 @@ async fn catch_up<App: KolmeApp>(
continue;
};

let Some(msg) = extract_bridge_message_from_logs(&logs)? else {
let Some((tx_hash, msg)) = extract_bridge_message_from_logs(&sig.to_string(), &logs)?
else {
tracing::warn!("No bridge message data log was found in TX {} logs.", sig);

continue;
Expand All @@ -206,24 +209,24 @@ async fn catch_up<App: KolmeApp>(
break;
}

messages.push(msg);
messages_with_hashes.push((tx_hash, msg));
}

assert!(messages.is_sorted_by(|a, b| a.id > b.id));
let Some(latest_id) = messages.first().map(|x| x.id) else {
assert!(messages_with_hashes.is_sorted_by(|(_, a), (_, b)| a.id > b.id));
let Some(latest_id) = messages_with_hashes.first().map(|(_, x)| x.id) else {
return Ok(None);
};

tracing::info!(
"Found {} missed bridge events. Proposing Kolme transaction...",
messages.len()
messages_with_hashes.len()
);

// Now process in reverse insertion order - from oldest to newest.
let kolme_messages: Vec<Message<App::Message>> = messages
let kolme_messages: Vec<Message<App::Message>> = messages_with_hashes
.into_iter()
.rev()
.map(|x| to_kolme_message(x, chain))
.map(|(tx_hash, x)| to_kolme_message(tx_hash, x, chain))
.collect();

kolme
Expand All @@ -233,7 +236,10 @@ async fn catch_up<App: KolmeApp>(
Ok(Some(BridgeEventId(latest_id)))
}

fn extract_bridge_message_from_logs(logs: &[String]) -> Result<Option<BridgeMessage>> {
fn extract_bridge_message_from_logs(
signature: &str,
logs: &[String],
) -> Result<Option<(ExternalTxHash, BridgeMessage)>> {
const PROGRAM_DATA_LOG: &str = "Program data: ";

// Our program data should always be the last "Program data:" entry even if CPI was invoked.
Expand All @@ -253,7 +259,7 @@ fn extract_bridge_message_from_logs(logs: &[String]) -> Result<Option<BridgeMess
});

match result {
Ok(result) => return Ok(Some(result)),
Ok(result) => return Ok(Some((ExternalTxHash(signature.to_owned()), result))),
Err(e) => {
if logs.iter().any(|x| x.contains("Instruction: Initialize")) {
tracing::info!(
Expand All @@ -270,7 +276,11 @@ fn extract_bridge_message_from_logs(logs: &[String]) -> Result<Option<BridgeMess
Ok(None)
}

fn to_kolme_message<T>(msg: BridgeMessage, chain: SolanaChain) -> Message<T> {
fn to_kolme_message<T>(
tx_hash: ExternalTxHash,
msg: BridgeMessage,
chain: SolanaChain,
) -> Message<T> {
let event_id = BridgeEventId(msg.id);
let wallet = Pubkey::new_from_array(msg.wallet).to_string();
let event = match msg.ty {
Expand Down Expand Up @@ -299,6 +309,7 @@ fn to_kolme_message<T>(msg: BridgeMessage, chain: SolanaChain) -> Message<T> {

Message::Listener {
chain: chain.into(),
tx_hash: Some(tx_hash),
event_id,
event,
}
Expand Down
10 changes: 6 additions & 4 deletions packages/kolme/src/submitter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ impl<App: KolmeApp> Submitter<App> {
Notification::NewBlock(_) => (),
Notification::GenesisInstantiation {
chain: _,
tx_hash: _,
contract: _,
} => continue,
Notification::FailedTransaction { .. } => continue,
Expand Down Expand Up @@ -128,7 +129,7 @@ impl<App: KolmeApp> Submitter<App> {
}

async fn handle_genesis(&mut self, genesis_action: GenesisAction) -> Result<()> {
let (contract_addr, chain) = match genesis_action {
let (contract_addr, txhash, chain) = match genesis_action {
GenesisAction::InstantiateCosmos {
chain,
code_id,
Expand All @@ -146,7 +147,7 @@ impl<App: KolmeApp> Submitter<App> {

let addr = cosmos::instantiate(&cosmos, seed_phrase, code_id, args).await?;

(addr, chain.into())
(addr, None, chain.into())
}
GenesisAction::InstantiateSolana {
chain,
Expand All @@ -163,14 +164,15 @@ impl<App: KolmeApp> Submitter<App> {

let client = self.kolme.read().get_solana_client(chain).await;

solana::instantiate(&client, keypair, &program_id, args).await?;
let txhash = solana::instantiate(&client, keypair, &program_id, args).await?;

(program_id, chain.into())
(program_id, Some(txhash), chain.into())
}
};

self.kolme.notify(Notification::GenesisInstantiation {
chain,
tx_hash: txhash,
contract: contract_addr,
});
self.genesis_created.insert(chain);
Expand Down
6 changes: 3 additions & 3 deletions packages/kolme/src/submitter/solana.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ pub async fn instantiate(
keypair: &Keypair,
program_id: &str,
set: ValidatorSet,
) -> Result<()> {
) -> Result<ExternalTxHash> {
let data = InitializeIxData { set };

let program_pubkey = Pubkey::from_str(program_id)?;
let blockhash = client.get_latest_blockhash().await?;
let tx = init_tx(program_pubkey, blockhash, keypair, &data).map_err(|x| anyhow::anyhow!(x))?;

client.send_and_confirm_transaction(&tx).await?;
let signature = client.send_and_confirm_transaction(&tx).await?;

Ok(())
Ok(ExternalTxHash(signature.to_string()))
}

pub async fn execute(
Expand Down
Loading