-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Home
胡贵 edited this page Jun 12, 2015
·
3 revisions
名称简写说明: JobTracker 简称 jt, TaskTracker检查 tt, JobClient检查 jc
####JobTracker启动 项目需要引入 lts-job-tracker.jar, 或者 使用maven构建,将lts所有的jar包上传到本地的maven仓库中。后面我会将这个上传到maven中央仓库中。
final JobTracker jobTracker = new JobTracker();
// 节点信息配置
jobTracker.setRegistryAddress("zookeeper://127.0.0.1:2181"); // zookeeper 作为注册中心
// jobTracker.setRegistryAddress("redis://127.0.0.1:6379"); // redis 做注册中心
// jobTracker.setListenPort(35002); // 默认 35001
jobTracker.setClusterName("test_cluster"); // 集群名称(jt,tt,jc 的集群名称必须一样,才会是一个集群)
jobTracker.addMasterChangeListener(new MasterChangeListenerImpl()); // jt master 节点监听器
// 设置业务日志记录
jobTracker.addConfig("job.logger", "mongo");
// jobTracker.addConfig("job.logger", "mysql");
// 任务队列用mongo
jobTracker.addConfig("job.queue", "mongo");
// mongo 配置
jobTracker.addConfig("mongo.addresses", "127.0.0.1:27017"); // 多个地址用逗号分割
jobTracker.addConfig("mongo.database", "lts");
jobTracker.setOldDataHandler(new OldDataDeletePolicy());
// 设置 zk 客户端用哪个, 可选 zkclient, curator 默认是 zkclient
// jobTracker.addConfig("zk.client", "zkclient");
// 启动节点
jobTracker.start();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
jobTracker.stop();
}
}));
####TaskTracker启动
final TaskTracker taskTracker = new TaskTracker();
taskTracker.setJobRunnerClass(TestJobRunner.class);
taskTracker.setRegistryAddress("zookeeper://127.0.0.1:2181");
// taskTracker.setRegistryAddress("redis://127.0.0.1:6379");
taskTracker.setNodeGroup("test_trade_TaskTracker");
taskTracker.setClusterName("test_cluster");
taskTracker.setWorkThreads(20); // 工作线程
// taskTracker.setFailStorePath(Constants.USER_HOME);
taskTracker.addMasterChangeListener(new MasterChangeListenerImpl());
// taskTracker.setBizLoggerLevel(Level.INFO); // 业务日志级别
taskTracker.start();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
taskTracker.stop();
}
}));
public class TestJobRunner implements JobRunner {
@Override
public void run(Job job) throws Throwable {
System.out.println(
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
+ " 我要执行:" + job + "shopId=" + job.getParam("shopId"));
BizLogger bizLogger = LtsLoggerFactory.getBizLogger();
// 会发送到 LTS (JobTracker上)
bizLogger.info("测试,业务日志啊啊啊啊啊");
// 这里是用户的业务逻辑
try {
System.out.println("我要睡个1s");
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
####JobClient启动
final JobClient jobClient = new RetryJobClient();
// final JobClient jobClient = new JobClient();
jobClient.setNodeGroup("test_jobClient");
jobClient.setClusterName("test_cluster");
jobClient.setRegistryAddress("zookeeper://127.0.0.1:2181");
// jobClient.setRegistryAddress("redis://127.0.0.1:6379");
// 任务重试保存地址,默认用户目录下
// jobClient.setFailStorePath(Constants.USER_HOME);
jobClient.setJobFinishedHandler(new JobFinishedHandlerImpl()); // 任务完成监听器
jobClient.addMasterChangeListener(new MasterChangeListenerImpl());
// jobClient.addConfig("job.fail.store", "leveldb"); // 默认
// jobClient.addConfig("job.fail.store", "berkeleydb");
// jobClient.addConfig("job.fail.store", "rocksdb");
jobClient.addConfig("job.submit.concurrency.size", "20"); // 并发提交任务线程数
jobClient.start();