Skip to content

Commit

Permalink
[#noissue] Added function names, idleness in flink
Browse files Browse the repository at this point in the history
  • Loading branch information
smilu97 committed May 12, 2023
1 parent 89fe796 commit fff9e0e
Showing 1 changed file with 87 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,142 +15,113 @@
*/
package com.navercorp.pinpoint.flink;

/**
* @author minwoo.jung
*/

import com.navercorp.pinpoint.common.server.bo.stat.join.JoinApplicationStatBo;
import com.navercorp.pinpoint.common.server.bo.stat.join.JoinStatBo;
import com.navercorp.pinpoint.flink.dao.hbase.StatisticsDao;
import com.navercorp.pinpoint.flink.function.AgentStatTimestampAssigner;
import com.navercorp.pinpoint.flink.function.ApplicationStatBoWindow;
import com.navercorp.pinpoint.flink.function.ApplicationStatBoFilter;
import com.navercorp.pinpoint.flink.function.ApplicationStatKeySelector;
import com.navercorp.pinpoint.flink.process.TBaseFlatMapper;
import com.navercorp.pinpoint.flink.receiver.TcpSourceFunction;
import com.navercorp.pinpoint.flink.vo.RawData;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.Serializable;
import java.time.Duration;

/**
* @author minwoo.jung
* @author youngjin.kim2
*/
public class StatStreamingVer2Job implements Serializable {
private final static Logger logger = LogManager.getLogger(StatStreamingVer2Job.class);
private static final Logger logger = LogManager.getLogger(StatStreamingVer2Job.class);

private static final String JOB_NAME = "Aggregation Stat Data";

public static void main(String[] args) throws Exception {
ParameterTool parameters = ParameterTool.fromArgs(args);
new StatStreamingVer2Job().start(parameters);
final ParameterTool paramTool = ParameterTool.fromArgs(args);

try {
new StatStreamingVer2Job().start(paramTool);
} catch (Exception e) {
logger.error("Error occurred while starting \"{}\" job.", JOB_NAME, e);
throw e;
}
}

public void start(ParameterTool parameters) throws Exception {
logger.info("start Aggregation Stat Data job with job parameter. : " + parameters.toMap() );
final Bootstrap bootstrap = Bootstrap.getInstance(parameters.toMap());
public void start(ParameterTool paramTool) throws Exception {
logger.info("Starting \"{}\" job with job parameter: {}", JOB_NAME, paramTool.toMap());

final Bootstrap bootstrap = Bootstrap.getInstance(paramTool.toMap());
final TcpSourceFunction source = bootstrap.getTcpSourceFunction();
final TBaseFlatMapper mapper = bootstrap.getTbaseFlatMapper();
final ApplicationStatBoWindow window = new ApplicationStatBoWindow();
final StatisticsDao sink = bootstrap.getStatisticsDao();

// set data source
final TcpSourceFunction tcpSourceFunction = bootstrap.getTcpSourceFunction();
final FlowParameters flowParams = new FlowParameters(paramTool);
final StreamExecutionEnvironment env = bootstrap.createStreamExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);
DataStreamSource<RawData> rawData = env.addSource(tcpSourceFunction);

//0. generation rawdata
final SingleOutputStreamOperator<Tuple3<String, JoinStatBo, Long>> statOperator = rawData.flatMap(bootstrap.getTbaseFlatMapper());

//1-1 save data processing application stat raw data
final StatisticsDao statisticsDao = bootstrap.getStatisticsDao();
DataStream<Tuple3<String, JoinStatBo, Long>> applicationStatAggregationData = statOperator.filter(new ApplicationStatBoFilter())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, JoinStatBo, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new AgentStatTimestampAssigner()))
.keyBy(new ApplicationStatKeySelector())
.window(TumblingEventTimeWindows.of(Time.milliseconds(ApplicationStatBoWindow.WINDOW_SIZE)))
.allowedLateness(Time.milliseconds(ApplicationStatBoWindow.ALLOWED_LATENESS))
.apply(new ApplicationStatBoWindow());
applicationStatAggregationData.addSink(statisticsDao);

// 1-2. aggregate application stat data
// statOperator.filter(new FilterFunction<Tuple3<String, JoinStatBo, Long>>() {
// @Override
// public boolean filter(Tuple3<String, JoinStatBo, Long> value) throws Exception {
// if (value.f1 instanceof JoinApplicationStatBo) {
// logger.info("1-2 application stat aggre window function : " + value.f1);
// return true;
// }
//
// return false;
// }
// })
// .assignTimestampsAndWatermarks(new Timestamp())
// .keyBy(0)
// .window(TumblingEventTimeWindows.of(Time.seconds(120)))
// .apply(new WindowFunction<Tuple3<String, JoinStatBo, Long>, Tuple3<String, JoinStatBo, Long>, Tuple, TimeWindow>() {
// @Override
// public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple3<String, JoinStatBo, Long>> values, Collector<Tuple3<String, JoinStatBo, Long>> out) throws Exception {
// try {
// JoinApplicationStatBo joinApplicationStatBo = join(values);
// logger.info("1-2 application stat aggre window function : " + joinApplicationStatBo);
// out.collect(new Tuple3<>(joinApplicationStatBo.getId(), joinApplicationStatBo, joinApplicationStatBo.getTimestamp()));
// } catch (Exception e) {
// logger.error("window function error", e);
// }
// }
//
// private JoinApplicationStatBo join(Iterable<Tuple3<String, JoinStatBo, Long>> values) {
// List<JoinApplicationStatBo> joinApplicaitonStatBoList = new ArrayList<JoinApplicationStatBo>();
// for (Tuple3<String, JoinStatBo, Long> value : values) {
// joinApplicaitonStatBoList.add((JoinApplicationStatBo) value.f1);
// }
// return JoinApplicationStatBo.joinApplicationStatBo(joinApplicaitonStatBoList);
//
// }
// }).writeUsingOutputFormat(statisticsDao);


// 2. agrregage agent stat
// statOperator.filter(new FilterFunction<Tuple3<String, JoinStatBo, Long>>() {
// @Override
// public boolean filter(Tuple3<String, JoinStatBo, Long> value) throws Exception {
// if (value.f1 instanceof JoinAgentStatBo) {
// logger.info("2 application stat aggre window function : " + value.f1);
// return true;
// }
//
// return false;
// }
// })
// .assignTimestampsAndWatermarks(new Timestamp())
// .keyBy(0)
// .window(TumblingEventTimeWindows.of(Time.seconds(120)))
//
// .apply(new WindowFunction<Tuple3<String, JoinStatBo, Long>, Tuple3<String, JoinStatBo, Long>, Tuple, TimeWindow>() {
//
// @Override
// public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple3<String, JoinStatBo, Long>> values, Collector<Tuple3<String, JoinStatBo, Long>> out) throws Exception {
// try {
// JoinAgentStatBo joinAgentStatBo = join(values);
// logger.info("2 agent stat aggre window function : " + joinAgentStatBo);
// out.collect(new Tuple3<>(joinAgentStatBo.getId(), joinAgentStatBo, joinAgentStatBo.getTimestamp()));
// } catch (Exception e) {
// logger.error("window function error", e);
// }
// }
//
// private JoinAgentStatBo join(Iterable<Tuple3<String, JoinStatBo, Long>> values) {
// List<JoinAgentStatBo> joinAgentStatBoList = new ArrayList<JoinAgentStatBo>();
// for (Tuple3<String, JoinStatBo, Long> value : values) {
// joinAgentStatBoList.add((JoinAgentStatBo) value.f1);
// }
//
// return JoinAgentStatBo.joinAgentStatBo(joinAgentStatBoList);
// }
// })
// .writeUsingOutputFormat(statisticsDao);

env.execute("Aggregation Stat Data");
env.getConfig().setGlobalJobParameters(paramTool);
env
.addSource(source).name("TcpSourceFunction")
.flatMap(mapper).name("TBaseFlatMapper")
.filter(el -> el.f1 instanceof JoinApplicationStatBo).name("OnlyJoinApplicationStatBo")
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Tuple3<String, JoinStatBo, Long>>forBoundedOutOfOrderness(flowParams.getOutOfOrderness())
.withIdleness(flowParams.getIdleness())
.withTimestampAssigner((el, t) -> el.f2))
.keyBy(el -> el.f0)
.window(TumblingEventTimeWindows.of(flowParams.getWindowSize()))
.allowedLateness(flowParams.getAllowedLateness())
.apply(window).name("ApplicationStatBoWindow")
.addSink(sink).name("StatisticsDao");
env.execute(JOB_NAME);
}

private static class FlowParameters {

private static final int DEFAULT_OUT_OF_ORDERNESS_MILLIS = 0;
private static final int DEFAULT_IDLENESS_MILLIS = 10000;
private static final int DEFAULT_WINDOW_SIZE_MILLIS = ApplicationStatBoWindow.WINDOW_SIZE;
private static final int DEFAULT_ALLOWED_LATENESS_MILLIS = ApplicationStatBoWindow.ALLOWED_LATENESS;

private static final String PARAM_PREFIX = "pinpoint.flink.";
private static final String PARAM_OUT_OF_ORDERNESS = PARAM_PREFIX + "outOfOrdernessMillis";
private static final String PARAM_IDLENESS = PARAM_PREFIX + "idlenessMillis";
private static final String PARAM_WINDOW_SIZE = PARAM_PREFIX + "windowSizeMillis";
private static final String PARAM_ALLOWED_LATENESS = PARAM_PREFIX + "allowedLatenessMillis";

private final int outOfOrdernessMillis;
private final int idlenessMillis;
private final int windowSizeMillis;
private final int allowedLatenessMillis;

FlowParameters(ParameterTool params) {
this.outOfOrdernessMillis = params.getInt(PARAM_OUT_OF_ORDERNESS, DEFAULT_OUT_OF_ORDERNESS_MILLIS);
this.idlenessMillis = params.getInt(PARAM_IDLENESS, DEFAULT_IDLENESS_MILLIS);
this.windowSizeMillis = params.getInt(PARAM_WINDOW_SIZE, DEFAULT_WINDOW_SIZE_MILLIS);
this.allowedLatenessMillis = params.getInt(PARAM_ALLOWED_LATENESS, DEFAULT_ALLOWED_LATENESS_MILLIS);
}

Duration getOutOfOrderness() {
return Duration.ofMillis(outOfOrdernessMillis);
}

Duration getIdleness() {
return Duration.ofMillis(idlenessMillis);
}

Time getWindowSize() {
return Time.milliseconds(windowSizeMillis);
}

Time getAllowedLateness() {
return Time.milliseconds(allowedLatenessMillis);
}

}

}

0 comments on commit fff9e0e

Please sign in to comment.