@@ -155,16 +155,23 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
155155 })
156156
157157 val conf = new SparkConf ()
158+ val shortProp = " spark.rpc.short.timeout"
158159 conf.set(" spark.rpc.retry.wait" , " 0" )
159160 conf.set(" spark.rpc.numRetries" , " 1" )
160161 val anotherEnv = createRpcEnv(conf, " remote" , 13345 )
161162 // Use anotherEnv to find out the RpcEndpointRef
162163 val rpcEndpointRef = anotherEnv.setupEndpointRef(" local" , env.address, " ask-timeout" )
163164 try {
164165 val e = intercept[Exception ] {
165- rpcEndpointRef.askWithRetry[String ](" hello" , 1 millis)
166+ rpcEndpointRef.askWithRetry[String ](" hello" , new RpcTimeout ( 1 millis, shortProp) )
166167 }
167168 assert(e.isInstanceOf [TimeoutException ] || e.getCause.isInstanceOf [TimeoutException ])
169+ e match {
170+ case te : TimeoutException =>
171+ assert(te.getMessage().contains(shortProp))
172+ case e : Exception =>
173+ assert(e.getCause().getMessage().contains(shortProp))
174+ }
168175 } finally {
169176 anotherEnv.shutdown()
170177 anotherEnv.awaitTermination()
@@ -539,6 +546,22 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
539546 }
540547 }
541548
549+ test(" construction of RpcTimeout using properties" ) {
550+ val conf = new SparkConf
551+
552+ val testProp = " spark.ask.test.timeout"
553+ val testDurationSeconds = 30
554+
555+ conf.set(testProp, testDurationSeconds.toString + " s" )
556+
557+ val rt = RpcTimeout (conf, testProp)
558+ assert( testDurationSeconds === rt.duration.toSeconds )
559+
560+ val ex = intercept[Throwable ] {
561+ RpcTimeout (conf, " spark.ask.invalid.timeout" )
562+ }
563+ }
564+
542565}
543566
544567class UnserializableClass
0 commit comments