Skip to content

Commit

Permalink
move message to common package
Browse files Browse the repository at this point in the history
  • Loading branch information
Artemyev Vyacheslav authored and Artemyev Vyacheslav committed Nov 30, 2023
1 parent 154a9f9 commit c9fea04
Show file tree
Hide file tree
Showing 10 changed files with 29 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import com.rabbitmq.client.MessageProperties
import io.github.viartemev.rabbitmq.channel.ConfirmChannel
import io.github.viartemev.rabbitmq.channel.createConfirmChannel
import io.github.viartemev.rabbitmq.publisher.ConfirmPublisher
import io.github.viartemev.rabbitmq.publisher.OutboundMessage
import io.github.viartemev.rabbitmq.common.OutboundMessage
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.MessageProperties
import io.github.viartemev.rabbitmq.channel.confirmChannel
import io.github.viartemev.rabbitmq.channel.publish
import io.github.viartemev.rabbitmq.publisher.OutboundMessage
import io.github.viartemev.rabbitmq.common.OutboundMessage
import io.github.viartemev.rabbitmq.queue.QueueSpecification
import io.github.viartemev.rabbitmq.queue.declareQueue
import kotlinx.coroutines.Dispatchers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.MessageProperties
import io.github.viartemev.rabbitmq.channel.channel
import io.github.viartemev.rabbitmq.channel.rpc
import io.github.viartemev.rabbitmq.publisher.OutboundMessage
import io.github.viartemev.rabbitmq.common.OutboundMessage
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.runBlocking
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package io.github.viartemev.rabbitmq.channel

import com.rabbitmq.client.Channel
import io.github.viartemev.rabbitmq.consumer.ConfirmConsumer
import io.github.viartemev.rabbitmq.publisher.OutboundMessage
import io.github.viartemev.rabbitmq.common.OutboundMessage
import mu.KotlinLogging
import java.util.concurrent.atomic.AtomicBoolean

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.github.viartemev.rabbitmq.publisher
package io.github.viartemev.rabbitmq.common

import com.rabbitmq.client.AMQP.BasicProperties

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@ package io.github.viartemev.rabbitmq.consumer

import com.rabbitmq.client.Channel
import com.rabbitmq.client.Delivery
import kotlinx.coroutines.cancel
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.trySendBlocking
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Semaphore
import mu.KotlinLogging
import java.io.Closeable
Expand Down Expand Up @@ -61,7 +58,9 @@ class ConfirmConsumer internal constructor(
val deliveryTag = delivery.envelope.deliveryTag
try {
handler(delivery)
amqpChannel.basicAck(deliveryTag, false)
runInterruptible(Dispatchers.IO) {
amqpChannel.basicAck(deliveryTag, false)
}
} catch (e: Exception) {
val errorMessage = "Can't ack a message with deliveryTag: $deliveryTag"
logger.error(e) { errorMessage }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.github.viartemev.rabbitmq.publisher

import com.rabbitmq.client.Channel
import io.github.viartemev.rabbitmq.common.OutboundMessage
import kotlinx.coroutines.suspendCancellableCoroutine
import mu.KotlinLogging
import java.util.concurrent.ConcurrentHashMap
Expand All @@ -16,7 +17,7 @@ private val logger = KotlinLogging.logger {}
* @constructor Creates a ConfirmPublisher with the specified channel.
*/
class ConfirmPublisher internal constructor(private val channel: Channel) {
internal val continuations = ConcurrentHashMap<Long, Continuation<Boolean>>()
private val continuations = ConcurrentHashMap<Long, Continuation<Boolean>>()

init {
channel.addConfirmListener(AckListener(continuations))
Expand All @@ -34,7 +35,7 @@ class ConfirmPublisher internal constructor(private val channel: Channel) {
return suspendCancellableCoroutine { continuation ->
continuation.invokeOnCancellation { continuations.remove(messageSequenceNumber) }
continuations[messageSequenceNumber] = continuation
message.apply { channel.basicPublish(exchange, routingKey, properties, msg) }
channel.basicPublish(message.exchange, message.routingKey, message.properties, message.msg)
logger.debug { "Message successfully published" }
}
}
Expand Down
32 changes: 15 additions & 17 deletions src/main/kotlin/io/github/viartemev/rabbitmq/rpc/RpcClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@ package io.github.viartemev.rabbitmq.rpc

import com.rabbitmq.client.Channel
import com.rabbitmq.client.Delivery
import io.github.viartemev.rabbitmq.publisher.OutboundMessage
import io.github.viartemev.rabbitmq.common.OutboundMessage
import io.github.viartemev.rabbitmq.queue.DeleteQueueSpecification
import io.github.viartemev.rabbitmq.queue.declareQueue
import io.github.viartemev.rabbitmq.queue.deleteQueue
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.withContext
import kotlinx.coroutines.*
import mu.KotlinLogging
import java.io.IOException
import java.util.*
Expand Down Expand Up @@ -37,24 +34,25 @@ class RpcClient(val channel: Channel) {
val corrId = UUID.randomUUID().toString()
val properties = outboundMessage.properties.builder().correlationId(corrId).replyTo(replyQueueName).build()

withContext(Dispatchers.IO) {
channel.basicPublish(
outboundMessage.exchange, outboundMessage.routingKey, properties, outboundMessage.msg
)
runInterruptible(Dispatchers.IO) {
channel.basicPublish(outboundMessage.exchange, outboundMessage.routingKey, properties, outboundMessage.msg)
}

var consumerTag: String? = null
try {
return suspendCancellableCoroutine { continuation ->
val deliverCallback: (consumerTag: String, message: Delivery) -> Unit = { _, delivery ->
if (corrId == delivery.properties.correlationId) {
channel.basicAck(delivery.envelope.deliveryTag, false)
continuation.resume(delivery)
}
}
val cancelCallback: (consumerTag: String) -> Unit = { consumerTag ->
logger.debug { "Consumer $consumerTag has been cancelled for reasons other than by a call to Channel#basicCancel" }
}

try {
consumerTag = channel.basicConsume(replyQueueName, false, { _, delivery ->
if (corrId == delivery.properties.correlationId) {
continuation.resume(delivery)
channel.basicAck(delivery.envelope.deliveryTag, false)
}
}, { consumerTag ->
logger.debug { "Consumer $consumerTag has been cancelled for reasons other than by a call to Channel#basicCancel" }
})
consumerTag = channel.basicConsume(replyQueueName, false, deliverCallback, cancelCallback)
} catch (e: Exception) {
continuation.resumeWithException(e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import com.rabbitmq.client.Channel
import com.rabbitmq.client.Delivery
import com.rabbitmq.client.MessageProperties
import io.github.viartemev.rabbitmq.AbstractTestContainersTest
import io.github.viartemev.rabbitmq.publisher.OutboundMessage
import io.github.viartemev.rabbitmq.common.OutboundMessage
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertNotNull
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.github.viartemev.rabbitmq.utils

import com.rabbitmq.client.MessageProperties
import io.github.viartemev.rabbitmq.publisher.OutboundMessage
import io.github.viartemev.rabbitmq.common.OutboundMessage

fun createMessage(exchange: String = "", queue: String = "test_queue", body: String) =
OutboundMessage(exchange, queue, MessageProperties.PERSISTENT_BASIC, body)

0 comments on commit c9fea04

Please sign in to comment.