Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add more powerful versions of execute, select and selectFirst #20

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ class ServiceImpl(session: Session) extends Service {

override def put(value: Model) = insertQuery(value).execute.unit.provide(ZLayer.succeed(session))
override def get(id: Int) = selectQuery(id).selectFirst.provideSome(ZLayer.succeed(session))

// alternatively, to avoid providing environment each time
def insert(value: Model) = session.execute(insertQuery(value)).unit
def select(id: Int) = session.selectFirst(selectQuery(id))

}
```

Expand Down
44 changes: 42 additions & 2 deletions src/it/scala/zio/cassandra/session/SessionSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.core.cql.SimpleStatement
import com.datastax.oss.driver.api.core.servererrors.InvalidQueryException
import com.dimafeng.testcontainers.CassandraContainer
import zio.cassandra.session.cql.CqlStringContext
import zio.cassandra.session.cql.unsafe.lift
import zio.test.Assertion._
import zio.test._
import zio.{ Chunk, Scope, ZIO }

import java.net.InetSocketAddress
import java.util.UUID

object SessionSpec extends ZIOCassandraSpec with ZIOCassandraSpecUtils {

Expand Down Expand Up @@ -54,6 +57,23 @@ object SessionSpec extends ZIOCassandraSpec with ZIOCassandraSpecUtils {
.runCollect
} yield assertTrue(results == Chunk("one", "two", "three"))
},
test("select interpolated query (cqlConst) should return prepared data") {
for {
session <- ZIO.service[Session]
results <- session
.select(cqlConst"select data FROM $keyspace.test_data WHERE id IN (1,2,3)".as[String])
.runCollect
} yield assertTrue(results == Chunk("one", "two", "three"))
},
test("select interpolated query (cql) should return prepared data") {
for {
session <- ZIO.service[Session]
ids = List(1L, 2L, 3L)
results <- session
.select(cql"select data FROM ${lift(keyspace)}.test_data WHERE id IN $ids".as[String])
.runCollect
} yield assertTrue(results == Chunk("one", "two", "three"))
},
test("select should be pure stream") {
for {
session <- ZIO.service[Session]
Expand All @@ -65,15 +85,15 @@ object SessionSpec extends ZIOCassandraSpec with ZIOCassandraSpecUtils {
results <- selectStream
} yield assertTrue(results == Chunk("one", "two", "three"))
},
test("selectOne should return None on empty result") {
test("selectFirst should return None on empty result") {
for {
session <- ZIO.service[Session]
result <- session
.selectFirst(s"select data FROM $keyspace.test_data WHERE id = 404")
.map(_.map(_.getString(0)))
} yield assertTrue(result.isEmpty)
},
test("selectOne should return Some for one") {
test("selectFirst should return Some for one") {
for {
session <- ZIO.service[Session]
result <- session
Expand All @@ -88,6 +108,19 @@ object SessionSpec extends ZIOCassandraSpec with ZIOCassandraSpecUtils {
.map(_.map(_.getString(0)))
} yield assertTrue(result.contains(null))
},
test("selectFirst interpolated query (cqlConst) should return Some") {
for {
session <- ZIO.service[Session]
result <- session.selectFirst(cqlConst"select data FROM $keyspace.test_data WHERE id = 1".as[String])
} yield assertTrue(result.contains("one"))
},
test("selectFirst interpolated query (cql) should return Some") {
for {
session <- ZIO.service[Session]
id = 1L
result <- session.selectFirst(cql"select data FROM ${lift(keyspace)}.test_data WHERE id = $id".as[String])
} yield assertTrue(result.contains("one"))
},
test("select will emit in chunks sized equal to statement pageSize") {
val st = SimpleStatement.newInstance(s"select data from $keyspace.test_data").setPageSize(2)
for {
Expand All @@ -102,6 +135,13 @@ object SessionSpec extends ZIOCassandraSpec with ZIOCassandraSpecUtils {
session <- ZIO.service[Session]
results <- session.select(st).map(_.getString(0)).runCollect
} yield assert(results)(hasSameElements(Chunk("one", "two", "three")))
},
test("execute will create a table") {
for {
session <- ZIO.service[Session]
table = UUID.randomUUID().toString.replaceAll("-", "_")
created <- session.execute(cqlConst"create table $keyspace.$table(id text primary key)")
} yield assertTrue(created)
}
)
}
23 changes: 22 additions & 1 deletion src/main/scala/zio/cassandra/session/Session.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@ import com.datastax.oss.driver.api.core.cql._
import com.datastax.oss.driver.api.core.metadata.Metadata
import com.datastax.oss.driver.api.core.metrics.Metrics
import com.datastax.oss.driver.api.core.{ CqlIdentifier, CqlSession, CqlSessionBuilder }
import shapeless.HList
import zio._
import zio.cassandra.session.cql.query.{ ParameterizedQuery, PreparedQuery, Query, QueryTemplate }
import zio.macros.accessible
import zio.stream.{ Stream, ZStream }
import zio.stream.ZStream.Pull
import zio.stream.{ Stream, ZStream }

import scala.jdk.CollectionConverters.IterableHasAsScala
import scala.jdk.OptionConverters.RichOptional

@accessible
trait Session {

def prepare(stmt: String): Task[PreparedStatement]

def execute(stmt: Statement[_]): Task[AsyncResultSet]
Expand All @@ -26,6 +29,23 @@ trait Session {
// short-cuts
def selectFirst(stmt: Statement[_]): Task[Option[Row]]

final def prepare[V <: HList, R](query: QueryTemplate[V, R]): Task[PreparedQuery[V, R]] = {
import query.{ binder, reads }
prepare(query.query).map(new PreparedQuery[V, R](this, _, query.config))
}

final def prepare[V <: HList, R](query: ParameterizedQuery[V, R]): Task[Query[R]] =
prepare(query.template).map(_.applyProduct(query.values))

final def execute[V <: HList](query: ParameterizedQuery[V, _]): Task[Boolean] =
prepare(query).flatMap(_.execute)

final def select[V <: HList, R](query: ParameterizedQuery[V, R]): Stream[Throwable, R] =
ZStream.fromZIO(prepare(query)).flatMap(_.select)

final def selectFirst[V <: HList, R](query: ParameterizedQuery[V, R]): Task[Option[R]] =
prepare(query).flatMap(_.selectFirst)

// other methods
def metrics: Option[Metrics]
def name: String
Expand All @@ -36,6 +56,7 @@ trait Session {

def context: DriverContext
def keyspace: Option[CqlIdentifier]

}

object Session {
Expand Down
10 changes: 0 additions & 10 deletions src/main/scala/zio/cassandra/session/cql/FieldName.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package zio.cassandra.session.cql.query
import com.datastax.oss.driver.api.core.cql.BoundStatement
import shapeless.HList
import shapeless.ops.hlist.Prepend
import zio.RIO
import zio._
import zio.cassandra.session.Session
import zio.cassandra.session.cql.Binder
import zio.cassandra.session.cql.codec.Reads
Expand All @@ -13,14 +13,11 @@ case class ParameterizedQuery[V <: HList: Binder, R: Reads] private (template: Q
def +(that: String): ParameterizedQuery[V, R] = ParameterizedQuery[V, R](this.template + that, this.values)
def as[R1: Reads]: ParameterizedQuery[V, R1] = ParameterizedQuery[V, R1](template.as[R1], values)

def select: ZStream[Session, Throwable, R] =
ZStream.unwrap(template.prepare.map(_.applyProduct(values).select))
def select: ZStream[Session, Throwable, R] = ZStream.serviceWithStream(_.select(this))

def selectFirst: RIO[Session, Option[R]] =
template.prepare.flatMap(_.applyProduct(values).selectFirst)
def selectFirst: RIO[Session, Option[R]] = ZIO.serviceWithZIO(_.selectFirst(this))

def execute: RIO[Session, Boolean] =
template.prepare.map(_.applyProduct(values)).flatMap(_.execute)
def execute: RIO[Session, Boolean] = ZIO.serviceWithZIO(_.execute(this))

def config(config: BoundStatement => BoundStatement): ParameterizedQuery[V, R] =
ParameterizedQuery[V, R](template.config(config), values)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import zio.cassandra.session.cql.Binder
import zio.cassandra.session.Session
import zio.cassandra.session.cql.codec.Reads

class PreparedQuery[V <: HList: Binder, R: Reads] private[cql] (
class PreparedQuery[V <: HList: Binder, R: Reads] private[session] (
session: Session,
statement: PreparedStatement,
config: BoundStatement => BoundStatement
Expand Down
10 changes: 5 additions & 5 deletions src/main/scala/zio/cassandra/session/cql/query/Query.scala
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
package zio.cassandra.session.cql.query

import com.datastax.oss.driver.api.core.cql.BoundStatement
import zio.{ Task, ZIO }
import zio.stream.Stream
import zio.cassandra.session.Session
import zio.cassandra.session.cql.codec.Reads
import zio.stream.Stream
import zio.{ Task, ZIO }

class Query[R: Reads] private[cql] (
session: Session,
private[cql] val statement: BoundStatement
) {
def config(statement: BoundStatement => BoundStatement) = new Query[R](session, statement(this.statement))

def select: Stream[Throwable, R] = session.select(statement).mapChunksZIO { chunk =>
def select: Stream[Throwable, R] = session.select(statement).mapChunksZIO { chunk =>
chunk.mapZIO(row => ZIO.attempt(Reads[R].read(row)))
}

def selectFirst: Task[Option[R]] = session.selectFirst(statement).flatMap {
def selectFirst: Task[Option[R]] = session.selectFirst(statement).flatMap {
case None => ZIO.none
case Some(row) => ZIO.attempt(Reads[R].read(row)).map(Some(_))
}

def execute: Task[Boolean] = session.execute(statement).map(_.wasApplied)
def execute: Task[Boolean] = session.execute(statement).map(_.wasApplied)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,14 @@ import zio.cassandra.session.cql.Binder
import zio.cassandra.session.cql.codec.Reads
import zio.{ RIO, ZIO }

case class QueryTemplate[V <: HList: Binder, R: Reads] private[cql] (
case class QueryTemplate[V <: HList, R] private[cql] (
query: String,
config: BoundStatement => BoundStatement
) {
)(implicit val binder: Binder[V], val reads: Reads[R]) {
def +(that: String): QueryTemplate[V, R] = QueryTemplate[V, R](this.query + that, config)
def as[R1: Reads]: QueryTemplate[V, R1] = QueryTemplate[V, R1](query, config)

def prepare: RIO[Session, PreparedQuery[V, R]] =
ZIO.serviceWithZIO { session =>
session.prepare(query).map(new PreparedQuery(session, _, config))
}
def prepare: RIO[Session, PreparedQuery[V, R]] = ZIO.serviceWithZIO(_.prepare(this))

def config(config: BoundStatement => BoundStatement): QueryTemplate[V, R] =
QueryTemplate[V, R](this.query, this.config andThen config)
Expand Down