Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ abstract class ClusterManagerTestBase(s: String)
bootProps.setProperty("log-level", "config")
// Easier to switch ON traces. thats why added this.
// bootProps.setProperty("gemfirexd.debug.true",
// "QueryDistribution,TraceExecution,TraceActivation")
// "QueryDistribution,TraceExecution,TraceActivation,TraceTran")
bootProps.setProperty("statistic-archive-file", "snappyStore.gfs")
bootProps.setProperty("spark.executor.cores",
TestUtils.defaultCores.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.language.postfixOps
import com.gemstone.gemfire.internal.cache.PartitionedRegion
import com.pivotal.gemfirexd.internal.engine.Misc
import com.pivotal.gemfirexd.internal.shared.common.reference.SQLState
import io.snappydata.SnappyTableStatsProviderService
import io.snappydata.{Property, SnappyTableStatsProviderService}
import io.snappydata.core.{TestData, TestData2}
import io.snappydata.store.ClusterSnappyJoinSuite
import io.snappydata.test.dunit.{AvailablePortHelper, SerializableRunnable}
Expand Down Expand Up @@ -68,6 +68,12 @@ class SplitSnappyClusterDUnitTest(s: String)
super.afterClass()
}

def testCreateTablesFromOtherTables(): Unit = {
vm3.invoke(getClass, "createTablesFromOtherTablesTest",
startArgs :+
Int.box(locatorClientPort))
}

override protected def locatorClientPort = { locatorNetPort }

override protected def startNetworkServers(): Unit = {
Expand Down Expand Up @@ -728,10 +734,116 @@ object SplitSnappyClusterDUnitTest
rs = resultDF.collect()
case e: Exception => throw e
}
assert(rs.length == 5)
assert(rs.length == 5, s"Expected 5 but got ${rs.length}")
assert(rs(0).getAs[String]("COL1").equals("AA"))
assert(rs(0).getAs[String]("COL2").equals("AA"))

connectorSnc.dropTable("APP.T1")
}

def createTablesFromOtherTablesTest(locatorPort: Int,
prop: Properties,
locatorClientPort: Int): Unit = {

val props = Map.empty[String, String]
val snc = getSnappyContextForConnector(locatorPort,
locatorClientPort)
val rowTable = "rowTable"
val colTable = "colTable"

Property.ColumnBatchSize.set(snc.sessionState.conf, 30)
val rdd = sc.parallelize(
(1 to 113999).map(i => new TestRecord(i, i + 1, i + 2)))
val dataDF = snc.createDataFrame(rdd)

snc.createTable(rowTable, "row", dataDF.schema, props)
dataDF.write.format("row").mode(SaveMode.Append).options(props).saveAsTable(rowTable)

snc.createTable(colTable, "column", dataDF.schema, props + ("BUCKETS" -> "17"))
dataDF.write.format("column").mode(SaveMode.Append).options(props).saveAsTable(colTable)

executeTestWithOptions(locatorPort, locatorClientPort)
executeTestWithOptions(locatorPort, locatorClientPort,
Map.empty,Map.empty+("BUCKETS" ->"17"),"","BUCKETS " +
"'13',PARTITION_BY 'COL1', REDUNDANCY '1'")

}

def executeTestWithOptions(locatorPort:Int, locatorClientPort: Int,
rowTableOptios: Map[String, String] = Map.empty[String,String],
colTableOptions: Map[String,String]= Map.empty[String,String],
tempRowTableOptions: String = "",
tempColTableOptions: String = ""): Unit = {

val snc = getSnappyContextForConnector(locatorPort,
locatorClientPort)
val rowTable = "rowTable"
val colTable = "colTable"


snc.sql("DROP TABLE IF EXISTS " + rowTable)
snc.sql("DROP TABLE IF EXISTS " + colTable)
Property.ColumnBatchSize.set(snc.sessionState.conf, 30)
val rdd = sc.parallelize(
(1 to 113999).map(i => new TestRecord(i, i + 1, i + 2)))
val dataDF = snc.createDataFrame(rdd)

snc.createTable(rowTable, "row", dataDF.schema, rowTableOptios)
dataDF.write.format("row").mode(SaveMode.Append).options(rowTableOptios).saveAsTable(rowTable)

snc.createTable(colTable, "column", dataDF.schema, colTableOptions)
dataDF.write.format("column").mode(SaveMode.Append).options(colTableOptions).saveAsTable(colTable)

val tempRowTableName = "testRowTable1"
val tempColTableName = "testcolTable1"


snc.sql("DROP TABLE IF EXISTS " + tempRowTableName)
snc.sql(s"CREATE TABLE " + tempRowTableName + s" using row options($tempRowTableOptions) AS" +
s" (SELECT col1 ,col2 FROM " + rowTable + ")")
val testResults1 = snc.sql("SELECT * FROM " + tempRowTableName).collect
assert(testResults1.length == 113999, s"Expected row count is 113999 while actual count is " +
s"${testResults1.length}")


snc.sql("DROP TABLE IF EXISTS " + tempRowTableName)
snc.sql("CREATE TABLE " + tempRowTableName + s" using row options($tempRowTableOptions) AS " +
s"(SELECT col1 ,col2 FROM " + colTable + ")")
val testResults2 = snc.sql("SELECT * FROM " + tempRowTableName).collect()
assert(testResults2.length == 113999, s"Expected row count is 113999 while actual count is " +
s"${testResults2.length}")

snc.sql("DROP TABLE IF EXISTS " + tempColTableName)
snc.sql("CREATE TABLE " + tempColTableName + s" USING COLUMN OPTIONS($tempColTableOptions) AS (SELECT " +
s"col1 ,col2 FROM " + rowTable + ")")

val testResults3 = snc.sql("SELECT * FROM " + tempColTableName).collect
assert(testResults3.length == 113999, s"Expected row count is 113999 while actual count is " +
s"${testResults3.length}")

snc.sql("DROP TABLE IF EXISTS " + tempColTableName)
snc.sql("CREATE TABLE " + tempColTableName + s" USING COLUMN OPTIONS($tempColTableOptions) AS (SELECT " +
"col1 ,col2 FROM " + colTable + ")")

val testResults4 = snc.sql("SELECT * FROM " + tempColTableName).collect
assert(testResults4.length == 113999, s"Expected row count is 113999 while actual count is" +
s"${testResults4.length}")

snc.sql("DROP TABLE IF EXISTS " + tempColTableName)

snc.sql("CREATE TABLE " + tempColTableName + s" USING COLUMN OPTIONS($tempColTableOptions) AS (SELECT " +
"t1.col1 ,t1.col2 FROM " + colTable + " t1," + rowTable + " t2 where t1.col1=t2.col2)")
// Expected count will be 113998 as first row will not match
val testResults5 = snc.sql("SELECT * FROM " + tempColTableName).collect

assert(testResults5.length == 113998, s"Expected row count is 113998 while actual count is" +
s"${testResults5.length}")

snc.sql("DROP TABLE IF EXISTS " + tempColTableName)
snc.sql("DROP TABLE IF EXISTS " + tempRowTableName)

snc.sql("DROP TABLE IF EXISTS " + rowTable)
snc.sql("DROP TABLE IF EXISTS " + colTable)

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ final class SparkShellRDDHelper {
}

def executeQuery(conn: Connection, tableName: String,
split: Partition, query: String, relDestroyVersion: Int): (Statement, ResultSet) = {
split: Partition, query: String, relDestroyVersion: Int): (Statement, ResultSet, String) = {
DriverRegistry.register(Constant.JDBC_CLIENT_DRIVER)
val resolvedName = StoreUtils.lookupName(tableName, conn.getSchema)

Expand Down Expand Up @@ -110,7 +110,7 @@ final class SparkShellRDDHelper {
}

val rs = statement.executeQuery(query)
(statement, rs)
(statement, rs, txId)
}

def getConnection(connectionProperties: ConnectionProperties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,20 +526,21 @@ final class SmartConnectorColumnRDD(
val (fetchStatsQuery, fetchColQuery) = helper.getSQLStatement(resolvedTableName,
split.index, requiredColumns.map(_.replace(store.columnPrefix, "")), schema)
// fetch the stats
val (statement, rs) = helper.executeQuery(conn, tableName, split,
val (statement, rs, txId) = helper.executeQuery(conn, tableName, split,
fetchStatsQuery, relDestroyVersion)
val itr = new ColumnBatchIteratorOnRS(conn, requiredColumns, statement, rs,
context, fetchColQuery)

if (context ne null) {
context.addTaskCompletionListener { _ =>
val txid = SparkShellRDDHelper.snapshotTxId.get()
if ((txid ne null) && !txid.equals("null")
logDebug(s"The txid going to be committed is $txId " + tableName)

if ((txId ne null) && !txId.equals("null")
/* && !(tx.asInstanceOf[TXStateProxy]).isClosed() */ ) {
val ps = conn.prepareStatement(s"call sys.COMMIT_SNAPSHOT_TXID(?)")
ps.setString(1, txid)
ps.setString(1, txId)
ps.executeUpdate()
logDebug(s"The txid being committed is $txid")
logDebug(s"The txid being committed is $txId")
ps.close()
SparkShellRDDHelper.snapshotTxId.set(null)
}
Expand Down Expand Up @@ -595,6 +596,24 @@ class SmartConnectorRowRDD(_session: SnappySession,
pushProjections = true, useResultSet = true, _connProperties,
_filters, _partEval, _commitTx) {


override def commitTxBeforeTaskCompletion(conn: Option[Connection], context: TaskContext) = {
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => {
val txId = SparkShellRDDHelper.snapshotTxId.get
logDebug(s"The txid going to be committed is $txId " + tableName)
if ((txId ne null) && !txId.equals("null")
/* && !(tx.asInstanceOf[TXStateProxy]).isClosed() */ ) {
val ps = conn.get.prepareStatement(s"call sys.COMMIT_SNAPSHOT_TXID(?)")
ps.setString(1, txId)
ps.executeUpdate()
logDebug(s"The txid being committed is $txId")
ps.close()
SparkShellRDDHelper.snapshotTxId.set(null)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be done after all tasks are done? Won't a commit in the middle of another task execution cause trouble?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes in general. But for read operations if the txState is already being used in an iterator , even if the state is closed, we compare with the snapshot stored.
For write operations this would cause issue if tx are committed by a task prematurely, however currently I haven't found a scenario for that in this task.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@suranjan
Even then a read in a collocated join may not even be scheduled (or scheduled by Spark but not by OS) while another task commits. In that case the two will use different snapshots? I don't see what kind of semantics this change is trying to provide.

))
}

override def computeResultSet(
thePart: Partition): (Connection, Statement, ResultSet) = {
val helper = new SparkShellRDDHelper
Expand Down Expand Up @@ -627,17 +646,28 @@ class SmartConnectorRowRDD(_session: SnappySession,
stmt.setFetchSize(fetchSize.toInt)
}

val txId = SparkShellRDDHelper.snapshotTxId.get
if (txId != null) {
if (!txId.equals("null")) {
val statement = conn.createStatement()
statement.execute(
s"call sys.USE_SNAPSHOT_TXID('$txId')")
}
}

val rs = stmt.executeQuery()

// get the txid which was used to take the snapshot.
if (!_commitTx) {
val getSnapshotTXId = conn.prepareCall(s"call sys.GET_SNAPSHOT_TXID (?)")
getSnapshotTXId.registerOutParameter(1, java.sql.Types.VARCHAR)
getSnapshotTXId.execute
getSnapshotTXId.execute()
val txid: String = getSnapshotTXId.getString(1)
getSnapshotTXId.close()
SparkShellRDDHelper.snapshotTxId.set(txid)
logDebug(s"The snapshot tx id is ${txid} and tablename is ${tableName}")
}
logDebug(s"The previous snapshot tx id is ${txId} and tablename is ${tableName}")
(conn, stmt, rs)
}

Expand Down
2 changes: 1 addition & 1 deletion store
Submodule store updated from aa8737 to 6e547b