Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ object JdbcUtils extends Logging {
batchSize: Int,
dialect: JdbcDialect,
isolationLevel: Int,
options: JDBCOptions): Iterator[Byte] = {
options: JDBCOptions): Long = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to update this instead of updating the metric inside this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it would be cleaner to handle the metric outside of the method, as it will not update metric if savePartition throws exception. We should add the metrics update logic to the end of finally statement which doesn't seem to be cleaner if we want to do the same but inside savePartition.

In other words, this approach doesn't support iterative updates on metric, as well as no update on partially written and failed. It would totally make sense to not update if it supports transaction, but if it doesn't support transaction and it leaves some records on failure, I'm not sure we should update the metric. How we are dealing with partial output?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just revisited SparkHadoopWriter and realized it just updates the metric regardless of task success or not. Got it. I'll include metric update into savePartition method. Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I though we took care of the metric only if transaction committed like this;

    } finally {
      if (!committed) {
        // The stage must fail.  We got here through an exception path, so
        // let the exception through unless rollback() or close() want to
        // tell the user about another problem.
        if (supportsTransactions) {
          conn.rollback()
        }
        conn.close()
      } else {
        // If the transaction committed, updates the metric
        outputMetrics.setRecordsWritten(recordsWritten)

        // The stage must succeed.  We cannot propagate any exception close() might throw.
        try {
          conn.close()
        } catch {
          case e: Exception => logWarning("Transaction succeeded, but closing failed", e)
        }
      }

cc: @HyukjinKwon @wangyum

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah but looks like SparkHadoopWriter just updates the metric for any output being written - maybe that's because of nonexistence of transaction. If we take transaction into account, it would make sense to only update metric when the transaction is committed, but we might also want to update metric when both committed and supportsTransactions are false to reflect metric for dirty outputs. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that looks reasonable to me. So, can you brush up the code based on that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK let me update the patch. Thanks!

val conn = getConnection()
var committed = false

Expand Down Expand Up @@ -643,7 +643,7 @@ object JdbcUtils extends Logging {
}
}
val supportsTransactions = finalIsolationLevel != Connection.TRANSACTION_NONE

var totalUpdatedRows = 0
try {
if (supportsTransactions) {
conn.setAutoCommit(false) // Everything in the same db transaction.
Expand Down Expand Up @@ -673,12 +673,12 @@ object JdbcUtils extends Logging {
stmt.addBatch()
rowCount += 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot move rowCount outside try then just use it for the metric?

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Oct 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used to determine whether it needs one more flush or not at the end of iterating. It can just be a boolean flag, but we should have one specific variable for taking this into account anyway.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, I see.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you leave some comments somewhere about the policy to collect metrics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just added it. 6e908d1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

if (rowCount % batchSize == 0) {
stmt.executeBatch()
totalUpdatedRows += stmt.executeBatch().sum
rowCount = 0
}
}
if (rowCount > 0) {
stmt.executeBatch()
totalUpdatedRows += stmt.executeBatch().sum
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot just sum up rowCont?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took the approach to ensure we only count for actual updates, but not sure how Spark has been doing for others. Same for number of bytes written. I was actually asked to update number of bytes written as well, but there's no way to get the actual value from JDBC, so skipped it.

Please let me know how Spark has been updating these metrics - I'll follow the approach. Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SparkHadoopWriter uses a row count as the metric:

Since the returned values of stmt.executeBatch seems to be JDBC implementation specific, IMHO its ok to just do the same with SparkHadoopWriter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Thanks for the guide! What about number of bytes? Reading the length of file is easy, but measuring the size of row for every rows seems nontrivial.

}
} finally {
stmt.close()
Expand All @@ -687,7 +687,7 @@ object JdbcUtils extends Logging {
conn.commit()
}
committed = true
Iterator.empty
totalUpdatedRows
} catch {
case e: SQLException =>
val cause = e.getNextException
Expand Down Expand Up @@ -840,10 +840,13 @@ object JdbcUtils extends Logging {
case Some(n) if n < df.rdd.getNumPartitions => df.coalesce(n)
case _ => df
}
repartitionedDF.rdd.foreachPartition(iterator => savePartition(
getConnection, table, iterator, rddSchema, insertStmt, batchSize, dialect, isolationLevel,
options)
)
repartitionedDF.rdd.foreachPartition { iterator =>
val outMetrics = TaskContext.get().taskMetrics().outputMetrics
val totalUpdatedRows = savePartition(
getConnection, table, iterator, rddSchema, insertStmt, batchSize, dialect, isolationLevel,
options)
outMetrics.setRecordsWritten(outMetrics.recordsWritten + totalUpdatedRows)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outMetrics.setRecordsWritten(totalUpdatedRows)?

}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import java.sql.DriverManager
import java.util.Properties

import scala.collection.JavaConverters.propertiesAsScalaMapConverter
import scala.collection.mutable.ArrayBuffer

import org.scalatest.BeforeAndAfter

import org.apache.spark.SparkException
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
Expand Down Expand Up @@ -543,4 +545,58 @@ class JDBCWriteSuite extends SharedSparkSession with BeforeAndAfter {
}.getMessage
assert(errMsg.contains("Statement was canceled or the session timed out"))
}

test("metrics") {
val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2)

runAndVerifyRecordsWritten(2) {
df.write.mode(SaveMode.Append).jdbc(url, "TEST.BASICCREATETEST", new Properties())
}

runAndVerifyRecordsWritten(1) {
df2.write.mode(SaveMode.Overwrite).jdbc(url, "TEST.BASICCREATETEST", new Properties())
}

runAndVerifyRecordsWritten(1) {
df2.write.mode(SaveMode.Overwrite).option("truncate", true)
.jdbc(url, "TEST.BASICCREATETEST", new Properties())
}

runAndVerifyRecordsWritten(0) {
intercept[AnalysisException] {
df2.write.mode(SaveMode.ErrorIfExists).jdbc(url, "TEST.BASICCREATETEST", new Properties())
}
}

runAndVerifyRecordsWritten(0) {
df.write.mode(SaveMode.Ignore).jdbc(url, "TEST.BASICCREATETEST", new Properties())
}
}

private def runAndVerifyRecordsWritten(expected: Long)(job: => Unit): Unit = {
assert(expected === runAndReturnMetrics(job, _.taskMetrics.outputMetrics.recordsWritten))
}

private def runAndReturnMetrics(job: => Unit, collector: (SparkListenerTaskEnd) => Long): Long = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is copied from InputOutputMetricsSuite - please let me know if it should be extracted with some utility class/object.

val taskMetrics = new ArrayBuffer[Long]()

// Avoid receiving earlier taskEnd events
sparkContext.listenerBus.waitUntilEmpty()

val listener = new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
taskMetrics += collector(taskEnd)
}
}
sparkContext.addSparkListener(listener)

job

sparkContext.listenerBus.waitUntilEmpty()

sparkContext.removeSparkListener(listener)
taskMetrics.sum
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove this blank.

}