Skip to content

Commit d774f69

Browse files
committed
experiments
Signed-off-by: Marc-Antoine Perennou <[email protected]>
1 parent e518428 commit d774f69

File tree

4 files changed

+274
-0
lines changed

4 files changed

+274
-0
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ waker-fn = "^1.1"
7878

7979
[dev-dependencies]
8080
async-global-executor = "^3.1"
81+
async-io = "^2.0"
8182
futures-lite = "^2.0"
8283
serde_json = "^1.0"
8384
waker-fn = "^1.1"

examples/c.rs

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
use futures_lite::StreamExt;
2+
use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties};
3+
use tracing::info;
4+
5+
fn main() {
6+
if std::env::var("RUST_LOG").is_err() {
7+
unsafe { std::env::set_var("RUST_LOG", "info") };
8+
}
9+
10+
tracing_subscriber::fmt::init();
11+
12+
let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
13+
14+
async_global_executor::block_on(async {
15+
let conn = Connection::connect(&addr, ConnectionProperties::default())
16+
.await
17+
.expect("connection error");
18+
19+
info!("CONNECTED");
20+
21+
//receive channel
22+
let channel = conn.create_channel().await.expect("create_channel");
23+
info!(state=?conn.status().state());
24+
25+
let queue = channel
26+
.queue_declare(
27+
"hello-recover",
28+
QueueDeclareOptions::default(),
29+
FieldTable::default(),
30+
)
31+
.await
32+
.expect("queue_declare");
33+
info!(state=?conn.status().state());
34+
info!(?queue, "Declared queue");
35+
36+
info!("will consume");
37+
let mut consumer = channel
38+
.basic_consume(
39+
"hello-recover",
40+
"my_consumer",
41+
BasicConsumeOptions::default(),
42+
FieldTable::default(),
43+
)
44+
.await
45+
.expect("basic_consume");
46+
info!(state=?conn.status().state());
47+
48+
while let Some(delivery) = consumer.next().await {
49+
info!(message=?delivery, "received message");
50+
if let Ok(delivery) = delivery {
51+
delivery
52+
.ack(BasicAckOptions::default())
53+
.await
54+
.expect("basic_ack");
55+
}
56+
}
57+
})
58+
}

examples/p.rs

+95
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
use lapin::{options::*, types::FieldTable, BasicProperties, Connection, ConnectionProperties};
2+
use tracing::info;
3+
4+
fn main() {
5+
if std::env::var("RUST_LOG").is_err() {
6+
std::env::set_var("RUST_LOG", "info");
7+
}
8+
9+
tracing_subscriber::fmt::init();
10+
11+
let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
12+
let recovery_config = lapin::experimental::RecoveryConfig::default().auto_recover_channels();
13+
14+
async_global_executor::block_on(async {
15+
let conn = Connection::connect(
16+
&addr,
17+
ConnectionProperties::default().with_experimental_recovery_config(recovery_config),
18+
)
19+
.await
20+
.expect("connection error");
21+
22+
info!("CONNECTED");
23+
24+
let channel1 = conn.create_channel().await.expect("create_channel");
25+
channel1
26+
.confirm_select(ConfirmSelectOptions::default())
27+
.await
28+
.expect("confirm_select");
29+
channel1
30+
.queue_declare(
31+
"hello-recover",
32+
QueueDeclareOptions::default(),
33+
FieldTable::default(),
34+
)
35+
.await
36+
.expect("queue_declare");
37+
38+
let ch = channel1.clone();
39+
async_global_executor::spawn(async move {
40+
loop {
41+
async_io::Timer::after(std::time::Duration::from_secs(1)).await;
42+
info!("Trigger failure");
43+
assert!(ch
44+
.queue_declare(
45+
"fake queue",
46+
QueueDeclareOptions {
47+
passive: true,
48+
..QueueDeclareOptions::default()
49+
},
50+
FieldTable::default(),
51+
)
52+
.await
53+
.is_err());
54+
}
55+
})
56+
.detach();
57+
58+
let mut published = 0;
59+
let mut errors = 0;
60+
info!("will publish");
61+
loop {
62+
let res = channel1
63+
.basic_publish(
64+
"",
65+
"recover-test",
66+
BasicPublishOptions::default(),
67+
b"before",
68+
BasicProperties::default(),
69+
)
70+
.await;
71+
let res = if let Ok(res) = res {
72+
res.await.map(|_| ())
73+
} else {
74+
res.map(|_| ())
75+
};
76+
match res {
77+
Ok(()) => {
78+
println!("GOT OK");
79+
published += 1;
80+
}
81+
Err(err) => {
82+
println!("GOT ERROR");
83+
if !err.is_amqp_soft_error() {
84+
panic!("{}", err);
85+
}
86+
errors += 1;
87+
if let Some(notifier) = err.notifier() {
88+
notifier.await
89+
}
90+
}
91+
}
92+
println!("Published {} with {} errors", published, errors);
93+
}
94+
});
95+
}

examples/t.rs

+120
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
use lapin::{
2+
message::DeliveryResult, options::*, publisher_confirm::Confirmation, types::FieldTable,
3+
BasicProperties, Connection, ConnectionProperties,
4+
};
5+
use tracing::info;
6+
7+
fn main() {
8+
if std::env::var("RUST_LOG").is_err() {
9+
std::env::set_var("RUST_LOG", "info");
10+
}
11+
12+
tracing_subscriber::fmt::init();
13+
14+
let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
15+
let recovery_config = lapin::experimental::RecoveryConfig::default().auto_recover_channels();
16+
17+
async_global_executor::block_on(async {
18+
let conn = Connection::connect(
19+
&addr,
20+
ConnectionProperties::default().with_experimental_recovery_config(recovery_config),
21+
)
22+
.await
23+
.expect("connection error");
24+
25+
info!("CONNECTED");
26+
27+
{
28+
let channel1 = conn.create_channel().await.expect("create_channel");
29+
let channel2 = conn.create_channel().await.expect("create_channel");
30+
channel1
31+
.confirm_select(ConfirmSelectOptions::default())
32+
.await
33+
.expect("confirm_select");
34+
channel1
35+
.queue_declare(
36+
"recover-test",
37+
QueueDeclareOptions::default(),
38+
FieldTable::default(),
39+
)
40+
.await
41+
.expect("queue_declare");
42+
43+
info!("will consume");
44+
let channel = channel2.clone();
45+
channel2
46+
.basic_consume(
47+
"recover-test",
48+
"my_consumer",
49+
BasicConsumeOptions::default(),
50+
FieldTable::default(),
51+
)
52+
.await
53+
.expect("basic_consume")
54+
.set_delegate(move |delivery: DeliveryResult| {
55+
let channel = channel.clone();
56+
async move {
57+
info!(message=?delivery, "received message");
58+
if let Ok(Some(delivery)) = delivery {
59+
delivery
60+
.ack(BasicAckOptions::default())
61+
.await
62+
.expect("basic_ack");
63+
if &delivery.data[..] == b"after" {
64+
channel
65+
.basic_cancel("my_consumer", BasicCancelOptions::default())
66+
.await
67+
.expect("basic_cancel");
68+
}
69+
}
70+
}
71+
});
72+
73+
info!("will publish");
74+
let confirm = channel1
75+
.basic_publish(
76+
"",
77+
"recover-test",
78+
BasicPublishOptions::default(),
79+
b"before",
80+
BasicProperties::default(),
81+
)
82+
.await
83+
.expect("basic_publish")
84+
.await
85+
.expect("publisher-confirms");
86+
assert_eq!(confirm, Confirmation::Ack(None));
87+
88+
info!("before fail");
89+
assert!(channel1
90+
.queue_declare(
91+
"fake queue",
92+
QueueDeclareOptions {
93+
passive: true,
94+
..QueueDeclareOptions::default()
95+
},
96+
FieldTable::default(),
97+
)
98+
.await
99+
.is_err());
100+
info!("after fail");
101+
102+
info!("publish after");
103+
let confirm = channel1
104+
.basic_publish(
105+
"",
106+
"recover-test",
107+
BasicPublishOptions::default(),
108+
b"after",
109+
BasicProperties::default(),
110+
)
111+
.await
112+
.expect("basic_publish")
113+
.await
114+
.expect("publisher-confirms");
115+
assert_eq!(confirm, Confirmation::Ack(None));
116+
}
117+
118+
conn.run().expect("conn.run");
119+
});
120+
}

0 commit comments

Comments
 (0)