-
Notifications
You must be signed in to change notification settings - Fork 467
/
MultiLangDaemon.java
174 lines (156 loc) · 6.68 KB
/
MultiLangDaemon.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.amazonaws.services.kinesis.multilang;
import java.io.IOException;
import java.io.PrintStream;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
/**
* Main app that launches the worker that runs the multi-language record processor.
*
* Requires a properties file containing configuration for this daemon and the KCL. A properties file should at minimum
* define these properties:
*
* <pre>
* # The script that abides by the multi-language protocol. This script will
* # be executed by the MultiLangDaemon, which will communicate with this script
* # over STDIN and STDOUT according to the multi-language protocol.
* executableName = sampleapp.py
*
* # The name of an Amazon Kinesis stream to process.
* streamName = words
*
* # Used by the KCL as the name of this application. Will be used as the name
* # of a Amazon DynamoDB table which will store the lease and checkpoint
* # information for workers with this application name.
* applicationName = PythonKCLSample
*
* # Users can change the credentials provider the KCL will use to retrieve credentials.
* # The DefaultAWSCredentialsProviderChain checks several other providers, which is
* # described here:
* # http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
* AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
* </pre>
*/
public class MultiLangDaemon implements Callable<Integer> {
private static final Log LOG = LogFactory.getLog(MultiLangDaemon.class);
private Worker worker;
/**
* Constructor.
*
* @param configuration The KCL config to use.
* @param recordProcessorFactory A record processor factory to create record processors that abide by the multi-lang
* protocol.
* @param workerThreadPool The executor service to run the daemon in.
*/
public MultiLangDaemon(KinesisClientLibConfiguration configuration,
MultiLangRecordProcessorFactory recordProcessorFactory,
ExecutorService workerThreadPool) {
this(buildWorker(recordProcessorFactory, configuration, workerThreadPool));
}
private static Worker buildWorker(IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration configuration, ExecutorService workerThreadPool) {
return new Worker.Builder().recordProcessorFactory(recordProcessorFactory).config(configuration)
.execService(workerThreadPool).build();
}
/**
*
* @param worker A worker to use instead of the default worker.
*/
public MultiLangDaemon(Worker worker) {
this.worker = worker;
}
/**
* Utility for describing how to run this app.
*
* @param stream Where to output the usage info.
* @param messageToPrepend An optional error message to describe why the usage is being printed.
*/
public static void printUsage(PrintStream stream, String messageToPrepend) {
StringBuilder builder = new StringBuilder();
if (messageToPrepend != null) {
builder.append(messageToPrepend);
}
builder.append(String.format("java %s <properties file>", MultiLangDaemon.class.getCanonicalName()));
stream.println(builder.toString());
}
@Override
public Integer call() throws Exception {
int exitCode = 0;
try {
worker.run();
} catch (Throwable t) {
LOG.error("Caught throwable while processing data.", t);
exitCode = 1;
}
return exitCode;
}
/**
* @param args Accepts a single argument, that argument is a properties file which provides KCL configuration as
* well as the name of an executable.
*/
public static void main(String[] args) {
if (args.length == 0) {
printUsage(System.err, "You must provide a properties file");
System.exit(1);
}
MultiLangDaemonConfig config = null;
try {
config = new MultiLangDaemonConfig(args[0]);
} catch (IOException e) {
printUsage(System.err, "You must provide a properties file");
System.exit(1);
} catch (IllegalArgumentException e) {
printUsage(System.err, e.getMessage());
System.exit(1);
}
ExecutorService executorService = config.getExecutorService();
// Daemon
final MultiLangDaemon daemon = new MultiLangDaemon(
config.getKinesisClientLibConfiguration(),
config.getRecordProcessorFactory(),
executorService);
final long shutdownGraceMillis = config.getKinesisClientLibConfiguration().getShutdownGraceMillis();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
LOG.info("Process terminanted, will initiate shutdown.");
try {
Future<Void> fut = daemon.worker.requestShutdown();
fut.get(shutdownGraceMillis, TimeUnit.MILLISECONDS);
LOG.info("Process shutdown is complete.");
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.error("Encountered an error during shutdown.", e);
}
}
});
Future<Integer> future = executorService.submit(daemon);
try {
System.exit(future.get());
} catch (InterruptedException | ExecutionException e) {
LOG.error("Encountered an error while running daemon", e);
}
System.exit(1);
}
}