Skip to content

Commit

Permalink
RabbitMQ connection testing (#808)
Browse files Browse the repository at this point in the history
  • Loading branch information
ecarrara authored Dec 12, 2024
1 parent 1bd7217 commit 0828ff7
Showing 1 changed file with 25 additions and 9 deletions.
34 changes: 25 additions & 9 deletions crates/arroyo-connectors/src/rabbitmq/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,33 @@ impl Connector for RabbitmqConnector {
fn test(
&self,
_: &str,
_: Self::ProfileT,
_: Self::TableT,
_: Option<&arroyo_rpc::api_types::connections::ConnectionSchema>,
config: Self::ProfileT,
table: Self::TableT,
_schema: Option<&arroyo_rpc::api_types::connections::ConnectionSchema>,
tx: tokio::sync::mpsc::Sender<arroyo_rpc::api_types::connections::TestSourceMessage>,
) {
// TODO
tokio::task::spawn(async move {
let message = TestSourceMessage {
error: false,
done: true,
message: "Successfully validated connection".to_string(),
let message = match config.get_environment().await {
Ok(environment) => {
if let Err(e) = environment.consumer().build(&table.stream).await {
TestSourceMessage {
error: true,
done: true,
message: e.to_string(),
}
} else {
TestSourceMessage {
error: false,
done: true,
message: "Successfully validated connection".to_string(),
}
}
}
Err(e) => TestSourceMessage {
error: true,
done: true,
message: e.to_string(),
},
};
tx.send(message).await.unwrap();
});
Expand Down Expand Up @@ -227,7 +243,7 @@ impl Connector for RabbitmqConnector {
}

impl RabbitmqStreamConfig {
async fn get_environment(&mut self) -> anyhow::Result<Environment> {
async fn get_environment(&self) -> anyhow::Result<Environment> {
let builder = Environment::builder()
.host(&self.host.clone().unwrap_or("localhost".to_owned()))
.username(&self.username.clone().unwrap_or("guest".to_owned()))
Expand Down

0 comments on commit 0828ff7

Please sign in to comment.