Skip to content

Conversation

@hellozepp
Copy link
Contributor

@hellozepp hellozepp commented Nov 18, 2021

In the following pr, spark supports the jdbc parameter fetch_size to be set to a negative number. Namely, to allow data fetch in stream manner (row-by-row fetch) against MySQL database.

apache/spark#26244

On this basis, we provide a default value of fetch_size to use row-by-row fetch, which can avoid loading too large data to cause OOM.

Note: This change takes effect in spark-sql: 2.12. If you are using a previous version of spark3, the following exception will occur if a negative number is used. So in the previous version, we will configure the open cursor (Namely, useCursorFetch=true) and set the default value of fetchsize to 1000.

requirement failed: Invalid value `-2147483648` for parameter `fetchsize`. The minimum value is 0. When the value is 0, the JDBC driver ignores the value and does the estimates.
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:152)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:35)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:32)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
at streaming.core.datasource.impl.MLSQLDirectJDBC.load(MLSQLDirectJDBC.scala:65)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at tech.mlsql.dsl.adaptor.LoadPRocessing$$anonfun$parse$2.apply(LoadAdaptor.scala:117)
at tech.mlsql.dsl.adaptor.LoadPRocessing$$anonfun$parse$2.apply(LoadAdaptor.scala:115)

@hellozepp
Copy link
Contributor Author

hellozepp commented Nov 18, 2021

Recurrence

I inserted more than 512M data in the database and set the following JVM parameters:

-Xms512m -Xmx512m

Through the directQuery method load the full scale, I got the following exception:

Job aborted due to stage failure: Task 0 in stage 27.0 failed 1 times, most recent failure: Lost task 0.0 in stage 27.0 (TID 265) (192.168.0.102 executor driver): java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3181)
at java.util.ArrayList.grow(ArrayList.java:265)
at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:239)
at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:231)
at java.util.ArrayList.add(ArrayList.java:462)
at com.mysql.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:3417)
at com.mysql.jdbc.MysqlIO.getResultSet(MysqlIO.java:471)
at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3115)
at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2344)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2739)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2486)
at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858)
at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1966)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:304)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2716/53833777.apply(Unknown Source)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

@hellozepp
Copy link
Contributor Author

hellozepp commented Nov 18, 2021

Test Case

connect jdbc where
url="jdbc:mysql://127.0.0.1:13306/test?useUnicode=true&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&autoReconnect=true&failOverReadOnly=false"
and driver="com.mysql.jdbc.Driver"
and user="root"
and password="--"
as db1; 


load jdbc.`db1.student`
where directQuery="select * from test.student"
as data1;

Without specifying fetchSize in sql, in spark2.4 and 3.0, I can use load to pull all table data normally.
image

@hellozepp hellozepp changed the title add default option fetchsize of MLSQLJDBC [Data Source]Add default option fetchsize of MLSQLJDBC Nov 18, 2021

trait JDBCSource {

def fillJDBCOptions(reader: DataFrameReader, config: DataSourceConfig, formatName: String, dbSplitter: String): (Option[String], Option[String]) = {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notice that this is only happen in MySQL JDBC Driver. According to your refractor, the change will infect all jdbc data source.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I ignored other data sources, I changed it.

url = options.get("url")
})
}
url = url.map(x => if (x.contains("useCursorFetch")) x else s"$x&useCursorFetch=true")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The url parameters only supported by MySQL JDBC Driver. we should do like this.

@chncaesar
Copy link
Contributor

Please update mlsql-docs accordingly.

Fix fetchsize related configuration only for mysql setting
import org.apache.spark.sql.DataFrameReader
import streaming.dsl.{ConnectMeta, DBMappingKey}

trait JDBCSource {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The trait name and the function names in this trait i guess we can do better.

@allwefantasy allwefantasy merged commit 6da921b into allwefantasy:master Nov 21, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants