Skip to content

Commit

Permalink
Changed base package name to io.github.dobrynya.zio.jms.
Browse files Browse the repository at this point in the history
Changed DestinationFactory to return IO instead of Destination.
Changed target repository to Maven Central.
  • Loading branch information
dobrynya committed Nov 6, 2021
1 parent 68c721b commit 9490315
Show file tree
Hide file tree
Showing 11 changed files with 84 additions and 67 deletions.
20 changes: 10 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ ZIO-JMS adapts JMS API to ZIO streams and makes it working more conveniently and

---
# Add library to your project
* Add maven repository [https://dl.bintray.com/dobrynya/maven](https://dl.bintray.com/dobrynya/maven) to your build tool
* Add dependency on the library "com.gh.dobrynya" %% "zio-jms" % "0.1"
* Add Maven Central Repository to your build script
* Add dependency on the library "io.github.dobrynya" %% "zio-jms" % "0.2"

# Receive messages

Expand All @@ -16,7 +16,7 @@ For receiving messages it needs to create a message consumer using utility metho
import zio.{ZIO, Has, Chunk}
import zio.blocking._
import javax.jms.{Connection, Message, JMSException}
import com.gh.dobrynya.zio.jms._
import io.github.dobrynya.zio.jms._

val received: ZIO[Has[Connection] with Blocking, JMSException, Chunk[Message]] =
JmsConsumer.consume(Queue("test-queue"))
Expand All @@ -31,7 +31,7 @@ You can process a stream of messages transactionally like follows
import zio.{ZIO, Has, Chunk, UIO}
import zio.blocking._
import javax.jms.{Connection, Message, JMSException}
import com.gh.dobrynya.zio.jms._
import io.github.dobrynya.zio.jms._

def messageProcessor(message: Message): UIO[Unit] = ???

Expand All @@ -50,7 +50,7 @@ import zio.{ZIO, Has}
import zio.blocking._
import zio.console._
import javax.jms.{Connection, Message, JMSException}
import com.gh.dobrynya.zio.jms._
import io.github.dobrynya.zio.jms._

def someMessageProcessor(message: Message): ZIO[Console, Exception, Unit] =
putStrLn(s"Received message $message")
Expand All @@ -66,7 +66,7 @@ when it is processed successfully and rolls back when it fails
import zio.{ZIO, IO, Has}
import zio.blocking._
import javax.jms.{Connection, Message, JMSException}
import com.gh.dobrynya.zio.jms._
import io.github.dobrynya.zio.jms._

def someMessageProcessor(message: Message): IO[String, Unit] =
IO.fail(s"Error occurred during processing a message $message")
Expand All @@ -81,7 +81,7 @@ For sending messages it needs to create sinks providing a destination and a mess

```scala
import zio.stream.ZStream
import com.gh.dobrynya.zio.jms._
import io.github.dobrynya.zio.jms._

val messages = (1 to 100).map(i => s"Message $i")

Expand All @@ -96,7 +96,7 @@ The last thing is to provide a connection like follows
import zio.{ZLayer, Has}
import zio.blocking._
import javax.jms.{Connection, ConnectionFactory, JMSException}
import com.gh.dobrynya.zio.jms._
import io.github.dobrynya.zio.jms._

def connectionFactory: ConnectionFactory = ???

Expand All @@ -112,7 +112,7 @@ val consuming = JmsConsumer.consume(Queue("test-queue")).runDrain
## From a client's perspective

```scala
import com.gh.dobrynya.zio.jms._
import io.github.dobrynya.zio.jms._
import zio.stream._

val request = Queue("request")
Expand All @@ -127,7 +127,7 @@ ZStream.fromIterable(messages)
## From a server's perspective

```scala
import com.gh.dobrynya.zio.jms._
import io.github.dobrynya.zio.jms._
import zio.stream._

val request = Queue("request")
Expand Down
36 changes: 25 additions & 11 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,18 +1,32 @@
name := "zio-jms"
organization := "com.gh.dobrynya"
version := "0.1"
scalaVersion := "3.0.0-RC3"
organization := "io.github.dobrynya"
version := "0.2"
homepage := Some(url("https://github.com/dobrynya/zio-jms"))
developers += Developer("dobrynya", "Dmitry Dobrynin", "[email protected]", url("https://gitlab.com/dobrynya"))
scmInfo := Some(
ScmInfo(
url("https://github.com/dobrynya/zio-jms"),
"scm:[email protected]:dobrynya/zio-jms.git"
)
)
licenses += ("APACHE2.0", url("https://opensource.org/licenses/Apache-2.0"))
ThisBuild / versionScheme := Some("early-semver")
crossPaths := true
publishMavenStyle := true
publishTo := Some("releases" at "https://s01.oss.sonatype.org/service/local/staging/deploy/maven2")

scalaVersion := "3.1.0"
crossScalaVersions := List("2.13.7", "3.1.0")

libraryDependencies ++= Seq(
"dev.zio" %% "zio" % "[1.0.7,)",
"dev.zio" %% "zio-streams" % "[1.0.7,)",
"dev.zio" %% "zio-test" % "[1.0.7,)" % Test,
"dev.zio" %% "zio-test-sbt" % "[1.0.7,)" % Test,
"org.apache.geronimo.specs" % "geronimo-jms_1.1_spec" % "1.1",
"org.apache.activemq" % "activemq-broker" % "5.15.12" % Test,
"org.apache.activemq" % "activemq-kahadb-store" % "5.15.12" % Test,
"ch.qos.logback" % "logback-classic" % "[1.2,)" % Test
"dev.zio" %% "zio" % "1.0.12",
"dev.zio" %% "zio-streams" % "1.0.12",
"dev.zio" %% "zio-test" % "1.0.12" % Test,
"dev.zio" %% "zio-test-sbt" % "1.0.12" % Test,
"org.apache.geronimo.specs" % "geronimo-jms_1.1_spec" % "1.1.1" % Provided,
"org.apache.activemq" % "activemq-broker" % "5.16.2" % Test,
"org.apache.activemq" % "activemq-kahadb-store" % "5.16.2" % Test,
"ch.qos.logback" % "logback-classic" % "1.2.6" % Test
)

testFrameworks += new TestFramework("zio.test.sbt.ZTestFramework")
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version = 1.5.2
sbt.version = 1.5.5
1 change: 0 additions & 1 deletion project/plugin.sbt
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
addSbtPlugin("com.geirsson" % "sbt-scalafmt" % "1.6.0-RC4")
addSbtPlugin("org.foundweekends" % "sbt-bintray" % "0.5.6")
19 changes: 0 additions & 19 deletions src/main/scala/com/gh/dobrynya/zio/jms/destinations.scala

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.gh.dobrynya.zio.jms
package io.github.dobrynya.zio.jms

import javax.jms.{ JMSException, Message, MessageConsumer, Session, Connection => JMSConnection }
import zio._
Expand All @@ -16,16 +16,14 @@ class JmsConsumer[T](session: Session, consumer: MessageConsumer, semaphore: Sem
effectBlockingInterrupt(enrich(consumer.receive(), this)).refineToOrDie
))

def commit: IO[JMSException, Unit] =
semaphore.withPermit(Task(session.commit()).refineToOrDie)
def commitSession: ZIO[Blocking, JMSException, Unit] = semaphore.withPermit(commit(session))

def rollback: IO[JMSException, Unit] =
semaphore.withPermit(Task(session.rollback()).refineToOrDie)
def rollbackSession: ZIO[Blocking, JMSException, Unit] = semaphore.withPermit(rollback(session))
}

class TransactionalMessage(val message: Message, consumer: JmsConsumer[TransactionalMessage]) {
def commit: IO[JMSException, Unit] = consumer.commit
def rollback: IO[JMSException, Unit] = consumer.rollback
class TxMessage(val message: Message, consumer: JmsConsumer[TxMessage]) {
def commit: ZIO[Blocking, JMSException, Unit] = consumer.commitSession
def rollback: ZIO[Blocking, JMSException, Unit] = consumer.rollbackSession
}

object JmsConsumer {
Expand All @@ -42,14 +40,15 @@ object JmsConsumer {
for {
connection <- ZIO.service[JMSConnection].toManaged_
session <- session(connection, transacted, acknowledgementMode)
mc <- consumer(session, destination(session))
d <- destination(session).toManaged_
mc <- consumer(session, d)
semaphore <- Semaphore.make(1).toManaged_
} yield new JmsConsumer(session, mc, semaphore)

def consumeTx(destination: DestinationFactory): ZStream[BlockingConnection, JMSException, TransactionalMessage] =
def consumeTx(destination: DestinationFactory): ZStream[BlockingConnection, JMSException, TxMessage] =
ZStream
.managed(make[TransactionalMessage](destination, transacted = true, Session.SESSION_TRANSACTED))
.flatMap(_.consume(new TransactionalMessage(_, _)))
.managed(make[TxMessage](destination, transacted = true, Session.SESSION_TRANSACTED))
.flatMap(_.consume(new TxMessage(_, _)))

def consumeAndReplyWith[R, E >: JMSException](
destination: DestinationFactory,
Expand All @@ -68,7 +67,8 @@ object JmsConsumer {
val consumerAndProducer = for {
connection <- ZIO.service[JMSConnection].toManaged_
session <- session(connection, transacted, acknowledgementMode)
mc <- consumer(session, destination(session))
d <- destination(session).toManaged_
mc <- consumer(session, d)
mp <- producer(session)
semaphore <- Semaphore.make(1).toManaged_
} yield (session, mc, mp, semaphore)
Expand Down Expand Up @@ -120,9 +120,9 @@ object JmsConsumer {
destination: DestinationFactory,
processor: Message => ZIO[R, E, Any],
): ZIO[R with BlockingConnection, E, Unit] =
make[TransactionalMessage](destination, transacted = true, Session.SESSION_TRANSACTED)
make[TxMessage](destination, transacted = true, Session.SESSION_TRANSACTED)
.use(
_.consume(new TransactionalMessage(_, _)).foreach { tm =>
_.consume(new TxMessage(_, _)).foreach { tm =>
processor(tm.message).tapBoth(_ => tm.rollback, _ => tm.commit)
}.unit
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.gh.dobrynya.zio.jms
package io.github.dobrynya.zio.jms

import javax.jms.{ Destination, JMSException, Message, Session, Connection => JMSConnection }
import zio._
Expand Down Expand Up @@ -49,7 +49,7 @@ object JmsProducer {
for {
connection <- ZIO.service[JMSConnection].toManaged_
session <- session(connection, transacted, acknowledgementMode)
d = destination(session)
d <- destination(session).toManaged_
mp <- producer(session)
semaphore <- Semaphore.make(1).toManaged_
} yield
Expand Down Expand Up @@ -112,8 +112,8 @@ object JmsProducer {
session <- session(connection, transacted, acknowledgementMode)
mp <- producer(session)
semaphore <- Semaphore.make(1).toManaged_
d = destination(session)
replyHeader = replyTo(session)
d <- destination(session).toManaged_
replyHeader <- replyTo(session).toManaged_
} yield
new JmsProducer[R, E, A](session,
message =>
Expand Down
24 changes: 24 additions & 0 deletions src/main/scala/io/github/dobrynya/zio/jms/destinations.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.github.dobrynya.zio.jms

import zio.{ IO, Task }
import javax.jms._

case class Queue(name: String) extends DestinationFactory {
override def apply(session: Session): IO[JMSException, Destination] =
Task(session.createQueue(name)).refineToOrDie[JMSException]
}

case class Topic(name: String) extends DestinationFactory {
override def apply(session: Session): IO[JMSException, Destination] =
Task(session.createTopic(name)).refineToOrDie[JMSException]
}

case object TemporaryQueue extends DestinationFactory {
override def apply(session: Session): IO[JMSException, Destination] =
Task(session.createTemporaryQueue()).refineToOrDie[JMSException]
}

case object TemporaryTopic extends DestinationFactory {
override def apply(session: Session): IO[JMSException, Destination] =
Task(session.createTemporaryQueue()).refineToOrDie[JMSException]
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package com.gh.dobrynya.zio
package io.github.dobrynya.zio

import javax.jms._
import zio._
import zio.blocking._

package object jms {

type DestinationFactory = Session => Destination
type MessageFactory[T] = (T, Session) => Message
type DestinationFactory = Session => IO[JMSException, Destination]
type BlockingConnection = Blocking with Has[Connection]

def connection(connectionFactory: ConnectionFactory,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.gh.dobrynya.zio.jms
package io.github.dobrynya.zio.jms

import javax.jms.{Connection, JMSException}
import org.apache.activemq.broker.BrokerService
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.gh.dobrynya.zio.jms
package io.github.dobrynya.zio.jms

import javax.jms.{Queue => _, _}
import zio.{Queue => ZQueue, _}
Expand Down Expand Up @@ -218,7 +218,7 @@ object JmsComponentsSpec extends DefaultRunnableSpec with ConnectionAware {
for {
c <- managedConnection
s <- session(c, transacted, acknowledgementMode)
d = Queue(dest)(s)
d <- Queue(dest)(s).toManaged_
p <- producer(s)
mc <- consumer(s, d)
} yield (s, p, mc, d)
Expand Down

0 comments on commit 9490315

Please sign in to comment.