Skip to content

Commit 9d03431

Browse files
committed
[#noissue] Added function names, idleness in flink
1 parent 89fe796 commit 9d03431

File tree

1 file changed

+91
-116
lines changed

1 file changed

+91
-116
lines changed

flink/src/main/java/com/navercorp/pinpoint/flink/StatStreamingVer2Job.java

+91-116
Original file line numberDiff line numberDiff line change
@@ -15,142 +15,117 @@
1515
*/
1616
package com.navercorp.pinpoint.flink;
1717

18-
/**
19-
* @author minwoo.jung
20-
*/
21-
18+
import com.navercorp.pinpoint.common.server.bo.stat.join.JoinApplicationStatBo;
2219
import com.navercorp.pinpoint.common.server.bo.stat.join.JoinStatBo;
2320
import com.navercorp.pinpoint.flink.dao.hbase.StatisticsDao;
24-
import com.navercorp.pinpoint.flink.function.AgentStatTimestampAssigner;
2521
import com.navercorp.pinpoint.flink.function.ApplicationStatBoWindow;
26-
import com.navercorp.pinpoint.flink.function.ApplicationStatBoFilter;
27-
import com.navercorp.pinpoint.flink.function.ApplicationStatKeySelector;
22+
import com.navercorp.pinpoint.flink.process.TBaseFlatMapper;
2823
import com.navercorp.pinpoint.flink.receiver.TcpSourceFunction;
29-
import com.navercorp.pinpoint.flink.vo.RawData;
3024
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
25+
import org.apache.flink.api.common.functions.FilterFunction;
26+
import org.apache.flink.api.java.functions.KeySelector;
3127
import org.apache.flink.api.java.tuple.Tuple3;
3228
import org.apache.flink.api.java.utils.ParameterTool;
33-
import org.apache.flink.streaming.api.datastream.DataStream;
34-
import org.apache.flink.streaming.api.datastream.DataStreamSource;
35-
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
3629
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3730
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
3831
import org.apache.flink.streaming.api.windowing.time.Time;
39-
import org.apache.logging.log4j.Logger;
4032
import org.apache.logging.log4j.LogManager;
33+
import org.apache.logging.log4j.Logger;
4134

4235
import java.io.Serializable;
36+
import java.time.Duration;
4337

38+
/**
39+
* @author minwoo.jung
40+
* @author youngjin.kim2
41+
*/
4442
public class StatStreamingVer2Job implements Serializable {
45-
private final static Logger logger = LogManager.getLogger(StatStreamingVer2Job.class);
43+
private static final Logger logger = LogManager.getLogger(StatStreamingVer2Job.class);
44+
45+
private static final String JOB_NAME = "Aggregation Stat Data";
4646

4747
public static void main(String[] args) throws Exception {
48-
ParameterTool parameters = ParameterTool.fromArgs(args);
49-
new StatStreamingVer2Job().start(parameters);
48+
final ParameterTool paramTool = ParameterTool.fromArgs(args);
49+
50+
try {
51+
new StatStreamingVer2Job().start(paramTool);
52+
} catch (Exception e) {
53+
logger.error("Error occurred while starting \"{}\" job.", JOB_NAME, e);
54+
throw e;
55+
}
5056
}
5157

52-
public void start(ParameterTool parameters) throws Exception {
53-
logger.info("start Aggregation Stat Data job with job parameter. : " + parameters.toMap() );
54-
final Bootstrap bootstrap = Bootstrap.getInstance(parameters.toMap());
58+
public void start(ParameterTool paramTool) throws Exception {
59+
logger.info("Starting \"{}\" job with job parameter: {}", JOB_NAME, paramTool.toMap());
60+
61+
final Bootstrap bootstrap = Bootstrap.getInstance(paramTool.toMap());
62+
final TcpSourceFunction source = bootstrap.getTcpSourceFunction();
63+
final TBaseFlatMapper mapper = bootstrap.getTbaseFlatMapper();
64+
final FilterFunction<Tuple3<String, JoinStatBo, Long>> filter = el -> (el.f1 instanceof JoinApplicationStatBo);
65+
final KeySelector<Tuple3<String, JoinStatBo, Long>, String> keySelector = el -> el.f0;
66+
final ApplicationStatBoWindow window = new ApplicationStatBoWindow();
67+
final StatisticsDao sink = bootstrap.getStatisticsDao();
5568

56-
// set data source
57-
final TcpSourceFunction tcpSourceFunction = bootstrap.getTcpSourceFunction();
69+
final FlowParameters flowParams = new FlowParameters(paramTool);
5870
final StreamExecutionEnvironment env = bootstrap.createStreamExecutionEnvironment();
59-
env.getConfig().setGlobalJobParameters(parameters);
60-
DataStreamSource<RawData> rawData = env.addSource(tcpSourceFunction);
61-
62-
//0. generation rawdata
63-
final SingleOutputStreamOperator<Tuple3<String, JoinStatBo, Long>> statOperator = rawData.flatMap(bootstrap.getTbaseFlatMapper());
64-
65-
//1-1 save data processing application stat raw data
66-
final StatisticsDao statisticsDao = bootstrap.getStatisticsDao();
67-
DataStream<Tuple3<String, JoinStatBo, Long>> applicationStatAggregationData = statOperator.filter(new ApplicationStatBoFilter())
68-
.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, JoinStatBo, Long>>forMonotonousTimestamps()
69-
.withTimestampAssigner(new AgentStatTimestampAssigner()))
70-
.keyBy(new ApplicationStatKeySelector())
71-
.window(TumblingEventTimeWindows.of(Time.milliseconds(ApplicationStatBoWindow.WINDOW_SIZE)))
72-
.allowedLateness(Time.milliseconds(ApplicationStatBoWindow.ALLOWED_LATENESS))
73-
.apply(new ApplicationStatBoWindow());
74-
applicationStatAggregationData.addSink(statisticsDao);
75-
76-
// 1-2. aggregate application stat data
77-
// statOperator.filter(new FilterFunction<Tuple3<String, JoinStatBo, Long>>() {
78-
// @Override
79-
// public boolean filter(Tuple3<String, JoinStatBo, Long> value) throws Exception {
80-
// if (value.f1 instanceof JoinApplicationStatBo) {
81-
// logger.info("1-2 application stat aggre window function : " + value.f1);
82-
// return true;
83-
// }
84-
//
85-
// return false;
86-
// }
87-
// })
88-
// .assignTimestampsAndWatermarks(new Timestamp())
89-
// .keyBy(0)
90-
// .window(TumblingEventTimeWindows.of(Time.seconds(120)))
91-
// .apply(new WindowFunction<Tuple3<String, JoinStatBo, Long>, Tuple3<String, JoinStatBo, Long>, Tuple, TimeWindow>() {
92-
// @Override
93-
// public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple3<String, JoinStatBo, Long>> values, Collector<Tuple3<String, JoinStatBo, Long>> out) throws Exception {
94-
// try {
95-
// JoinApplicationStatBo joinApplicationStatBo = join(values);
96-
// logger.info("1-2 application stat aggre window function : " + joinApplicationStatBo);
97-
// out.collect(new Tuple3<>(joinApplicationStatBo.getId(), joinApplicationStatBo, joinApplicationStatBo.getTimestamp()));
98-
// } catch (Exception e) {
99-
// logger.error("window function error", e);
100-
// }
101-
// }
102-
//
103-
// private JoinApplicationStatBo join(Iterable<Tuple3<String, JoinStatBo, Long>> values) {
104-
// List<JoinApplicationStatBo> joinApplicaitonStatBoList = new ArrayList<JoinApplicationStatBo>();
105-
// for (Tuple3<String, JoinStatBo, Long> value : values) {
106-
// joinApplicaitonStatBoList.add((JoinApplicationStatBo) value.f1);
107-
// }
108-
// return JoinApplicationStatBo.joinApplicationStatBo(joinApplicaitonStatBoList);
109-
//
110-
// }
111-
// }).writeUsingOutputFormat(statisticsDao);
112-
113-
114-
// 2. agrregage agent stat
115-
// statOperator.filter(new FilterFunction<Tuple3<String, JoinStatBo, Long>>() {
116-
// @Override
117-
// public boolean filter(Tuple3<String, JoinStatBo, Long> value) throws Exception {
118-
// if (value.f1 instanceof JoinAgentStatBo) {
119-
// logger.info("2 application stat aggre window function : " + value.f1);
120-
// return true;
121-
// }
122-
//
123-
// return false;
124-
// }
125-
// })
126-
// .assignTimestampsAndWatermarks(new Timestamp())
127-
// .keyBy(0)
128-
// .window(TumblingEventTimeWindows.of(Time.seconds(120)))
129-
//
130-
// .apply(new WindowFunction<Tuple3<String, JoinStatBo, Long>, Tuple3<String, JoinStatBo, Long>, Tuple, TimeWindow>() {
131-
//
132-
// @Override
133-
// public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple3<String, JoinStatBo, Long>> values, Collector<Tuple3<String, JoinStatBo, Long>> out) throws Exception {
134-
// try {
135-
// JoinAgentStatBo joinAgentStatBo = join(values);
136-
// logger.info("2 agent stat aggre window function : " + joinAgentStatBo);
137-
// out.collect(new Tuple3<>(joinAgentStatBo.getId(), joinAgentStatBo, joinAgentStatBo.getTimestamp()));
138-
// } catch (Exception e) {
139-
// logger.error("window function error", e);
140-
// }
141-
// }
142-
//
143-
// private JoinAgentStatBo join(Iterable<Tuple3<String, JoinStatBo, Long>> values) {
144-
// List<JoinAgentStatBo> joinAgentStatBoList = new ArrayList<JoinAgentStatBo>();
145-
// for (Tuple3<String, JoinStatBo, Long> value : values) {
146-
// joinAgentStatBoList.add((JoinAgentStatBo) value.f1);
147-
// }
148-
//
149-
// return JoinAgentStatBo.joinAgentStatBo(joinAgentStatBoList);
150-
// }
151-
// })
152-
// .writeUsingOutputFormat(statisticsDao);
153-
154-
env.execute("Aggregation Stat Data");
71+
env.getConfig().setGlobalJobParameters(paramTool);
72+
env
73+
.addSource(source).name("TcpSourceFunction")
74+
.flatMap(mapper).name("TBaseFlatMapper")
75+
.filter(filter).name("OnlyJoinApplicationStatBo")
76+
.assignTimestampsAndWatermarks(WatermarkStrategy
77+
.<Tuple3<String, JoinStatBo, Long>>forBoundedOutOfOrderness(flowParams.getOutOfOrderness())
78+
.withIdleness(flowParams.getIdleness())
79+
.withTimestampAssigner((el, t) -> el.f2))
80+
.keyBy(keySelector)
81+
.window(TumblingEventTimeWindows.of(flowParams.getWindowSize()))
82+
.allowedLateness(flowParams.getAllowedLateness())
83+
.apply(window).name("ApplicationStatBoWindow")
84+
.addSink(sink).name("StatisticsDao");
85+
env.execute(JOB_NAME);
15586
}
87+
88+
private static class FlowParameters {
89+
90+
private static final int DEFAULT_OUT_OF_ORDERNESS_MILLIS = 0;
91+
private static final int DEFAULT_IDLENESS_MILLIS = 10000;
92+
private static final int DEFAULT_WINDOW_SIZE_MILLIS = ApplicationStatBoWindow.WINDOW_SIZE;
93+
private static final int DEFAULT_ALLOWED_LATENESS_MILLIS = ApplicationStatBoWindow.ALLOWED_LATENESS;
94+
95+
private static final String PARAM_PREFIX = "pinpoint.flink.";
96+
private static final String PARAM_OUT_OF_ORDERNESS = PARAM_PREFIX + "outOfOrdernessMillis";
97+
private static final String PARAM_IDLENESS = PARAM_PREFIX + "idlenessMillis";
98+
private static final String PARAM_WINDOW_SIZE = PARAM_PREFIX + "windowSizeMillis";
99+
private static final String PARAM_ALLOWED_LATENESS = PARAM_PREFIX + "allowedLatenessMillis";
100+
101+
private final int outOfOrdernessMillis;
102+
private final int idlenessMillis;
103+
private final int windowSizeMillis;
104+
private final int allowedLatenessMillis;
105+
106+
FlowParameters(ParameterTool params) {
107+
this.outOfOrdernessMillis = params.getInt(PARAM_OUT_OF_ORDERNESS, DEFAULT_OUT_OF_ORDERNESS_MILLIS);
108+
this.idlenessMillis = params.getInt(PARAM_IDLENESS, DEFAULT_IDLENESS_MILLIS);
109+
this.windowSizeMillis = params.getInt(PARAM_WINDOW_SIZE, DEFAULT_WINDOW_SIZE_MILLIS);
110+
this.allowedLatenessMillis = params.getInt(PARAM_ALLOWED_LATENESS, DEFAULT_ALLOWED_LATENESS_MILLIS);
111+
}
112+
113+
Duration getOutOfOrderness() {
114+
return Duration.ofMillis(outOfOrdernessMillis);
115+
}
116+
117+
Duration getIdleness() {
118+
return Duration.ofMillis(idlenessMillis);
119+
}
120+
121+
Time getWindowSize() {
122+
return Time.milliseconds(windowSizeMillis);
123+
}
124+
125+
Time getAllowedLateness() {
126+
return Time.milliseconds(allowedLatenessMillis);
127+
}
128+
129+
}
130+
156131
}

0 commit comments

Comments
 (0)