Skip to content

Commit 8c5a222

Browse files
harishreedharantdas
authored andcommitted
[SPARK-3054][STREAMING] Add unit tests for Spark Sink.
This patch adds unit tests for Spark Sink. It also removes the private[flume] for Spark Sink, since the sink is instantiated from Flume configuration (looks like this is ignored by reflection which is used by Flume, but we should still remove it anyway). Author: Hari Shreedharan <[email protected]> Author: Hari Shreedharan <[email protected]> Closes apache#1958 from harishreedharan/spark-sink-test and squashes the following commits: e3110b9 [Hari Shreedharan] Add a sleep to allow sink to commit the transactions 120b81e [Hari Shreedharan] Fix complexity in threading model in test 4df5be6 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' into spark-sink-test c9190d1 [Hari Shreedharan] Indentation and spaces changes 7fedc5a [Hari Shreedharan] Merge remote-tracking branch 'asf/master' into spark-sink-test abc20cb [Hari Shreedharan] Minor test changes 7b9b649 [Hari Shreedharan] Merge branch 'master' into spark-sink-test f2c56c9 [Hari Shreedharan] Update SparkSinkSuite.scala a24aac8 [Hari Shreedharan] Remove unused var c86d615 [Hari Shreedharan] [SPARK-3054][STREAMING] Add unit tests for Spark Sink.
1 parent 0a7ef63 commit 8c5a222

File tree

4 files changed

+212
-2
lines changed

4 files changed

+212
-2
lines changed

external/flume-sink/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,13 @@
7272
<groupId>org.scalatest</groupId>
7373
<artifactId>scalatest_${scala.binary.version}</artifactId>
7474
</dependency>
75+
<dependency>
76+
<groupId>org.apache.spark</groupId>
77+
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
78+
<version>${project.version}</version>
79+
<type>test-jar</type>
80+
<scope>test</scope> <!-- Need it only for tests, don't package it -->
81+
</dependency>
7582
</dependencies>
7683
<build>
7784
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>

external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ import org.apache.flume.sink.AbstractSink
5353
*
5454
*/
5555

56-
private[flume]
5756
class SparkSink extends AbstractSink with Logging with Configurable {
5857

5958
// Size of the pool to use for holding transaction processors.
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.streaming.flume.sink
18+
19+
import java.net.InetSocketAddress
20+
import java.util.concurrent.atomic.AtomicInteger
21+
import java.util.concurrent.{TimeUnit, CountDownLatch, Executors}
22+
23+
import scala.collection.JavaConversions._
24+
import scala.concurrent.{ExecutionContext, Future}
25+
import scala.util.{Failure, Success}
26+
27+
import com.google.common.util.concurrent.ThreadFactoryBuilder
28+
import org.apache.avro.ipc.NettyTransceiver
29+
import org.apache.avro.ipc.specific.SpecificRequestor
30+
import org.apache.flume.Context
31+
import org.apache.flume.channel.MemoryChannel
32+
import org.apache.flume.event.EventBuilder
33+
import org.apache.spark.streaming.TestSuiteBase
34+
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
35+
36+
class SparkSinkSuite extends TestSuiteBase {
37+
val eventsPerBatch = 1000
38+
val channelCapacity = 5000
39+
40+
test("Success") {
41+
val (channel, sink) = initializeChannelAndSink()
42+
channel.start()
43+
sink.start()
44+
45+
putEvents(channel, eventsPerBatch)
46+
47+
val port = sink.getPort
48+
val address = new InetSocketAddress("0.0.0.0", port)
49+
50+
val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
51+
val events = client.getEventBatch(1000)
52+
client.ack(events.getSequenceNumber)
53+
assert(events.getEvents.size() === 1000)
54+
assertChannelIsEmpty(channel)
55+
sink.stop()
56+
channel.stop()
57+
transceiver.close()
58+
}
59+
60+
test("Nack") {
61+
val (channel, sink) = initializeChannelAndSink()
62+
channel.start()
63+
sink.start()
64+
putEvents(channel, eventsPerBatch)
65+
66+
val port = sink.getPort
67+
val address = new InetSocketAddress("0.0.0.0", port)
68+
69+
val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
70+
val events = client.getEventBatch(1000)
71+
assert(events.getEvents.size() === 1000)
72+
client.nack(events.getSequenceNumber)
73+
assert(availableChannelSlots(channel) === 4000)
74+
sink.stop()
75+
channel.stop()
76+
transceiver.close()
77+
}
78+
79+
test("Timeout") {
80+
val (channel, sink) = initializeChannelAndSink(Map(SparkSinkConfig
81+
.CONF_TRANSACTION_TIMEOUT -> 1.toString))
82+
channel.start()
83+
sink.start()
84+
putEvents(channel, eventsPerBatch)
85+
val port = sink.getPort
86+
val address = new InetSocketAddress("0.0.0.0", port)
87+
88+
val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
89+
val events = client.getEventBatch(1000)
90+
assert(events.getEvents.size() === 1000)
91+
Thread.sleep(1000)
92+
assert(availableChannelSlots(channel) === 4000)
93+
sink.stop()
94+
channel.stop()
95+
transceiver.close()
96+
}
97+
98+
test("Multiple consumers") {
99+
testMultipleConsumers(failSome = false)
100+
}
101+
102+
test("Multiple consumers with some failures") {
103+
testMultipleConsumers(failSome = true)
104+
}
105+
106+
def testMultipleConsumers(failSome: Boolean): Unit = {
107+
implicit val executorContext = ExecutionContext
108+
.fromExecutorService(Executors.newFixedThreadPool(5))
109+
val (channel, sink) = initializeChannelAndSink()
110+
channel.start()
111+
sink.start()
112+
(1 to 5).foreach(_ => putEvents(channel, eventsPerBatch))
113+
val port = sink.getPort
114+
val address = new InetSocketAddress("0.0.0.0", port)
115+
val transceiversAndClients = getTransceiverAndClient(address, 5)
116+
val batchCounter = new CountDownLatch(5)
117+
val counter = new AtomicInteger(0)
118+
transceiversAndClients.foreach(x => {
119+
Future {
120+
val client = x._2
121+
val events = client.getEventBatch(1000)
122+
if (!failSome || counter.getAndIncrement() % 2 == 0) {
123+
client.ack(events.getSequenceNumber)
124+
} else {
125+
client.nack(events.getSequenceNumber)
126+
throw new RuntimeException("Sending NACK for failure!")
127+
}
128+
events
129+
}.onComplete {
130+
case Success(events) =>
131+
assert(events.getEvents.size() === 1000)
132+
batchCounter.countDown()
133+
case Failure(t) =>
134+
// Don't re-throw the exception, causes a nasty unnecessary stack trace on stdout
135+
batchCounter.countDown()
136+
}
137+
})
138+
batchCounter.await()
139+
TimeUnit.SECONDS.sleep(1) // Allow the sink to commit the transactions.
140+
executorContext.shutdown()
141+
if(failSome) {
142+
assert(availableChannelSlots(channel) === 3000)
143+
} else {
144+
assertChannelIsEmpty(channel)
145+
}
146+
sink.stop()
147+
channel.stop()
148+
transceiversAndClients.foreach(x => x._1.close())
149+
}
150+
151+
private def initializeChannelAndSink(overrides: Map[String, String] = Map.empty): (MemoryChannel,
152+
SparkSink) = {
153+
val channel = new MemoryChannel()
154+
val channelContext = new Context()
155+
156+
channelContext.put("capacity", channelCapacity.toString)
157+
channelContext.put("transactionCapacity", 1000.toString)
158+
channelContext.put("keep-alive", 0.toString)
159+
channelContext.putAll(overrides)
160+
channel.configure(channelContext)
161+
162+
val sink = new SparkSink()
163+
val sinkContext = new Context()
164+
sinkContext.put(SparkSinkConfig.CONF_HOSTNAME, "0.0.0.0")
165+
sinkContext.put(SparkSinkConfig.CONF_PORT, 0.toString)
166+
sink.configure(sinkContext)
167+
sink.setChannel(channel)
168+
(channel, sink)
169+
}
170+
171+
private def putEvents(ch: MemoryChannel, count: Int): Unit = {
172+
val tx = ch.getTransaction
173+
tx.begin()
174+
(1 to count).foreach(x => ch.put(EventBuilder.withBody(x.toString.getBytes)))
175+
tx.commit()
176+
tx.close()
177+
}
178+
179+
private def getTransceiverAndClient(address: InetSocketAddress,
180+
count: Int): Seq[(NettyTransceiver, SparkFlumeProtocol.Callback)] = {
181+
182+
(1 to count).map(_ => {
183+
lazy val channelFactoryExecutor =
184+
Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).
185+
setNameFormat("Flume Receiver Channel Thread - %d").build())
186+
lazy val channelFactory =
187+
new NioClientSocketChannelFactory(channelFactoryExecutor, channelFactoryExecutor)
188+
val transceiver = new NettyTransceiver(address, channelFactory)
189+
val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
190+
(transceiver, client)
191+
})
192+
}
193+
194+
private def assertChannelIsEmpty(channel: MemoryChannel): Unit = {
195+
assert(availableChannelSlots(channel) === channelCapacity)
196+
}
197+
198+
private def availableChannelSlots(channel: MemoryChannel): Int = {
199+
val queueRemaining = channel.getClass.getDeclaredField("queueRemaining")
200+
queueRemaining.setAccessible(true)
201+
val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
202+
m.invoke(queueRemaining.get(channel)).asInstanceOf[Int]
203+
}
204+
}

external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ class FlumePollingStreamSuite extends TestSuiteBase {
198198
}
199199

200200
def assertChannelIsEmpty(channel: MemoryChannel) = {
201-
val queueRemaining = channel.getClass.getDeclaredField("queueRemaining");
201+
val queueRemaining = channel.getClass.getDeclaredField("queueRemaining")
202202
queueRemaining.setAccessible(true)
203203
val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
204204
assert(m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] === 5000)

0 commit comments

Comments
 (0)