Skip to content

Commit

Permalink
[update][core] Remove unused codes
Browse files Browse the repository at this point in the history
  • Loading branch information
wgzhao committed Oct 31, 2024
1 parent 42bbf2b commit aaba2b1
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,21 +148,21 @@ public static String getSnapshot(Communication communication)
sb.append(" All Task WaitReaderTime ");
sb.append(PerfTrace.unitTime(communication.getLongCounter(WAIT_READER_TIME)));
sb.append(" | ");
if (communication.getLongCounter(CommunicationTool.TRANSFORMER_USED_TIME) > 0
|| communication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS) > 0
|| communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS) > 0
|| communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS) > 0) {
if (communication.getLongCounter(TRANSFORMER_USED_TIME) > 0
|| communication.getLongCounter(TRANSFORMER_SUCCEED_RECORDS) > 0
|| communication.getLongCounter(TRANSFORMER_FAILED_RECORDS) > 0
|| communication.getLongCounter(TRANSFORMER_FILTER_RECORDS) > 0) {
sb.append("Transformer Success ");
sb.append(String.format("%d records", communication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS)));
sb.append(String.format("%d records", communication.getLongCounter(TRANSFORMER_SUCCEED_RECORDS)));
sb.append(" | ");
sb.append("Transformer Error ");
sb.append(String.format("%d records", communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS)));
sb.append(String.format("%d records", communication.getLongCounter(TRANSFORMER_FAILED_RECORDS)));
sb.append(" | ");
sb.append("Transformer Filter ");
sb.append(String.format("%d records", communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS)));
sb.append(String.format("%d records", communication.getLongCounter(TRANSFORMER_FILTER_RECORDS)));
sb.append(" | ");
sb.append("Transformer usedTime ");
sb.append(PerfTrace.unitTime(communication.getLongCounter(CommunicationTool.TRANSFORMER_USED_TIME)));
sb.append(PerfTrace.unitTime(communication.getLongCounter(TRANSFORMER_USED_TIME)));
sb.append(" | ");
}
sb.append("Percentage ");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,156 +135,143 @@ public void start()
long lastReportTimeStamp = 0;
Communication lastTaskGroupContainerCommunication = new Communication();

// try {
while (true) {
//1.判断task状态
boolean failedOrKilled = false;
Map<Integer, Communication> communicationMap = containerCommunicator.getCommunicationMap();
for (Map.Entry<Integer, Communication> entry : communicationMap.entrySet()) {
Integer taskId = entry.getKey();
Communication taskCommunication = entry.getValue();
if (!taskCommunication.isFinished()) {
continue;
}
TaskExecutor taskExecutor = removeTask(runTasks, taskId);

//上面从runTasks里移除了,因此对应在monitor里移除
taskMonitor.removeTask(taskId);

//失败,看task是否支持fail over,重试次数未超过最大限制
if (taskCommunication.getState() == State.FAILED) {
taskFailedExecutorMap.put(taskId, taskExecutor);
assert taskExecutor != null;
if (taskExecutor.supportFailOver() && taskExecutor.getAttemptCount() < taskMaxRetryTimes) {
taskExecutor.shutdown(); //关闭老的executor
containerCommunicator.resetCommunication(taskId); //将task的状态重置
Configuration taskConfig = taskConfigMap.get(taskId);
taskQueue.add(taskConfig); //重新加入任务列表
}
else {
failedOrKilled = true;
break;
}
while (true) {
//1.判断task状态
boolean failedOrKilled = false;
Map<Integer, Communication> communicationMap = containerCommunicator.getCommunicationMap();
for (Map.Entry<Integer, Communication> entry : communicationMap.entrySet()) {
Integer taskId = entry.getKey();
Communication taskCommunication = entry.getValue();
if (!taskCommunication.isFinished()) {
continue;
}
TaskExecutor taskExecutor = removeTask(runTasks, taskId);

//上面从runTasks里移除了,因此对应在monitor里移除
taskMonitor.removeTask(taskId);

//失败,看task是否支持fail over,重试次数未超过最大限制
if (taskCommunication.getState() == State.FAILED) {
taskFailedExecutorMap.put(taskId, taskExecutor);
assert taskExecutor != null;
if (taskExecutor.supportFailOver() && taskExecutor.getAttemptCount() < taskMaxRetryTimes) {
taskExecutor.shutdown(); //关闭老的executor
containerCommunicator.resetCommunication(taskId); //将task的状态重置
Configuration taskConfig = taskConfigMap.get(taskId);
taskQueue.add(taskConfig); //重新加入任务列表
}
else if (taskCommunication.getState() == State.KILLED) {
else {
failedOrKilled = true;
break;
}
else if (taskCommunication.getState() == State.SUCCEEDED) {
Long taskStartTime = taskStartTimeMap.get(taskId);
if (taskStartTime != null) {
long usedTime = System.currentTimeMillis() - taskStartTime;
LOG.debug("TaskGroup[{}] TaskId[{}] succeeded, used [{}]ms",
this.taskGroupId, taskId, usedTime);
taskStartTimeMap.remove(taskId);
taskConfigMap.remove(taskId);
}
}
else if (taskCommunication.getState() == State.KILLED) {
failedOrKilled = true;
break;
}
else if (taskCommunication.getState() == State.SUCCEEDED) {
Long taskStartTime = taskStartTimeMap.get(taskId);
if (taskStartTime != null) {
long usedTime = System.currentTimeMillis() - taskStartTime;
LOG.debug("TaskGroup[{}] TaskId[{}] succeeded, used [{}]ms",
this.taskGroupId, taskId, usedTime);
taskStartTimeMap.remove(taskId);
taskConfigMap.remove(taskId);
}
}
}

// 2.发现该taskGroup下taskExecutor的总状态失败则汇报错误
if (failedOrKilled) {
lastTaskGroupContainerCommunication = reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
// 2.发现该taskGroup下taskExecutor的总状态失败则汇报错误
if (failedOrKilled) {
lastTaskGroupContainerCommunication = reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);

throw AddaxException.asAddaxException(RUNTIME_ERROR, lastTaskGroupContainerCommunication.getThrowable());
}
throw AddaxException.asAddaxException(RUNTIME_ERROR, lastTaskGroupContainerCommunication.getThrowable());
}

//3.有任务未执行,且正在运行的任务数小于最大通道限制
Iterator<Configuration> iterator = taskQueue.iterator();
while (iterator.hasNext() && runTasks.size() < channelNumber) {
Configuration taskConfig = iterator.next();
Integer taskId = taskConfig.getInt(CoreConstant.TASK_ID);
int attemptCount = 1;
TaskExecutor lastExecutor = taskFailedExecutorMap.get(taskId);
if (lastExecutor != null) {
attemptCount = lastExecutor.getAttemptCount() + 1;
long now = System.currentTimeMillis();
long failedTime = lastExecutor.getTimeStamp();
if (now - failedTime < taskRetryIntervalInMs) { //未到等待时间,继续留在队列
continue;
}
if (!lastExecutor.isShutdown()) { //上次失败的task仍未结束
if (now - failedTime > taskMaxWaitInMs) {
markCommunicationFailed(taskId);
reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
throw AddaxException.asAddaxException(ErrorCode.WAIT_TIME_EXCEED, "The task fail over wait timed out.");
}
else {
lastExecutor.shutdown(); //try to close again
continue;
}
//3.有任务未执行,且正在运行的任务数小于最大通道限制
Iterator<Configuration> iterator = taskQueue.iterator();
while (iterator.hasNext() && runTasks.size() < channelNumber) {
Configuration taskConfig = iterator.next();
Integer taskId = taskConfig.getInt(CoreConstant.TASK_ID);
int attemptCount = 1;
TaskExecutor lastExecutor = taskFailedExecutorMap.get(taskId);
if (lastExecutor != null) {
attemptCount = lastExecutor.getAttemptCount() + 1;
long now = System.currentTimeMillis();
long failedTime = lastExecutor.getTimeStamp();
if (now - failedTime < taskRetryIntervalInMs) { //未到等待时间,继续留在队列
continue;
}
if (!lastExecutor.isShutdown()) { //上次失败的task仍未结束
if (now - failedTime > taskMaxWaitInMs) {
markCommunicationFailed(taskId);
reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
throw AddaxException.asAddaxException(ErrorCode.WAIT_TIME_EXCEED, "The task fail over wait timed out.");
}
else {
LOG.debug("TaskGroup[{}] TaskId[{}] AttemptCount[{}] has already shutdown",
this.taskGroupId, taskId, lastExecutor.getAttemptCount());
lastExecutor.shutdown(); //try to close again
continue;
}
}
Configuration taskConfigForRun = taskMaxRetryTimes > 1 ? taskConfig.clone() : taskConfig;
TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount);
taskStartTimeMap.put(taskId, System.currentTimeMillis());
taskExecutor.doStart();
else {
LOG.debug("TaskGroup[{}] TaskId[{}] AttemptCount[{}] has already shutdown",
this.taskGroupId, taskId, lastExecutor.getAttemptCount());
}
}
Configuration taskConfigForRun = taskMaxRetryTimes > 1 ? taskConfig.clone() : taskConfig;
TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount);
taskStartTimeMap.put(taskId, System.currentTimeMillis());
taskExecutor.doStart();

iterator.remove();
runTasks.add(taskExecutor);
iterator.remove();
runTasks.add(taskExecutor);

//上面,增加task到runTasks列表,因此在monitor里注册。
taskMonitor.registerTask(taskId, this.containerCommunicator.getCommunication(taskId));
//上面,增加task到runTasks列表,因此在monitor里注册。
taskMonitor.registerTask(taskId, this.containerCommunicator.getCommunication(taskId));

taskFailedExecutorMap.remove(taskId);
LOG.debug("TaskGroup[{}] TaskId[{}] AttemptCount[{}] has started",
this.taskGroupId, taskId, attemptCount);
}
taskFailedExecutorMap.remove(taskId);
LOG.debug("TaskGroup[{}] TaskId[{}] AttemptCount[{}] has started",
this.taskGroupId, taskId, attemptCount);
}

//4.任务列表为空,executor已结束, 搜集状态为success--->成功
if (taskQueue.isEmpty() && isAllTaskDone(runTasks) && containerCommunicator.collectState() == State.SUCCEEDED) {
// 成功的情况下,也需要汇报一次。否则在任务结束非常快的情况下,采集的信息将会不准确
lastTaskGroupContainerCommunication = reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
//4.任务列表为空,executor已结束, 搜集状态为success--->成功
if (taskQueue.isEmpty() && isAllTaskDone(runTasks) && containerCommunicator.collectState() == State.SUCCEEDED) {
// 成功的情况下,也需要汇报一次。否则在任务结束非常快的情况下,采集的信息将会不准确
lastTaskGroupContainerCommunication = reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);

LOG.debug("The taskGroup[{}] has completed it's tasks.", this.taskGroupId);
break;
}
LOG.debug("The taskGroup[{}] has completed it's tasks.", this.taskGroupId);
break;
}

// 5.如果当前时间已经超出汇报时间的interval,那么我们需要马上汇报
long now = System.currentTimeMillis();
if (now - lastReportTimeStamp > reportIntervalInMillSec) {
lastTaskGroupContainerCommunication = reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
// 5.如果当前时间已经超出汇报时间的interval,那么我们需要马上汇报
long now = System.currentTimeMillis();
if (now - lastReportTimeStamp > reportIntervalInMillSec) {
lastTaskGroupContainerCommunication = reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);

lastReportTimeStamp = now;
lastReportTimeStamp = now;

//taskMonitor对于正在运行的task,每reportIntervalInMillSec进行检查
for (TaskExecutor taskExecutor : runTasks) {
taskMonitor.report(taskExecutor.getTaskId(), this.containerCommunicator.getCommunication(taskExecutor.getTaskId()));
}
}
try {
TimeUnit.MILLISECONDS.sleep(sleepIntervalInMillSec);
//taskMonitor对于正在运行的task,每reportIntervalInMillSec进行检查
for (TaskExecutor taskExecutor : runTasks) {
taskMonitor.report(taskExecutor.getTaskId(), this.containerCommunicator.getCommunication(taskExecutor.getTaskId()));
}
catch (InterruptedException e) {
Communication nowTaskGroupContainerCommunication = this.containerCommunicator.collect();
}
try {
TimeUnit.MILLISECONDS.sleep(sleepIntervalInMillSec);
}
catch (InterruptedException e) {
Communication nowTaskGroupContainerCommunication = this.containerCommunicator.collect();

if (nowTaskGroupContainerCommunication.getThrowable() == null) {
nowTaskGroupContainerCommunication.setThrowable(e);
}
nowTaskGroupContainerCommunication.setState(State.FAILED);
this.containerCommunicator.report(nowTaskGroupContainerCommunication);
throw AddaxException.asAddaxException(RUNTIME_ERROR, e);
if (nowTaskGroupContainerCommunication.getThrowable() == null) {
nowTaskGroupContainerCommunication.setThrowable(e);
}
nowTaskGroupContainerCommunication.setState(State.FAILED);
this.containerCommunicator.report(nowTaskGroupContainerCommunication);
throw AddaxException.asAddaxException(RUNTIME_ERROR, e);
}
}

//6.最后还要汇报一次
reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
// }
// catch (Throwable e) {
// Communication nowTaskGroupContainerCommunication = this.containerCommunicator.collect();
//
// if (nowTaskGroupContainerCommunication.getThrowable() == null) {
// nowTaskGroupContainerCommunication.setThrowable(e);
// }
// nowTaskGroupContainerCommunication.setState(State.FAILED);
// this.containerCommunicator.report(nowTaskGroupContainerCommunication);
//
// throw AddaxException.asAddaxException(RUNTIME_ERROR, e);
// }
//6.最后还要汇报一次
reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
}

private Map<Integer, Configuration> buildTaskConfigMap(List<Configuration> configurations)
Expand Down

0 comments on commit aaba2b1

Please sign in to comment.