Skip to content

Commit

Permalink
Update docs
Browse files Browse the repository at this point in the history
  • Loading branch information
Artemyev Vyacheslav authored and Artemyev Vyacheslav committed Nov 10, 2023
1 parent 156badc commit b097576
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.viartemev.thewhiterabbit.common

import com.rabbitmq.client.AMQP

fun AMQP.Connection.withConnection() {

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package com.viartemev.thewhiterabbit.consumer.flow

import com.rabbitmq.client.Channel
import com.rabbitmq.client.Delivery
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.channels.trySendBlocking
import kotlinx.coroutines.flow.Flow
Expand All @@ -17,7 +16,9 @@ class ConsumerFlow(

/**
* The consumerAutoAckFlow function establishes a cold Flow for consuming messages from an AMQP queue with automatic acknowledgment enabled.
* This flow is designed to emit messages to the downstream consumers as they arrive from the queue.
*
* @param prefetchSize The maximum number of unacknowledged messages that the consumer can receive at once. Defaults to 0.
* @return A Flow of Delivery objects representing the messages received from the queue.
*/
suspend fun consumerAutoAckFlow(prefetchSize: Int = 0): Flow<Delivery> = callbackFlow {
if (prefetchSize != 0) {
Expand Down Expand Up @@ -46,14 +47,16 @@ class ConsumerFlow(
* The messages are not automatically acknowledged after being received.
* Instead, acknowledgments are manually sent to the AMQP server after the messages are successfully emitted to the downstream flow collector.
*
* @param prefetchSize The number of messages to prefetch from the server. Default is 0 which means no prefetching.
* @return A cold Flow that consumes messages from the AMQP queue.
*/
suspend fun consumerConfirmAckFlow(prefetchSize: Int = 0): Flow<Delivery> = callbackFlow {
suspend fun consumerConfirmAckFlow(prefetchSize: Int = 0) = callbackFlow {
if (prefetchSize != 0) {
amqpChannel.basicQos(prefetchSize, false)
}
val deliverCallback: (consumerTag: String, message: Delivery) -> Unit = { _, message ->
try {
logger.debug { "Trying to send a message from the flow consumer to the flow ${Thread.currentThread().name}" }
logger.debug { "Trying to send a message from the flow consumer to the flow" }
trySendBlocking(message)
logger.debug { "The message was successfully sent to the flow" }
amqpChannel.basicAck(message.envelope.deliveryTag, false)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,10 @@ abstract class AbstractTestContainersTest {
val rabbitmq = RabbitMQContainer()
}

lateinit var factory: ConnectionFactory
lateinit var httpRabbitMQClient: Client
val DEFAULT_VHOST = "/"

@BeforeAll
fun setUp() {
factory = ConnectionFactory()
factory.host = rabbitmq.host.toString()
factory.port = rabbitmq.connectionPort()
httpRabbitMQClient =
Client(URL("http://${rabbitmq.host}:${rabbitmq.managementPort()}/api/"), "guest", "guest")
val factory: ConnectionFactory = ConnectionFactory().apply {
host = rabbitmq.host.toString()
port = rabbitmq.connectionPort()
}
var httpRabbitMQClient = Client(URL("http://${rabbitmq.host}:${rabbitmq.managementPort()}/api/"), "guest", "guest")
val DEFAULT_VHOST = "/"
}
Original file line number Diff line number Diff line change
@@ -1,55 +1,60 @@
package com.viartemev.thewhiterabbit.consumer.flow

import com.rabbitmq.client.MessageProperties
import com.viartemev.thewhiterabbit.AbstractTestContainersTest
import com.viartemev.thewhiterabbit.channel.confirmChannel
import com.viartemev.thewhiterabbit.channel.publish
import com.viartemev.thewhiterabbit.channel.channel
import com.viartemev.thewhiterabbit.queue.QueueSpecification
import com.viartemev.thewhiterabbit.queue.declareQueue
import com.viartemev.thewhiterabbit.utils.createMessage
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.single
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test

class ConsumerFlowTest : AbstractTestContainersTest() {
private val QUEUE_NAME = "test_queue"
private val connection = factory.newConnection()

private suspend fun generateMessages(count: Int) = coroutineScope {
connection.channel {
declareQueue(QueueSpecification(QUEUE_NAME))
}
connection.channel {
(1..count).map {
basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_BASIC, "Hello #$it".toByteArray())
}
}
}

@Test
fun testAutoAckFlow(): Unit = runBlocking {
factory.newConnection().use { connection ->
connection.confirmChannel {
declareQueue(QueueSpecification(QUEUE_NAME))
publish {
(1..10).map { createMessage(queue = QUEUE_NAME, body = "1") }
.map { m -> async { publishWithConfirm(m) } }.awaitAll()
}
ConsumerFlow(this, QUEUE_NAME).consumerAutoAckFlow(2).take(10)
.collect { delivery -> println(String(delivery.body)) }
}
val messagesCount = 100
generateMessages(messagesCount)
connection.channel {
ConsumerFlow(this, QUEUE_NAME)
.consumerAutoAckFlow(2)
.take(messagesCount)
.collect { delivery -> println(String(delivery.body)) }
}
}

@Test
fun testConfirmAckFlow(): Unit = runBlocking {
factory.newConnection().use { connection ->
connection.confirmChannel {
declareQueue(QueueSpecification(QUEUE_NAME))
publish {
(1..10).map { i -> createMessage(queue = QUEUE_NAME, body = i.toString()) }
.map { m -> async { publishWithConfirm(m) } }.awaitAll()
val messagesCount = 10
generateMessages(messagesCount)
connection.channel {
ConsumerFlow(this, QUEUE_NAME)
.consumerConfirmAckFlow(2)
.flowOn(Dispatchers.IO)
.take(messagesCount)
.catch { e -> println("Caught exception: $e") }.collect { delivery ->
println("Got the message: ${delivery.body}")
}
val delivery = ConsumerFlow(this, QUEUE_NAME)
.consumerConfirmAckFlow(2)
.flowOn(Dispatchers.IO)
.catch { e -> println("Caught exception: $e") }
.single()
println("Got the message: ${delivery.body}")
}
}
}

}

0 comments on commit b097576

Please sign in to comment.