Skip to content

Commit

Permalink
Merge pull request #21 from narma/add_more_powerful_versions_of_execu…
Browse files Browse the repository at this point in the history
…te_select_selectFirst_zio-1.x

add more powerful versions of execute, select and selectFirst
  • Loading branch information
myazinn authored Sep 26, 2022
2 parents da99d21 + b9b2e28 commit 4f9fc28
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 28 deletions.
4 changes: 4 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ class ServiceImpl(session: Session) extends Service {

override def put(value: Model) = insertQuery(value).execute.unit.provide(Has(session))
override def get(id: Int) = selectQuery(id).selectFirst.provide(Has(session))

// alternatively, to avoid providing environment each time
def insert(value: Model) = session.execute(insertQuery(value)).unit
def select(id: Int) = session.selectFirst(selectQuery(id))
}
```

Expand Down
44 changes: 42 additions & 2 deletions src/it/scala/zio/cassandra/session/SessionSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.core.cql.SimpleStatement
import com.datastax.oss.driver.api.core.servererrors.InvalidQueryException
import com.dimafeng.testcontainers.CassandraContainer
import zio.cassandra.session.cql.CqlStringContext
import zio.cassandra.session.cql.unsafe.lift
import zio.test.Assertion._
import zio.test._
import zio.{ Chunk, Task, ZIO }

import java.net.InetSocketAddress
import java.util.UUID

object SessionSpec extends CassandraSpecUtils {

Expand Down Expand Up @@ -54,6 +57,23 @@ object SessionSpec extends CassandraSpecUtils {
.runCollect
} yield assertTrue(results == Chunk("one", "two", "three"))
},
testM("select interpolated query (cqlConst) should return prepared data") {
for {
session <- ZIO.service[Session]
results <- session
.select(cqlConst"select data FROM $keyspace.test_data WHERE id IN (1,2,3)".as[String])
.runCollect
} yield assertTrue(results == Chunk("one", "two", "three"))
},
testM("select interpolated query (cql) should return prepared data") {
for {
session <- ZIO.service[Session]
ids = List(1L, 2L, 3L)
results <- session
.select(cql"select data FROM ${lift(keyspace)}.test_data WHERE id IN $ids".as[String])
.runCollect
} yield assertTrue(results == Chunk("one", "two", "three"))
},
testM("select should be pure stream") {
for {
session <- ZIO.service[Session]
Expand All @@ -65,15 +85,15 @@ object SessionSpec extends CassandraSpecUtils {
results <- selectStream
} yield assertTrue(results == Chunk("one", "two", "three"))
},
testM("selectOne should return None on empty result") {
testM("selectFirst should return None on empty result") {
for {
session <- ZIO.service[Session]
result <- session
.selectFirst(s"select data FROM $keyspace.test_data WHERE id = 404")
.map(_.map(_.getString(0)))
} yield assertTrue(result.isEmpty)
},
testM("selectOne should return Some for one") {
testM("selectFirst should return Some for one") {
for {
session <- ZIO.service[Session]
result <- session
Expand All @@ -88,13 +108,33 @@ object SessionSpec extends CassandraSpecUtils {
.map(_.map(_.getString(0)))
} yield assertTrue(result.contains(null))
},
testM("selectFirst interpolated query (cqlConst) should return Some") {
for {
session <- ZIO.service[Session]
result <- session.selectFirst(cqlConst"select data FROM $keyspace.test_data WHERE id = 1".as[String])
} yield assertTrue(result.contains("one"))
},
testM("selectFirst interpolated query (cql) should return Some") {
for {
session <- ZIO.service[Session]
id = 1L
result <- session.selectFirst(cql"select data FROM ${lift(keyspace)}.test_data WHERE id = $id".as[String])
} yield assertTrue(result.contains("one"))
},
testM("select will emit in chunks sized equal to statement pageSize") {
val st = SimpleStatement.newInstance(s"select data from $keyspace.test_data").setPageSize(2)
for {
session <- ZIO.service[Session]
stream = session.select(st)
chunkSizes <- stream.mapChunks(ch => Chunk.single(ch.size)).runCollect
} yield assert(chunkSizes)(forall(equalTo(2))) && assertTrue(chunkSizes.size > 1)
},
testM("execute will create a table") {
for {
session <- ZIO.service[Session]
table = UUID.randomUUID().toString.replaceAll("-", "_")
created <- session.execute(cqlConst"create table $keyspace.$table(id text primary key)")
} yield assertTrue(created)
}
)
}
21 changes: 21 additions & 0 deletions src/main/scala/zio/cassandra/session/Session.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import com.datastax.oss.driver.api.core.cql._
import com.datastax.oss.driver.api.core.metadata.Metadata
import com.datastax.oss.driver.api.core.metrics.Metrics
import com.datastax.oss.driver.api.core.{ CqlIdentifier, CqlSession, CqlSessionBuilder }
import shapeless.HList
import zio._
import zio.cassandra.session.cql.query.{ ParameterizedQuery, PreparedQuery, Query, QueryTemplate }
import zio.macros.accessible
import zio.stream.Stream
import zio.stream.ZStream.Pull
Expand All @@ -15,6 +17,7 @@ import scala.jdk.OptionConverters.RichOptional

@accessible
trait Session {

def prepare(stmt: String): Task[PreparedStatement]

def execute(stmt: Statement[_]): Task[AsyncResultSet]
Expand All @@ -26,6 +29,23 @@ trait Session {
// short-cuts
def selectFirst(stmt: Statement[_]): Task[Option[Row]]

final def prepare[V <: HList, R](query: QueryTemplate[V, R]): Task[PreparedQuery[V, R]] = {
import query.{ binder, reads }
prepare(query.query).map(new PreparedQuery[V, R](this, _, query.config))
}

final def prepare[V <: HList, R](query: ParameterizedQuery[V, R]): Task[Query[R]] =
prepare(query.template).map(_.applyProduct(query.values))

final def execute[V <: HList](query: ParameterizedQuery[V, _]): Task[Boolean] =
prepare(query).flatMap(_.execute)

final def select[V <: HList, R](query: ParameterizedQuery[V, R]): Stream[Throwable, R] =
Stream.fromEffect(prepare(query)).flatMap(_.select)

final def selectFirst[V <: HList, R](query: ParameterizedQuery[V, R]): Task[Option[R]] =
prepare(query).flatMap(_.selectFirst)

// other methods
def metrics: Option[Metrics]
def name: String
Expand All @@ -36,6 +56,7 @@ trait Session {

def context: DriverContext
def keyspace: Option[CqlIdentifier]

}

object Session {
Expand Down
10 changes: 0 additions & 10 deletions src/main/scala/zio/cassandra/session/cql/FieldName.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,17 @@ import zio.cassandra.session.Session
import zio.cassandra.session.cql.Binder
import zio.cassandra.session.cql.codec.Reads
import zio.stream.{ Stream, ZStream }
import zio.{ Has, RIO }
import zio.{ Has, RIO, ZIO }

case class ParameterizedQuery[V <: HList: Binder, R: Reads] private (template: QueryTemplate[V, R], values: V) {
def +(that: String): ParameterizedQuery[V, R] = ParameterizedQuery[V, R](this.template + that, this.values)
def as[R1: Reads]: ParameterizedQuery[V, R1] = ParameterizedQuery[V, R1](template.as[R1], values)

def select: ZStream[Has[Session], Throwable, R] =
Stream.unwrap(template.prepare.map(_.applyProduct(values).select))
def select: ZStream[Has[Session], Throwable, R] = Stream.serviceWithStream(_.select(this))

def selectFirst: RIO[Has[Session], Option[R]] =
template.prepare.flatMap(_.applyProduct(values).selectFirst)
def selectFirst: RIO[Has[Session], Option[R]] = ZIO.serviceWith(_.selectFirst(this))

def execute: RIO[Has[Session], Boolean] =
template.prepare.map(_.applyProduct(values)).flatMap(_.execute)
def execute: RIO[Has[Session], Boolean] = ZIO.serviceWith(_.execute(this))

def config(config: BoundStatement => BoundStatement): ParameterizedQuery[V, R] =
ParameterizedQuery[V, R](template.config(config), values)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import zio.cassandra.session.cql.Binder
import zio.cassandra.session.Session
import zio.cassandra.session.cql.codec.Reads

class PreparedQuery[V <: HList: Binder, R: Reads] private[cql] (
class PreparedQuery[V <: HList: Binder, R: Reads] private[session] (
session: Session,
statement: PreparedStatement,
config: BoundStatement => BoundStatement
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/zio/cassandra/session/cql/query/Query.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package zio.cassandra.session.cql.query

import com.datastax.oss.driver.api.core.cql.BoundStatement
import zio.{ Task, ZIO }
import zio.stream.Stream
import zio.cassandra.session.Session
import zio.cassandra.session.cql.codec.Reads
import zio.stream.Stream
import zio.{ Task, ZIO }

class Query[R: Reads] private[cql] (
session: Session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,14 @@ import zio.cassandra.session.cql.Binder
import zio.cassandra.session.cql.codec.Reads
import zio.{ Has, RIO, ZIO }

case class QueryTemplate[V <: HList: Binder, R: Reads] private[cql] (
case class QueryTemplate[V <: HList, R] private[cql] (
query: String,
config: BoundStatement => BoundStatement
) {
)(implicit val binder: Binder[V], val reads: Reads[R]) {
def +(that: String): QueryTemplate[V, R] = QueryTemplate[V, R](this.query + that, config)
def as[R1: Reads]: QueryTemplate[V, R1] = QueryTemplate[V, R1](query, config)

def prepare: RIO[Has[Session], PreparedQuery[V, R]] =
ZIO.accessM[Has[Session]] { session =>
session.get.prepare(query).map(new PreparedQuery(session.get, _, config))
}
def prepare: RIO[Has[Session], PreparedQuery[V, R]] = ZIO.serviceWith(_.prepare(this))

def config(config: BoundStatement => BoundStatement): QueryTemplate[V, R] =
QueryTemplate[V, R](this.query, this.config andThen config)
Expand Down

0 comments on commit 4f9fc28

Please sign in to comment.