Skip to content

Commit 1089484

Browse files
committed
fix: avoid latest_polled_point reset if streamer polled nothing
1 parent 8d6acc1 commit 1089484

File tree

2 files changed

+84
-2
lines changed

2 files changed

+84
-2
lines changed

mithril-aggregator/src/services/cardano_transactions_importer.rs

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,13 @@ impl CardanoTransactionsImporter {
8080
highest_stored_chain_point: &Option<ChainPoint>,
8181
) -> StdResult<Option<RawCardanoPoint>> {
8282
let last_polled_point = self.last_polled_point.lock().await.clone();
83+
if last_polled_point.is_none() {
84+
debug!(
85+
self.logger,
86+
"No last polled point available, falling back to the highest stored chain point"
87+
);
88+
}
89+
8390
Ok(last_polled_point.or(highest_stored_chain_point
8491
.as_ref()
8592
.map(RawCardanoPoint::from)))
@@ -138,7 +145,9 @@ impl CardanoTransactionsImporter {
138145
}
139146
}
140147

141-
*self.last_polled_point.lock().await = streamer.last_polled_point();
148+
if let Some(point) = streamer.last_polled_point() {
149+
*self.last_polled_point.lock().await = Some(point);
150+
}
142151

143152
Ok(())
144153
}
@@ -962,6 +971,38 @@ mod tests {
962971
start_point_after_import
963972
);
964973
}
974+
975+
#[tokio::test]
976+
async fn importing_transactions_dont_update_start_point_if_streamer_did_nothing() {
977+
let connection = cardano_tx_db_connection().unwrap();
978+
let importer = CardanoTransactionsImporter {
979+
last_polled_point: Arc::new(Mutex::new(Some(RawCardanoPoint::new(
980+
SlotNumber(15),
981+
"block_hash-1",
982+
)))),
983+
..CardanoTransactionsImporter::new_for_test(
984+
Arc::new(DumbBlockScanner::new()),
985+
Arc::new(CardanoTransactionRepository::new(Arc::new(
986+
SqliteConnectionPool::build_from_connection(connection),
987+
))),
988+
)
989+
};
990+
let highest_stored_block_number = None;
991+
992+
importer
993+
.import_transactions(BlockNumber(1000))
994+
.await
995+
.unwrap();
996+
997+
let start_point_after_import = importer
998+
.start_point(&highest_stored_block_number)
999+
.await
1000+
.unwrap();
1001+
assert_eq!(
1002+
Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")),
1003+
start_point_after_import
1004+
);
1005+
}
9651006
}
9661007

9671008
#[tokio::test]

mithril-signer/src/services/cardano_transactions/importer/service.rs

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,13 @@ impl CardanoTransactionsImporter {
8080
highest_stored_chain_point: &Option<ChainPoint>,
8181
) -> StdResult<Option<RawCardanoPoint>> {
8282
let last_polled_point = self.last_polled_point.lock().await.clone();
83+
if last_polled_point.is_none() {
84+
debug!(
85+
self.logger,
86+
"No last polled point available, falling back to the highest stored chain point"
87+
);
88+
}
89+
8390
Ok(last_polled_point.or(highest_stored_chain_point
8491
.as_ref()
8592
.map(RawCardanoPoint::from)))
@@ -138,7 +145,9 @@ impl CardanoTransactionsImporter {
138145
}
139146
}
140147

141-
*self.last_polled_point.lock().await = streamer.last_polled_point();
148+
if let Some(point) = streamer.last_polled_point() {
149+
*self.last_polled_point.lock().await = Some(point);
150+
}
142151

143152
Ok(())
144153
}
@@ -962,6 +971,38 @@ mod tests {
962971
start_point_after_import
963972
);
964973
}
974+
975+
#[tokio::test]
976+
async fn importing_transactions_dont_update_start_point_if_streamer_did_nothing() {
977+
let connection = cardano_tx_db_connection().unwrap();
978+
let importer = CardanoTransactionsImporter {
979+
last_polled_point: Arc::new(Mutex::new(Some(RawCardanoPoint::new(
980+
SlotNumber(15),
981+
"block_hash-1",
982+
)))),
983+
..CardanoTransactionsImporter::new_for_test(
984+
Arc::new(DumbBlockScanner::new()),
985+
Arc::new(CardanoTransactionRepository::new(Arc::new(
986+
SqliteConnectionPool::build_from_connection(connection),
987+
))),
988+
)
989+
};
990+
let highest_stored_block_number = None;
991+
992+
importer
993+
.import_transactions(BlockNumber(1000))
994+
.await
995+
.unwrap();
996+
997+
let start_point_after_import = importer
998+
.start_point(&highest_stored_block_number)
999+
.await
1000+
.unwrap();
1001+
assert_eq!(
1002+
Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")),
1003+
start_point_after_import
1004+
);
1005+
}
9651006
}
9661007

9671008
#[tokio::test]

0 commit comments

Comments
 (0)