diff --git a/readme.md b/readme.md index 0e486fa..95c9d83 100644 --- a/readme.md +++ b/readme.md @@ -71,6 +71,10 @@ class ServiceImpl(session: Session) extends Service { override def put(value: Model) = insertQuery(value).execute.unit.provide(Has(session)) override def get(id: Int) = selectQuery(id).selectFirst.provide(Has(session)) + + // alternatively, to avoid providing environment each time + def insert(value: Model) = session.execute(insertQuery(value)).unit + def select(id: Int) = session.selectFirst(selectQuery(id)) } ``` diff --git a/src/it/scala/zio/cassandra/session/SessionSpec.scala b/src/it/scala/zio/cassandra/session/SessionSpec.scala index cdadca8..373a947 100644 --- a/src/it/scala/zio/cassandra/session/SessionSpec.scala +++ b/src/it/scala/zio/cassandra/session/SessionSpec.scala @@ -4,11 +4,14 @@ import com.datastax.oss.driver.api.core.CqlSession import com.datastax.oss.driver.api.core.cql.SimpleStatement import com.datastax.oss.driver.api.core.servererrors.InvalidQueryException import com.dimafeng.testcontainers.CassandraContainer +import zio.cassandra.session.cql.CqlStringContext +import zio.cassandra.session.cql.unsafe.lift import zio.test.Assertion._ import zio.test._ import zio.{ Chunk, Task, ZIO } import java.net.InetSocketAddress +import java.util.UUID object SessionSpec extends CassandraSpecUtils { @@ -54,6 +57,23 @@ object SessionSpec extends CassandraSpecUtils { .runCollect } yield assertTrue(results == Chunk("one", "two", "three")) }, + testM("select interpolated query (cqlConst) should return prepared data") { + for { + session <- ZIO.service[Session] + results <- session + .select(cqlConst"select data FROM $keyspace.test_data WHERE id IN (1,2,3)".as[String]) + .runCollect + } yield assertTrue(results == Chunk("one", "two", "three")) + }, + testM("select interpolated query (cql) should return prepared data") { + for { + session <- ZIO.service[Session] + ids = List(1L, 2L, 3L) + results <- session + .select(cql"select data FROM ${lift(keyspace)}.test_data WHERE id IN $ids".as[String]) + .runCollect + } yield assertTrue(results == Chunk("one", "two", "three")) + }, testM("select should be pure stream") { for { session <- ZIO.service[Session] @@ -65,7 +85,7 @@ object SessionSpec extends CassandraSpecUtils { results <- selectStream } yield assertTrue(results == Chunk("one", "two", "three")) }, - testM("selectOne should return None on empty result") { + testM("selectFirst should return None on empty result") { for { session <- ZIO.service[Session] result <- session @@ -73,7 +93,7 @@ object SessionSpec extends CassandraSpecUtils { .map(_.map(_.getString(0))) } yield assertTrue(result.isEmpty) }, - testM("selectOne should return Some for one") { + testM("selectFirst should return Some for one") { for { session <- ZIO.service[Session] result <- session @@ -88,6 +108,19 @@ object SessionSpec extends CassandraSpecUtils { .map(_.map(_.getString(0))) } yield assertTrue(result.contains(null)) }, + testM("selectFirst interpolated query (cqlConst) should return Some") { + for { + session <- ZIO.service[Session] + result <- session.selectFirst(cqlConst"select data FROM $keyspace.test_data WHERE id = 1".as[String]) + } yield assertTrue(result.contains("one")) + }, + testM("selectFirst interpolated query (cql) should return Some") { + for { + session <- ZIO.service[Session] + id = 1L + result <- session.selectFirst(cql"select data FROM ${lift(keyspace)}.test_data WHERE id = $id".as[String]) + } yield assertTrue(result.contains("one")) + }, testM("select will emit in chunks sized equal to statement pageSize") { val st = SimpleStatement.newInstance(s"select data from $keyspace.test_data").setPageSize(2) for { @@ -95,6 +128,13 @@ object SessionSpec extends CassandraSpecUtils { stream = session.select(st) chunkSizes <- stream.mapChunks(ch => Chunk.single(ch.size)).runCollect } yield assert(chunkSizes)(forall(equalTo(2))) && assertTrue(chunkSizes.size > 1) + }, + testM("execute will create a table") { + for { + session <- ZIO.service[Session] + table = UUID.randomUUID().toString.replaceAll("-", "_") + created <- session.execute(cqlConst"create table $keyspace.$table(id text primary key)") + } yield assertTrue(created) } ) } diff --git a/src/main/scala/zio/cassandra/session/Session.scala b/src/main/scala/zio/cassandra/session/Session.scala index 69767e4..2a97194 100644 --- a/src/main/scala/zio/cassandra/session/Session.scala +++ b/src/main/scala/zio/cassandra/session/Session.scala @@ -5,7 +5,9 @@ import com.datastax.oss.driver.api.core.cql._ import com.datastax.oss.driver.api.core.metadata.Metadata import com.datastax.oss.driver.api.core.metrics.Metrics import com.datastax.oss.driver.api.core.{ CqlIdentifier, CqlSession, CqlSessionBuilder } +import shapeless.HList import zio._ +import zio.cassandra.session.cql.query.{ ParameterizedQuery, PreparedQuery, Query, QueryTemplate } import zio.macros.accessible import zio.stream.Stream import zio.stream.ZStream.Pull @@ -15,6 +17,7 @@ import scala.jdk.OptionConverters.RichOptional @accessible trait Session { + def prepare(stmt: String): Task[PreparedStatement] def execute(stmt: Statement[_]): Task[AsyncResultSet] @@ -26,6 +29,23 @@ trait Session { // short-cuts def selectFirst(stmt: Statement[_]): Task[Option[Row]] + final def prepare[V <: HList, R](query: QueryTemplate[V, R]): Task[PreparedQuery[V, R]] = { + import query.{ binder, reads } + prepare(query.query).map(new PreparedQuery[V, R](this, _, query.config)) + } + + final def prepare[V <: HList, R](query: ParameterizedQuery[V, R]): Task[Query[R]] = + prepare(query.template).map(_.applyProduct(query.values)) + + final def execute[V <: HList](query: ParameterizedQuery[V, _]): Task[Boolean] = + prepare(query).flatMap(_.execute) + + final def select[V <: HList, R](query: ParameterizedQuery[V, R]): Stream[Throwable, R] = + Stream.fromEffect(prepare(query)).flatMap(_.select) + + final def selectFirst[V <: HList, R](query: ParameterizedQuery[V, R]): Task[Option[R]] = + prepare(query).flatMap(_.selectFirst) + // other methods def metrics: Option[Metrics] def name: String @@ -36,6 +56,7 @@ trait Session { def context: DriverContext def keyspace: Option[CqlIdentifier] + } object Session { diff --git a/src/main/scala/zio/cassandra/session/cql/FieldName.scala b/src/main/scala/zio/cassandra/session/cql/FieldName.scala deleted file mode 100644 index 4c3eb9e..0000000 --- a/src/main/scala/zio/cassandra/session/cql/FieldName.scala +++ /dev/null @@ -1,10 +0,0 @@ -package zio.cassandra.session.cql - -/** This type is used by FromUdtValue and ToUdtValue to decide whether to utilize schema data when reading and writing - * data from the Datastax DataType - */ -private[cql] sealed trait FieldName -object FieldName { - case object Unused extends FieldName - final case class Labelled(value: String) extends FieldName -} diff --git a/src/main/scala/zio/cassandra/session/cql/query/ParameterizedQuery.scala b/src/main/scala/zio/cassandra/session/cql/query/ParameterizedQuery.scala index 212c970..e0bd32b 100644 --- a/src/main/scala/zio/cassandra/session/cql/query/ParameterizedQuery.scala +++ b/src/main/scala/zio/cassandra/session/cql/query/ParameterizedQuery.scala @@ -7,20 +7,17 @@ import zio.cassandra.session.Session import zio.cassandra.session.cql.Binder import zio.cassandra.session.cql.codec.Reads import zio.stream.{ Stream, ZStream } -import zio.{ Has, RIO } +import zio.{ Has, RIO, ZIO } case class ParameterizedQuery[V <: HList: Binder, R: Reads] private (template: QueryTemplate[V, R], values: V) { def +(that: String): ParameterizedQuery[V, R] = ParameterizedQuery[V, R](this.template + that, this.values) def as[R1: Reads]: ParameterizedQuery[V, R1] = ParameterizedQuery[V, R1](template.as[R1], values) - def select: ZStream[Has[Session], Throwable, R] = - Stream.unwrap(template.prepare.map(_.applyProduct(values).select)) + def select: ZStream[Has[Session], Throwable, R] = Stream.serviceWithStream(_.select(this)) - def selectFirst: RIO[Has[Session], Option[R]] = - template.prepare.flatMap(_.applyProduct(values).selectFirst) + def selectFirst: RIO[Has[Session], Option[R]] = ZIO.serviceWith(_.selectFirst(this)) - def execute: RIO[Has[Session], Boolean] = - template.prepare.map(_.applyProduct(values)).flatMap(_.execute) + def execute: RIO[Has[Session], Boolean] = ZIO.serviceWith(_.execute(this)) def config(config: BoundStatement => BoundStatement): ParameterizedQuery[V, R] = ParameterizedQuery[V, R](template.config(config), values) diff --git a/src/main/scala/zio/cassandra/session/cql/query/PreparedQuery.scala b/src/main/scala/zio/cassandra/session/cql/query/PreparedQuery.scala index 2ad6151..39ef4ae 100644 --- a/src/main/scala/zio/cassandra/session/cql/query/PreparedQuery.scala +++ b/src/main/scala/zio/cassandra/session/cql/query/PreparedQuery.scala @@ -6,7 +6,7 @@ import zio.cassandra.session.cql.Binder import zio.cassandra.session.Session import zio.cassandra.session.cql.codec.Reads -class PreparedQuery[V <: HList: Binder, R: Reads] private[cql] ( +class PreparedQuery[V <: HList: Binder, R: Reads] private[session] ( session: Session, statement: PreparedStatement, config: BoundStatement => BoundStatement diff --git a/src/main/scala/zio/cassandra/session/cql/query/Query.scala b/src/main/scala/zio/cassandra/session/cql/query/Query.scala index 87ab306..f3db009 100644 --- a/src/main/scala/zio/cassandra/session/cql/query/Query.scala +++ b/src/main/scala/zio/cassandra/session/cql/query/Query.scala @@ -1,10 +1,10 @@ package zio.cassandra.session.cql.query import com.datastax.oss.driver.api.core.cql.BoundStatement -import zio.{ Task, ZIO } -import zio.stream.Stream import zio.cassandra.session.Session import zio.cassandra.session.cql.codec.Reads +import zio.stream.Stream +import zio.{ Task, ZIO } class Query[R: Reads] private[cql] ( session: Session, diff --git a/src/main/scala/zio/cassandra/session/cql/query/QueryTemplate.scala b/src/main/scala/zio/cassandra/session/cql/query/QueryTemplate.scala index 0cf2dd7..9462c56 100644 --- a/src/main/scala/zio/cassandra/session/cql/query/QueryTemplate.scala +++ b/src/main/scala/zio/cassandra/session/cql/query/QueryTemplate.scala @@ -8,17 +8,14 @@ import zio.cassandra.session.cql.Binder import zio.cassandra.session.cql.codec.Reads import zio.{ Has, RIO, ZIO } -case class QueryTemplate[V <: HList: Binder, R: Reads] private[cql] ( +case class QueryTemplate[V <: HList, R] private[cql] ( query: String, config: BoundStatement => BoundStatement -) { +)(implicit val binder: Binder[V], val reads: Reads[R]) { def +(that: String): QueryTemplate[V, R] = QueryTemplate[V, R](this.query + that, config) def as[R1: Reads]: QueryTemplate[V, R1] = QueryTemplate[V, R1](query, config) - def prepare: RIO[Has[Session], PreparedQuery[V, R]] = - ZIO.accessM[Has[Session]] { session => - session.get.prepare(query).map(new PreparedQuery(session.get, _, config)) - } + def prepare: RIO[Has[Session], PreparedQuery[V, R]] = ZIO.serviceWith(_.prepare(this)) def config(config: BoundStatement => BoundStatement): QueryTemplate[V, R] = QueryTemplate[V, R](this.query, this.config andThen config)