Skip to content

Commit 4773574

Browse files
SAMZA-2693: Make Samza log4j appenders agnostic of where they are running (apache#1532)
API/usage changes: 1. (functionality change) In worker containers, the logs for fetching the initial job model (in SamzaContainer.readJobModel) will no longer be sent to StreamAppender, because the config will not be initialized yet in the appender. 2. (test change) MockSystemAdmin.createdStreamName was changed to MockSystemAdmin.createdStreamSpec. The stream name can be accessed with MockSystemAdmin.createdStreamSpec.getPhysicalName().
1 parent c570a3d commit 4773574

File tree

15 files changed

+525
-444
lines changed

15 files changed

+525
-444
lines changed

build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,7 @@ project(":samza-log4j2_$scalaSuffix") {
449449
compile project(":samza-core_$scalaSuffix")
450450
compile "com.fasterxml.jackson.core:jackson-databind:$jacksonVersion"
451451
testCompile "junit:junit:$junitVersion"
452+
testCompile "org.mockito:mockito-core:$mockitoVersion"
452453
}
453454
}
454455

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.samza.logging;
20+
21+
import java.util.concurrent.atomic.AtomicReference;
22+
import com.google.common.annotations.VisibleForTesting;
23+
import org.apache.samza.config.Config;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
28+
/**
29+
* Holds information to be used by loggers. For example, some custom Samza log4j/log4j2 logging appenders need system
30+
* configs for initialization, so this allows the configs to be passed to those appenders.
31+
*/
32+
public class LoggingContextHolder {
33+
private static final Logger LOG = LoggerFactory.getLogger(LoggingContextHolder.class);
34+
public static final LoggingContextHolder INSTANCE = new LoggingContextHolder();
35+
36+
private final AtomicReference<Config> config = new AtomicReference<>();
37+
38+
@VisibleForTesting
39+
LoggingContextHolder() {
40+
}
41+
42+
/**
43+
* Set the config to be used by Samza loggers.
44+
* Only the config used in the first call to this method will be used. After the first call, this method will do
45+
* nothing.
46+
*/
47+
public void setConfig(Config config) {
48+
if (!this.config.compareAndSet(null, config)) {
49+
LOG.warn("Attempted to set config, but it was already set");
50+
}
51+
}
52+
53+
public Config getConfig() {
54+
return this.config.get();
55+
}
56+
}

samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.samza.coordinator.stream.messages.SetExecutionEnvContainerIdMapping;
4545
import org.apache.samza.diagnostics.DiagnosticsManager;
4646
import org.apache.samza.job.model.JobModel;
47+
import org.apache.samza.logging.LoggingContextHolder;
4748
import org.apache.samza.metadatastore.MetadataStore;
4849
import org.apache.samza.metrics.MetricsRegistryMap;
4950
import org.apache.samza.metrics.MetricsReporter;
@@ -81,14 +82,14 @@ public static void run(
8182
ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,
8283
String jobName, String jobId, String containerId, Optional<String> execEnvContainerId,
8384
JobModel jobModel) {
85+
Config config = jobModel.getConfig();
8486

85-
// populate MDC for logging
87+
// logging setup: MDC, logging context
8688
MDC.put("containerName", "samza-container-" + containerId);
8789
MDC.put("jobName", jobName);
8890
MDC.put("jobId", jobId);
91+
LoggingContextHolder.INSTANCE.setConfig(jobModel.getConfig());
8992

90-
91-
Config config = jobModel.getConfig();
9293
DiagnosticsUtil.writeMetadataFile(jobName, jobId, containerId, execEnvContainerId, config);
9394
run(appDesc, jobName, jobId, containerId, execEnvContainerId, jobModel, config, buildExternalContext(config));
9495

samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.samza.config.ShellCommandConfig;
3232
import org.apache.samza.container.SamzaContainer;
3333
import org.apache.samza.job.model.JobModel;
34+
import org.apache.samza.logging.LoggingContextHolder;
3435
import org.apache.samza.util.SamzaUncaughtExceptionHandler;
3536
import org.slf4j.Logger;
3637
import org.slf4j.LoggerFactory;
@@ -50,18 +51,22 @@ public static void main(String[] args) throws Exception {
5051
}));
5152

5253
String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID);
53-
log.info(String.format("Got container ID: %s", containerId));
5454
System.out.println(String.format("Container ID: %s", containerId));
5555

5656
String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL);
57-
log.info(String.format("Got coordinator URL: %s", coordinatorUrl));
5857
System.out.println(String.format("Coordinator URL: %s", coordinatorUrl));
5958

6059
Optional<String> execEnvContainerId = Optional.ofNullable(System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID));
6160

6261
int delay = new Random().nextInt(SamzaContainer.DEFAULT_READ_JOBMODEL_DELAY_MS()) + 1;
6362
JobModel jobModel = SamzaContainer.readJobModel(coordinatorUrl, delay);
6463
Config config = jobModel.getConfig();
64+
65+
// this call is also in ContainerLaunchUtil, but adding this here allows more logs to get handled by Samza loggers
66+
LoggingContextHolder.INSTANCE.setConfig(config);
67+
log.info(String.format("Got container ID: %s", containerId));
68+
log.info(String.format("Got coordinator URL: %s", coordinatorUrl));
69+
6570
JobConfig jobConfig = new JobConfig(config);
6671
String jobName = jobConfig.getName()
6772
.orElseThrow(() -> new SamzaException(String.format("Config %s is missing", JobConfig.JOB_NAME)));

samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala

+4-11
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStrea
2626
import org.apache.samza.coordinator.server.{HttpServer, JobServlet, LocalityServlet}
2727
import org.apache.samza.coordinator.stream.messages.{SetContainerHostMapping, SetTaskContainerMapping, SetTaskModeMapping, SetTaskPartitionMapping}
2828
import org.apache.samza.job.model.JobModel
29+
import org.apache.samza.logging.LoggingContextHolder
2930
import org.apache.samza.metadatastore.MetadataStore
3031
import org.apache.samza.metrics.{MetricsRegistry, MetricsRegistryMap}
3132
import org.apache.samza.serializers.model.SamzaObjectMapper
@@ -40,15 +41,8 @@ import java.util.concurrent.atomic.AtomicReference
4041
* given a Config object.
4142
*/
4243
object JobModelManager extends Logging {
43-
4444
val SOURCE = "JobModelManager"
4545

46-
/**
47-
* a volatile value to store the current instantiated <code>JobModelManager</code>
48-
*/
49-
@volatile var currentJobModelManager: JobModelManager = _
50-
val serializedJobModelRef = new AtomicReference[Array[Byte]]
51-
5246
/**
5347
* Currently used only in the ApplicationMaster for yarn deployment model.
5448
* Does the following:
@@ -78,15 +72,14 @@ object JobModelManager extends Logging {
7872
val jobModel = jobModelHelper.newJobModel(config, changelogPartitionMapping)
7973
val jobModelToServe = new JobModel(jobModel.getConfig, jobModel.getContainers)
8074
val serializedJobModelToServe = SamzaObjectMapper.getObjectMapper().writeValueAsBytes(jobModelToServe)
81-
serializedJobModelRef.set(serializedJobModelToServe)
8275

8376
val clusterManagerConfig = new ClusterManagerConfig(config)
8477
val server = new HttpServer(port = clusterManagerConfig.getCoordinatorUrlPort)
85-
server.addServlet("/", new JobServlet(serializedJobModelRef))
78+
server.addServlet("/", new JobServlet(new AtomicReference[Array[Byte]](serializedJobModelToServe)))
8679
server.addServlet("/locality", new LocalityServlet(localityManager))
8780

88-
currentJobModelManager = new JobModelManager(jobModelToServe, server)
89-
currentJobModelManager
81+
LoggingContextHolder.INSTANCE.setConfig(jobModel.getConfig)
82+
new JobModelManager(jobModelToServe, server)
9083
} finally {
9184
systemAdmins.stop()
9285
// Not closing coordinatorStreamStore, since {@code ClusterBasedJobCoordinator} uses it to read container locality through {@code JobModel}.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.samza.logging;
20+
21+
import org.apache.samza.config.Config;
22+
import org.junit.Before;
23+
import org.junit.Test;
24+
import org.mockito.Mock;
25+
import org.mockito.MockitoAnnotations;
26+
27+
import static org.junit.Assert.assertEquals;
28+
import static org.junit.Assert.assertNull;
29+
import static org.mockito.Mockito.mock;
30+
31+
32+
public class TestLoggingContextHolder {
33+
@Mock
34+
private Config config;
35+
36+
private LoggingContextHolder loggingContextHolder;
37+
38+
@Before
39+
public void setup() {
40+
MockitoAnnotations.initMocks(this);
41+
this.loggingContextHolder = new LoggingContextHolder();
42+
}
43+
44+
@Test
45+
public void testGet() {
46+
assertNull(this.loggingContextHolder.getConfig());
47+
48+
this.loggingContextHolder.setConfig(this.config);
49+
assertEquals(this.config, this.loggingContextHolder.getConfig());
50+
}
51+
52+
@Test
53+
public void testSetMultiple() {
54+
this.loggingContextHolder.setConfig(this.config);
55+
Config config0 = mock(Config.class);
56+
this.loggingContextHolder.setConfig(config0);
57+
// should still have first config
58+
assertEquals(this.config, this.loggingContextHolder.getConfig());
59+
}
60+
}

samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala

+4-6
Original file line numberDiff line numberDiff line change
@@ -268,11 +268,10 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
268268
"0" -> new ContainerModel("0", tasks),
269269
"1" -> new ContainerModel("1", tasks))
270270
val jobModel = new JobModel(config, containers)
271-
def jobModelGenerator(): Array[Byte] = SamzaObjectMapper.getObjectMapper.writeValueAsBytes(jobModel)
272271
val server = new HttpServer
273272
val coordinator = new JobModelManager(jobModel, server)
274-
JobModelManager.serializedJobModelRef.set(jobModelGenerator())
275-
coordinator.server.addServlet("/*", new JobServlet(JobModelManager.serializedJobModelRef))
273+
val serializedJobModel = SamzaObjectMapper.getObjectMapper.writeValueAsBytes(jobModel)
274+
coordinator.server.addServlet("/*", new JobServlet(new AtomicReference[Array[Byte]](serializedJobModel)))
276275
try {
277276
coordinator.start
278277
assertEquals(jobModel, SamzaContainer.readJobModel(server.getUrl.toString))
@@ -293,11 +292,10 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
293292
"0" -> new ContainerModel("0", tasks),
294293
"1" -> new ContainerModel("1", tasks))
295294
val jobModel = new JobModel(config, containers)
296-
def jobModelGenerator(): Array[Byte] = SamzaObjectMapper.getObjectMapper.writeValueAsBytes(jobModel)
297295
val server = new HttpServer
298296
val coordinator = new JobModelManager(jobModel, server)
299-
JobModelManager.serializedJobModelRef.set(jobModelGenerator())
300-
val mockJobServlet = new MockJobServlet(2, JobModelManager.serializedJobModelRef)
297+
val serializedJobModel = SamzaObjectMapper.getObjectMapper.writeValueAsBytes(jobModel)
298+
val mockJobServlet = new MockJobServlet(2, new AtomicReference[Array[Byte]](serializedJobModel))
301299
coordinator.server.addServlet("/*", mockJobServlet)
302300
try {
303301
coordinator.start

samza-core/src/test/scala/org/apache/samza/coordinator/TestJobModelManager.scala

+7-9
Original file line numberDiff line numberDiff line change
@@ -101,28 +101,26 @@ class TestJobModelManager extends FlatSpec with PrivateMethodTester {
101101
// We want the mocksystemconsumer to use the same instance across runs
102102
MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
103103

104-
val coordinator = getTestJobModelManager(new MapConfig((config ++ otherConfigs).asJava))
104+
val jobModelManager = getTestJobModelManager(new MapConfig((config ++ otherConfigs).asJava))
105105
val expectedJobModel = new JobModel(new MapConfig(config.asJava), containers.asJava)
106106

107-
// Verify that the atomicReference is initialized
108-
assertNotNull(JobModelManager.serializedJobModelRef.get())
109107
val expectedContainerModels = new util.TreeMap[String, ContainerModel](expectedJobModel.getContainers)
110-
val jobModel = SamzaObjectMapper.getObjectMapper.readValue(JobModelManager.serializedJobModelRef.get(), classOf[JobModel])
108+
val jobModel = jobModelManager.jobModel
111109
val actualContainerModels = new util.TreeMap[String, ContainerModel](jobModel.getContainers)
112110
assertEquals(expectedContainerModels, actualContainerModels)
113111

114-
coordinator.start
115-
val expectedConfig: Config = coordinator.jobModel.getConfig
112+
jobModelManager.start
113+
val expectedConfig: Config = jobModelManager.jobModel.getConfig
116114
val actualConfig: Config = new MapConfig(config.asJava)
117115
assertTrue(expectedConfig.entrySet().containsAll(actualConfig.entrySet()))
118-
assertEquals(expectedJobModel.getContainers, coordinator.jobModel.getContainers)
116+
assertEquals(expectedJobModel.getContainers, jobModelManager.jobModel.getContainers)
119117

120-
val response = HttpUtil.read(coordinator.server.getUrl)
118+
val response = HttpUtil.read(jobModelManager.server.getUrl)
121119
// Verify that the JobServlet is serving the correct jobModel
122120
val jobModelFromCoordinatorUrl = SamzaObjectMapper.getObjectMapper.readValue(response, classOf[JobModel])
123121
assertEquals(expectedJobModel.getContainers, jobModelFromCoordinatorUrl.getContainers)
124122

125-
coordinator.stop
123+
jobModelManager.stop
126124
}
127125

128126
@Test

0 commit comments

Comments
 (0)