1717
1818package org .apache .spark .rdd
1919
20- import java .sql .{Connection , ResultSet }
20+ import java .sql .{PreparedStatement , Connection , ResultSet }
2121
2222import scala .reflect .ClassTag
2323
@@ -70,20 +70,30 @@ class JdbcRDD[T: ClassTag](
7070 override def compute (thePart : Partition , context : TaskContext ) = new NextIterator [T ] {
7171 context.addOnCompleteCallback{ () => closeIfNeeded() }
7272 val part = thePart.asInstanceOf [JdbcPartition ]
73- val conn = getConnection()
74- val stmt = conn.prepareStatement(sql, ResultSet .TYPE_FORWARD_ONLY , ResultSet .CONCUR_READ_ONLY )
73+ var conn : Connection = _
74+ var stmt : PreparedStatement = _
75+ try {
76+ conn = getConnection()
77+ stmt = conn.prepareStatement(sql, ResultSet .TYPE_FORWARD_ONLY , ResultSet .CONCUR_READ_ONLY )
7578
76- // setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force streaming results,
77- // rather than pulling entire resultset into memory.
78- // see http://dev.mysql.com/doc/refman/5.0/en/connector-j-reference-implementation-notes.html
79- if (conn.getMetaData.getURL.matches(" jdbc:mysql:.*" )) {
80- stmt.setFetchSize(Integer .MIN_VALUE )
81- logInfo(" statement fetch size set to: " + stmt.getFetchSize + " to force MySQL streaming " )
82- }
79+ // setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force streaming results,
80+ // rather than pulling entire resultset into memory.
81+ // see http://dev.mysql.com/doc/refman/5.0/en/connector-j-reference-implementation-notes.html
82+ if (conn.getMetaData.getURL.matches(" jdbc:mysql:.*" )) {
83+ stmt.setFetchSize(Integer .MIN_VALUE )
84+ logInfo(" statement fetch size set to: " + stmt.getFetchSize + " to force MySQL streaming " )
85+ }
8386
84- stmt.setLong(1 , part.lower)
85- stmt.setLong(2 , part.upper)
86- val rs = stmt.executeQuery()
87+ stmt.setLong(1 , part.lower)
88+ stmt.setLong(2 , part.upper)
89+ val rs = stmt.executeQuery()
90+
91+ } catch {
92+ case e : Exception =>
93+ close()
94+ logError(" Exception occurred on creating connection/preparedStatement" , e)
95+ throw e // Is it correct to throw Exception, or what is preferred cleanup here?
96+ }
8797
8898 override def getNext : T = {
8999 if (rs.next()) {
@@ -106,7 +116,7 @@ class JdbcRDD[T: ClassTag](
106116 case e : Exception => logWarning(" Exception closing statement" , e)
107117 }
108118 try {
109- if (null != conn && ! conn .isClosed()) conn.close()
119+ if (null != conn && ! stmt .isClosed()) conn.close()
110120 logInfo(" closed connection" )
111121 } catch {
112122 case e : Exception => logWarning(" Exception closing connection" , e)
@@ -120,3 +130,4 @@ object JdbcRDD {
120130 Array .tabulate[Object ](rs.getMetaData.getColumnCount)(i => rs.getObject(i + 1 ))
121131 }
122132}
133+
0 commit comments