diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index 791021604d..26e52f2510 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -28,11 +28,11 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.config.Config; import org.apache.samza.config.JobCoordinatorConfig; import org.apache.samza.config.TaskConfigJava; -import org.apache.samza.container.IllegalContainerStateException; import org.apache.samza.container.SamzaContainer; import org.apache.samza.container.SamzaContainerListener; import org.apache.samza.coordinator.JobCoordinator; @@ -101,10 +101,10 @@ public class StreamProcessor { private final Config config; private final long taskShutdownMs; private final String processorId; - private final ExecutorService executorService; + private final ExecutorService containerExcecutorService; private final Object lock = new Object(); - private Throwable containerException = null; + private volatile Throwable containerException = null; volatile CountDownLatch containerShutdownLatch = new CountDownLatch(1); @@ -198,7 +198,7 @@ public StreamProcessor(Config config, Map customMetrics this.jobCoordinatorListener = createJobCoordinatorListener(); this.jobCoordinator.setListener(jobCoordinatorListener); ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(CONTAINER_THREAD_NAME_FORMAT).setDaemon(true).build(); - this.executorService = Executors.newSingleThreadExecutor(threadFactory); + this.containerExcecutorService = Executors.newSingleThreadExecutor(threadFactory); // TODO: remove the dependency on jobCoordinator for processorId after fixing SAMZA-1835 this.processorId = this.jobCoordinator.getProcessorId(); this.processorListener = listenerFactory.createInstance(this); @@ -258,7 +258,7 @@ public void stop() { boolean hasContainerShutdown = stopSamzaContainer(); if (!hasContainerShutdown) { LOGGER.info("Interrupting the container: {} thread to die.", container); - executorService.shutdownNow(); + containerExcecutorService.shutdownNow(); } } catch (Throwable throwable) { LOGGER.error(String.format("Exception occurred on container: %s shutdown of stream processor: %s.", container, processorId), throwable); @@ -298,22 +298,28 @@ private JobCoordinator createJobCoordinator() { private boolean stopSamzaContainer() { boolean hasContainerShutdown = true; if (container != null) { - if (!container.hasStopped()) { - try { - container.shutdown(); - LOGGER.info("Waiting {} ms for the container: {} to shutdown.", taskShutdownMs, container); - hasContainerShutdown = containerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS); - } catch (IllegalContainerStateException icse) { - LOGGER.info(String.format("Cannot shutdown container: %s for stream processor: %s. Container is not running.", container, processorId), icse); - } catch (Exception e) { - LOGGER.error("Exception occurred when shutting down the container: {}.", container, e); - hasContainerShutdown = false; + try { + container.shutdown(); + LOGGER.info("Waiting {} ms for the container: {} to shutdown.", taskShutdownMs, container); + hasContainerShutdown = containerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOGGER.error("Exception occurred when shutting down the container: {}.", container, e); + hasContainerShutdown = false; + if (containerException != null) { + containerException = e; } - LOGGER.info(String.format("Shutdown status of container: %s for stream processor: %s is: %b.", container, processorId, hasContainerShutdown)); - } else { - LOGGER.info("Container is not instantiated for stream processor: {}.", processorId); } + LOGGER.info(String.format("Shutdown status of container: %s for stream processor: %s is: %b.", container, processorId, hasContainerShutdown)); + } + + // We want to propagate TimeoutException when container shutdown times out. It is possible that the timeout exception + // we propagate to the application runner maybe overwritten by container failure cause in case of interleaved execution. + // It is acceptable since container exception is much more useful compared to timeout exception. + // We can infer from the logs about the fact that container shutdown timed out or not for additional inference. + if (!hasContainerShutdown && containerException == null) { + containerException = new TimeoutException("Container shutdown timed out after " + taskShutdownMs + " ms."); } + return hasContainerShutdown; } @@ -348,7 +354,7 @@ public void onNewJobModel(String processorId, JobModel jobModel) { container = createSamzaContainer(processorId, jobModel); container.setContainerListener(new ContainerListener()); LOGGER.info("Starting the container: {} for the stream processor: {}.", container, processorId); - executorService.submit(container); + containerExcecutorService.submit(container); } else { LOGGER.info("Ignoring onNewJobModel invocation since the current state is {} and not {}.", state, State.IN_REBALANCE); } @@ -359,8 +365,12 @@ public void onNewJobModel(String processorId, JobModel jobModel) { public void onCoordinatorStop() { synchronized (lock) { LOGGER.info("Shutting down the executor service of the stream processor: {}.", processorId); - stopSamzaContainer(); - executorService.shutdownNow(); + boolean hasContainerShutdown = stopSamzaContainer(); + + // we only want to interrupt when container shutdown times out. + if (!hasContainerShutdown) { + containerExcecutorService.shutdownNow(); + } state = State.STOPPED; } if (containerException != null) @@ -374,8 +384,12 @@ public void onCoordinatorStop() { public void onCoordinatorFailure(Throwable throwable) { synchronized (lock) { LOGGER.info(String.format("Coordinator: %s failed with an exception. Stopping the stream processor: %s. Original exception:", jobCoordinator, processorId), throwable); - stopSamzaContainer(); - executorService.shutdownNow(); + boolean hasContainerShutdown = stopSamzaContainer(); + + // we only want to interrupt when container shutdown times out. + if (!hasContainerShutdown) { + containerExcecutorService.shutdownNow(); + } state = State.STOPPED; } processorListener.afterFailure(throwable); @@ -413,6 +427,7 @@ public void afterStart() { @Override public void afterStop() { containerShutdownLatch.countDown(); + synchronized (lock) { if (state == State.IN_REBALANCE) { LOGGER.info("Container: {} of the stream processor: {} was stopped by the JobCoordinator.", container, processorId); @@ -426,11 +441,12 @@ public void afterStop() { @Override public void afterFailure(Throwable t) { + containerException = t; containerShutdownLatch.countDown(); + synchronized (lock) { LOGGER.error(String.format("Container: %s failed with an exception. Stopping the stream processor: %s. Original exception:", container, processorId), t); state = State.STOPPING; - containerException = t; jobCoordinator.stop(); } } diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 417fc18518..5c4723b2d6 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -878,9 +878,11 @@ class SamzaContainer( * @throws SamzaException, Thrown when the container has already been stopped or failed */ def shutdown(): Unit = { - if (status == SamzaContainerStatus.STOPPED || status == SamzaContainerStatus.FAILED) { - throw new IllegalContainerStateException("Cannot shutdown a container with status " + status) + if (status == SamzaContainerStatus.FAILED || status == SamzaContainerStatus.STOPPED) { + warn("Shutdown is no-op since the container is already in state: " + status) + return } + shutdownRunLoop() } @@ -1182,14 +1184,3 @@ class SamzaContainer( } } } - -/** - * Exception thrown when the SamzaContainer tries to transition to an illegal state. - * {@link SamzaContainerStatus} has more details on the state transitions. - * - * @param s String, Message associated with the exception - * @param t Throwable, Wrapped error/exception thrown, if any. - */ -class IllegalContainerStateException(s: String, t: Throwable) extends SamzaException(s, t) { - def this(s: String) = this(s, null) -} diff --git a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java index 673015aa56..93b157ad14 100644 --- a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java +++ b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java @@ -49,7 +49,7 @@ import org.powermock.api.mockito.PowerMockito; import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.*; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -392,14 +392,15 @@ public void testOnNewJobModelShouldResultInValidStateTransitions() throws Except ProcessorLifecycleListener lifecycleListener = Mockito.mock(ProcessorLifecycleListener.class); SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class); MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0")); - StreamProcessor streamProcessor = PowerMockito.spy(new StreamProcessor(config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator)); + StreamProcessor streamProcessor = new TestableStreamProcessor(config, new HashMap<>(), null, + lifecycleListener, mockJobCoordinator, mockSamzaContainer); - streamProcessor.container = mockSamzaContainer; streamProcessor.state = State.IN_REBALANCE; Mockito.doNothing().when(mockSamzaContainer).run(); streamProcessor.jobCoordinatorListener.onNewJobModel("TestProcessorId", new JobModel(new MapConfig(), new HashMap<>())); + Mockito.verify(mockSamzaContainer, Mockito.times(1)).setContainerListener(any()); Mockito.verify(mockSamzaContainer, Mockito.atMost(1)).run(); } diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java new file mode 100644 index 0000000000..2dc76a440f --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.test.framework; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import org.apache.samza.application.TaskApplication; +import org.apache.samza.application.TaskApplicationDescriptor; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.JobCoordinatorConfig; +import org.apache.samza.config.TaskConfig; +import org.apache.samza.config.ZkConfig; +import org.apache.samza.job.ApplicationStatus; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; +import org.apache.samza.task.ClosableTask; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.StreamTask; +import org.apache.samza.task.StreamTaskFactory; +import org.apache.samza.task.TaskCoordinator; +import org.apache.samza.test.operator.data.PageView; +import org.junit.Test; + +import static org.junit.Assert.*; + + +public class FaultInjectionTest extends StreamApplicationIntegrationTestHarness { + private static final String PAGE_VIEWS = "page-views"; + + @Test + public void testRaceCondition() throws InterruptedException { + int taskShutdownInMs = (int) (Math.random() * 10000); + + CountDownLatch containerShutdownLatch = new CountDownLatch(1); + Map configs = new HashMap<>(); + configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.zk.ZkJobCoordinatorFactory"); + configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.zk.ZkCoordinationUtilsFactory"); + configs.put(JobConfig.PROCESSOR_ID(), "0"); + configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory"); + configs.put(FaultInjectionStreamApp.INPUT_TOPIC_NAME_PROP, "page-views"); + configs.put(TaskConfig.INPUT_STREAMS(), "kafka.page-views"); + configs.put(ZkConfig.ZK_CONNECT, zkConnect()); + configs.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), "5000"); + + // we purposefully randomize the task.shutdown.ms to make sure we can consistently verify if status is unsuccessfulFinish + // even though the reason for failure can either be container exception or container shutdown timing out. + configs.put("task.shutdown.ms", Integer.toString(taskShutdownInMs)); + configs.put(JobConfig.PROCESSOR_ID(), "0"); + + createTopic(PAGE_VIEWS, 2); + + // create events for the following user activity. + // userId: (viewId, pageId, (adIds)) + // u1: (v1, p1, (a1)), (v2, p2, (a3)) + // u2: (v3, p1, (a1)), (v4, p3, (a5)) + produceMessage(PAGE_VIEWS, 0, "p1", "{\"viewId\":\"v1\",\"pageId\":\"p1\",\"userId\":\"u1\"}"); + produceMessage(PAGE_VIEWS, 1, "p2", "{\"viewId\":\"v2\",\"pageId\":\"p2\",\"userId\":\"u1\"}"); + + FaultInjectionStreamApp app = new FaultInjectionStreamApp(); + FaultInjectionStreamApp.containerShutdownLatch = containerShutdownLatch; + RunApplicationContext context = + runApplication(app, "fault-injection-app", configs); + + containerShutdownLatch.await(); + context.getRunner().kill(); + context.getRunner().waitForFinish(); + assertEquals(context.getRunner().status(), ApplicationStatus.UnsuccessfulFinish); + } + + private static class FaultInjectionStreamApp implements TaskApplication { + public static final String SYSTEM = "kafka"; + public static final String INPUT_TOPIC_NAME_PROP = "inputTopicName"; + private static transient CountDownLatch containerShutdownLatch; + + @Override + public void describe(TaskApplicationDescriptor appDesc) { + Config config = appDesc.getConfig(); + String inputTopic = config.get(INPUT_TOPIC_NAME_PROP); + + final JsonSerdeV2 serde = new JsonSerdeV2<>(PageView.class); + KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM); + KafkaInputDescriptor isd = ksd.getInputDescriptor(inputTopic, serde); + appDesc.addInputStream(isd); + appDesc.setTaskFactory((StreamTaskFactory) () -> new FaultInjectionTask(containerShutdownLatch)); + } + + private static class FaultInjectionTask implements StreamTask, ClosableTask { + private final transient CountDownLatch containerShutdownLatch; + + public FaultInjectionTask(CountDownLatch containerShutdownLatch) { + this.containerShutdownLatch = containerShutdownLatch; + } + + @Override + public void close() throws Exception { + containerShutdownLatch.countDown(); + } + + @Override + public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) + throws Exception { + throw new RuntimeException("Failed"); + } + } + } +} diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java index 7f132826ac..0c3d755975 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java @@ -35,7 +35,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.samza.application.StreamApplication; +import org.apache.samza.application.SamzaApplication; import org.apache.samza.config.Config; import org.apache.samza.config.KafkaConfig; import org.apache.samza.config.MapConfig; @@ -48,7 +48,7 @@ import scala.Option$; /** - * Harness for writing integration tests for {@link StreamApplication}s. + * Harness for writing integration tests for {@link SamzaApplication}s. * *

This provides the following features for its sub-classes: *

    @@ -74,7 +74,7 @@ * State persistence: {@link #tearDown()} clears all associated state (including topics and metadata) in Kafka and * Zookeeper. Hence, the state is not durable across invocations of {@link #tearDown()}
    * - * Execution model: {@link StreamApplication}s are run as their own {@link org.apache.samza.job.local.ThreadJob}s. + * Execution model: {@link SamzaApplication}s are run as their own {@link org.apache.samza.job.local.ThreadJob}s. * Similarly, embedded Kafka servers and Zookeeper servers are run as their own threads. * {@link #produceMessage(String, int, String, String)} and {@link #consumeMessages(Collection, int)} are blocking calls. * @@ -217,7 +217,7 @@ public List> consumeMessages(Collection t * @return RunApplicationContext which contains objects created within runApplication, to be used for verification * if necessary */ - protected RunApplicationContext runApplication(StreamApplication streamApplication, + protected RunApplicationContext runApplication(SamzaApplication streamApplication, String appName, Map overriddenConfigs) { Map configMap = new HashMap<>(); diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java index b249d4d287..2ee17c0144 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java @@ -617,6 +617,9 @@ public void testShouldStopStreamApplicationWhenShutdownTimeOutIsLessThanContaine // Trigger re-balancing phase, by manually adding a new processor. configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[2]); + + // Reset the task shutdown ms for 3rd application to give it ample time to shutdown cleanly + configMap.put(TaskConfig.SHUTDOWN_MS(), TASK_SHUTDOWN_MS); Config applicationConfig3 = new MapConfig(configMap); CountDownLatch processedMessagesLatch3 = new CountDownLatch(1); @@ -629,13 +632,14 @@ public void testShouldStopStreamApplicationWhenShutdownTimeOutIsLessThanContaine publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); processedMessagesLatch3.await(); + appRunner1.waitForFinish(); + appRunner2.waitForFinish(); /** * If the processing has started in the third stream processor, then other two stream processors should be stopped. */ - // TODO: This is a bug! Status should be unsuccessful finish. - assertEquals(ApplicationStatus.SuccessfulFinish, appRunner1.status()); - assertEquals(ApplicationStatus.SuccessfulFinish, appRunner2.status()); + assertEquals(ApplicationStatus.UnsuccessfulFinish, appRunner1.status()); + assertEquals(ApplicationStatus.UnsuccessfulFinish, appRunner2.status()); appRunner3.kill(); appRunner3.waitForFinish();