Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.TaskConfigJava;
import org.apache.samza.container.IllegalContainerStateException;
import org.apache.samza.container.SamzaContainer;
import org.apache.samza.container.SamzaContainerListener;
import org.apache.samza.coordinator.JobCoordinator;
Expand Down Expand Up @@ -101,10 +101,10 @@ public class StreamProcessor {
private final Config config;
private final long taskShutdownMs;
private final String processorId;
private final ExecutorService executorService;
private final ExecutorService containerExcecutorService;
private final Object lock = new Object();

private Throwable containerException = null;
private volatile Throwable containerException = null;

volatile CountDownLatch containerShutdownLatch = new CountDownLatch(1);

Expand Down Expand Up @@ -198,7 +198,7 @@ public StreamProcessor(Config config, Map<String, MetricsReporter> customMetrics
this.jobCoordinatorListener = createJobCoordinatorListener();
this.jobCoordinator.setListener(jobCoordinatorListener);
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(CONTAINER_THREAD_NAME_FORMAT).setDaemon(true).build();
this.executorService = Executors.newSingleThreadExecutor(threadFactory);
this.containerExcecutorService = Executors.newSingleThreadExecutor(threadFactory);
// TODO: remove the dependency on jobCoordinator for processorId after fixing SAMZA-1835
this.processorId = this.jobCoordinator.getProcessorId();
this.processorListener = listenerFactory.createInstance(this);
Expand Down Expand Up @@ -258,7 +258,7 @@ public void stop() {
boolean hasContainerShutdown = stopSamzaContainer();
if (!hasContainerShutdown) {
LOGGER.info("Interrupting the container: {} thread to die.", container);
executorService.shutdownNow();
containerExcecutorService.shutdownNow();
}
} catch (Throwable throwable) {
LOGGER.error(String.format("Exception occurred on container: %s shutdown of stream processor: %s.", container, processorId), throwable);
Expand Down Expand Up @@ -298,22 +298,28 @@ private JobCoordinator createJobCoordinator() {
private boolean stopSamzaContainer() {
boolean hasContainerShutdown = true;
if (container != null) {
if (!container.hasStopped()) {
try {
container.shutdown();
LOGGER.info("Waiting {} ms for the container: {} to shutdown.", taskShutdownMs, container);
hasContainerShutdown = containerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS);
} catch (IllegalContainerStateException icse) {
LOGGER.info(String.format("Cannot shutdown container: %s for stream processor: %s. Container is not running.", container, processorId), icse);
} catch (Exception e) {
LOGGER.error("Exception occurred when shutting down the container: {}.", container, e);
hasContainerShutdown = false;
try {
container.shutdown();
LOGGER.info("Waiting {} ms for the container: {} to shutdown.", taskShutdownMs, container);
hasContainerShutdown = containerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOGGER.error("Exception occurred when shutting down the container: {}.", container, e);
hasContainerShutdown = false;
if (containerException != null) {
containerException = e;
}
LOGGER.info(String.format("Shutdown status of container: %s for stream processor: %s is: %b.", container, processorId, hasContainerShutdown));
} else {
LOGGER.info("Container is not instantiated for stream processor: {}.", processorId);
}
LOGGER.info(String.format("Shutdown status of container: %s for stream processor: %s is: %b.", container, processorId, hasContainerShutdown));
}

// We want to propagate TimeoutException when container shutdown times out. It is possible that the timeout exception
// we propagate to the application runner maybe overwritten by container failure cause in case of interleaved execution.
// It is acceptable since container exception is much more useful compared to timeout exception.
// We can infer from the logs about the fact that container shutdown timed out or not for additional inference.
if (!hasContainerShutdown && containerException == null) {
containerException = new TimeoutException("Container shutdown timed out after " + taskShutdownMs + " ms.");
}

return hasContainerShutdown;
}

Expand Down Expand Up @@ -348,7 +354,7 @@ public void onNewJobModel(String processorId, JobModel jobModel) {
container = createSamzaContainer(processorId, jobModel);
container.setContainerListener(new ContainerListener());
LOGGER.info("Starting the container: {} for the stream processor: {}.", container, processorId);
executorService.submit(container);
containerExcecutorService.submit(container);
} else {
LOGGER.info("Ignoring onNewJobModel invocation since the current state is {} and not {}.", state, State.IN_REBALANCE);
}
Expand All @@ -359,8 +365,12 @@ public void onNewJobModel(String processorId, JobModel jobModel) {
public void onCoordinatorStop() {
synchronized (lock) {
LOGGER.info("Shutting down the executor service of the stream processor: {}.", processorId);
stopSamzaContainer();
executorService.shutdownNow();
boolean hasContainerShutdown = stopSamzaContainer();

// we only want to interrupt when container shutdown times out.
if (!hasContainerShutdown) {
containerExcecutorService.shutdownNow();
}
state = State.STOPPED;
}
if (containerException != null)
Expand All @@ -374,8 +384,12 @@ public void onCoordinatorStop() {
public void onCoordinatorFailure(Throwable throwable) {
synchronized (lock) {
LOGGER.info(String.format("Coordinator: %s failed with an exception. Stopping the stream processor: %s. Original exception:", jobCoordinator, processorId), throwable);
stopSamzaContainer();
executorService.shutdownNow();
boolean hasContainerShutdown = stopSamzaContainer();

// we only want to interrupt when container shutdown times out.
if (!hasContainerShutdown) {
containerExcecutorService.shutdownNow();
}
state = State.STOPPED;
}
processorListener.afterFailure(throwable);
Expand Down Expand Up @@ -413,6 +427,7 @@ public void afterStart() {
@Override
public void afterStop() {
containerShutdownLatch.countDown();

synchronized (lock) {
if (state == State.IN_REBALANCE) {
LOGGER.info("Container: {} of the stream processor: {} was stopped by the JobCoordinator.", container, processorId);
Expand All @@ -426,11 +441,12 @@ public void afterStop() {

@Override
public void afterFailure(Throwable t) {
containerException = t;
containerShutdownLatch.countDown();

synchronized (lock) {
LOGGER.error(String.format("Container: %s failed with an exception. Stopping the stream processor: %s. Original exception:", container, processorId), t);
state = State.STOPPING;
containerException = t;
jobCoordinator.stop();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -878,9 +878,11 @@ class SamzaContainer(
* @throws SamzaException, Thrown when the container has already been stopped or failed
*/
def shutdown(): Unit = {
if (status == SamzaContainerStatus.STOPPED || status == SamzaContainerStatus.FAILED) {
throw new IllegalContainerStateException("Cannot shutdown a container with status " + status)
if (status == SamzaContainerStatus.FAILED || status == SamzaContainerStatus.STOPPED) {
warn("Shutdown is no-op since the container is already in state: " + status)
return
}

shutdownRunLoop()
}

Expand Down Expand Up @@ -1182,14 +1184,3 @@ class SamzaContainer(
}
}
}

/**
* Exception thrown when the SamzaContainer tries to transition to an illegal state.
* {@link SamzaContainerStatus} has more details on the state transitions.
*
* @param s String, Message associated with the exception
* @param t Throwable, Wrapped error/exception thrown, if any.
*/
class IllegalContainerStateException(s: String, t: Throwable) extends SamzaException(s, t) {
def this(s: String) = this(s, null)
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import org.powermock.api.mockito.PowerMockito;

import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -392,14 +392,15 @@ public void testOnNewJobModelShouldResultInValidStateTransitions() throws Except
ProcessorLifecycleListener lifecycleListener = Mockito.mock(ProcessorLifecycleListener.class);
SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class);
MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
StreamProcessor streamProcessor = PowerMockito.spy(new StreamProcessor(config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator));
StreamProcessor streamProcessor = new TestableStreamProcessor(config, new HashMap<>(), null,
lifecycleListener, mockJobCoordinator, mockSamzaContainer);

streamProcessor.container = mockSamzaContainer;
streamProcessor.state = State.IN_REBALANCE;
Mockito.doNothing().when(mockSamzaContainer).run();

streamProcessor.jobCoordinatorListener.onNewJobModel("TestProcessorId", new JobModel(new MapConfig(), new HashMap<>()));

Mockito.verify(mockSamzaContainer, Mockito.times(1)).setContainerListener(any());
Mockito.verify(mockSamzaContainer, Mockito.atMost(1)).run();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.test.framework;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.apache.samza.application.TaskApplication;
import org.apache.samza.application.TaskApplicationDescriptor;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.kafka.KafkaInputDescriptor;
import org.apache.samza.system.kafka.KafkaSystemDescriptor;
import org.apache.samza.task.ClosableTask;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamTask;
import org.apache.samza.task.StreamTaskFactory;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.test.operator.data.PageView;
import org.junit.Test;

import static org.junit.Assert.*;


public class FaultInjectionTest extends StreamApplicationIntegrationTestHarness {
private static final String PAGE_VIEWS = "page-views";

@Test
public void testRaceCondition() throws InterruptedException {
int taskShutdownInMs = (int) (Math.random() * 10000);

CountDownLatch containerShutdownLatch = new CountDownLatch(1);
Map<String, String> configs = new HashMap<>();
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.zk.ZkJobCoordinatorFactory");
configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.zk.ZkCoordinationUtilsFactory");
configs.put(JobConfig.PROCESSOR_ID(), "0");
configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
configs.put(FaultInjectionStreamApp.INPUT_TOPIC_NAME_PROP, "page-views");
configs.put(TaskConfig.INPUT_STREAMS(), "kafka.page-views");
configs.put(ZkConfig.ZK_CONNECT, zkConnect());
configs.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), "5000");

// we purposefully randomize the task.shutdown.ms to make sure we can consistently verify if status is unsuccessfulFinish
// even though the reason for failure can either be container exception or container shutdown timing out.
configs.put("task.shutdown.ms", Integer.toString(taskShutdownInMs));
configs.put(JobConfig.PROCESSOR_ID(), "0");

createTopic(PAGE_VIEWS, 2);

// create events for the following user activity.
// userId: (viewId, pageId, (adIds))
// u1: (v1, p1, (a1)), (v2, p2, (a3))
// u2: (v3, p1, (a1)), (v4, p3, (a5))
produceMessage(PAGE_VIEWS, 0, "p1", "{\"viewId\":\"v1\",\"pageId\":\"p1\",\"userId\":\"u1\"}");
produceMessage(PAGE_VIEWS, 1, "p2", "{\"viewId\":\"v2\",\"pageId\":\"p2\",\"userId\":\"u1\"}");

FaultInjectionStreamApp app = new FaultInjectionStreamApp();
FaultInjectionStreamApp.containerShutdownLatch = containerShutdownLatch;
RunApplicationContext context =
runApplication(app, "fault-injection-app", configs);

containerShutdownLatch.await();
context.getRunner().kill();
context.getRunner().waitForFinish();
assertEquals(context.getRunner().status(), ApplicationStatus.UnsuccessfulFinish);
}

private static class FaultInjectionStreamApp implements TaskApplication {
public static final String SYSTEM = "kafka";
public static final String INPUT_TOPIC_NAME_PROP = "inputTopicName";
private static transient CountDownLatch containerShutdownLatch;

@Override
public void describe(TaskApplicationDescriptor appDesc) {
Config config = appDesc.getConfig();
String inputTopic = config.get(INPUT_TOPIC_NAME_PROP);

final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class);
KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM);
KafkaInputDescriptor<PageView> isd = ksd.getInputDescriptor(inputTopic, serde);
appDesc.addInputStream(isd);
appDesc.setTaskFactory((StreamTaskFactory) () -> new FaultInjectionTask(containerShutdownLatch));
}

private static class FaultInjectionTask implements StreamTask, ClosableTask {
private final transient CountDownLatch containerShutdownLatch;

public FaultInjectionTask(CountDownLatch containerShutdownLatch) {
this.containerShutdownLatch = containerShutdownLatch;
}

@Override
public void close() throws Exception {
containerShutdownLatch.countDown();
}

@Override
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator)
throws Exception {
throw new RuntimeException("Failed");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.application.SamzaApplication;
import org.apache.samza.config.Config;
import org.apache.samza.config.KafkaConfig;
import org.apache.samza.config.MapConfig;
Expand All @@ -48,7 +48,7 @@
import scala.Option$;

/**
* Harness for writing integration tests for {@link StreamApplication}s.
* Harness for writing integration tests for {@link SamzaApplication}s.
*
* <p> This provides the following features for its sub-classes:
* <ul>
Expand All @@ -74,7 +74,7 @@
* State persistence: {@link #tearDown()} clears all associated state (including topics and metadata) in Kafka and
* Zookeeper. Hence, the state is not durable across invocations of {@link #tearDown()} <br/>
*
* Execution model: {@link StreamApplication}s are run as their own {@link org.apache.samza.job.local.ThreadJob}s.
* Execution model: {@link SamzaApplication}s are run as their own {@link org.apache.samza.job.local.ThreadJob}s.
* Similarly, embedded Kafka servers and Zookeeper servers are run as their own threads.
* {@link #produceMessage(String, int, String, String)} and {@link #consumeMessages(Collection, int)} are blocking calls.
*
Expand Down Expand Up @@ -217,7 +217,7 @@ public List<ConsumerRecord<String, String>> consumeMessages(Collection<String> t
* @return RunApplicationContext which contains objects created within runApplication, to be used for verification
* if necessary
*/
protected RunApplicationContext runApplication(StreamApplication streamApplication,
protected RunApplicationContext runApplication(SamzaApplication streamApplication,
String appName,
Map<String, String> overriddenConfigs) {
Map<String, String> configMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,9 @@ public void testShouldStopStreamApplicationWhenShutdownTimeOutIsLessThanContaine
// Trigger re-balancing phase, by manually adding a new processor.

configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[2]);

// Reset the task shutdown ms for 3rd application to give it ample time to shutdown cleanly
configMap.put(TaskConfig.SHUTDOWN_MS(), TASK_SHUTDOWN_MS);
Config applicationConfig3 = new MapConfig(configMap);

CountDownLatch processedMessagesLatch3 = new CountDownLatch(1);
Expand All @@ -629,13 +632,14 @@ public void testShouldStopStreamApplicationWhenShutdownTimeOutIsLessThanContaine
publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);

processedMessagesLatch3.await();
appRunner1.waitForFinish();
appRunner2.waitForFinish();

/**
* If the processing has started in the third stream processor, then other two stream processors should be stopped.
*/
// TODO: This is a bug! Status should be unsuccessful finish.
assertEquals(ApplicationStatus.SuccessfulFinish, appRunner1.status());
assertEquals(ApplicationStatus.SuccessfulFinish, appRunner2.status());
assertEquals(ApplicationStatus.UnsuccessfulFinish, appRunner1.status());
assertEquals(ApplicationStatus.UnsuccessfulFinish, appRunner2.status());

appRunner3.kill();
appRunner3.waitForFinish();
Expand Down