From df8618784aa783c205544a63360a854843037e7c Mon Sep 17 00:00:00 2001 From: "nikita.myazin" Date: Sat, 24 Sep 2022 14:32:11 +0300 Subject: [PATCH] add more powerful versions of execute, select and selectFirst --- readme.md | 5 +++ .../zio/cassandra/session/SessionSpec.scala | 44 ++++++++++++++++++- .../scala/zio/cassandra/session/Session.scala | 23 +++++++++- .../zio/cassandra/session/cql/FieldName.scala | 10 ----- .../cql/query/ParameterizedQuery.scala | 11 ++--- .../session/cql/query/PreparedQuery.scala | 2 +- .../cassandra/session/cql/query/Query.scala | 10 ++--- .../session/cql/query/QueryTemplate.scala | 9 ++-- 8 files changed, 82 insertions(+), 32 deletions(-) delete mode 100644 src/main/scala/zio/cassandra/session/cql/FieldName.scala diff --git a/readme.md b/readme.md index 4763928..953eec4 100644 --- a/readme.md +++ b/readme.md @@ -70,6 +70,11 @@ class ServiceImpl(session: Session) extends Service { override def put(value: Model) = insertQuery(value).execute.unit.provide(ZLayer.succeed(session)) override def get(id: Int) = selectQuery(id).selectFirst.provideSome(ZLayer.succeed(session)) + + // alternatively, to avoid providing environment each time + def insert(value: Model) = session.execute(insertQuery(value)).unit + def select(id: Int) = session.selectFirst(selectQuery(id)) + } ``` diff --git a/src/it/scala/zio/cassandra/session/SessionSpec.scala b/src/it/scala/zio/cassandra/session/SessionSpec.scala index fd5e38a..c505087 100644 --- a/src/it/scala/zio/cassandra/session/SessionSpec.scala +++ b/src/it/scala/zio/cassandra/session/SessionSpec.scala @@ -4,11 +4,14 @@ import com.datastax.oss.driver.api.core.CqlSession import com.datastax.oss.driver.api.core.cql.SimpleStatement import com.datastax.oss.driver.api.core.servererrors.InvalidQueryException import com.dimafeng.testcontainers.CassandraContainer +import zio.cassandra.session.cql.CqlStringContext +import zio.cassandra.session.cql.unsafe.lift import zio.test.Assertion._ import zio.test._ import zio.{ Chunk, Scope, ZIO } import java.net.InetSocketAddress +import java.util.UUID object SessionSpec extends ZIOCassandraSpec with ZIOCassandraSpecUtils { @@ -54,6 +57,23 @@ object SessionSpec extends ZIOCassandraSpec with ZIOCassandraSpecUtils { .runCollect } yield assertTrue(results == Chunk("one", "two", "three")) }, + test("select interpolated query (cqlConst) should return prepared data") { + for { + session <- ZIO.service[Session] + results <- session + .select(cqlConst"select data FROM $keyspace.test_data WHERE id IN (1,2,3)".as[String]) + .runCollect + } yield assertTrue(results == Chunk("one", "two", "three")) + }, + test("select interpolated query (cql) should return prepared data") { + for { + session <- ZIO.service[Session] + ids = List(1L, 2L, 3L) + results <- session + .select(cql"select data FROM ${lift(keyspace)}.test_data WHERE id IN $ids".as[String]) + .runCollect + } yield assertTrue(results == Chunk("one", "two", "three")) + }, test("select should be pure stream") { for { session <- ZIO.service[Session] @@ -65,7 +85,7 @@ object SessionSpec extends ZIOCassandraSpec with ZIOCassandraSpecUtils { results <- selectStream } yield assertTrue(results == Chunk("one", "two", "three")) }, - test("selectOne should return None on empty result") { + test("selectFirst should return None on empty result") { for { session <- ZIO.service[Session] result <- session @@ -73,7 +93,7 @@ object SessionSpec extends ZIOCassandraSpec with ZIOCassandraSpecUtils { .map(_.map(_.getString(0))) } yield assertTrue(result.isEmpty) }, - test("selectOne should return Some for one") { + test("selectFirst should return Some for one") { for { session <- ZIO.service[Session] result <- session @@ -88,6 +108,19 @@ object SessionSpec extends ZIOCassandraSpec with ZIOCassandraSpecUtils { .map(_.map(_.getString(0))) } yield assertTrue(result.contains(null)) }, + test("selectFirst interpolated query (cqlConst) should return Some") { + for { + session <- ZIO.service[Session] + result <- session.selectFirst(cqlConst"select data FROM $keyspace.test_data WHERE id = 1".as[String]) + } yield assertTrue(result.contains("one")) + }, + test("selectFirst interpolated query (cql) should return Some") { + for { + session <- ZIO.service[Session] + id = 1L + result <- session.selectFirst(cql"select data FROM ${lift(keyspace)}.test_data WHERE id = $id".as[String]) + } yield assertTrue(result.contains("one")) + }, test("select will emit in chunks sized equal to statement pageSize") { val st = SimpleStatement.newInstance(s"select data from $keyspace.test_data").setPageSize(2) for { @@ -102,6 +135,13 @@ object SessionSpec extends ZIOCassandraSpec with ZIOCassandraSpecUtils { session <- ZIO.service[Session] results <- session.select(st).map(_.getString(0)).runCollect } yield assert(results)(hasSameElements(Chunk("one", "two", "three"))) + }, + test("execute will create a table") { + for { + session <- ZIO.service[Session] + table = UUID.randomUUID().toString.replaceAll("-", "_") + created <- session.execute(cqlConst"create table $keyspace.$table(id text primary key)") + } yield assertTrue(created) } ) } diff --git a/src/main/scala/zio/cassandra/session/Session.scala b/src/main/scala/zio/cassandra/session/Session.scala index 11bf17e..9ec44a5 100644 --- a/src/main/scala/zio/cassandra/session/Session.scala +++ b/src/main/scala/zio/cassandra/session/Session.scala @@ -5,16 +5,19 @@ import com.datastax.oss.driver.api.core.cql._ import com.datastax.oss.driver.api.core.metadata.Metadata import com.datastax.oss.driver.api.core.metrics.Metrics import com.datastax.oss.driver.api.core.{ CqlIdentifier, CqlSession, CqlSessionBuilder } +import shapeless.HList import zio._ +import zio.cassandra.session.cql.query.{ ParameterizedQuery, PreparedQuery, Query, QueryTemplate } import zio.macros.accessible -import zio.stream.{ Stream, ZStream } import zio.stream.ZStream.Pull +import zio.stream.{ Stream, ZStream } import scala.jdk.CollectionConverters.IterableHasAsScala import scala.jdk.OptionConverters.RichOptional @accessible trait Session { + def prepare(stmt: String): Task[PreparedStatement] def execute(stmt: Statement[_]): Task[AsyncResultSet] @@ -26,6 +29,23 @@ trait Session { // short-cuts def selectFirst(stmt: Statement[_]): Task[Option[Row]] + final def prepare[V <: HList, R](query: QueryTemplate[V, R]): Task[PreparedQuery[V, R]] = { + import query.{ binder, reads } + prepare(query.query).map(new PreparedQuery[V, R](this, _, query.config)) + } + + final def prepare[V <: HList, R](query: ParameterizedQuery[V, R]): Task[Query[R]] = + prepare(query.template).map(_.applyProduct(query.values)) + + final def execute[V <: HList](query: ParameterizedQuery[V, _]): Task[Boolean] = + prepare(query).flatMap(_.execute) + + final def select[V <: HList, R](query: ParameterizedQuery[V, R]): Stream[Throwable, R] = + ZStream.fromZIO(prepare(query)).flatMap(_.select) + + final def selectFirst[V <: HList, R](query: ParameterizedQuery[V, R]): Task[Option[R]] = + prepare(query).flatMap(_.selectFirst) + // other methods def metrics: Option[Metrics] def name: String @@ -36,6 +56,7 @@ trait Session { def context: DriverContext def keyspace: Option[CqlIdentifier] + } object Session { diff --git a/src/main/scala/zio/cassandra/session/cql/FieldName.scala b/src/main/scala/zio/cassandra/session/cql/FieldName.scala deleted file mode 100644 index 4c3eb9e..0000000 --- a/src/main/scala/zio/cassandra/session/cql/FieldName.scala +++ /dev/null @@ -1,10 +0,0 @@ -package zio.cassandra.session.cql - -/** This type is used by FromUdtValue and ToUdtValue to decide whether to utilize schema data when reading and writing - * data from the Datastax DataType - */ -private[cql] sealed trait FieldName -object FieldName { - case object Unused extends FieldName - final case class Labelled(value: String) extends FieldName -} diff --git a/src/main/scala/zio/cassandra/session/cql/query/ParameterizedQuery.scala b/src/main/scala/zio/cassandra/session/cql/query/ParameterizedQuery.scala index 07f2c3b..87b70ef 100644 --- a/src/main/scala/zio/cassandra/session/cql/query/ParameterizedQuery.scala +++ b/src/main/scala/zio/cassandra/session/cql/query/ParameterizedQuery.scala @@ -3,7 +3,7 @@ package zio.cassandra.session.cql.query import com.datastax.oss.driver.api.core.cql.BoundStatement import shapeless.HList import shapeless.ops.hlist.Prepend -import zio.RIO +import zio._ import zio.cassandra.session.Session import zio.cassandra.session.cql.Binder import zio.cassandra.session.cql.codec.Reads @@ -13,14 +13,11 @@ case class ParameterizedQuery[V <: HList: Binder, R: Reads] private (template: Q def +(that: String): ParameterizedQuery[V, R] = ParameterizedQuery[V, R](this.template + that, this.values) def as[R1: Reads]: ParameterizedQuery[V, R1] = ParameterizedQuery[V, R1](template.as[R1], values) - def select: ZStream[Session, Throwable, R] = - ZStream.unwrap(template.prepare.map(_.applyProduct(values).select)) + def select: ZStream[Session, Throwable, R] = ZStream.serviceWithStream(_.select(this)) - def selectFirst: RIO[Session, Option[R]] = - template.prepare.flatMap(_.applyProduct(values).selectFirst) + def selectFirst: RIO[Session, Option[R]] = ZIO.serviceWithZIO(_.selectFirst(this)) - def execute: RIO[Session, Boolean] = - template.prepare.map(_.applyProduct(values)).flatMap(_.execute) + def execute: RIO[Session, Boolean] = ZIO.serviceWithZIO(_.execute(this)) def config(config: BoundStatement => BoundStatement): ParameterizedQuery[V, R] = ParameterizedQuery[V, R](template.config(config), values) diff --git a/src/main/scala/zio/cassandra/session/cql/query/PreparedQuery.scala b/src/main/scala/zio/cassandra/session/cql/query/PreparedQuery.scala index 2ad6151..39ef4ae 100644 --- a/src/main/scala/zio/cassandra/session/cql/query/PreparedQuery.scala +++ b/src/main/scala/zio/cassandra/session/cql/query/PreparedQuery.scala @@ -6,7 +6,7 @@ import zio.cassandra.session.cql.Binder import zio.cassandra.session.Session import zio.cassandra.session.cql.codec.Reads -class PreparedQuery[V <: HList: Binder, R: Reads] private[cql] ( +class PreparedQuery[V <: HList: Binder, R: Reads] private[session] ( session: Session, statement: PreparedStatement, config: BoundStatement => BoundStatement diff --git a/src/main/scala/zio/cassandra/session/cql/query/Query.scala b/src/main/scala/zio/cassandra/session/cql/query/Query.scala index b4e66ce..93af321 100644 --- a/src/main/scala/zio/cassandra/session/cql/query/Query.scala +++ b/src/main/scala/zio/cassandra/session/cql/query/Query.scala @@ -1,10 +1,10 @@ package zio.cassandra.session.cql.query import com.datastax.oss.driver.api.core.cql.BoundStatement -import zio.{ Task, ZIO } -import zio.stream.Stream import zio.cassandra.session.Session import zio.cassandra.session.cql.codec.Reads +import zio.stream.Stream +import zio.{ Task, ZIO } class Query[R: Reads] private[cql] ( session: Session, @@ -12,14 +12,14 @@ class Query[R: Reads] private[cql] ( ) { def config(statement: BoundStatement => BoundStatement) = new Query[R](session, statement(this.statement)) - def select: Stream[Throwable, R] = session.select(statement).mapChunksZIO { chunk => + def select: Stream[Throwable, R] = session.select(statement).mapChunksZIO { chunk => chunk.mapZIO(row => ZIO.attempt(Reads[R].read(row))) } - def selectFirst: Task[Option[R]] = session.selectFirst(statement).flatMap { + def selectFirst: Task[Option[R]] = session.selectFirst(statement).flatMap { case None => ZIO.none case Some(row) => ZIO.attempt(Reads[R].read(row)).map(Some(_)) } - def execute: Task[Boolean] = session.execute(statement).map(_.wasApplied) + def execute: Task[Boolean] = session.execute(statement).map(_.wasApplied) } diff --git a/src/main/scala/zio/cassandra/session/cql/query/QueryTemplate.scala b/src/main/scala/zio/cassandra/session/cql/query/QueryTemplate.scala index c909c6f..b1e218e 100644 --- a/src/main/scala/zio/cassandra/session/cql/query/QueryTemplate.scala +++ b/src/main/scala/zio/cassandra/session/cql/query/QueryTemplate.scala @@ -8,17 +8,14 @@ import zio.cassandra.session.cql.Binder import zio.cassandra.session.cql.codec.Reads import zio.{ RIO, ZIO } -case class QueryTemplate[V <: HList: Binder, R: Reads] private[cql] ( +case class QueryTemplate[V <: HList, R] private[cql] ( query: String, config: BoundStatement => BoundStatement -) { +)(implicit val binder: Binder[V], val reads: Reads[R]) { def +(that: String): QueryTemplate[V, R] = QueryTemplate[V, R](this.query + that, config) def as[R1: Reads]: QueryTemplate[V, R1] = QueryTemplate[V, R1](query, config) - def prepare: RIO[Session, PreparedQuery[V, R]] = - ZIO.serviceWithZIO { session => - session.prepare(query).map(new PreparedQuery(session, _, config)) - } + def prepare: RIO[Session, PreparedQuery[V, R]] = ZIO.serviceWithZIO(_.prepare(this)) def config(config: BoundStatement => BoundStatement): QueryTemplate[V, R] = QueryTemplate[V, R](this.query, this.config andThen config)